Commit 22832b70 by 罗长华

完善回调功能

parent 544bcd9a
...@@ -16,7 +16,7 @@ public class ClientOnlineStatusChangeEvent extends ApplicationEvent { ...@@ -16,7 +16,7 @@ public class ClientOnlineStatusChangeEvent extends ApplicationEvent {
private Long applicationId; private Long applicationId;
private String clientId; private Long clientId;
private Integer status; private Integer status;
...@@ -35,7 +35,7 @@ public class ClientOnlineStatusChangeEvent extends ApplicationEvent { ...@@ -35,7 +35,7 @@ public class ClientOnlineStatusChangeEvent extends ApplicationEvent {
super(source); super(source);
} }
public ClientOnlineStatusChangeEvent(Long applicationId, String clientId, public ClientOnlineStatusChangeEvent(Long applicationId, Long clientId,
Integer status, Integer deviceType, Long time, String clientIp) { Integer status, Integer deviceType, Long time, String clientIp) {
super(clientId); super(clientId);
this.applicationId = applicationId; this.applicationId = applicationId;
......
...@@ -2,6 +2,7 @@ package com.wecloud.im.event.listener; ...@@ -2,6 +2,7 @@ package com.wecloud.im.event.listener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.wecloud.im.event.ClientOnlineStatusChangeEvent; import com.wecloud.im.event.ClientOnlineStatusChangeEvent;
...@@ -19,11 +20,12 @@ public class ClientOnlineStatusChangeListener implements ApplicationListener<Cli ...@@ -19,11 +20,12 @@ public class ClientOnlineStatusChangeListener implements ApplicationListener<Cli
@Autowired @Autowired
private ImCallbackService callbackService; private ImCallbackService callbackService;
@Async
@Override @Override
public void onApplicationEvent(ClientOnlineStatusChangeEvent event) { public void onApplicationEvent(ClientOnlineStatusChangeEvent event) {
Long applicationId = event.getApplicationId(); Long applicationId = event.getApplicationId();
String clientId = event.getClientId(); Long clientId = event.getClientId();
Integer status = event.getStatus(); Integer status = event.getStatus();
......
...@@ -9,9 +9,12 @@ import lombok.extern.slf4j.Slf4j; ...@@ -9,9 +9,12 @@ import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource; import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.wecloud.dispatch.general.GeneralMessageHandler; import com.wecloud.dispatch.general.GeneralMessageHandler;
import com.wecloud.im.event.ClientOnlineStatusChangeEvent;
import com.wecloud.im.executor.BusinessThreadPool; import com.wecloud.im.executor.BusinessThreadPool;
import com.wecloud.im.ws.manager.ChannelManager; import com.wecloud.im.ws.manager.ChannelManager;
...@@ -35,6 +38,9 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -35,6 +38,9 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Resource @Resource
private GeneralMessageHandler generalMessageHandler; private GeneralMessageHandler generalMessageHandler;
@Autowired
private ApplicationEventPublisher eventPublisher;
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
// 读空闲的计数清零 // 读空闲的计数清零
...@@ -135,14 +141,21 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -135,14 +141,21 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Override @Override
public void handlerRemoved(ChannelHandlerContext ctx) { public void handlerRemoved(ChannelHandlerContext ctx) {
Long appId = ctx.channel().attr(ChannelManager.APPLICATION_ID).get();
Long clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get(); Long clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
Integer platform = ctx.channel().attr(ChannelManager.PLATFORM).get(); Integer platform = ctx.channel().attr(ChannelManager.PLATFORM).get();
log.info("CLIENT_ID:{}, handlerRemoved. channelId is {}", clientId, ctx.channel().id().asLongText()); log.info("CLIENT_ID:{}, handlerRemoved. channelId is {}", clientId, ctx.channel().id().asLongText());
// 关掉连接 // 关掉连接
channelManager.offline(clientId, platform, ctx); channelManager.offline(clientId, platform, ctx);
// 下线通知
long time = System.currentTimeMillis();
String clientIp = ctx.channel().remoteAddress().toString();
ClientOnlineStatusChangeEvent clientOnlineStatusChangeEvent = new ClientOnlineStatusChangeEvent(appId,
clientId, 0, platform, time, clientIp);
eventPublisher.publishEvent(clientOnlineStatusChangeEvent);
} }
} }
...@@ -158,20 +158,21 @@ public class NettyApiRequest { ...@@ -158,20 +158,21 @@ public class NettyApiRequest {
// 设置属性值 userid - channel // 设置属性值 userid - channel
ctx.channel().attr(ChannelManager.CLIENT_ID).set(client.getId()); ctx.channel().attr(ChannelManager.CLIENT_ID).set(client.getId());
ctx.channel().attr(ChannelManager.PLATFORM).set(jwtToken.getPlatform()); ctx.channel().attr(ChannelManager.PLATFORM).set(jwtToken.getPlatform());
ctx.channel().attr(ChannelManager.APPLICATION_ID).set(app.getId());// 读空闲的计数=0
ctx.channel().attr(ChannelManager.READ_IDLE_TIMES).set(0);// 读空闲的计数=0 ctx.channel().attr(ChannelManager.READ_IDLE_TIMES).set(0);// 读空闲的计数=0
// 保存用户上下文对象 // 保存用户上下文对象
appUserChannelsService.online(client.getId(), jwtToken.getPlatform(), (NioSocketChannel) ctx.channel()); appUserChannelsService.online(client.getId(), jwtToken.getPlatform(), (NioSocketChannel) ctx.channel());
// 发布客户端在线状态变化事件 // 发布客户端在线状态变化-上线事件
// Long appId = app.getId(); Long appId = app.getId();
// String clientId = client.getClientId(); String clientId = client.getClientId();
// Integer platform = jwtToken.getPlatform(); Integer platform = jwtToken.getPlatform();
// long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
// String clientIp = ctx.channel().remoteAddress().toString(); String clientIp = ctx.channel().remoteAddress().toString();
// ClientOnlineStatusChangeEvent clientOnlineStatusChangeEvent = new ClientOnlineStatusChangeEvent(appId, ClientOnlineStatusChangeEvent clientOnlineStatusChangeEvent = new ClientOnlineStatusChangeEvent(appId,
// clientId, 1, platform, time, clientIp); client.getId(), 1, platform, time, clientIp);
// eventPublisher.publishEvent(clientOnlineStatusChangeEvent); eventPublisher.publishEvent(clientOnlineStatusChangeEvent);
//移除当前api处理handler, 不再参与长连接处理 //移除当前api处理handler, 不再参与长连接处理
ctx.pipeline().remove("SingleHttpRequestHandler"); ctx.pipeline().remove("SingleHttpRequestHandler");
......
...@@ -24,6 +24,6 @@ public interface ImCallbackService { ...@@ -24,6 +24,6 @@ public interface ImCallbackService {
* @param * @param
* @Return * @Return
*/ */
Boolean clientOnlineStatusChange(Long applicationId, String clientId, Integer status, Boolean clientOnlineStatusChange(Long applicationId, Long clientId, Integer status,
Integer deviceType, Long time, String clientIp); Integer deviceType, Long time, String clientIp);
} }
package com.wecloud.im.service.impl; package com.wecloud.im.service.impl;
import io.geekidea.springbootplus.framework.common.exception.BusinessException;
import io.geekidea.springbootplus.framework.shiro.signature.SignUtils; import io.geekidea.springbootplus.framework.shiro.signature.SignUtils;
import java.util.Date; import java.util.Date;
...@@ -16,9 +15,11 @@ import cn.hutool.core.date.DateUtil; ...@@ -16,9 +15,11 @@ import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.id.NanoId; import cn.hutool.core.lang.id.NanoId;
import com.wecloud.im.entity.ImApplication; import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.param.ClientOnlineStatusChangeDto; import com.wecloud.im.param.ClientOnlineStatusChangeDto;
import com.wecloud.im.service.ImApplicationService; import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImCallbackService; import com.wecloud.im.service.ImCallbackService;
import com.wecloud.im.service.ImClientService;
/** /**
* 回调服务实现类 * 回调服务实现类
...@@ -35,6 +36,9 @@ public class ImCallbackServiceImpl implements ImCallbackService { ...@@ -35,6 +36,9 @@ public class ImCallbackServiceImpl implements ImCallbackService {
@Autowired @Autowired
private ImApplicationService applicationService; private ImApplicationService applicationService;
@Autowired
private ImClientService imClientService;
/** /**
* 全量消息路由 * 全量消息路由
* @Author luozh * @Author luozh
...@@ -53,35 +57,38 @@ public class ImCallbackServiceImpl implements ImCallbackService { ...@@ -53,35 +57,38 @@ public class ImCallbackServiceImpl implements ImCallbackService {
* @Date 2022年04月22日 09:35:47 * @Date 2022年04月22日 09:35:47
* @param applicationId 应用id * @param applicationId 应用id
* @param clientId 客户端id * @param clientId 客户端id
* @param status 状态:0:online 上线、1:offline 离线、2:logout 登出 * @param status 状态:1:online 上线、0:offline 离线、2:logout 登出
* @param deviceType 设备类型 * @param deviceType 设备类型
* @param time 发生时间 * @param time 发生时间
* @param clientIp 用户当前的 IP 地址及端口 * @param clientIp 用户当前的 IP 地址及端口
* @Return * @Return
*/ */
@Override @Override
public Boolean clientOnlineStatusChange(Long applicationId, String clientId, Integer status, public Boolean clientOnlineStatusChange(Long applicationId, Long clientId, Integer status,
Integer deviceType, Long time, String clientIp) { Integer deviceType, Long time, String clientIp) {
ImApplication application = applicationService.getById(applicationId); ImApplication application = applicationService.getById(applicationId);
if (application == null) { if (application == null) {
throw new BusinessException("application not exist"); return false;
} }
ImClient client = imClientService.getCacheImClient(clientId);
String subscribeUrl = application.getOnlineStatusSubscribeUrl(); String subscribeUrl = application.getOnlineStatusSubscribeUrl();
if (StringUtils.isNotBlank(subscribeUrl)) { if (StringUtils.isNotBlank(subscribeUrl)) {
String appKey = application.getAppKey(); String appKey = application.getAppKey();
String appSecret = application.getAppSecret(); String appSecret = application.getAppSecret();
String callbackUrl = buildCallbackUrl(subscribeUrl, appKey, appSecret); String callbackUrl = buildCallbackUrl(subscribeUrl, appKey, appSecret);
ClientOnlineStatusChangeDto body = ClientOnlineStatusChangeDto.builder() ClientOnlineStatusChangeDto body = ClientOnlineStatusChangeDto.builder()
.userId(clientId) .userId(client.getClientId())
.status(status) .status(status)
.os("") .os("")
.time(time) .time(time)
.clientIp(clientIp) .clientIp(clientIp)
.build(); .build();
try {
ResponseEntity<Object> response = restTemplate.postForEntity(callbackUrl, body, Object.class); ResponseEntity<Object> response = restTemplate.postForEntity(callbackUrl, body, Object.class);
// 同步在线状态时需要接收服务提供应答,只要有 HTTP 应答码 200 即认为状态已经同步 // 同步在线状态时需要接收服务提供应答,只要有 HTTP 应答码 200 即认为状态已经同步
if (response.getStatusCode().equals(HttpStatus.OK)) { if (response.getStatusCode().equals(HttpStatus.OK)) {
...@@ -89,6 +96,9 @@ public class ImCallbackServiceImpl implements ImCallbackService { ...@@ -89,6 +96,9 @@ public class ImCallbackServiceImpl implements ImCallbackService {
} else { } else {
// 如果应答超时 5 秒,会再尝试推送 2 次,如果仍然失败,将不再同步此条状态。如短时间内有大面积超时,将暂停推送,1 分钟后会继续推送。 // 如果应答超时 5 秒,会再尝试推送 2 次,如果仍然失败,将不再同步此条状态。如短时间内有大面积超时,将暂停推送,1 分钟后会继续推送。
} }
} catch (Exception e) {
// do nothing is ok
}
} }
return true; return true;
} }
......
package com.wecloud.im.ws.manager; package com.wecloud.im.ws.manager;
import com.wecloud.im.ws.cache.UserStateListener;
import com.wecloud.im.ws.model.ClientInfo;
import com.wecloud.im.ws.utils.RedisUtils;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.springframework.stereotype.Component;
import com.wecloud.im.ws.cache.UserStateListener;
import com.wecloud.im.ws.model.ClientInfo;
import com.wecloud.im.ws.utils.RedisUtils;
/** /**
* channel内容管理,在线、离线等信息在channel里 * channel内容管理,在线、离线等信息在channel里
* @author lixiaozhong * @author lixiaozhong
...@@ -32,6 +34,8 @@ public class ChannelManager { ...@@ -32,6 +34,8 @@ public class ChannelManager {
*/ */
public static final AttributeKey<Long> CLIENT_ID = AttributeKey.valueOf("ci"); public static final AttributeKey<Long> CLIENT_ID = AttributeKey.valueOf("ci");
public static final AttributeKey<Long> APPLICATION_ID = AttributeKey.valueOf("ai");
public static final AttributeKey<Integer> READ_IDLE_TIMES = AttributeKey.valueOf("readIdleTimes"); public static final AttributeKey<Integer> READ_IDLE_TIMES = AttributeKey.valueOf("readIdleTimes");
/** /**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment