Commit 129d6577 by 罗长华

增加在线回调方法

parent 7db96678
...@@ -82,7 +82,12 @@ public class GeneralMessageHandler { ...@@ -82,7 +82,12 @@ public class GeneralMessageHandler {
// 处理自定义异常 // 处理自定义异常
ApiCode errorCode; ApiCode errorCode;
if (exception instanceof BusinessException) { if (exception instanceof BusinessException) {
errorCode = ApiCode.getApiCode(((BusinessException) exception).getErrorCode()); Integer apiCode = ((BusinessException) exception).getErrorCode();
if (apiCode == null) {
errorCode = ApiCode.BUSINESS_EXCEPTION;
} else {
errorCode = ApiCode.getApiCode(apiCode);
}
} else if (exception instanceof DaoException) { } else if (exception instanceof DaoException) {
errorCode = ApiCode.DAO_EXCEPTION; errorCode = ApiCode.DAO_EXCEPTION;
} else if (exception instanceof VerificationCodeException) { } else if (exception instanceof VerificationCodeException) {
......
package com.wecloud.im.event; package com.wecloud.im.event;
import lombok.Getter;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
/** /**
...@@ -8,9 +10,22 @@ import org.springframework.context.ApplicationEvent; ...@@ -8,9 +10,22 @@ import org.springframework.context.ApplicationEvent;
* @Date 2022年04月22日 09:44 * @Date 2022年04月22日 09:44
* @Version 1.0 * @Version 1.0
*/ */
@Getter
public class ClientOnlineStatusChangeEvent extends ApplicationEvent { public class ClientOnlineStatusChangeEvent extends ApplicationEvent {
private Long applicationId;
private String clientId;
private Integer status;
private Integer deviceType;
private Long time;
private String clientIp;
/** /**
* Create a new {@code ApplicationEvent}. * Create a new {@code ApplicationEvent}.
* @param source the object on which the event initially occurred or with * @param source the object on which the event initially occurred or with
...@@ -19,4 +34,15 @@ public class ClientOnlineStatusChangeEvent extends ApplicationEvent { ...@@ -19,4 +34,15 @@ public class ClientOnlineStatusChangeEvent extends ApplicationEvent {
public ClientOnlineStatusChangeEvent(Object source) { public ClientOnlineStatusChangeEvent(Object source) {
super(source); super(source);
} }
public ClientOnlineStatusChangeEvent(Long applicationId, String clientId,
Integer status, Integer deviceType, Long time, String clientIp) {
super(clientId);
this.applicationId = applicationId;
this.clientId = clientId;
this.status = status;
this.deviceType = deviceType;
this.time = time;
this.clientIp = clientIp;
}
} }
package com.wecloud.im.event.listener; package com.wecloud.im.event.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.wecloud.im.event.ClientOnlineStatusChangeEvent; import com.wecloud.im.event.ClientOnlineStatusChangeEvent;
import com.wecloud.im.service.ImCallbackService;
/** /**
* *
...@@ -14,8 +16,23 @@ import com.wecloud.im.event.ClientOnlineStatusChangeEvent; ...@@ -14,8 +16,23 @@ import com.wecloud.im.event.ClientOnlineStatusChangeEvent;
@Component @Component
public class ClientOnlineStatusChangeListener implements ApplicationListener<ClientOnlineStatusChangeEvent> { public class ClientOnlineStatusChangeListener implements ApplicationListener<ClientOnlineStatusChangeEvent> {
@Autowired
private ImCallbackService callbackService;
@Override @Override
public void onApplicationEvent(ClientOnlineStatusChangeEvent event) { public void onApplicationEvent(ClientOnlineStatusChangeEvent event) {
Long applicationId = event.getApplicationId();
String clientId = event.getClientId();
Integer status = event.getStatus();
Integer deviceType = event.getDeviceType();
Long time = event.getTime();
String clientIp = event.getClientIp();
callbackService.clientOnlineStatusChange(applicationId, clientId, status, deviceType, time, clientIp);
} }
} }
package com.wecloud.im.netty.core; package com.wecloud.im.netty.core;
import com.wecloud.dispatch.general.GeneralMessageHandler;
import com.wecloud.im.executor.BusinessThreadPool;
import com.wecloud.im.ws.manager.ChannelManager;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.springframework.stereotype.Component;
import com.wecloud.dispatch.general.GeneralMessageHandler;
import com.wecloud.im.executor.BusinessThreadPool;
import com.wecloud.im.ws.manager.ChannelManager;
import static com.wecloud.im.ws.ImConstant.PING; import static com.wecloud.im.ws.ImConstant.PING;
import static com.wecloud.im.ws.ImConstant.PONG; import static com.wecloud.im.ws.ImConstant.PONG;
import static com.wecloud.im.ws.ImConstant.READ_IDLE_CLOSE_COUNT; import static com.wecloud.im.ws.ImConstant.READ_IDLE_CLOSE_COUNT;
...@@ -137,6 +139,7 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -137,6 +139,7 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
Integer platform = ctx.channel().attr(ChannelManager.PLATFORM).get(); Integer platform = ctx.channel().attr(ChannelManager.PLATFORM).get();
log.info("CLIENT_ID:{}, handlerRemoved. channelId is {}", clientId, ctx.channel().id().asLongText()); log.info("CLIENT_ID:{}, handlerRemoved. channelId is {}", clientId, ctx.channel().id().asLongText());
// 关掉连接 // 关掉连接
channelManager.offline(clientId, platform, ctx); channelManager.offline(clientId, platform, ctx);
......
package com.wecloud.im.netty.handler; package com.wecloud.im.netty.handler;
import com.alibaba.fastjson.JSONObject;
import com.auth0.jwt.interfaces.DecodedJWT;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.netty.core.WsReadHandler;
import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientService;
import com.wecloud.im.ws.manager.ChannelManager;
import com.wecloud.im.ws.model.WsConstants;
import com.wecloud.im.ws.utils.FullHttpRequestUtils;
import com.wecloud.utils.JsonUtils;
import io.geekidea.springbootplus.config.constant.CommonConstant; import io.geekidea.springbootplus.config.constant.CommonConstant;
import io.geekidea.springbootplus.framework.common.api.ApiCode; import io.geekidea.springbootplus.framework.common.api.ApiCode;
import io.geekidea.springbootplus.framework.common.api.ApiResult; import io.geekidea.springbootplus.framework.common.api.ApiResult;
...@@ -23,15 +12,31 @@ import io.netty.channel.socket.nio.NioSocketChannel; ...@@ -23,15 +12,31 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpResponseStatus;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Base64; import java.util.Base64;
import java.util.Map; import java.util.Map;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.auth0.jwt.interfaces.DecodedJWT;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.event.ClientOnlineStatusChangeEvent;
import com.wecloud.im.netty.core.WsReadHandler;
import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientService;
import com.wecloud.im.ws.manager.ChannelManager;
import com.wecloud.im.ws.model.WsConstants;
import com.wecloud.im.ws.utils.FullHttpRequestUtils;
import com.wecloud.utils.JsonUtils;
/** /**
* @Description 聊天模块 http请求处理 * @Description 聊天模块 http请求处理
* @Author hewei hwei1233@163.com * @Author hewei hwei1233@163.com
...@@ -56,6 +61,9 @@ public class NettyApiRequest { ...@@ -56,6 +61,9 @@ public class NettyApiRequest {
@Autowired @Autowired
private ImApplicationService imApplicationService; private ImApplicationService imApplicationService;
@Autowired
private ApplicationEventPublisher eventPublisher;
/** /**
* http请求接收 * http请求接收
* *
...@@ -155,6 +163,15 @@ public class NettyApiRequest { ...@@ -155,6 +163,15 @@ public class NettyApiRequest {
// 保存用户上下文对象 // 保存用户上下文对象
appUserChannelsService.online(client.getId(), jwtToken.getPlatform(), (NioSocketChannel) ctx.channel()); appUserChannelsService.online(client.getId(), jwtToken.getPlatform(), (NioSocketChannel) ctx.channel());
// 发布客户端在线状态变化事件
Long appId = app.getId();
String clientId = client.getClientId();
Integer platform = jwtToken.getPlatform();
long time = System.currentTimeMillis();
String clientIp = ctx.channel().remoteAddress().toString();
ClientOnlineStatusChangeEvent clientOnlineStatusChangeEvent = new ClientOnlineStatusChangeEvent(appId,
clientId, 1, platform, time, clientIp);
eventPublisher.publishEvent(clientOnlineStatusChangeEvent);
//移除当前api处理handler, 不再参与长连接处理 //移除当前api处理handler, 不再参与长连接处理
ctx.pipeline().remove("SingleHttpRequestHandler"); ctx.pipeline().remove("SingleHttpRequestHandler");
......
...@@ -5,9 +5,11 @@ import io.geekidea.springbootplus.framework.shiro.signature.SignUtils; ...@@ -5,9 +5,11 @@ import io.geekidea.springbootplus.framework.shiro.signature.SignUtils;
import java.util.Date; import java.util.Date;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
...@@ -24,6 +26,7 @@ import com.wecloud.im.service.ImCallbackService; ...@@ -24,6 +26,7 @@ import com.wecloud.im.service.ImCallbackService;
* @Date 2022年04月22日 09:11 * @Date 2022年04月22日 09:11
* @Version 1.0 * @Version 1.0
*/ */
@Service
public class ImCallbackServiceImpl implements ImCallbackService { public class ImCallbackServiceImpl implements ImCallbackService {
@Autowired @Autowired
...@@ -48,41 +51,44 @@ public class ImCallbackServiceImpl implements ImCallbackService { ...@@ -48,41 +51,44 @@ public class ImCallbackServiceImpl implements ImCallbackService {
* 用户在线状态 * 用户在线状态
* @Author luozh * @Author luozh
* @Date 2022年04月22日 09:35:47 * @Date 2022年04月22日 09:35:47
* @param clientId * @param applicationId 应用id
* @param status * @param clientId 客户端id
* @param deviceType * @param status 状态:0:online 上线、1:offline 离线、2:logout 登出
* @param time * @param deviceType 设备类型
* @param clientIp * @param time 发生时间
* @param clientIp 用户当前的 IP 地址及端口
* @Return * @Return
*/ */
@Override @Override
public Boolean clientOnlineStatusChange(Long applicationId, String clientId, Integer status, public Boolean clientOnlineStatusChange(Long applicationId, String clientId, Integer status,
Integer deviceType, Long time, String clientIp) { Integer deviceType, Long time, String clientIp) {
ImApplication application = applicationService.getCacheById(applicationId); ImApplication application = applicationService.getById(applicationId);
if (application == null) { if (application == null) {
throw new BusinessException("application not exist"); throw new BusinessException("application not exist");
} }
String subscribeUrl = application.getOnlineStatusSubscribeUrl(); String subscribeUrl = application.getOnlineStatusSubscribeUrl();
String appKey = application.getAppKey(); if (StringUtils.isNotBlank(subscribeUrl)) {
String appSecret = application.getAppSecret(); String appKey = application.getAppKey();
String callbackUrl = buildCallbackUrl(subscribeUrl, appKey, appSecret); String appSecret = application.getAppSecret();
String callbackUrl = buildCallbackUrl(subscribeUrl, appKey, appSecret);
ClientOnlineStatusChangeDto body = ClientOnlineStatusChangeDto.builder() ClientOnlineStatusChangeDto body = ClientOnlineStatusChangeDto.builder()
.userId(clientId) .userId(clientId)
.status(status) .status(status)
.os("") .os("")
.time(time) .time(time)
.clientIp(clientIp) .clientIp(clientIp)
.build(); .build();
ResponseEntity<Object> response = restTemplate.postForEntity(callbackUrl, body, Object.class); ResponseEntity<Object> response = restTemplate.postForEntity(callbackUrl, body, Object.class);
// 同步在线状态时需要接收服务提供应答,只要有 HTTP 应答码 200 即认为状态已经同步 // 同步在线状态时需要接收服务提供应答,只要有 HTTP 应答码 200 即认为状态已经同步
if (response.getStatusCode().equals(HttpStatus.OK)) { if (response.getStatusCode().equals(HttpStatus.OK)) {
// do nothing // do nothing
} else { } else {
// 如果应答超时 5 秒,会再尝试推送 2 次,如果仍然失败,将不再同步此条状态。如短时间内有大面积超时,将暂停推送,1 分钟后会继续推送。 // 如果应答超时 5 秒,会再尝试推送 2 次,如果仍然失败,将不再同步此条状态。如短时间内有大面积超时,将暂停推送,1 分钟后会继续推送。
}
} }
return true; return true;
} }
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<groupId>com.wecloud</groupId> <groupId>com.wecloud</groupId>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>im-sdk</artifactId> <artifactId>im-sdk</artifactId>
<version>1.0</version> <version>1.0-SNAPSHOT</version>
<description> <description>
蔚可云IM SDK 蔚可云IM SDK
</description> </description>
...@@ -74,7 +74,24 @@ ...@@ -74,7 +74,24 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
</plugins> </plugins>
</build> </build>
<distributionManagement>
<repository>
<id>wecloud</id>
<url>http://124.71.68.175:8081/repository/maven-releases/</url>
</repository>
<snapshotRepository>
<id>wecloud</id>
<url>http://124.71.68.175:8081/repository/maven-snapshots/</url>
</snapshotRepository>
</distributionManagement>
</project> </project>
...@@ -619,22 +619,4 @@ ...@@ -619,22 +619,4 @@
</profile> </profile>
</profiles> </profiles>
<!-- <repositories>-->
<!-- <repository>-->
<!-- <id>aliyun</id>-->
<!-- <name>aliyun-maven</name>-->
<!-- <url>http://maven.aliyun.com/nexus/content/groups/public/</url>-->
<!-- </repository>-->
<!-- <repository>-->
<!-- <id>spring-milestones</id>-->
<!-- <name>Spring Milestones</name>-->
<!-- <url>https://maven.aliyun.com/repository/spring</url>-->
<!-- </repository>-->
<!-- <repository>-->
<!-- <id>central</id>-->
<!-- <name>maven-central</name>-->
<!-- <url>http://central.maven.org/maven2/</url>-->
<!-- </repository>-->
<!-- </repositories>-->
</project> </project>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment