Commit 61dd8b49 by hweeeeeei

1.IM相关常量移动到ImConstant类;

2.优化接口:getInfoList参数判空;
3.修改线程池名称"-business-"改为"-bs-";
4.修改线程池名称"-sendMsg-"改为"-sendM-";
parent 5cdd4f3b
...@@ -15,7 +15,7 @@ import java.util.concurrent.TimeUnit; ...@@ -15,7 +15,7 @@ import java.util.concurrent.TimeUnit;
public class BusinessThreadPool { public class BusinessThreadPool {
private static final ThreadFactory BUSINESS_THREAD_FACTORY = new ThreadFactoryBuilder() private static final ThreadFactory BUSINESS_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNamePrefix("-business-").build(); .setNamePrefix("-bs-").build();
/** /**
* 业务处理线程池 * 业务处理线程池
......
...@@ -10,12 +10,12 @@ import java.util.concurrent.ThreadPoolExecutor; ...@@ -10,12 +10,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* 业务处理线程池 * ws消息下发 线程池
*/ */
public class SendMsgThreadPool { public class SendMsgThreadPool {
private static final ThreadFactory SEND_MSG_THREAD_FACTORY = new ThreadFactoryBuilder() private static final ThreadFactory SEND_MSG_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNamePrefix("-sendMsg-").build(); .setNamePrefix("-sendM-").build();
/** /**
* 业务处理线程池 * 业务处理线程池
......
...@@ -14,27 +14,36 @@ import org.springframework.stereotype.Component; ...@@ -14,27 +14,36 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.wecloud.im.ws.ImConstant.READER_IDLE_TIME;
@Component @Component
public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Resource @Resource
private ChannelInboundHandler channelInboundHandler; private ChannelInboundHandler channelInboundHandler;
public static final String SINGLE_HTTP_REQUEST_HANDLER = "SingleHttpRequestHandler";
public static final String APP_WEB_SOCKET_SERVEROTOCOL_HANDLER = "appWebSocketServerotocolHandler";
public static final int MAX_CONTENT_LENGTH = 2048576000;
@Override @Override
protected void initChannel(SocketChannel ch) { protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
// http // http
pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(2048576000)); pipeline.addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH));
// 服务端api接口 // 服务端api接口
pipeline.addLast("SingleHttpRequestHandler", channelInboundHandler); pipeline.addLast(SINGLE_HTTP_REQUEST_HANDLER, channelInboundHandler);
// 连接超时管理 (判断通道是否有数据写入) // 连接超时管理 (判断通道是否有数据写入)
// pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(60)); // pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(60));
// "/appws"路径 升级长连接 // "/appws"路径 升级长连接
pipeline.addLast("appWebSocketServerotocolHandler", new WebSocketServerProtocolHandler(WsConstants.WS_URL)); pipeline.addLast(APP_WEB_SOCKET_SERVEROTOCOL_HANDLER, new WebSocketServerProtocolHandler(WsConstants.WS_URL));
/* /*
* 心跳机制 * 心跳机制
...@@ -44,7 +53,7 @@ public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { ...@@ -44,7 +53,7 @@ public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
* allIdleTime—状态为IdleState的IdleStateEvent。在一定时间内不读不写会触发ALL_IDLE。指定0禁用。 * allIdleTime—状态为IdleState的IdleStateEvent。在一定时间内不读不写会触发ALL_IDLE。指定0禁用。
* unit—readerIdleTime、writeIdleTime和allIdleTime的时间单位 * unit—readerIdleTime、writeIdleTime和allIdleTime的时间单位
*/ */
pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS)); pipeline.addLast(new IdleStateHandler(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS));
} }
} }
...@@ -15,6 +15,7 @@ import javax.annotation.Resource; ...@@ -15,6 +15,7 @@ import javax.annotation.Resource;
import static com.wecloud.im.ws.ImConstant.PING; import static com.wecloud.im.ws.ImConstant.PING;
import static com.wecloud.im.ws.ImConstant.PONG; import static com.wecloud.im.ws.ImConstant.PONG;
import static com.wecloud.im.ws.ImConstant.READ_IDLE_CLOSE_COUNT;
/** /**
* @Description app端 长连接事件处理 * @Description app端 长连接事件处理
...@@ -31,7 +32,8 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -31,7 +32,8 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
ctx.channel().attr(ChannelManager.READ_IDLE_TIMES).set(0);// 读空闲的计数清零 // 读空闲的计数清零
ctx.channel().attr(ChannelManager.READ_IDLE_TIMES).set(0);
String data = msg.text(); String data = msg.text();
try { try {
...@@ -86,11 +88,11 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -86,11 +88,11 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
String eventType = null; String eventType = null;
switch (event.state()) { switch (event.state()) {
case READER_IDLE: case READER_IDLE:
eventType = "读空闲:readIdleTimes=" + readIdleTimes; eventType = "读空闲:readIdleTimes=" + readIdleTimes;
ctx.channel().attr(ChannelManager.READ_IDLE_TIMES).set(readIdleTimes + 1);// 读空闲的计数加1 // 读空闲的计数加1
ctx.channel().attr(ChannelManager.READ_IDLE_TIMES).set(readIdleTimes + 1);
// 发ping // 发ping
ctx.channel().writeAndFlush(new TextWebSocketFrame(PING)); ctx.channel().writeAndFlush(new TextWebSocketFrame(PING));
...@@ -103,9 +105,10 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -103,9 +105,10 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
eventType = "读写空闲"; eventType = "读写空闲";
// 不处理 // 不处理
break; break;
default:
} }
log.info(clientId + "超时事件:" + eventType); log.info(clientId + "超时事件:" + eventType);
if (readIdleTimes >= 5) { if (readIdleTimes >= READ_IDLE_CLOSE_COUNT) {
log.info(clientId + ".读空闲超过5次关闭连接"); log.info(clientId + ".读空闲超过5次关闭连接");
ctx.channel().close(); ctx.channel().close();
} }
......
...@@ -20,6 +20,7 @@ import com.wecloud.im.service.ImApplicationService; ...@@ -20,6 +20,7 @@ import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientService; import com.wecloud.im.service.ImClientService;
import com.wecloud.im.vo.GetInfoListVo; import com.wecloud.im.vo.GetInfoListVo;
import io.geekidea.springbootplus.framework.common.api.ApiResult; import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.common.exception.BusinessException;
import io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl; import io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl;
import io.geekidea.springbootplus.framework.core.pagination.PageInfo; import io.geekidea.springbootplus.framework.core.pagination.PageInfo;
import io.geekidea.springbootplus.framework.core.pagination.Paging; import io.geekidea.springbootplus.framework.core.pagination.Paging;
...@@ -97,6 +98,10 @@ public class ImClientServiceImpl extends BaseServiceImpl<ImClientMapper, ImClien ...@@ -97,6 +98,10 @@ public class ImClientServiceImpl extends BaseServiceImpl<ImClientMapper, ImClien
public ApiResult<List<GetInfoListVo>> getInfoList(GetClientInfoParam getClientInfoParam) throws Exception { public ApiResult<List<GetInfoListVo>> getInfoList(GetClientInfoParam getClientInfoParam) throws Exception {
ImClient curentClient = getCurentClient(); ImClient curentClient = getCurentClient();
if (getClientInfoParam.getClientIds() == null || getClientInfoParam.getClientIds().isEmpty()) {
throw new BusinessException("getClientInfoParam.getClientIds() == null");
}
// List<ImClient> imClients = this.list(new QueryWrapper<ImClient>().lambda() // List<ImClient> imClients = this.list(new QueryWrapper<ImClient>().lambda()
// .eq(ImClient::getFkAppid, curentClient.getFkAppid()) // .eq(ImClient::getFkAppid, curentClient.getFkAppid())
// .in(ImClient::getClientId, getClientInfoParam.getClientId()) // .in(ImClient::getClientId, getClientInfoParam.getClientId())
......
...@@ -11,6 +11,17 @@ import java.io.Serializable; ...@@ -11,6 +11,17 @@ import java.io.Serializable;
public class ImConstant implements Serializable { public class ImConstant implements Serializable {
/** /**
* channel超时n次后断开连接
*/
public static final int READ_IDLE_CLOSE_COUNT = 5;
/**
* channel 读取时间 心跳检查间隔实际 (秒)
*/
public static final int READER_IDLE_TIME = 5;
/**
* 心跳 * 心跳
*/ */
public static final String PING = "ping"; public static final String PING = "ping";
......
...@@ -64,7 +64,7 @@ public class AsyncPush { ...@@ -64,7 +64,7 @@ public class AsyncPush {
*/ */
@Async @Async
public void push(PushVO pushVO, ImClient imClientReceiver, ImApplication imApplication) { public void push(PushVO pushVO, ImClient imClientReceiver, ImApplication imApplication) {
log.info("push:" + imClientReceiver.getClientId()); log.info("push: {}", imClientReceiver.getClientId());
if (pushVO == null) { if (pushVO == null) {
pushVO = new PushVO(); pushVO = new PushVO();
...@@ -75,7 +75,7 @@ public class AsyncPush { ...@@ -75,7 +75,7 @@ public class AsyncPush {
// 校验参数 // 校验参数
if (imClientReceiver.getValid() == null || imClientReceiver.getDeviceToken() == null || imClientReceiver.getDeviceType() == null) { if (imClientReceiver.getValid() == null || imClientReceiver.getDeviceToken() == null || imClientReceiver.getDeviceType() == null) {
log.info("push参数错误"); log.info("{} 应用未配置push,或client无DeviceToken {}", imApplication.getId(), imClientReceiver.getClientId());
return; return;
} }
// 设备不想收到推送提醒, 1想, 0不想 // 设备不想收到推送提醒, 1想, 0不想
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment