Commit 18f91149 by giaogiao

消息存储表,维护netty用户channel对象

parent 91783cbb
package com.wecloud.im.controller;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.param.ImMessagePageParam;
import com.wecloud.im.param.ImMessageQueryVo;
import com.wecloud.im.service.ImMessageService;
import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.common.controller.BaseController;
import io.geekidea.springbootplus.framework.core.pagination.Paging;
import io.geekidea.springbootplus.framework.core.validator.groups.Add;
import io.geekidea.springbootplus.framework.core.validator.groups.Update;
import io.geekidea.springbootplus.framework.log.annotation.OperationLog;
import io.geekidea.springbootplus.framework.log.enums.OperationLogType;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 消息存储表 控制器
*
* @author wei
* @since 2021-04-29
*/
@Slf4j
@RestController
@RequestMapping("/imMessage")
@Api(value = "消息存储表API", tags = {"消息存储表"})
public class ImMessageController extends BaseController {
@Autowired
private ImMessageService imMessageService;
/**
* 添加消息存储表
*/
@PostMapping("/add")
@OperationLog(name = "添加消息存储表", type = OperationLogType.ADD)
@ApiOperation(value = "添加消息存储表")
public ApiResult<Boolean> addImMessage(@Validated(Add.class) @RequestBody ImMessage imMessage)throws Exception{
boolean flag= imMessageService.saveImMessage(imMessage);
return ApiResult.result(flag);
}
/**
* 修改消息存储表
*/
@PostMapping("/update")
@OperationLog(name = "修改消息存储表", type = OperationLogType.UPDATE)
@ApiOperation(value = "修改消息存储表")
public ApiResult<Boolean> updateImMessage(@Validated(Update.class) @RequestBody ImMessage imMessage)throws Exception{
boolean flag= imMessageService.updateImMessage(imMessage);
return ApiResult.result(flag);
}
/**
* 删除消息存储表
*/
@PostMapping("/delete/{id}")
@OperationLog(name = "删除消息存储表", type = OperationLogType.DELETE)
@ApiOperation(value = "删除消息存储表")
public ApiResult<Boolean> deleteImMessage(@PathVariable("id") Long id)throws Exception{
boolean flag= imMessageService.deleteImMessage(id);
return ApiResult.result(flag);
}
/**
* 获取消息存储表详情
*/
@GetMapping("/info/{id}")
@OperationLog(name = "消息存储表详情", type = OperationLogType.INFO)
@ApiOperation(value = "消息存储表详情")
public ApiResult<ImMessageQueryVo> getImMessage(@PathVariable("id") Long id)throws Exception{
ImMessageQueryVo imMessageQueryVo = imMessageService.getImMessageById(id);
return ApiResult.ok(imMessageQueryVo);
}
/**
* 消息存储表分页列表
*/
@PostMapping("/getPageList")
@OperationLog(name = "消息存储表分页列表", type = OperationLogType.PAGE)
@ApiOperation(value = "消息存储表分页列表")
public ApiResult<Paging<ImMessageQueryVo>>getImMessagePageList(@Validated @RequestBody ImMessagePageParam imMessagePageParam)throws Exception{
Paging<ImMessageQueryVo> paging = imMessageService.getImMessagePageList(imMessagePageParam);
return ApiResult.ok(paging);
}
}
package com.wecloud.im.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import io.geekidea.springbootplus.framework.common.entity.BaseEntity;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import javax.validation.constraints.NotNull;
import java.util.Date;
/**
* 消息存储表
*
* @author wei
* @since 2021-04-29
*/
@Data
@Accessors(chain = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "ImMessage对象")
public class ImMessage extends BaseEntity {
private static final long serialVersionUID = 1L;
@NotNull(message = "消息id不能为空")
@ApiModelProperty("消息id")
@TableId(value = "id", type = IdType.INPUT)
private Long id;
@ApiModelProperty("创建时间")
private Date createTime;
@ApiModelProperty("撤回时间")
private Date withdrawTime;
@ApiModelProperty("修改时间")
private Date updateDate;
@NotNull(message = "应用appid不能为空")
@ApiModelProperty("应用appid")
private Long fkAppid;
@ApiModelProperty("发送者客户端id")
private Long sender;
@ApiModelProperty("内容")
private String content;
@ApiModelProperty("0未撤回; 1已撤回")
private Boolean withdraw;
@ApiModelProperty("0非事件; 1为事件")
private Boolean event;
@ApiModelProperty("0非系统通知; 1为系统通知")
private Boolean system;
@ApiModelProperty("at他人,传入客户端id数组")
private String at;
@ApiModelProperty("发送状态, 0AVIMMessageStatusNone(未知) 1AVIMMessageStatusSending(发送中) 2AVIMMessageStatusSent(发送成功) 3AVIMMessageStatusReceipt(被接收) 4AVIMMessageStatusFailed(失败)")
private Integer sendStatus;
@ApiModelProperty("会话id")
private Long fkConversationId;
}
package com.wecloud.im.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.param.ImMessagePageParam;
import com.wecloud.im.param.ImMessageQueryVo;
import org.springframework.stereotype.Repository;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.ibatis.annotations.Param;
import java.io.Serializable;
/**
* 消息存储表 Mapper 接口
*
* @author wei
* @since 2021-04-29
*/
@Repository
public interface ImMessageMapper extends BaseMapper<ImMessage> {
/**
* 根据ID获取查询对象
*
* @param id
* @return
*/
ImMessageQueryVo getImMessageById(Serializable id);
/**
* 获取分页对象
*
* @param page
* @param imMessagePageParam
* @return
*/
IPage<ImMessageQueryVo> getImMessagePageList(@Param("page") Page page,@Param("param") ImMessagePageParam imMessagePageParam);
}
package com.wecloud.im.param;
import io.swagger.annotations.ApiModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import io.geekidea.springbootplus.framework.core.pagination.BasePageOrderParam;
/**
* <pre>
* 消息存储表 分页参数对象
* </pre>
*
* @author wei
* @date 2021-04-29
*/
@Data
@Accessors(chain = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "消息存储表分页参数")
public class ImMessagePageParam extends BasePageOrderParam{
private static final long serialVersionUID=1L;
}
package com.wecloud.im.param;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.util.Date;
/**
* <pre>
* 消息存储表 查询结果对象
* </pre>
*
* @author wei
* @date 2021-04-29
*/
@Data
@Accessors(chain = true)
@ApiModel(value = "ImMessageQueryVo对象")
public class ImMessageQueryVo implements Serializable {
private static final long serialVersionUID = 1L;
@ApiModelProperty("消息id")
private Long id;
@ApiModelProperty("创建时间")
private Date createTime;
@ApiModelProperty("撤回时间")
private Date withdrawTime;
@ApiModelProperty("修改时间")
private Date updateDate;
@ApiModelProperty("应用appid")
private Long fkAppid;
@ApiModelProperty("发送者客户端id")
private Long sender;
@ApiModelProperty("内容")
private String content;
@ApiModelProperty("0未撤回; 1已撤回")
private Boolean withdraw;
@ApiModelProperty("0非事件; 1为事件")
private Boolean event;
@ApiModelProperty("0非系统通知; 1为系统通知")
private Boolean system;
@ApiModelProperty("at他人,传入客户端id数组")
private String at;
@ApiModelProperty("发送状态, 0AVIMMessageStatusNone(未知) 1AVIMMessageStatusSending(发送中) 2AVIMMessageStatusSent(发送成功) 3AVIMMessageStatusReceipt(被接收) 4AVIMMessageStatusFailed(失败)")
private Integer sendStatus;
@ApiModelProperty("会话id")
private Long fkConversationId;
}
\ No newline at end of file
package com.wecloud.im.service;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.param.ImMessagePageParam;
import io.geekidea.springbootplus.framework.common.service.BaseService;
import com.wecloud.im.param.ImMessageQueryVo;
import io.geekidea.springbootplus.framework.core.pagination.Paging;
/**
* 消息存储表 服务类
*
* @author wei
* @since 2021-04-29
*/
public interface ImMessageService extends BaseService<ImMessage> {
/**
* 保存
*
* @param imMessage
* @return
* @throws Exception
*/
boolean saveImMessage(ImMessage imMessage)throws Exception;
/**
* 修改
*
* @param imMessage
* @return
* @throws Exception
*/
boolean updateImMessage(ImMessage imMessage)throws Exception;
/**
* 删除
*
* @param id
* @return
* @throws Exception
*/
boolean deleteImMessage(Long id)throws Exception;
/**
* 根据ID获取查询对象
*
* @param id
* @return
* @throws Exception
*/
ImMessageQueryVo getImMessageById(Long id)throws Exception;
/**
* 获取分页对象
*
* @param imMessagePageParam
* @return
* @throws Exception
*/
Paging<ImMessageQueryVo> getImMessagePageList(ImMessagePageParam imMessagePageParam) throws Exception;
}
package com.wecloud.im.service.impl;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.mapper.ImMessageMapper;
import com.wecloud.im.service.ImMessageService;
import com.wecloud.im.param.ImMessagePageParam;
import com.wecloud.im.param.ImMessageQueryVo;
import io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl;
import io.geekidea.springbootplus.framework.core.pagination.Paging;
import io.geekidea.springbootplus.framework.core.pagination.PageInfo;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.metadata.OrderItem;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.springframework.transaction.annotation.Transactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Autowired;
/**
* 消息存储表 服务实现类
*
* @author wei
* @since 2021-04-29
*/
@Slf4j
@Service
public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMessage> implements ImMessageService {
@Autowired
private ImMessageMapper imMessageMapper;
@Transactional(rollbackFor = Exception.class)
@Override
public boolean saveImMessage(ImMessage imMessage)throws Exception{
return super.save(imMessage);
}
@Transactional(rollbackFor = Exception.class)
@Override
public boolean updateImMessage(ImMessage imMessage)throws Exception{
return super.updateById(imMessage);
}
@Transactional(rollbackFor = Exception.class)
@Override
public boolean deleteImMessage(Long id)throws Exception{
return super.removeById(id);
}
@Override
public ImMessageQueryVo getImMessageById(Long id)throws Exception{
return imMessageMapper.getImMessageById(id);
}
@Override
public Paging<ImMessageQueryVo> getImMessagePageList(ImMessagePageParam imMessagePageParam)throws Exception{
Page<ImMessageQueryVo> page=new PageInfo<>(imMessagePageParam,OrderItem.desc(getLambdaColumn(ImMessage::getCreateTime)));
IPage<ImMessageQueryVo> iPage= imMessageMapper.getImMessagePageList(page, imMessagePageParam);
return new Paging<ImMessageQueryVo>(iPage);
}
}
package com.wecloud.im.tillo.app_ws; package com.wecloud.im.tillo.app_ws;
import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.thread.ThreadFactoryBuilder; import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.wecloud.im.tillo.app_ws.model.Constants; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.service.ImMessageService;
import com.wecloud.im.tillo.app_ws.model.WsConstants;
import com.wecloud.im.tillo.app_ws.receive.ReadWsData; import com.wecloud.im.tillo.app_ws.receive.ReadWsData;
import com.wecloud.im.tillo.app_ws.service.AppUserChannelsService; import com.wecloud.im.tillo.app_ws.service.MangerChannelService;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Date;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
...@@ -27,12 +34,16 @@ import java.util.concurrent.TimeUnit; ...@@ -27,12 +34,16 @@ import java.util.concurrent.TimeUnit;
@Component @Component
@ChannelHandler.Sharable @ChannelHandler.Sharable
@Slf4j @Slf4j
public class AppImHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { public class WsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Autowired
private ImMessageService imMessageService;
@Resource @Resource
private ReadWsData readWsData; private ReadWsData readWsData;
@Resource @Resource
private AppUserChannelsService appUserChannelsService; private MangerChannelService appUserChannelsService;
private final static ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder() private final static ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNamePrefix("business-").build(); .setNamePrefix("business-").build();
...@@ -44,7 +55,7 @@ public class AppImHandler extends SimpleChannelInboundHandler<TextWebSocketFrame ...@@ -44,7 +55,7 @@ public class AppImHandler extends SimpleChannelInboundHandler<TextWebSocketFrame
private final static ExecutorService THREAD_POOL_EXECUTOR = private final static ExecutorService THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(Constants.CPU_PROCESSORS * 120, Constants.CPU_PROCESSORS * 130 + 2, new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS * 120, WsConstants.CPU_PROCESSORS * 130 + 2,
1L, TimeUnit.MILLISECONDS, 1L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy()); new LinkedBlockingQueue<Runnable>(), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
...@@ -52,6 +63,33 @@ public class AppImHandler extends SimpleChannelInboundHandler<TextWebSocketFrame ...@@ -52,6 +63,33 @@ public class AppImHandler extends SimpleChannelInboundHandler<TextWebSocketFrame
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
String data = msg.text(); String data = msg.text();
ImMessage imMessage = new ImMessage();
imMessage.setId(new Snowflake(1L, 1L).nextId());
imMessage.setCreateTime(new Date());
imMessage.setWithdrawTime(new Date());
imMessage.setUpdateDate(new Date());
imMessage.setFkAppid(0L);
imMessage.setSender(0L);
imMessage.setContent(data);
imMessage.setWithdraw(false);
imMessage.setEvent(false);
imMessage.setSystem(false);
imMessage.setAt("");
imMessage.setSendStatus(0);
imMessage.setFkConversationId(0L);
imMessageService.save(imMessage);
JsonMapper jsonMapper = new JsonMapper();
try {
JsonNode jsonNode = jsonMapper.readTree(data);
int i = 0;
} catch (JsonProcessingException e) {
e.printStackTrace();
}
ChannelFuture channelFuture = ctx.writeAndFlush(new TextWebSocketFrame(data)); ChannelFuture channelFuture = ctx.writeAndFlush(new TextWebSocketFrame(data));
try { try {
...@@ -78,7 +116,7 @@ public class AppImHandler extends SimpleChannelInboundHandler<TextWebSocketFrame ...@@ -78,7 +116,7 @@ public class AppImHandler extends SimpleChannelInboundHandler<TextWebSocketFrame
// log.debug("appWS收到" + userIdByChannel + ":" + data + ",channelId:" + ctx.channel().id().asLongText()); // log.debug("appWS收到" + userIdByChannel + ":" + data + ",channelId:" + ctx.channel().id().asLongText());
log.debug("WS收到" + data); log.debug("WS收到" + data);
String language = ctx.channel().attr(AppUserChannelsService.LANGUAGE).get(); String language = ctx.channel().attr(MangerChannelService.LANGUAGE).get();
// readWsData.convertModel(data, userIdByChannel, language); // readWsData.convertModel(data, userIdByChannel, language);
} }
......
package com.wecloud.im.tillo.app_ws.model;
import lombok.extern.slf4j.Slf4j;
/**
* @author wjm
* @Description 常量
* @date 2018/12/20
*/
@Slf4j
public class Constants {
/*
* 当前服务器cpu核心数量()
*/
public static final Integer CPU_PROCESSORS = Runtime.getRuntime().availableProcessors();
static {
log.info("CPU_PROCESSORS:" + CPU_PROCESSORS);
}
/**
* 加密消息
*/
public static final Integer IS_CRYPTO_MESSAGE = 1;
/**
* 加密消息类型
*/
public static final Integer CRYPTO_TYPE = 3;
/**
* 网页版长连接url
*/
public static final String WEB_WS_URL = "/webws";
/**
* WEBWS_V_2
*/
public static final String WEBWS_V_2 = "/webwsV2";
/**
* app长连接url
*/
public static final String APP_WS_URL = "/appws";
/**
* 存储当前登录用户id的字段名
*/
public static final String CURRENT_USER_ID = "CURRENT_USER_ID";
/**
* token有效期(小时)30天
*/
public static final int TOKEN_EXPIRES_HOUR = 720;
/**
* token
*/
public static final String TOKEN = "token";
/**
* 密码加密盐
*/
public static final String SALT = "tillo2018";
/**
* http字符传
*/
public static final String HTTP = "http";
/**
* 标点符号--逗号
*/
public static final String COMMA = ",";
/**
* 标点符号--百分号
*/
public static final String PERCENT = "%";
/**
* 标点符号--问号
*/
public static final String MARK = "?";
/**
* 群聊取模数
*/
public static final Integer GROUP_MODULUS = 32;
/**
* 字符串0
*/
public static final String ZERO = "0";
/**
* 横线
*/
public static final String LINE = "-";
}
package com.wecloud.im.tillo.app_ws.model;
import lombok.extern.slf4j.Slf4j;
/**
* ws相关的常量
*/
@Slf4j
public class WsConstants {
/*
* 当前服务器cpu核心数量()
*/
public static final Integer CPU_PROCESSORS = Runtime.getRuntime().availableProcessors();
static {
log.info("CPU_PROCESSORS:" + CPU_PROCESSORS);
}
/**
* 长连接url
*/
public static final String WS_URL = "/ws";
/**
* token
*/
public static final String TOKEN = "token";
}
...@@ -8,11 +8,11 @@ import java.util.Map; ...@@ -8,11 +8,11 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
* @Description 维护netty用户channel对象 * @Description 管理netty用户channel对象
* @Author hewei hwei1233@163.com * @Author hewei hwei1233@163.com
* @Date 2019-08-01 * @Date 2019-08-01
*/ */
public interface AppUserChannelsService { public interface MangerChannelService {
/** /**
* channel对象 * channel对象
...@@ -22,9 +22,11 @@ public interface AppUserChannelsService { ...@@ -22,9 +22,11 @@ public interface AppUserChannelsService {
Map<String, NioSocketChannel> CHANNEL_MAP = new ConcurrentHashMap<>(); Map<String, NioSocketChannel> CHANNEL_MAP = new ConcurrentHashMap<>();
/** /**
* USER_ID * CLIENT_ID
*/ */
AttributeKey<Long> USER_ID = AttributeKey.valueOf("userId"); AttributeKey<String> CLIENT_ID = AttributeKey.valueOf("CLIENTID");
AttributeKey<String> APP_KEY = AttributeKey.valueOf("APPKEY");
/** /**
* LANGUAGE * LANGUAGE
...@@ -60,7 +62,7 @@ public interface AppUserChannelsService { ...@@ -60,7 +62,7 @@ public interface AppUserChannelsService {
void remove(ChannelHandlerContext channelHandlerContext); void remove(ChannelHandlerContext channelHandlerContext);
/** /**
* 根据channel返回userIds * 根据channel返回userId
* *
* @param channelHandlerContext * @param channelHandlerContext
* @return * @return
...@@ -74,35 +76,43 @@ public interface AppUserChannelsService { ...@@ -74,35 +76,43 @@ public interface AppUserChannelsService {
* @param userId * @param userId
* @return * @return
*/ */
boolean writeData(String msg, Long userId); // boolean writeData(String msg, Long userId);
/**
* rpc异步下发数据
*
* @param msg
* @param userId
* @return
*/
boolean rpcWriteData(String msg, Long userId);
/** /**
* rpc-异步下发踢人数据及关闭旧通道 * 下发数据
* *
* @param msg * @param msg
* @param userId
* @return * @return
*/ */
boolean rpcKickWriteData(String msg, Long userId); boolean writeData(String msg, String toAppKey,String toClientId);
/** // /**
* rpc-异步关闭旧通道 // * rpc异步下发数据
* // *
* @param userId // * @param msg
* @return // * @param userId
*/ // * @return
boolean rpcCloseOldChannel(Long userId); // */
// boolean rpcWriteData(String msg, Long userId);
Boolean isOnLocal(Long userId); //
// /**
// * rpc-异步下发踢人数据及关闭旧通道
// *
// * @param msg
// * @param userId
// * @return
// */
// boolean rpcKickWriteData(String msg, Long userId);
//
// /**
// * rpc-异步关闭旧通道
// *
// * @param userId
// * @return
// */
// boolean rpcCloseOldChannel(Long userId);
//
// Boolean isOnLocal(Long userId);
} }
...@@ -2,8 +2,8 @@ package com.wecloud.im.tillo.app_ws.service.impl; ...@@ -2,8 +2,8 @@ package com.wecloud.im.tillo.app_ws.service.impl;
import cn.hutool.core.thread.ThreadFactoryBuilder; import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.wecloud.im.tillo.app_ws.cache.UserCache; import com.wecloud.im.tillo.app_ws.cache.UserCache;
import com.wecloud.im.tillo.app_ws.model.Constants; import com.wecloud.im.tillo.app_ws.model.WsConstants;
import com.wecloud.im.tillo.app_ws.service.AppUserChannelsService; import com.wecloud.im.tillo.app_ws.service.MangerChannelService;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
...@@ -14,9 +14,7 @@ import lombok.extern.slf4j.Slf4j; ...@@ -14,9 +14,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
...@@ -29,11 +27,10 @@ import java.util.concurrent.TimeUnit; ...@@ -29,11 +27,10 @@ import java.util.concurrent.TimeUnit;
*/ */
@Component @Component
@Slf4j @Slf4j
public class AppUserChannelsServiceImpl implements AppUserChannelsService { public class AppUserChannelsServiceImpl implements MangerChannelService {
private final static ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder() private final static ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNamePrefix("rpcWrite-").build(); .setNamePrefix("rpcWrite-").build();
// .setPriority(3)
/** /**
* 远程调用ws下发数据线程池 * 远程调用ws下发数据线程池
* 属于IO密集型任务 * 属于IO密集型任务
...@@ -48,23 +45,13 @@ public class AppUserChannelsServiceImpl implements AppUserChannelsService { ...@@ -48,23 +45,13 @@ public class AppUserChannelsServiceImpl implements AppUserChannelsService {
* *
* *
* <p> * <p>
* TODO 待完善:消息发送投递至MQ, 消费方从MQ队列获取下发任务 本地队列不宜缓存太多(机器死机则会全丢失) 堆积的请求处理队列可能会耗费非常大的内存甚至死机 * 后续优化待完善:消息发送投递至MQ, 消费方从MQ队列获取下发任务 本地队列不宜缓存太多(机器死机则会全丢失) 堆积的请求处理队列可能会耗费非常大的内存甚至死机
*/ */
// private final static ExecutorService THREAD_POOL_RPC_WRITE_EXECUTOR = private final static ExecutorService THREAD_POOL_RPC_WRITE_EXECUTOR = new ThreadPoolExecutor(
// new ThreadPoolExecutor(Constants.CPU_PROCESSORS * 4, Constants.CPU_PROCESSORS * 4 + 1, WsConstants.CPU_PROCESSORS * 100,
// 1L, TimeUnit.MILLISECONDS, WsConstants.CPU_PROCESSORS * 100,
// new LinkedBlockingQueue<Runnable>(), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy()); 1L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
// private final static ExecutorService THREAD_POOL_RPC_WRITE_EXECUTOR =
// new ThreadPoolExecutor(4, 6,
// 0L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>(), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.AbortPolicy());
private final static ExecutorService THREAD_POOL_RPC_WRITE_EXECUTOR =
new ThreadPoolExecutor(Constants.CPU_PROCESSORS * 100, Constants.CPU_PROCESSORS * 100,
1L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
@Autowired @Autowired
...@@ -74,7 +61,7 @@ public class AppUserChannelsServiceImpl implements AppUserChannelsService { ...@@ -74,7 +61,7 @@ public class AppUserChannelsServiceImpl implements AppUserChannelsService {
@Override @Override
public NioSocketChannel get(String userId) { public NioSocketChannel get(String userId) {
return AppUserChannelsService.CHANNEL_MAP.get(userId); return MangerChannelService.CHANNEL_MAP.get(userId);
} }
@Override @Override
...@@ -83,7 +70,7 @@ public class AppUserChannelsServiceImpl implements AppUserChannelsService { ...@@ -83,7 +70,7 @@ public class AppUserChannelsServiceImpl implements AppUserChannelsService {
// AppHashValueModel appHashValueModel = new AppHashValueModel(); // AppHashValueModel appHashValueModel = new AppHashValueModel();
// appHashValueModel.setOnlineStatus(UserCache.ONLINE); // appHashValueModel.setOnlineStatus(UserCache.ONLINE);
userCache.online(userId); userCache.online(userId);
AppUserChannelsService.CHANNEL_MAP.put(userId, channel); MangerChannelService.CHANNEL_MAP.put(userId, channel);
} }
...@@ -104,166 +91,212 @@ public class AppUserChannelsServiceImpl implements AppUserChannelsService { ...@@ -104,166 +91,212 @@ public class AppUserChannelsServiceImpl implements AppUserChannelsService {
log.debug("不活跃remove,uid:" + userId); log.debug("不活跃remove,uid:" + userId);
AppUserChannelsService.CHANNEL_MAP.remove(userId); MangerChannelService.CHANNEL_MAP.remove(userId);
channelHandlerContext.channel().close(); channelHandlerContext.channel().close();
} }
} }
@Override @Override
public Long getUserIdByChannel(ChannelHandlerContext channelHandlerContext) { public Long getUserIdByChannel(ChannelHandlerContext channelHandlerContext) {
return channelHandlerContext.channel().attr(AppUserChannelsService.USER_ID).get(); // return channelHandlerContext.channel().attr(MangerChannelService.USER_ID).get();
} return 1L;
@Override
public Boolean isOnLocal(Long userId) {
NioSocketChannel nioSocketChannel = this.get(String.valueOf(userId));
return null != nioSocketChannel;
}
/**
* TODO 待完成: 根据ACK回执 以及线程等待超时机制来判断客户端是否离线和超时;
* TODO 待完成: 发送后阻塞当前子线程2秒后获取ack回执 如客户端发起ack回执则需要主动唤醒当前子线程 立马唤醒当前子线程, 判断如果已回执则返回发送成功, 如果未回执则判断客户端是否断线或发送错误
*
* @param msg
* @param userId
* @return
*/
@Override
public boolean rpcWriteData(String msg, Long userId) {
Future<Boolean> future = THREAD_POOL_RPC_WRITE_EXECUTOR.submit(() -> {
NioSocketChannel nioSocketChannel = get(String.valueOf(userId));
if (null == nioSocketChannel) {
userCache.offline(String.valueOf(userId));
log.debug("rpc-writeData连接为空:" + userId + "," + msg);
return false;
}
// 判断连接是否断开
if (nioSocketChannel.isShutdown()) {
log.debug("rpc-writeData连接断开:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
return false;
}
if (log.isInfoEnabled()) {
log.info("rpc-writeData:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
}
ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg));
channelFuture.addListener(
//执行后回调的方法
(ChannelFutureListener) channelFuture1 -> {
if (log.isInfoEnabled()) {
log.info("rpc-netty线程异步执行结果:" + channelFuture1.isDone() + ",业务执行结果:" + channelFuture1.isSuccess()
+ ";\nwriteData:" + userId + "," + msg + ",channelId:" + nioSocketChannel.id().asLongText());
}
});
channelFuture.get();
return true;
});
boolean resultStatus = false;
try {
resultStatus = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return resultStatus;
} }
@Override // @Override
public boolean rpcKickWriteData(String msg, Long userId) { // public Boolean isOnLocal(Long userId) {
Future<Boolean> future = THREAD_POOL_RPC_WRITE_EXECUTOR.submit(() -> { // NioSocketChannel nioSocketChannel = this.get(String.valueOf(userId));
// return null != nioSocketChannel;
NioSocketChannel nioSocketChannel = get(String.valueOf(userId)); //
if (null == nioSocketChannel) { // }
log.debug("rpc-kickWriteData连接为空:" + userId + "," + msg); //
return false; //
} // /**
// * TODO 待完成: 根据ACK回执 以及线程等待超时机制来判断客户端是否离线和超时;
// 判断连接是否断开 // * TODO 待完成: 发送后阻塞当前子线程2秒后获取ack回执 如客户端发起ack回执则需要主动唤醒当前子线程 立马唤醒当前子线程, 判断如果已回执则返回发送成功, 如果未回执则判断客户端是否断线或发送错误
if (nioSocketChannel.isShutdown()) { // *
log.debug("rpc-kickWriteData连接断开:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText()); // * @param msg
nioSocketChannel.close(); // * @param userId
return false; // * @return
} // */
// @Override
if (log.isDebugEnabled()) { // public boolean rpcWriteData(String msg, Long userId) {
log.debug("rpc-kickWriteData:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText()); //
} // Future<Boolean> future = THREAD_POOL_RPC_WRITE_EXECUTOR.submit(() -> {
//
ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg)); // NioSocketChannel nioSocketChannel = get(String.valueOf(userId));
channelFuture.addListener( // if (null == nioSocketChannel) {
//执行后回调的方法 // userCache.offline(String.valueOf(userId));
(ChannelFutureListener) channelFuture1 -> { // log.debug("rpc-writeData连接为空:" + userId + "," + msg);
if (log.isDebugEnabled()) { // return false;
log.debug("rpc-netty踢人线程异步执行结果:" + channelFuture1.isDone() + ",业务执行结果:" + channelFuture1.isSuccess() // }
+ ";\nkickWriteData:" + userId + "," + msg + ",channelId:" + nioSocketChannel.id().asLongText()); //
} // // 判断连接是否断开
}); // if (nioSocketChannel.isShutdown()) {
// log.debug("rpc-writeData连接断开:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
channelFuture.get(); // return false;
// 关闭 // }
nioSocketChannel.close(); //
return true; // if (log.isInfoEnabled()) {
}); // log.info("rpc-writeData:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// }
boolean resultStatus = false; //
try { // ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg));
resultStatus = future.get(); // channelFuture.addListener(
} catch (InterruptedException | ExecutionException e) { // //执行后回调的方法
e.printStackTrace(); // (ChannelFutureListener) channelFuture1 -> {
} // if (log.isInfoEnabled()) {
return resultStatus; // log.info("rpc-netty线程异步执行结果:" + channelFuture1.isDone() + ",业务执行结果:" + channelFuture1.isSuccess()
} // + ";\nwriteData:" + userId + "," + msg + ",channelId:" + nioSocketChannel.id().asLongText());
// }
@Override // });
public boolean rpcCloseOldChannel(Long userId) { //
Future<Boolean> future = THREAD_POOL_RPC_WRITE_EXECUTOR.submit(() -> { // channelFuture.get();
NioSocketChannel nioSocketChannel = get(String.valueOf(userId)); //
if (null == nioSocketChannel) { // return true;
log.debug("rpc-closeOldChannel连接为空:" + userId); // });
return false; //
} // boolean resultStatus = false;
// 关闭 // try {
nioSocketChannel.close(); // resultStatus = future.get();
return true; // } catch (InterruptedException | ExecutionException e) {
}); // e.printStackTrace();
boolean resultStatus = false; // }
try { // return resultStatus;
resultStatus = future.get(); //
} catch (InterruptedException | ExecutionException e) { // }
e.printStackTrace(); //
} // @Override
return resultStatus; // public boolean rpcKickWriteData(String msg, Long userId) {
} // Future<Boolean> future = THREAD_POOL_RPC_WRITE_EXECUTOR.submit(() -> {
//
// NioSocketChannel nioSocketChannel = get(String.valueOf(userId));
// if (null == nioSocketChannel) {
// log.debug("rpc-kickWriteData连接为空:" + userId + "," + msg);
// return false;
// }
//
// // 判断连接是否断开
// if (nioSocketChannel.isShutdown()) {
// log.debug("rpc-kickWriteData连接断开:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// nioSocketChannel.close();
// return false;
// }
//
// if (log.isDebugEnabled()) {
// log.debug("rpc-kickWriteData:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// }
//
// ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg));
// channelFuture.addListener(
// //执行后回调的方法
// (ChannelFutureListener) channelFuture1 -> {
// if (log.isDebugEnabled()) {
// log.debug("rpc-netty踢人线程异步执行结果:" + channelFuture1.isDone() + ",业务执行结果:" + channelFuture1.isSuccess()
// + ";\nkickWriteData:" + userId + "," + msg + ",channelId:" + nioSocketChannel.id().asLongText());
// }
// });
//
// channelFuture.get();
// // 关闭
// nioSocketChannel.close();
// return true;
// });
//
// boolean resultStatus = false;
// try {
// resultStatus = future.get();
// } catch (InterruptedException | ExecutionException e) {
// e.printStackTrace();
// }
// return resultStatus;
// }
//
// @Override
// public boolean rpcCloseOldChannel(Long userId) {
// Future<Boolean> future = THREAD_POOL_RPC_WRITE_EXECUTOR.submit(() -> {
// NioSocketChannel nioSocketChannel = get(String.valueOf(userId));
// if (null == nioSocketChannel) {
// log.debug("rpc-closeOldChannel连接为空:" + userId);
// return false;
// }
// // 关闭
// nioSocketChannel.close();
// return true;
// });
// boolean resultStatus = false;
// try {
// resultStatus = future.get();
// } catch (InterruptedException | ExecutionException e) {
// e.printStackTrace();
// }
// return resultStatus;
// }
//
// @Override
// public boolean writeData(String msg, Long userId) {
//
//// Future<Boolean> future = THREAD_POOL_EXECUTOR.submit(() -> {
//
// NioSocketChannel nioSocketChannel = get(String.valueOf(userId));
// if (null == nioSocketChannel) {
// userCache.offline(String.valueOf(userId));
// log.debug("writeData连接为空:" + userId + "," + msg);
// return false;
// }
//
// // 判断连接是否断开
// if (nioSocketChannel.isShutdown()) {
// log.debug("writeData连接断开:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// return false;
// }
//
// if (log.isDebugEnabled()) {
// log.debug("writeData:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// }
//
// ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg));
// channelFuture.addListener(
// //执行后回调的方法
// (ChannelFutureListener) channelFuture1 -> {
// if (log.isDebugEnabled()) {
// log.debug("netty线程异步执行结果:" + channelFuture1.isDone() + ",业务执行结果:" + channelFuture1.isSuccess()
// + ";\nwriteData:" + userId + "," + msg + ",channelId:" + nioSocketChannel.id().asLongText());
// }
// });
//
//// channelFuture.get();
//
// return true;
//// });
//
//// Boolean resultStatus = false;
//// try {
//// resultStatus = future.get();
//// } catch (InterruptedException | ExecutionException e) {
//// e.printStackTrace();
//// }
//// return resultStatus;
//
// }
@Override @Override
public boolean writeData(String msg, Long userId) { public boolean writeData(String msg, String toAppKey, String toClientId) {
// Future<Boolean> future = THREAD_POOL_EXECUTOR.submit(() -> {
NioSocketChannel nioSocketChannel = get(String.valueOf(userId)); NioSocketChannel nioSocketChannel = get(String.valueOf(toAppKey + toClientId));
if (null == nioSocketChannel) { if (null == nioSocketChannel) {
userCache.offline(String.valueOf(userId)); userCache.offline(String.valueOf(toAppKey + toClientId));
log.debug("writeData连接为空:" + userId + "," + msg); log.debug("writeData连接为空:" + toAppKey + toClientId + "," + msg);
return false; return false;
} }
// 判断连接是否断开 // 判断连接是否断开
if (nioSocketChannel.isShutdown()) { if (nioSocketChannel.isShutdown()) {
log.debug("writeData连接断开:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText()); log.debug("writeData连接断开:" + toAppKey + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
return false; return false;
} }
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("writeData:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText()); log.debug("writeData:" + toAppKey + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
} }
ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg)); ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg));
...@@ -272,22 +305,10 @@ public class AppUserChannelsServiceImpl implements AppUserChannelsService { ...@@ -272,22 +305,10 @@ public class AppUserChannelsServiceImpl implements AppUserChannelsService {
(ChannelFutureListener) channelFuture1 -> { (ChannelFutureListener) channelFuture1 -> {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("netty线程异步执行结果:" + channelFuture1.isDone() + ",业务执行结果:" + channelFuture1.isSuccess() log.debug("netty线程异步执行结果:" + channelFuture1.isDone() + ",业务执行结果:" + channelFuture1.isSuccess()
+ ";\nwriteData:" + userId + "," + msg + ",channelId:" + nioSocketChannel.id().asLongText()); + ";\nwriteData:" + toAppKey + toClientId + "," + msg + ",channelId:" + nioSocketChannel.id().asLongText());
} }
}); });
// channelFuture.get();
return true; return true;
// });
// Boolean resultStatus = false;
// try {
// resultStatus = future.get();
// } catch (InterruptedException | ExecutionException e) {
// e.printStackTrace();
// }
// return resultStatus;
} }
......
...@@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSON; ...@@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.wecloud.im.tillo.app_ws.model.ResponseModel; import com.wecloud.im.tillo.app_ws.model.ResponseModel;
import com.wecloud.im.tillo.app_ws.model.ResultStatus; import com.wecloud.im.tillo.app_ws.model.ResultStatus;
import com.wecloud.im.tillo.app_ws.model.request.ReceiveModel; import com.wecloud.im.tillo.app_ws.model.request.ReceiveModel;
import com.wecloud.im.tillo.app_ws.service.AppUserChannelsService; import com.wecloud.im.tillo.app_ws.service.MangerChannelService;
import com.wecloud.im.tillo.app_ws.service.WriteDataService; import com.wecloud.im.tillo.app_ws.service.WriteDataService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -20,7 +20,7 @@ import java.util.HashMap; ...@@ -20,7 +20,7 @@ import java.util.HashMap;
public class WriteDataServiceImpl implements WriteDataService { public class WriteDataServiceImpl implements WriteDataService {
@Autowired @Autowired
private AppUserChannelsService appUserChannelsService; private MangerChannelService mangerChannelService;
@Override @Override
public void successAndData(ReceiveModel requestModel, Object data, Long userId, String language) { public void successAndData(ReceiveModel requestModel, Object data, Long userId, String language) {
...@@ -54,7 +54,7 @@ public class WriteDataServiceImpl implements WriteDataService { ...@@ -54,7 +54,7 @@ public class WriteDataServiceImpl implements WriteDataService {
@Override @Override
public void write(ResponseModel responseModel, Long userId) { public void write(ResponseModel responseModel, Long userId) {
String json = JSON.toJSONString(responseModel); String json = JSON.toJSONString(responseModel);
appUserChannelsService.writeData(json, userId); // mangerChannelService.writeData(json, userId);
} }
} }
...@@ -16,10 +16,10 @@ import java.net.InetAddress; ...@@ -16,10 +16,10 @@ import java.net.InetAddress;
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class ChannelInboundHandler extends ChannelInboundHandlerAdapter { public class ChannelInboundHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(ChannelInboundHandler.class); private final Logger logger = LoggerFactory.getLogger(ChannelInboundHandler.class);
private final NettyApiRequest singleRequest; private final NettyApiRequest nettyApiRequest;
public ChannelInboundHandler(NettyApiRequest singleRequest) { public ChannelInboundHandler(NettyApiRequest nettyApiRequest) {
this.singleRequest = singleRequest; this.nettyApiRequest = nettyApiRequest;
} }
...@@ -30,7 +30,7 @@ public class ChannelInboundHandler extends ChannelInboundHandlerAdapter { ...@@ -30,7 +30,7 @@ public class ChannelInboundHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) { public void channelRead(ChannelHandlerContext ctx, Object msg) {
FullHttpRequest httpRequest = (FullHttpRequest) msg; FullHttpRequest httpRequest = (FullHttpRequest) msg;
try { try {
singleRequest.handle(ctx, msg, httpRequest); nettyApiRequest.handle(ctx, msg, httpRequest);
} catch (Exception e) { } catch (Exception e) {
logger.error("SingleNettyServer处理请求失败!", e); logger.error("SingleNettyServer处理请求失败!", e);
// this.sendBad(ctx); // this.sendBad(ctx);
......
package com.wecloud.im.tillo.netty.core; package com.wecloud.im.tillo.netty.core;
import com.wecloud.im.tillo.app_ws.model.Constants; import com.wecloud.im.tillo.app_ws.model.WsConstants;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
...@@ -28,7 +28,7 @@ public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { ...@@ -28,7 +28,7 @@ public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
// 服务端api接口 // 服务端api接口
pipeline.addLast("SingleHttpRequestHandler", channelInboundHandler); pipeline.addLast("SingleHttpRequestHandler", channelInboundHandler);
// "/appws"路径 升级长连接 // "/appws"路径 升级长连接
pipeline.addLast("appWebSocketServerProtocolHandler", new WebSocketServerProtocolHandler(Constants.APP_WS_URL)); pipeline.addLast("appWebSocketServerotocolHandler", new WebSocketServerProtocolHandler(WsConstants.WS_URL));
} }
} }
...@@ -3,18 +3,20 @@ package com.wecloud.im.tillo.netty.handler; ...@@ -3,18 +3,20 @@ package com.wecloud.im.tillo.netty.handler;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.auth0.jwt.interfaces.DecodedJWT; import com.auth0.jwt.interfaces.DecodedJWT;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.service.ImApplicationService; import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientService; import com.wecloud.im.service.ImClientService;
import com.wecloud.im.tillo.app_ws.AppImHandler; import com.wecloud.im.tillo.app_ws.WsHandler;
import com.wecloud.im.tillo.app_ws.model.Constants; import com.wecloud.im.tillo.app_ws.model.WsConstants;
import com.wecloud.im.tillo.app_ws.model.RequestHeaderConstants; import com.wecloud.im.tillo.app_ws.service.MangerChannelService;
import com.wecloud.im.tillo.app_ws.utils.FullHttpRequestUtils; import com.wecloud.im.tillo.app_ws.utils.FullHttpRequestUtils;
import io.geekidea.springbootplus.framework.shiro.util.JwtUtil; import io.geekidea.springbootplus.framework.shiro.util.JwtUtil;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpResponseStatus;
import org.slf4j.Logger; import lombok.extern.slf4j.Slf4j;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -29,16 +31,17 @@ import java.util.Map; ...@@ -29,16 +31,17 @@ import java.util.Map;
* @Date 2019-07-19 * @Date 2019-07-19
*/ */
@Component @Component
@Slf4j
public class NettyApiRequest { public class NettyApiRequest {
private final Logger logger = LoggerFactory.getLogger(NettyApiRequest.class);
// @Resource @Resource
// private AppUserChannelsService appUserChannelsService; private MangerChannelService appUserChannelsService;
@Autowired @Autowired
private StringRedisTemplate redisTemplate; private StringRedisTemplate redisTemplate;
@Resource @Resource
private AppImHandler appImHandler; private WsHandler appImHandler;
@Autowired @Autowired
private ImClientService imClientService; private ImClientService imClientService;
...@@ -56,7 +59,7 @@ public class NettyApiRequest { ...@@ -56,7 +59,7 @@ public class NettyApiRequest {
*/ */
public void handle(ChannelHandlerContext ctx, Object msg, FullHttpRequest httpRequest) throws Exception { public void handle(ChannelHandlerContext ctx, Object msg, FullHttpRequest httpRequest) throws Exception {
if (!(msg instanceof FullHttpRequest)) { if (!(msg instanceof FullHttpRequest)) {
// String context = JsonUtil.obj2Json(ResultModel.error(ResultStatus.REQUEST_ERROR)); // String context = JsonUtil.obj2Json(ResultModel.error(ResultStatus.REQUEST_ERROR));
String context = "JsonUtil.obj2Json(ResultModel.error(ResultStatus.REQUEST_ERROR))"; String context = "JsonUtil.obj2Json(ResultModel.error(ResultStatus.REQUEST_ERROR))";
FullHttpRequestUtils.send(ctx, context, HttpResponseStatus.OK); FullHttpRequestUtils.send(ctx, context, HttpResponseStatus.OK);
return; return;
...@@ -64,66 +67,66 @@ public class NettyApiRequest { ...@@ -64,66 +67,66 @@ public class NettyApiRequest {
String path = httpRequest.uri(); String path = httpRequest.uri();
String body = FullHttpRequestUtils.getBody(httpRequest); String body = FullHttpRequestUtils.getBody(httpRequest);
if (log.isDebugEnabled()) {
if (logger.isDebugEnabled()) { log.debug("httpRequest:\n" + httpRequest.toString() + "\n" + body);
logger.debug("httpRequest:\n" + httpRequest.toString() + "\n" + body);
} }
if (path.contains(Constants.APP_WS_URL)) { if (path.contains(WsConstants.WS_URL)) {
/* /*
app聊天http升级webSocket app聊天http升级webSocket
*/ */
this.initAppWs(ctx, httpRequest); this.initWs(ctx, httpRequest);
} }
} }
/** /**
* app 初始化websocket * 初始化websocket
*
* @param ctx
* @param httpRequest
* @throws org.apache.http.MethodNotSupportedException
*/ */
private void initAppWs(ChannelHandlerContext ctx, FullHttpRequest httpRequest) throws Exception { private void initWs(ChannelHandlerContext ctx, FullHttpRequest httpRequest) throws Exception {
Map<String, String> paramMap = FullHttpRequestUtils.parameterParse(httpRequest); Map<String, String> paramMap = FullHttpRequestUtils.parameterParse(httpRequest);
String token = paramMap.get(Constants.TOKEN); String token = paramMap.get(WsConstants.TOKEN);
String deviceId = paramMap.get(RequestHeaderConstants.DEVICE_ID); // String deviceId = paramMap.get(RequestHeaderConstants.DEVICE_ID);
DecodedJWT jwtInfo = JwtUtil.getJwtInfo(token); DecodedJWT jwtInfo = JwtUtil.getJwtInfo(token);
String payload = jwtInfo.getPayload(); String payload = jwtInfo.getPayload();
Base64.Decoder decoder = Base64.getDecoder(); Base64.Decoder decoder = Base64.getDecoder();
payload = new String(decoder.decode(payload), "UTF-8"); payload = new String(decoder.decode(payload), "UTF-8");
JSONObject jsonObject = JSONObject.parseObject(payload); JSONObject jsonObject = JSONObject.parseObject(payload);
String appKey = (String) jsonObject.get("appKey"); String appKey = (String) jsonObject.get("appKey");
String id = (String) jsonObject.get("username"); String clientId = (String) jsonObject.get("username");
// 验签token
ImApplication imApplication = imApplicationService.getOne(new QueryWrapper<ImApplication>().lambda()
.eq(ImApplication::getAppKey, appKey));
if (!JwtUtil.verifyToken(token, imApplication.getAppSecret())) {
log.debug("验签token不通过");
}
// redisTemplate.opsForValue().set("client:" + imApplication.getAppKey() + ":" + clientId, generateToken); String redisKey = "client:" + appKey + ":" + clientId;
String redisToken = redisTemplate.opsForValue().get(redisKey);
// 判断token和redis是否一致
if (!token.equals(redisToken)) {
log.debug("token和redis不一致");
}
// 设置uri前缀 // 设置uri前缀
httpRequest.setUri(Constants.APP_WS_URL); httpRequest.setUri(WsConstants.WS_URL);
// 保持当前连接 // 保持当前连接
ctx.fireChannelRead(httpRequest.retain()); ctx.fireChannelRead(httpRequest.retain());
// 设置属性值 userid - channel // 设置属性值 userid - channel
// ctx.channel().attr(AppUserChannelsService.USER_ID).set(userId); ctx.channel().attr(MangerChannelService.CLIENT_ID).set(clientId);
// ctx.channel().attr(AppUserChannelsService.LANGUAGE).set(language); ctx.channel().attr(MangerChannelService.APP_KEY).set(appKey);
// 添加长连接handler // 添加长连接handler
ctx.pipeline().addLast("appImHandler", appImHandler); ctx.pipeline().addLast("appImHandler", appImHandler);
// 保存用户上下文对象 // 保存用户上下文对象
// appUserChannelsService.put(String.valueOf(headerUserId), (NioSocketChannel) ctx.channel()); appUserChannelsService.put((appKey + clientId), (NioSocketChannel) ctx.channel());
//移除当前api处理handler, 不再参与长连接处理 //移除当前api处理handler, 不再参与长连接处理
ctx.pipeline().remove("SingleHttpRequestHandler"); ctx.pipeline().remove("SingleHttpRequestHandler");
......
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.wecloud.im.mapper.ImMessageMapper">
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, create_time, withdraw_time, update_date, fk_appid, sender, content, withdraw, event, system, at, send_status, fk_conversation_id
</sql>
<select id="getImMessageById" resultType="com.wecloud.im.param.ImMessageQueryVo">
select
<include refid="Base_Column_List"/>
from im_message where id = #{id}
</select>
<select id="getImMessagePageList" parameterType="com.wecloud.im.param.ImMessagePageParam" resultType="com.wecloud.im.param.ImMessageQueryVo">
select
<include refid="Base_Column_List"/>
from im_message
</select>
</mapper>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment