Commit 16ad36d4 by giaogiao

添加心跳机制

parent c00e1b05
...@@ -8,10 +8,12 @@ import io.netty.handler.codec.http.HttpObjectAggregator; ...@@ -8,10 +8,12 @@ import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.ReadTimeoutHandler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
@Component @Component
public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
...@@ -34,6 +36,16 @@ public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { ...@@ -34,6 +36,16 @@ public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
// "/appws"路径 升级长连接 // "/appws"路径 升级长连接
pipeline.addLast("appWebSocketServerotocolHandler", new WebSocketServerProtocolHandler(WsConstants.WS_URL)); pipeline.addLast("appWebSocketServerotocolHandler", new WebSocketServerProtocolHandler(WsConstants.WS_URL));
/*
* 心跳机制
* observeOutput -当评估写空闲时是否应该考虑字节的消耗。默认为false。
* readerIdleTime—状态为IdleState的IdleStateEvent。当在指定的时间内没有执行读操作时,将触发READER_IDLE。指定0禁用。
* writerIdleTime—状态为IdleState的IdleStateEvent。当在指定的时间内没有执行写操作时,会触发WRITER_IDLE。指定0禁用。
* allIdleTime—状态为IdleState的IdleStateEvent。在一定时间内不读不写会触发ALL_IDLE。指定0禁用。
* unit—readerIdleTime、writeIdleTime和allIdleTime的时间单位
*/
pipeline.addLast(new IdleStateHandler(2, 2, 10, TimeUnit.SECONDS));
} }
} }
...@@ -8,6 +8,7 @@ import io.netty.channel.ChannelHandler; ...@@ -8,6 +8,7 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -28,6 +29,9 @@ import java.util.concurrent.TimeUnit; ...@@ -28,6 +29,9 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final String PING = "ping";
private static final String PONG = "pong";
@Resource @Resource
private ReadWsData readWsData; private ReadWsData readWsData;
...@@ -48,6 +52,8 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -48,6 +52,8 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
ctx.channel().attr(MangerChannelService.READ_IDLE_TIMES).set(0);// 读空闲的计数清零
String data = msg.text(); String data = msg.text();
try { try {
if (data.isEmpty()) { if (data.isEmpty()) {
...@@ -62,7 +68,7 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -62,7 +68,7 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
} catch (Exception e) { } catch (Exception e) {
//返回错误 //返回错误
ctx.channel().writeAndFlush(new TextWebSocketFrame("error=" + e.toString() + ",data=" + data)); ctx.channel().writeAndFlush(new TextWebSocketFrame("error=" + e + ",data=" + data));
log.error(e.getMessage() + data, e); log.error(e.getMessage() + data, e);
} }
} }
...@@ -71,12 +77,60 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -71,12 +77,60 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
String appKey = ctx.channel().attr(MangerChannelService.APP_KEY).get(); String appKey = ctx.channel().attr(MangerChannelService.APP_KEY).get();
String clientId = ctx.channel().attr(MangerChannelService.CLIENT_ID).get(); String clientId = ctx.channel().attr(MangerChannelService.CLIENT_ID).get();
try { try {
if (PING.equals(data)) {
log.info("收到心跳clientId:" + clientId);
ctx.channel().writeAndFlush(new TextWebSocketFrame(PONG));
return;
}
if (PONG.equals(data)) {
log.info("收到心跳应用Pong,clientId:" + clientId);
return;
}
readWsData.convertModel(data, ctx, appKey, clientId); readWsData.convertModel(data, ctx, appKey, clientId);
} catch (Exception e) { } catch (Exception e) {
log.error("系统繁忙data:" + data + ",appKey:" + appKey + ",clientId:" + clientId + log.error("系统繁忙data:" + data + ",appKey:" + appKey + ",clientId:" + clientId +
",channelId:" + ctx.channel().id().asShortText(), e); ",channelId:" + ctx.channel().id().asShortText(), e);
} }
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
String clientId = ctx.channel().attr(MangerChannelService.CLIENT_ID).get();
//读超时计时器
Integer readIdleTimes = ctx.channel().attr(MangerChannelService.READ_IDLE_TIMES).get();
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case READER_IDLE:
eventType = "读空闲:readIdleTimes=" + readIdleTimes;
ctx.channel().attr(MangerChannelService.READ_IDLE_TIMES).set(readIdleTimes + 1);// 读空闲的计数加1
// 发ping
ctx.channel().writeAndFlush(new TextWebSocketFrame(PING));
break;
case WRITER_IDLE:
eventType = "写空闲";
// 不处理
break;
case ALL_IDLE:
eventType = "读写空闲";
// 不处理
break;
}
log.info(clientId + "超时事件:" + eventType);
if (readIdleTimes > 5) {
log.info(clientId + ". [server]读空闲超过3次,关闭连接");
// ctx.channel().writeAndFlush("you are out");
ctx.channel().close();
}
} }
...@@ -118,9 +172,8 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -118,9 +172,8 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Override @Override
public void handlerRemoved(ChannelHandlerContext ctx) { public void handlerRemoved(ChannelHandlerContext ctx) {
String userIdByChannel = mangerChannelService.getInfoByChannel(ctx); String userIdByChannel = mangerChannelService.getInfoByChannel(ctx);
log.info("uid:" + userIdByChannel + "," + "handlerRemoved" + ",channelId:" + ctx.channel().id().asShortText()); log.info("uid:" + userIdByChannel + "," + "handlerRemoved" + ",channelId:" + ctx.channel().id());
// 关掉连接 // 关掉连接
ctx.close(); ctx.close();
} }
} }
...@@ -128,6 +128,7 @@ public class NettyApiRequest { ...@@ -128,6 +128,7 @@ public class NettyApiRequest {
// 设置属性值 userid - channel // 设置属性值 userid - channel
ctx.channel().attr(MangerChannelService.CLIENT_ID).set(clientId); ctx.channel().attr(MangerChannelService.CLIENT_ID).set(clientId);
ctx.channel().attr(MangerChannelService.APP_KEY).set(appKey); ctx.channel().attr(MangerChannelService.APP_KEY).set(appKey);
ctx.channel().attr(MangerChannelService.READ_IDLE_TIMES).set(0);// 读空闲的计数=0
// 添加长连接handler // 添加长连接handler
ctx.pipeline().addLast("appImHandler", appImReadHandler); ctx.pipeline().addLast("appImHandler", appImReadHandler);
......
...@@ -31,7 +31,6 @@ public class ReadWsData { ...@@ -31,7 +31,6 @@ public class ReadWsData {
@Resource @Resource
private WriteDataService writeDataService; private WriteDataService writeDataService;
private static final String PING = "ping";
/** /**
* 在此开始进入业务流程子线程,将不占netty的io线程 * 在此开始进入业务流程子线程,将不占netty的io线程
...@@ -43,10 +42,7 @@ public class ReadWsData { ...@@ -43,10 +42,7 @@ public class ReadWsData {
log.info("appWS收到data:" + data + "\nappKey+clientId:" + appKey + ":" + clientId + log.info("appWS收到data:" + data + "\nappKey+clientId:" + appKey + ":" + clientId +
",channelId:" + ctx.channel().id().asShortText()); ",channelId:" + ctx.channel().id().asShortText());
if (PING.equals(data)) {
log.info("收到心跳clientId:" + clientId);
return;
}
// 解析json // 解析json
JsonMapper jsonMapper = new JsonMapper(); JsonMapper jsonMapper = new JsonMapper();
......
...@@ -34,15 +34,17 @@ public interface MangerChannelService { ...@@ -34,15 +34,17 @@ public interface MangerChannelService {
/** /**
* LANGUAGE * LANGUAGE
*/ */
AttributeKey<String> LANGUAGE = AttributeKey.valueOf("lan"); AttributeKey<String> LANGUAGE = AttributeKey.valueOf("la");
/** /**
* APP_VERSION * APP_VERSION
*/ */
// AttributeKey<String> APP_VERSION = AttributeKey.valueOf("appVersion"); AttributeKey<String> APP_VERSION = AttributeKey.valueOf("av");
// AttributeKey<String> TOKEN = AttributeKey.valueOf("TOKEN"); AttributeKey<String> TOKEN = AttributeKey.valueOf("to");
// AttributeKey<String> DEVICEID = AttributeKey.valueOf("DEVICEID"); AttributeKey<String> DEVICEID = AttributeKey.valueOf("dc");
// AttributeKey<String> PLATFORM = AttributeKey.valueOf("PLATFORM"); AttributeKey<String> PLATFORM = AttributeKey.valueOf("pt");
AttributeKey<Integer> READ_IDLE_TIMES = AttributeKey.valueOf("readIdleTimes");
/** /**
* 根据userID获取channel * 根据userID获取channel
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment