Commit b159ce53 by hweeeeeei

添加dubbo服务发现;

路由找到当前client的远程地址,并通过RPC远程调用下发数据;
根据ClientId从redis获取client信息;
完成websocket的channel通道缓存改造;
维护client的多端数据:  值为set集合,netty的channel的id;
修改token同一个账号可多次登录;
parent 8a941b2c
......@@ -34,7 +34,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
* 项目启动入口
*/
@EnableDiscoveryClient
@EnableDubbo(scanBasePackages = {"com.wecloud.im.service.impl"})
@EnableDubbo(scanBasePackages = {"com.wecloud.im.service.impl", "com.wecloud.im.router"})
@EnableAsync
@EnableScheduling
@EnableTransactionManagement
......@@ -46,7 +46,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
public class SpringBootPlusApplication {
public static void main(String[] args) {
// 项目框架 集成logback 与nacos 自带的logback 冲突, nacos的logback 已经先于 项目启动,
// logback 与nacos 自带的logback 冲突, nacos的logback 已经先于 项目启动,
// logback 本身 在命名之后不允许二次修改contextName的名字,问题在阿里nacos 官方git上已有人提出,
// https://github.com/alibaba/nacos/issues/1959
System.setProperty("nacos.logging.default.config.enabled", "false");
......
......@@ -102,7 +102,7 @@ public class NettyApiRequest {
return;
}
// 验签token
// 从redis获取jwt的token 验签token
JwtToken jwtToken = shiroLoginService.getTJwtTokenForRedis(token);
if (jwtToken == null) {
......
package com.wecloud.im.router;
/**
* 路由找到当前client的远程地址
*/
public interface RouterSendService {
void rpcSend(String msg, String channelId);
}
package com.wecloud.im.router;
import com.wecloud.im.ws.service.MangerChannelService;
import org.apache.dubbo.config.annotation.DubboService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@DubboService
public class RouterSendServiceImpl implements RouterSendService {
@Autowired
private MangerChannelService mangerChannelService;
@Override
public void rpcSend(String msg, String channelId) {
mangerChannelService.writeDataToChannel(msg, channelId);
}
}
package com.wecloud.im.ws.cache;
import com.wecloud.im.ws.model.redis.ClientChannelInfo;
import com.wecloud.im.ws.model.redis.ClientConnectionInfo;
import java.util.List;
/**
* @author hewei123@163.com
* @Description 用户与redis绑定
* @createTime 2020年04月14日 16:21:00
*/
public interface UserCacheService {
/**
* 用户上线绑定机器ip
*
* @param longChannelId
*/
public void online(String appKey, String clientId, String longChannelId);
/**
* 用户下线删除绑定机器ip
*
* @param longChannelId
*/
public void offline(String appKey, String clientId, String longChannelId);
/**
* 根据用户id获取存在redis中的数据 例如绑定的服务器ip地址
*
* @param clientId
* @return
*/
public ClientConnectionInfo getById(String appKey, String clientId);
/**
* 根据ClientId从redis获取client信息
*
* @param appKey
* @param clientId
* @return
*/
List<ClientChannelInfo> getIpByClientIdAndOnline(String appKey, String clientId);
}
package com.wecloud.im.ws.cache;
import com.wecloud.im.ws.model.redis.ClientChannelInfo;
import com.wecloud.im.ws.model.redis.ClientConnectionInfo;
import com.wecloud.im.ws.utils.InitIp;
import com.wecloud.im.ws.utils.RedisUtils;
......@@ -7,7 +8,10 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author hewei123@163.com
......@@ -16,7 +20,7 @@ import java.util.Map;
*/
@Service
@Slf4j
public class UserCache {
public class UserCacheServiceImpl implements UserCacheService {
/**
* 在线状态
......@@ -39,7 +43,7 @@ public class UserCache {
/**
* 用户在线状态fieldKey
*/
private static final String ONLINE_STATUS_KEY = "ost";
private static final String ONLINE_STATUS = "ost";
/**
* 设备类型 web,ios,android,ios,other
*/
......@@ -69,12 +73,13 @@ public class UserCache {
*
* @param longChannelId
*/
@Override
public void online(String appKey, String clientId, String longChannelId) {
log.info("ws用户上线保存redis连接ip:" + InitIp.lAN_IP, ",uid:" + longChannelId);
redisUtils.addForSet(CLIENTS, longChannelId);
redisUtils.addForSet(CLIENTS + appKey + clientId, longChannelId);
redisUtils.hashset(CLIENT_INFO + appKey + clientId, LAN_IP, InitIp.lAN_IP);
redisUtils.hashset(CLIENT_INFO + appKey + clientId, ONLINE_STATUS_KEY, ONLINE.toString());
redisUtils.hashset(CLIENT_INFO + longChannelId, LAN_IP, InitIp.lAN_IP);
redisUtils.hashset(CLIENT_INFO + longChannelId, ONLINE_STATUS, ONLINE.toString());
}
......@@ -83,13 +88,15 @@ public class UserCache {
*
* @param longChannelId
*/
@Override
public void offline(String appKey, String clientId, String longChannelId) {
log.info("ws用户离线删除redis key,uid:" + longChannelId);
redisUtils.removeForSet(CLIENTS, longChannelId);
redisUtils.removeForSet(CLIENTS + appKey + clientId, longChannelId);
// redisUtils.hashset(CLIENT_INFO + appKey + clientId, ONLINE_STATUS_KEY, OFFLINE.toString());
redisUtils.delKey(CLIENT_INFO + appKey + clientId);
redisUtils.delKey(CLIENT_INFO + longChannelId);
}
/**
......@@ -98,9 +105,10 @@ public class UserCache {
* @param clientId
* @return
*/
public ClientConnectionInfo getById(String clientId) {
@Override
public ClientConnectionInfo getById(String appKey, String clientId) {
Map<String, String> hgetll = redisUtils.hashgetll(CLIENT_INFO + clientId);
Map<String, String> hgetll = redisUtils.hashgetll(CLIENT_INFO + appKey + clientId);
if (hgetll.isEmpty()) {
return null;
}
......@@ -111,5 +119,33 @@ public class UserCache {
}
@Override
public List<ClientChannelInfo> getIpByClientIdAndOnline(String appKey, String clientId) {
// 获取所有 CLIENTS的 longChannelId
Set<String> longChannelIds = redisUtils.getForSetMembers(CLIENTS + appKey + clientId);
ArrayList<ClientChannelInfo> clientChannelInfos = new ArrayList<>();
// 根据 longChannelId 查询信息
for (String longChannelId : longChannelIds) {
String onlineStatus = redisUtils.hashget(CLIENT_INFO + appKey + clientId, ONLINE_STATUS);
String lanIp = redisUtils.hashget(CLIENT_INFO + longChannelId, LAN_IP);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo();
clientChannelInfo.setChannelId(longChannelId);
clientChannelInfo.setLanIp(lanIp);
clientChannelInfo.setOnlineStatus(Integer.valueOf(onlineStatus));
clientChannelInfos.add(clientChannelInfo);
// //判断是否在线
// if (StringUtils.isNotBlank(onlineStatus) && Integer.valueOf(onlineStatus).equals(ONLINE)) {
// }
}
return clientChannelInfos;
}
}
package com.wecloud.im.ws.model.redis;
import lombok.Data;
import java.io.Serializable;
/**
* 维护client多端的ip及平台类型:
*/
@Data
public class ClientChannelInfo extends ClientConnectionInfo implements Serializable {
private String channelId;
private Integer onlineStatus;
}
......@@ -16,12 +16,12 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public interface MangerChannelService {
/**
* channel对象
* 用户id为key
* context为值
*/
Map<String, NioSocketChannel> CHANNEL_MAP = new ConcurrentHashMap<>();
// /**
// * channel对象
// * 用户id为key
// * context为值
// */
// Map<String, NioSocketChannel> CHANNEL_MAP = new ConcurrentHashMap<>();
/**
* 本地维护 uid 对应多个 channel的shortID
......@@ -63,7 +63,7 @@ public interface MangerChannelService {
*
* @return
*/
NioSocketChannel get(String appKey, String clientId);
// NioSocketChannel get(String appKey, String clientId);
/**
* client上线
......@@ -113,7 +113,16 @@ public interface MangerChannelService {
* @param msg
* @return
*/
boolean writeData(String msg, String toAppKey, String toClientId);
// boolean writeData(String msg, String toAppKey, String toClientId);
/**
* 向指定channel_Id下发数据
*
* @param msg
* @return
*/
boolean writeDataToChannel(String msg, String toChannelId);
// /**
// * rpc异步下发数据
......
......@@ -4,6 +4,7 @@ package com.wecloud.im.ws.service;
import com.wecloud.im.ws.model.WsResponseModel;
import com.wecloud.im.ws.model.request.ReceiveModel;
import io.geekidea.springbootplus.framework.common.api.ApiCode;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @Description ws响应数据,各种状态码封装
......@@ -50,4 +51,13 @@ public interface WriteDataService {
void write(WsResponseModel responseModel, String toAppKey, String toClientId);
/**
* 本地直接下发
*
* @param responseModel
* @param nioSocketChannel
*/
void response(WsResponseModel responseModel, NioSocketChannel nioSocketChannel);
}
package com.wecloud.im.ws.service.impl;
import com.wecloud.im.ws.cache.UserCache;
import com.wecloud.im.ws.cache.UserCacheService;
import com.wecloud.im.ws.model.ClientInfo;
import com.wecloud.im.ws.model.redis.ClientChannelInfo;
import com.wecloud.im.ws.service.MangerChannelService;
import com.wecloud.rtc.service.RtcService;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
......@@ -14,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
......@@ -28,58 +28,32 @@ public class MangerChannelServiceImpl implements MangerChannelService {
@Autowired
private RtcService rtcService;
// private final static ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder()
// .setNamePrefix("rpcWrite-").build();
/**
* 远程调用ws下发数据线程池
* 属于IO密集型任务
* IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如CPU核数*2
* 某大厂设置策略:IO密集型时,大部分线程都阻塞,故需要多配置线程数: 公式:CPU核数/1-阻塞系数 阻塞系数:0.8-0.9
* 比如8核CPU: 8/1-0.9 = 80 个线程数
* <p>
* 由于来自远程端调用下发数据 如果是群聊1000人群则调用1000次 为不要占用太多资源 需要排队下发
* 经过并发测试 200并发1000人群消息 需要调用200x1000=20w次 考虑单机cpu性能还要顾及本机api业务 设置阻塞队列
* 为避免过多占用本地io线程导致response慢,设置LinkedBlockingQueue数量多可以避免抢占, (队列数量需要测试调试到最优数量 )
* 最大线程数量不要设置太多 数量、优先级一定要比本地io线程低级
*
*
* <p>
* 后续优化待完善:消息发送投递至MQ, 消费方从MQ队列获取下发任务 本地队列不宜缓存太多(机器死机则会全丢失) 堆积的请求处理队列可能会耗费非常大的内存甚至死机
*/
// private final static ExecutorService THREAD_POOL_RPC_WRITE_EXECUTOR = new ThreadPoolExecutor(
// WsConstants.CPU_PROCESSORS * 100,
// WsConstants.CPU_PROCESSORS * 100,
// 1L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>(), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
@Autowired
private UserCache userCache;
private UserCacheService userCacheService;
@Override
public NioSocketChannel get(String appKey, String clientId) {
return this.CHANNEL_MAP.get(appKey + clientId);
}
// @Override
// public NioSocketChannel get(String appKey, String clientId) {
//
// return this.CHANNEL_MAP.get(appKey + clientId);
// }
@Override
public void online(String appKey, String clientId, NioSocketChannel channel) {
// 断掉旧链接
NioSocketChannel nioSocketChannel = get(appKey, clientId);
if (null != nioSocketChannel) {
log.info("put新连接关掉旧链接:" + appKey + "," + clientId + ",\nchannelId:" + nioSocketChannel.id().asLongText());
nioSocketChannel.close();
}
// NioSocketChannel nioSocketChannel = get(appKey, clientId);
// if (null != nioSocketChannel) {
// log.info("put新连接关掉旧链接:" + appKey + "," + clientId + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// nioSocketChannel.close();
// }
this.CHANNEL_MAP.put(appKey + clientId, channel);
// this.CHANNEL_MAP.put(appKey + clientId, channel);
String longChannelId = channel.id().asLongText();
this.putClientsMap(appKey, clientId, longChannelId);
this.putSessionInfoMap(longChannelId, channel);
userCache.online(appKey, clientId, longChannelId);
userCacheService.online(appKey, clientId, longChannelId);
}
void putSessionInfoMap(String longChannelId, NioSocketChannel channel) {
......@@ -126,14 +100,16 @@ public class MangerChannelServiceImpl implements MangerChannelService {
// 关掉连接
channelHandlerContext.close();
// rtc清空缓存
rtcService.clientOffline(appKey, clientId);
// 移除本地维护的channel
delSessionInfoMap(longChannelId);
delClientsMap(appKey, clientId, longChannelId);
userCache.offline(appKey, clientId, longChannelId);
// 移除redis缓存
userCacheService.offline(appKey, clientId, longChannelId);
// rtc清空缓存
rtcService.clientOffline(appKey, clientId);
}
......@@ -158,49 +134,86 @@ public class MangerChannelServiceImpl implements MangerChannelService {
*/
@Override
public boolean getOnlineStatus(String toAppKey, String toClientId) {
NioSocketChannel nioSocketChannel = get(toAppKey, toClientId);
if (null == nioSocketChannel) {
if (log.isDebugEnabled()) {
log.info("writeData 不存在 连接为空:" + toAppKey + toClientId);
}
return false;
}
// 判断连接是否断开
if (nioSocketChannel.isShutdown()) {
if (log.isDebugEnabled()) {
log.info("writeData连接断开:" + toAppKey + toClientId + "channelId:" + nioSocketChannel.id().asLongText());
List<ClientChannelInfo> channelInfos = userCacheService.getIpByClientIdAndOnline(toAppKey, toClientId);
boolean flag = false;
for (ClientChannelInfo channelInfo : channelInfos) {
if (channelInfo.getOnlineStatus().equals(1)) {
return true;
}
return false;
}
return true;
// NioSocketChannel nioSocketChannel = get(toAppKey, toClientId);
// if (null == nioSocketChannel) {
// if (log.isDebugEnabled()) {
// log.info("writeData 不存在 连接为空:" + toAppKey + toClientId);
// }
// return false;
// }
// // 判断连接是否断开
// if (nioSocketChannel.isShutdown()) {
// if (log.isDebugEnabled()) {
// log.info("writeData连接断开:" + toAppKey + toClientId + "channelId:" + nioSocketChannel.id().asLongText());
// }
// return false;
// }
return flag;
}
// @Override
// public boolean writeData(String msg, String toAppKey, String toClientId) {
//
// NioSocketChannel nioSocketChannel = get(toAppKey, toClientId);
// if (null == nioSocketChannel) {
// log.info("writeData连接为空:" + toAppKey + toClientId + "," + msg);
// return false;
// }
// // 判断连接是否断开
// if (nioSocketChannel.isShutdown()) {
// log.info("writeData连接断开:" + toAppKey + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// return false;
// }
//
// log.info("writeData:" + toAppKey + "," + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
//
// ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg));
// channelFuture.addListener(
// //执行后回调的方法
// (ChannelFutureListener) channelFuture1 -> {
// if (log.isDebugEnabled()) {
// log.info("netty线程异步执行结果:" + channelFuture1.isDone() + ",业务执行结果:" + channelFuture1.isSuccess()
// + ";\nwriteData:" + toAppKey + toClientId + "," + msg + ",channelId:" + nioSocketChannel.id().asLongText());
// }
// });
// return true;
// }
@Override
public boolean writeData(String msg, String toAppKey, String toClientId) {
public boolean writeDataToChannel(String msg, String toChannelId) {
NioSocketChannel nioSocketChannel = get(toAppKey, toClientId);
ClientInfo clientInfo = SESSION_INFO_MAP.get(toChannelId);
if (clientInfo == null) {
return false;
}
NioSocketChannel nioSocketChannel = clientInfo.getNioSocketChannel();
if (null == nioSocketChannel) {
log.info("writeData连接为空:" + toAppKey + toClientId + "," + msg);
log.info("writeData连接为空:" + msg);
return false;
}
// 判断连接是否断开
if (nioSocketChannel.isShutdown()) {
log.info("writeData连接断开:" + toAppKey + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
log.info("writeData连接断开:" + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
return false;
}
log.info("writeData:" + toAppKey + "," + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg));
channelFuture.addListener(
//执行后回调的方法
(ChannelFutureListener) channelFuture1 -> {
if (log.isDebugEnabled()) {
log.info("netty线程异步执行结果:" + channelFuture1.isDone() + ",业务执行结果:" + channelFuture1.isSuccess()
+ ";\nwriteData:" + toAppKey + toClientId + "," + msg + ",channelId:" + nioSocketChannel.id().asLongText());
}
});
log.info("writeData:" + ",\nchannelId:" + nioSocketChannel.id().asLongText());
nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg));
return true;
}
......
......@@ -3,22 +3,24 @@ package com.wecloud.im.ws.service.impl;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wecloud.im.router.RouterSendService;
import com.wecloud.im.ws.cache.UserCacheService;
import com.wecloud.im.ws.model.WsResponseModel;
import com.wecloud.im.ws.model.WsConstants;
import com.wecloud.im.ws.model.redis.ClientChannelInfo;
import com.wecloud.im.ws.model.request.ReceiveModel;
import com.wecloud.im.ws.service.MangerChannelService;
import com.wecloud.im.ws.service.WriteDataService;
import com.wecloud.im.ws.utils.InitIp;
import io.geekidea.springbootplus.framework.common.api.ApiCode;
import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Description 下发数据
......@@ -35,14 +37,21 @@ public class WriteDataServiceImpl implements WriteDataService {
* 属于io密集型业务
* io密集型任务配置尽可能多的线程数量
*/
private final static ExecutorService WRITE_TASK_THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS * 2, WsConstants.CPU_PROCESSORS * 3,
1L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), WRITE_NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
// private final static ExecutorService WRITE_TASK_THREAD_POOL_EXECUTOR =
// new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS * 2, WsConstants.CPU_PROCESSORS * 3,
// 1L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>(), WRITE_NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
@Autowired
private MangerChannelService mangerChannelService;
@Autowired
private UserCacheService userCacheService;
@Autowired
private RouterSendService routerSendService;
@Override
public void successAndData(ReceiveModel receiveModel, Object data, String toAppKey, String toClientId) {
this.dataAndStatus(receiveModel, ApiCode.SUCCESS, data, toAppKey, toClientId);
......@@ -71,26 +80,67 @@ public class WriteDataServiceImpl implements WriteDataService {
this.write(responseModel, toAppKey, toClientId);
}
// @Override
// public void write(WsResponseModel responseModel, String toAppKey, String toClientId) {
//// WRITE_TASK_THREAD_POOL_EXECUTOR.execute(
//// () -> {
// JsonMapper jsonMapper = new JsonMapper();
// String json = null;
// try {
// json = jsonMapper.writeValueAsString(responseModel);
// } catch (JsonProcessingException e) {
// e.printStackTrace();
// }
// mangerChannelService.writeData(json, toAppKey, toClientId);
// }
// );
//}
@Override
public void write(WsResponseModel responseModel, String toAppKey, String toClientId) {
WRITE_TASK_THREAD_POOL_EXECUTOR.execute(
() -> {
JsonMapper jsonMapper = new JsonMapper();
String msgJson = null;
try {
msgJson = jsonMapper.writeValueAsString(responseModel);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
JsonMapper jsonMapper = new JsonMapper();
String json = null;
List<ClientChannelInfo> channelInfos = userCacheService.getIpByClientIdAndOnline(toAppKey, toClientId);
try {
json = jsonMapper.writeValueAsString(responseModel);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
mangerChannelService.writeData(json, toAppKey, toClientId);
for (ClientChannelInfo channelInfo : channelInfos) {
// 是否为当前机器的ip
if (InitIp.lAN_IP.equals(channelInfo.getLanIp())) {
// 调用本地下发
mangerChannelService.writeDataToChannel(msgJson, channelInfo.getChannelId());
} else {
// rpc调用下发
routerSendService.rpcSend(msgJson, channelInfo.getChannelId());
}
}
}
@Override
public void response(WsResponseModel responseModel, NioSocketChannel nioSocketChannel) {
}
);
JsonMapper jsonMapper = new JsonMapper();
String msgJson = null;
try {
msgJson = jsonMapper.writeValueAsString(responseModel);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
// 本地直接下发
nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msgJson));
}
}
......@@ -27,6 +27,7 @@ import io.geekidea.springbootplus.framework.common.api.ApiCode;
import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.shiro.util.SnowflakeUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -76,6 +77,7 @@ public class ImChatConcrete extends ImCmdAbstract {
@Override
public void process(ReceiveModel receiveModel, ChannelHandlerContext ctx, String data, String appKey, String clientId) throws JsonProcessingException {
NioSocketChannel channel = (NioSocketChannel) ctx.channel();
// 查询imApplication
......@@ -128,15 +130,15 @@ public class ImChatConcrete extends ImCmdAbstract {
// 判断为单聊
if (membersList.size() == 1) {
// 拉黑逻辑
if (black(receiveModel, appKey, clientId, imClientSender, membersList)) {
// 判断是否被拉黑逻辑
if (black(receiveModel, appKey, clientId, imClientSender, membersList, channel)) {
return;
}
}
// 生成消息id
long messageId = SnowflakeUtil.getId();
// 保存消息至消息表
// 入库 保存消息至消息表
ImMessage imMessage = saveImMessage(imApplication, imClientSender, toConversationId, messageId, content);
// 封装响应的实体
......@@ -149,18 +151,9 @@ public class ImChatConcrete extends ImCmdAbstract {
// 遍历发送
for (ImConversationMembers conversationMembers : membersList) {
// 保存收件箱
// 入库 保存收件箱
long imInboxId = SnowflakeUtil.getId();
ImInbox imInbox = new ImInbox();
imInbox.setId(imInboxId);
imInbox.setCreateTime(new Date());
imInbox.setFkAppid(imApplication.getId());
imInbox.setReceiver(conversationMembers.getFkClientId());
imInbox.setFkMsgId(messageId);
imInbox.setReadMsgStatus(0);
imInbox.setReceiverMsgStatus(0);
imInbox.setFkConversationId(toConversationId);
imInboxService.save(imInbox);
saveImInbox(imApplication, toConversationId, messageId, conversationMembers, imInboxId);
// 查询接收方
ImClient imClientReceiver = imClientService.getOne(new QueryWrapper<ImClient>().lambda()
......@@ -180,12 +173,44 @@ public class ImChatConcrete extends ImCmdAbstract {
responseModel.setReqId(null);
writeDataService.write(responseModel, appKey, imClientReceiver.getClientId());
// , PushType.ALERT
// 异步推送系统通知消息
systemPush.push(pushMap, imClientReceiver, imApplication);
}
// 响应发送方消息id等信息
response(receiveModel, messageId, channel);
}
/**
* 入库 保存收件箱
*
* @param imApplication
* @param toConversationId
* @param messageId
* @param conversationMembers
* @param imInboxId
*/
private void saveImInbox(ImApplication imApplication, Long toConversationId, long messageId, ImConversationMembers conversationMembers, long imInboxId) {
ImInbox imInbox = new ImInbox();
imInbox.setId(imInboxId);
imInbox.setCreateTime(new Date());
imInbox.setFkAppid(imApplication.getId());
imInbox.setReceiver(conversationMembers.getFkClientId());
imInbox.setFkMsgId(messageId);
imInbox.setReadMsgStatus(0);
imInbox.setReceiverMsgStatus(0);
imInbox.setFkConversationId(toConversationId);
imInboxService.save(imInbox);
}
/**
* 响应发送方消息id等信息
*
* @param receiveModel
* @param messageId
*/
private void response(ReceiveModel receiveModel, long messageId, NioSocketChannel channel) {
WsResponseModel<HashMap<String, Long>> responseModel = new WsResponseModel<>();
ApiResult<Boolean> result = ApiResult.result(ApiCode.SUCCESS);
responseModel.setCmd(WsResponseCmdEnum.RES.getCmdCode());
......@@ -196,7 +221,8 @@ public class ImChatConcrete extends ImCmdAbstract {
responseModel.setData(stringHashMap);
responseModel.setReqId(receiveModel.getReqId());
// 响应发送方
writeDataService.write(responseModel, appKey, clientId);
// writeDataService.write(responseModel, appKey, clientId);
writeDataService.response(responseModel, channel);
}
private ImClient getClientSender(String clientUniId, ImApplication imApplication) {
......@@ -226,7 +252,17 @@ public class ImChatConcrete extends ImCmdAbstract {
return imMessage;
}
private boolean black(ReceiveModel receiveModel, String appKey, String clientUniId, ImClient imClientSender, List<ImConversationMembers> membersList) {
/**
* 判断是否被拉黑
*
* @param receiveModel
* @param appKey
* @param clientUniId
* @param imClientSender
* @param membersList
* @return
*/
private boolean black(ReceiveModel receiveModel, String appKey, String clientUniId, ImClient imClientSender, List<ImConversationMembers> membersList, NioSocketChannel channel) {
// 判断是否被拉黑
boolean beBlack = imClientBlacklistService.isBeBlack(membersList.get(0).getFkClientId(), imClientSender.getId());
if (beBlack) {
......@@ -239,7 +275,10 @@ public class ImChatConcrete extends ImCmdAbstract {
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
responseModel.setReqId(receiveModel.getReqId());
writeDataService.write(responseModel, appKey, clientUniId);
// writeDataService.write(responseModel, appKey, clientUniId);
writeDataService.response(responseModel, channel);
return true;
}
......@@ -254,7 +293,10 @@ public class ImChatConcrete extends ImCmdAbstract {
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
responseModel.setReqId(receiveModel.getReqId());
writeDataService.write(responseModel, appKey, clientUniId);
// writeDataService.write(responseModel, appKey, clientUniId);
writeDataService.response(responseModel, channel);
return true;
}
return false;
......
......@@ -30,7 +30,7 @@ public class InitIp {
/**
* 内网ip
*/
public static String lAN_IP = "";
public static String lAN_IP;
// 获取机器内网ip
static {
......
......@@ -229,7 +229,7 @@ spring-boot-plus:
# redis校验jwt token是否存在,可选
redis-check: true
# true: 同一个账号只能是最后一次登录token有效,false:同一个账号可多次登录
single-login: true
single-login: false
# 盐值校验,如果不加自定义盐值,则使用secret校验
salt-check: true
############################ JWT end ###############################
......
......@@ -43,7 +43,7 @@ public class JwtToken implements HostAuthenticationToken {
private String host;
/**
* 登录用户ID
* 登录clientId
*/
private String clientId;
......@@ -65,7 +65,7 @@ public class JwtToken implements HostAuthenticationToken {
*/
private Date createDate;
/**
* 多长时间过期,默认一小时
* 多长时间过期
*/
private long expireSecond;
/**
......
......@@ -50,6 +50,9 @@ public interface ShiroLoginService {
JwtTokenRedisVo getTokenInfoForRedis(String token);
/**
* 从redis获取token信息
*/
JwtToken getTJwtTokenForRedis(String token);
/**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment