Commit b64ef6f0 by giaogiao

channel关闭优化

parent 4b9d8085
...@@ -42,9 +42,9 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -42,9 +42,9 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
* io密集型任务配置尽可能多的线程数量 * io密集型任务配置尽可能多的线程数量
*/ */
private final static ExecutorService TASK_THREAD_POOL_EXECUTOR = private final static ExecutorService TASK_THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS * 5, WsConstants.CPU_PROCESSORS * 10, new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS * 5, WsConstants.CPU_PROCESSORS * 50,
3L, TimeUnit.MILLISECONDS, 3L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy()); new LinkedBlockingQueue<Runnable>(1), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
...@@ -88,19 +88,17 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -88,19 +88,17 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.info("检测到异常exceptionCaught", cause); log.info("检测到异常exceptionCaught", cause);
// //排除当客户端意外关闭的情况,不是发送指定指令通知服务器退出,就会产生此错误。 //排除当客户端意外关闭的情况,不是发送指定指令通知服务器退出,就会产生此错误。
// if (ctx.channel().isActive()) { if (ctx.channel().isActive()) {
// Long userIdByChannel = appUserChannelsService.getUserIdByChannel(ctx); String userIdByChannel = mangerChannelService.getInfoByChannel(ctx);
// log.error("uid:" + userIdByChannel + ",ws异常,channelId:" + ctx.channel().id().asLongText(), cause); log.error("uid:" + userIdByChannel + ",ws异常,channelId:" + ctx.channel().id().asLongText(), cause);
// } }
} }
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) { public void handlerAdded(ChannelHandlerContext ctx) {
String userIdByChannel = mangerChannelService.getInfoByChannel(ctx);
String userIdByChannel = mangerChannelService.getUserIdByChannel(ctx); log.info("连接WS成功handlerAdded,uid:" + userIdByChannel + "," + ",channelId:" + ctx.channel().id().asLongText());
log.info("uid:" + userIdByChannel + "," + ",channelId:" + ctx.channel().id().asLongText());
log.info("连接WS成功handlerAdded");
} }
...@@ -113,10 +111,8 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -113,10 +111,8 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) { public void channelInactive(ChannelHandlerContext ctx) {
log.info("客户端不活跃channelInactive"); log.info("客户端不活跃channelInactive");
String userIdByChannel = mangerChannelService.getInfoByChannel(ctx);
// Long userIdByChannel = appUserChannelsService.getUserIdByChannel(ctx); log.info("uid:" + userIdByChannel + "," + "不活跃" + ",channelId:" + ctx.channel().id().asLongText());
// log.info("uid:" + userIdByChannel + "," + "不活跃" + ",channelId:" + ctx.channel().id().asLongText());
// mangerChannelService.remove(ctx);
} }
/** /**
...@@ -124,11 +120,10 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -124,11 +120,10 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
*/ */
@Override @Override
public void handlerRemoved(ChannelHandlerContext ctx) { public void handlerRemoved(ChannelHandlerContext ctx) {
String userIdByChannel = mangerChannelService.getInfoByChannel(ctx);
log.info("handlerRemoved"); log.info("uid:" + userIdByChannel + "," + "handlerRemoved" + ",channelId:" + ctx.channel().id().asLongText());
// 关掉连接
// Long userIdByChannel = appUserChannelsService.getUserIdByChannel(ctx); ctx.close();
// log.info("uid:" + userIdByChannel + "," + "handlerRemoved" + ",channelId:" + ctx.channel().id().asLongText());
} }
} }
...@@ -65,12 +65,12 @@ public interface MangerChannelService { ...@@ -65,12 +65,12 @@ public interface MangerChannelService {
void remove(ChannelHandlerContext channelHandlerContext); void remove(ChannelHandlerContext channelHandlerContext);
/** /**
* 根据channel返回userId * 根据channel返回客户端key和id
* *
* @param channelHandlerContext * @param channelHandlerContext
* @return * @return
*/ */
String getUserIdByChannel(ChannelHandlerContext channelHandlerContext); String getInfoByChannel(ChannelHandlerContext channelHandlerContext);
/** /**
* 下发数据 * 下发数据
......
...@@ -59,6 +59,13 @@ public class MangerChannelServiceImpl implements MangerChannelService { ...@@ -59,6 +59,13 @@ public class MangerChannelServiceImpl implements MangerChannelService {
@Override @Override
public void put(String appKey, String clientId, NioSocketChannel channel) { public void put(String appKey, String clientId, NioSocketChannel channel) {
// 断掉旧链接
NioSocketChannel nioSocketChannel = get(appKey, clientId);
if (null != nioSocketChannel) {
log.info("put新连接关掉旧链接:" + appKey + "," + clientId + "," + ",\nchannelId:" + nioSocketChannel.id().asLongText());
nioSocketChannel.close();
}
// AppHashValueModel appHashValueModel = new AppHashValueModel(); // AppHashValueModel appHashValueModel = new AppHashValueModel();
// appHashValueModel.setOnlineStatus(UserCache.ONLINE); // appHashValueModel.setOnlineStatus(UserCache.ONLINE);
// userCache.online(userId); // userCache.online(userId);
...@@ -78,20 +85,18 @@ public class MangerChannelServiceImpl implements MangerChannelService { ...@@ -78,20 +85,18 @@ public class MangerChannelServiceImpl implements MangerChannelService {
// !channel.isWritable() 通道是否可写数据 // !channel.isWritable() 通道是否可写数据
*/ */
if (channel != null && !channel.isActive() && !channel.isOpen()) { if (channel != null && !channel.isActive() && !channel.isOpen()) {
String userId = String.valueOf(this.getUserIdByChannel(channelHandlerContext)); String userId = String.valueOf(this.getInfoByChannel(channelHandlerContext));
userCache.offline(userId); userCache.offline(userId);
log.info("不活跃remove,uid:" + userId);
log.debug("不活跃remove,uid:" + userId);
MangerChannelService.CHANNEL_MAP.remove(userId); MangerChannelService.CHANNEL_MAP.remove(userId);
channelHandlerContext.channel().close(); channelHandlerContext.channel().close();
} }
} }
@Override @Override
public String getUserIdByChannel(ChannelHandlerContext channelHandlerContext) { public String getInfoByChannel(ChannelHandlerContext channelHandlerContext) {
return channelHandlerContext.channel().attr(MangerChannelService.CLIENT_ID).get(); return "APP_KEY:" + channelHandlerContext.channel().attr(MangerChannelService.APP_KEY).get()
// return 1L; + ",CLIENT_ID:" + channelHandlerContext.channel().attr(MangerChannelService.CLIENT_ID).get();
} }
// @Override // @Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment