Commit 4233f56a by lixiaozhong

1、消息路由重新设计

2、内部全部用数据库主键app的Id和client的id,外部才用appKey 和 clientId
3、新增批量send功能
4、万人群功能之收发消息(还差批量push,离线消息的拉取)
parent 652277cd
......@@ -65,8 +65,8 @@ public class RedisCacheConfig extends CachingConfigurerSupport {
// 生成一个默认配置,通过config对象即可对缓存进行自定义配置
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
// 设置缓存的默认过期时间,也是使用Duration设置
// 过期时间5分钟
config = config.entryTtl(Duration.ofMinutes(5));
// 过期时间30分钟
config = config.entryTtl(Duration.ofMinutes(30));
// 设置一个初始化的缓存空间set集合
Set<String> cacheNames = new HashSet<>();
......
......@@ -51,7 +51,7 @@ public class LoginTest {
private String getToken(String timestemp, String clientId, String appKey, String sign) throws Exception {
// 根据appKey从数据库查询密钥
ImApplication imApplication = imApplicationService.getOneByAppKey(appKey);
ImApplication imApplication = imApplicationService.getCacheAppByAppKey(appKey);
// 生成以数据库为准的签名
String mySign = new MD5().digestHex(timestemp + clientId + imApplication.getAppKey() + imApplication.getAppSecret());
......
package com.wecloud.im.controller;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.param.GetClientInfoParam;
import com.wecloud.im.param.GetOnlineStatusParam;
import com.wecloud.im.param.add.ImClientDeviceInfoAdd;
......@@ -12,7 +13,6 @@ import com.wecloud.im.service.ImClientService;
import com.wecloud.im.vo.GetInfoListVo;
import com.wecloud.im.vo.ImOnlineStatusVo;
import com.wecloud.im.ws.cache.UserStateCacheManager;
import com.wecloud.im.ws.manager.ChannelManager;
import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.common.controller.BaseController;
import io.geekidea.springbootplus.framework.core.validator.groups.Add;
......@@ -119,13 +119,14 @@ public class ImClientController extends BaseController {
JwtToken curentJwtToken = JwtUtil.getCurentJwtToken();
// 根据appKey查询appid
ImApplication imApplication = imApplicationService.getOneByAppKey(curentJwtToken.getAppKey());
ImApplication imApplication = imApplicationService.getCacheAppByAppKey(curentJwtToken.getAppKey());
ArrayList<ImOnlineStatusVo> imOnlineStatusVos = new ArrayList<ImOnlineStatusVo>();
for (String clientId : getOnlineStatusParam.getClientIds()) {
ImOnlineStatusVo imOnlineStatusVo = new ImOnlineStatusVo();
// todo 需要改成批量
imOnlineStatusVo.setStatus(userStateCacheManager.isOnline(imApplication.getAppKey(), clientId));
ImClient client = imClientService.getCacheImClient(imApplication.getId(), clientId);
imOnlineStatusVo.setStatus(userStateCacheManager.isOnline(client.getId()));
imOnlineStatusVo.setClientId(clientId);
imOnlineStatusVos.add(imOnlineStatusVo);
}
......
......@@ -51,7 +51,7 @@ public class ApiImConversationMembersController extends BaseController {
// 根据appKey从数据库查询密钥
ImApplication imApplication = imApplicationService.getOneByAppKey(appkey);
ImApplication imApplication = imApplicationService.getCacheAppByAppKey(appkey);
if (imApplication == null) {
return ApiResult.result(ApiCode.FAIL, null);
......
......@@ -44,7 +44,7 @@ public class ApiImMessageController extends BaseController {
// 根据appKey从数据库查询密钥
ImApplication imApplication = imApplicationService.getOneByAppKey(appkey);
ImApplication imApplication = imApplicationService.getCacheAppByAppKey(appkey);
if (imApplication == null) {
return ApiResult.result(ApiCode.FAIL, null);
......
......@@ -22,7 +22,7 @@ public class BusinessThreadPool {
*/
public static final ExecutorService BUSINESS_TASK_THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS, WsConstants.CPU_PROCESSORS * 2,
60L, TimeUnit.MILLISECONDS,
1000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024 * 2), BUSINESS_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
}
package com.wecloud.im.executor;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.wecloud.im.ws.model.WsConstants;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 业务处理线程池
*/
public class SendMsgThreadPool {
private static final ThreadFactory SEND_MSG_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNamePrefix("-sendMsg-").build();
/**
* 业务处理线程池
*/
public static final ExecutorService SEND_MSG_THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS, WsConstants.CPU_PROCESSORS * 3,
1000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(102400), SEND_MSG_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
}
......@@ -28,7 +28,7 @@ public interface ImClientMapper extends BaseMapper<ImClient> {
* @param id
* @return
*/
ImClientQueryVo getImClientById(Serializable id);
ImClient getImClientById(Serializable id);
/**
* 获取分页对象
......
......@@ -46,4 +46,5 @@ public interface ImConversationMembersMapper extends BaseMapper<ImConversationMe
*/
IPage<ImConversationMembersQueryVo> getImConversationMembersPageList(@Param("page") Page page, @Param("param") ImConversationMembersPageParam imConversationMembersPageParam);
List<Long> findThousandGroupsByClientId(@Param("clientId") Long clientId);
}
......@@ -53,8 +53,7 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
}
private void execute(ChannelHandlerContext ctx, String data) {
String appKey = ctx.channel().attr(ChannelManager.APP_KEY).get();
String clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
Long clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
try {
if (PING.equals(data)) {
......@@ -68,17 +67,17 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
return;
}
AbstractImCmdStrategy.process(appKey, clientId, ctx, data);
AbstractImCmdStrategy.process(clientId, ctx, data);
} catch (Exception e) {
log.error("系统繁忙data:" + data + ",appKey:" + appKey + ",clientId:" + clientId +
log.error("系统繁忙data:" + data + ",clientId:" + clientId +
",channelId:" + ctx.channel().id().asLongText(), e);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
String clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
Long clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
//读超时计时器
Integer readIdleTimes = ctx.channel().attr(ChannelManager.READ_IDLE_TIMES).get();
......@@ -116,9 +115,8 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
if(log.isInfoEnabled()) {
String appKey = ctx.channel().attr(ChannelManager.APP_KEY).get();
String clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
log.info("HandlerAdded. uid is APP_KEY:{},CLIENT_ID:{}, channelId is {}", appKey, clientId, ctx.channel().id().asLongText());
Long clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
log.info("HandlerAdded. CLIENT_ID:{}, channelId is {}", clientId, ctx.channel().id().asLongText());
}
}
......@@ -128,13 +126,12 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
String appKey = ctx.channel().attr(ChannelManager.APP_KEY).get();
String clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
Long clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
Integer platform = ctx.channel().attr(ChannelManager.PLATFORM).get();
log.info("uid is APP_KEY:{},CLIENT_ID:{}, handlerRemoved. channelId is {}", appKey, clientId, ctx.channel().id().asLongText());
log.info("CLIENT_ID:{}, handlerRemoved. channelId is {}", clientId, ctx.channel().id().asLongText());
// 关掉连接
channelManager.offline(appKey, clientId, platform, ctx);
channelManager.offline(clientId, platform, ctx);
}
......
......@@ -3,9 +3,13 @@ package com.wecloud.im.netty.handler;
import com.alibaba.fastjson.JSONObject;
import com.auth0.jwt.interfaces.DecodedJWT;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.netty.core.WsReadHandler;
import com.wecloud.im.ws.model.WsConstants;
import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientService;
import com.wecloud.im.ws.manager.ChannelManager;
import com.wecloud.im.ws.model.WsConstants;
import com.wecloud.im.ws.utils.FullHttpRequestUtils;
import com.wecloud.utils.JsonUtils;
import io.geekidea.springbootplus.config.constant.CommonConstant;
......@@ -46,6 +50,12 @@ public class NettyApiRequest {
@Resource
private WsReadHandler appImReadHandler;
@Autowired
private ImClientService imClientService;
@Autowired
private ImApplicationService imApplicationService;
/**
* http请求接收
*
......@@ -100,22 +110,22 @@ public class NettyApiRequest {
payload = new String(decoder.decode(payload), StandardCharsets.UTF_8);
JSONObject jsonObject = JSONObject.parseObject(payload);
String appKey = (String) jsonObject.get(CommonConstant.APP_KEY);
String clientId = (String) jsonObject.get(CommonConstant.CLIENT_ID);
String outClientId = (String) jsonObject.get(CommonConstant.CLIENT_ID);
// 从redis获取jwt的token 验签token
JwtToken jwtToken = shiroLoginService.getJwtTokenForRedis(token);
if (jwtToken == null) {
log.info("jwtToken == null ,token和redis不一致, clientId:" + clientId + ",token:" + token);
log.info("jwtToken == null ,token和redis不一致, outClientId:" + outClientId + ",token:" + token);
String context = JsonUtils.encodeJson(ApiResult.result(ApiCode.FAIL, "jwtToken == null ,token和redis不一致", (Object) null));
FullHttpRequestUtils.send(ctx, context, HttpResponseStatus.OK);
return;
}
if ((!jwtToken.getClientId().equals(clientId)) || (!jwtToken.getAppKey().equals(appKey))) {
log.info("clientId appKey 不一致");
String context = JsonUtils.encodeJson(ApiResult.result(ApiCode.FAIL, "clientId appKey 不一致", (Object) null));
if ((!jwtToken.getClientId().equals(outClientId)) || (!jwtToken.getAppKey().equals(appKey))) {
log.info("outClientId appKey 不一致");
String context = JsonUtils.encodeJson(ApiResult.result(ApiCode.FAIL, "outClientId appKey 不一致", (Object) null));
FullHttpRequestUtils.send(ctx, context, HttpResponseStatus.OK);
return;
}
......@@ -126,17 +136,24 @@ public class NettyApiRequest {
// 保持当前连接
ctx.fireChannelRead(httpRequest.retain());
// 添加长连接handler
ctx.pipeline().addLast("appImHandler", appImReadHandler);
ImApplication app = imApplicationService.getCacheAppByAppKey(appKey);
if (app == null) {
log.warn("根据appKey: {} 查找不到 imApplication!", appKey);
return;
}
ImClient client = imClientService.getCacheImClient(app.getId(), outClientId);
// 设置属性值 userid - channel
ctx.channel().attr(ChannelManager.APP_KEY).set(appKey);
ctx.channel().attr(ChannelManager.CLIENT_ID).set(clientId);
ctx.channel().attr(ChannelManager.CLIENT_ID).set(client.getId());
ctx.channel().attr(ChannelManager.PLATFORM).set(jwtToken.getPlatform());
ctx.channel().attr(ChannelManager.READ_IDLE_TIMES).set(0);// 读空闲的计数=0
// 添加长连接handler
ctx.pipeline().addLast("appImHandler", appImReadHandler);
// 保存用户上下文对象
appUserChannelsService.online(appKey, clientId, jwtToken.getPlatform(), (NioSocketChannel) ctx.channel());
appUserChannelsService.online(client.getId(), jwtToken.getPlatform(), (NioSocketChannel) ctx.channel());
//移除当前api处理handler, 不再参与长连接处理
ctx.pipeline().remove("SingleHttpRequestHandler");
......
package com.wecloud.im.router;
import java.util.List;
/**
* 路由找到当前client的远程地址
*/
......@@ -7,11 +9,17 @@ public interface RouterSendService {
/**
* 通过rpc调用发送,解决channel不在本机时调用
* @param appKey
* @param clientId
* @param platform
* @param msg
*/
void sendMsgRemote(String appKey, String clientId, Integer platform, String msg);
void sendMsgRemote(Long clientId, Integer platform, String msg);
/**
* 通过rpc调用 批量发送,解决channel不在本机时调用
* @param sendRouterKeys
* @param msg
*/
void batchSendMsgRemote(List<String> sendRouterKeys, String msg);
}
package com.wecloud.im.router;
import com.wecloud.im.ws.manager.ChannelManager;
import com.wecloud.im.ws.sender.ChannelSender;
import org.apache.dubbo.config.annotation.DubboService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@DubboService
public class RouterSendServiceImpl implements RouterSendService {
......@@ -14,8 +15,12 @@ public class RouterSendServiceImpl implements RouterSendService {
private ChannelSender channelSender;
@Override
public void sendMsgRemote(String appKey, String clientId, Integer platform, String msg) {
channelSender.sendMsgLocal(appKey, clientId, platform, msg);
public void sendMsgRemote(Long clientId, Integer platform, String msg) {
channelSender.sendMsgLocal(clientId, platform, msg);
}
@Override
public void batchSendMsgRemote(List<String> sendRouterKeys, String msg) {
channelSender.batchSendMsgLocal(sendRouterKeys, msg);
}
}
......@@ -21,7 +21,7 @@ public interface ImApplicationService extends BaseService<ImApplication> {
* @param appKey
* @return
*/
ImApplication getOneByAppKey(String appKey);
ImApplication getCacheAppByAppKey(String appKey);
/**
* 保存
......
......@@ -79,15 +79,6 @@ public interface ImClientService extends BaseService<ImClient> {
boolean deleteImClient(Long id) throws Exception;
/**
* 根据ID获取查询对象
*
* @param id
* @return
* @throws Exception
*/
ImClientQueryVo getImClientById(Long id) throws Exception;
/**
* 移除旧的设备token
*
* @param appId
......@@ -117,4 +108,13 @@ public interface ImClientService extends BaseService<ImClient> {
void deleteCacheImClient(Long applicationId, String clientId);
/**
* 根据ID获取client对象,有缓存
*
* @param id
* @return
* @throws Exception
*/
ImClient getCacheImClient(Long id);
}
......@@ -94,4 +94,11 @@ public interface ImConversationMembersService extends BaseService<ImConversation
* @throws Exception
*/
Paging<ImConversationMembersQueryVo> getImConversationMembersPageList(ImConversationMembersPageParam imConversationMembersPageParam) throws Exception;
/**
* 根据用户id 获取千人群id列表
* @param clientId 用户id
* @return
*/
List<Long> findThousandGroupsByClientId(Long clientId);
}
......@@ -92,7 +92,7 @@ public interface ImConversationService extends BaseService<ImConversation> {
* @return
* @throws Exception
*/
boolean updateDisplayConversation(ImConversationDisplayUpdate imConversationDisplayUpdate) throws Exception;
boolean updateDisplayConversation(ImConversationDisplayUpdate imConversationDisplayUpdate);
/**
* 根据ID获取查询对象
......@@ -101,7 +101,7 @@ public interface ImConversationService extends BaseService<ImConversation> {
* @return
* @throws Exception
*/
ImConversationQueryVo getImConversationById(Long id) throws Exception;
ImConversationQueryVo getImConversationById(Long id);
/**
* 获取分页对象
......
......@@ -35,7 +35,7 @@ public class ImApplicationServiceImpl extends BaseServiceImpl<ImApplicationMappe
@Override
@Cacheable(key = "#p0")
public ImApplication getOneByAppKey(String appKey) {
public ImApplication getCacheAppByAppKey(String appKey) {
ImApplication imApplication = this.getOne(
new QueryWrapper<ImApplication>().lambda()
.eq(ImApplication::getAppKey, appKey)
......
......@@ -61,7 +61,7 @@ public class ImClientLoginServiceImpl implements ImClientLoginService {
public ApiResult<TokenVo> verifySign(ImTokenVerify imTokenVerify) throws NacosException {
// 根据appKey从数据库查询密钥
ImApplication imApplication = imApplicationService.getOneByAppKey(imTokenVerify.getAppKey());
ImApplication imApplication = imApplicationService.getCacheAppByAppKey(imTokenVerify.getAppKey());
if (imApplication == null) {
log.info("imApplication == null,getAppKey:" + imTokenVerify.getAppKey());
......
......@@ -150,7 +150,7 @@ public class ImClientServiceImpl extends BaseServiceImpl<ImClientMapper, ImClien
// shiro线程中获取当前token
JwtToken curentJwtToken = JwtUtil.getCurentJwtToken();
// 根据appKey查询appid
ImApplication imApplication = imApplicationService.getOneByAppKey(curentJwtToken.getAppKey());
ImApplication imApplication = imApplicationService.getCacheAppByAppKey(curentJwtToken.getAppKey());
// 清除旧client的redis缓存
ImClient imClient = this.getOne(new QueryWrapper<ImClient>().lambda()
......@@ -199,7 +199,6 @@ public class ImClientServiceImpl extends BaseServiceImpl<ImClientMapper, ImClien
return super.updateById(imClient);
}
@Transactional(rollbackFor = Exception.class)
@Override
public boolean deleteImClient(Long id) throws Exception {
return super.removeById(id);
......@@ -213,11 +212,6 @@ public class ImClientServiceImpl extends BaseServiceImpl<ImClientMapper, ImClien
}
@Override
public ImClientQueryVo getImClientById(Long id) throws Exception {
return imClientMapper.getImClientById(id);
}
@Override
public Paging<ImClientQueryVo> getImClientPageList(ImClientPageParam imClientPageParam) throws Exception {
Page<ImClientQueryVo> page = new PageInfo<>(imClientPageParam, OrderItem.desc(getLambdaColumn(ImClient::getCreateTime)));
IPage<ImClientQueryVo> iPage = imClientMapper.getImClientPageList(page, imClientPageParam);
......@@ -230,7 +224,7 @@ public class ImClientServiceImpl extends BaseServiceImpl<ImClientMapper, ImClien
// shiro线程中获取当前token
JwtToken curentJwtToken = JwtUtil.getCurentJwtToken();
// 根据appKey查询appid
ImApplication imApplication = imApplicationService.getOneByAppKey(curentJwtToken.getAppKey());
ImApplication imApplication = imApplicationService.getCacheAppByAppKey(curentJwtToken.getAppKey());
return getCacheImClient(imApplication.getId(), curentJwtToken.getClientId());
}
......@@ -247,4 +241,10 @@ public class ImClientServiceImpl extends BaseServiceImpl<ImClientMapper, ImClien
public void deleteCacheImClient(Long applicationId, String clientId) {
}
@Override
@Cacheable(key = "#p0")
public ImClient getCacheImClient(Long id) {
return imClientMapper.selectById(id);
}
}
......@@ -85,7 +85,7 @@ public class ImConversationMembersServiceImpl extends BaseServiceImpl<ImConversa
// shiro线程中获取当前token
JwtToken curentJwtToken = JwtUtil.getCurentJwtToken();
// 根据appKey查询application
ImApplication imApplication = imApplicationService.getOneByAppKey(curentJwtToken.getAppKey());
ImApplication imApplication = imApplicationService.getCacheAppByAppKey(curentJwtToken.getAppKey());
ImClient imClientSender = imClientService.getCurentClient();
......@@ -141,7 +141,7 @@ public class ImConversationMembersServiceImpl extends BaseServiceImpl<ImConversa
// shiro线程中获取当前token
JwtToken curentJwtToken = JwtUtil.getCurentJwtToken();
// 根据appKey查询application
ImApplication imApplication = imApplicationService.getOneByAppKey(curentJwtToken.getAppKey());
ImApplication imApplication = imApplicationService.getCacheAppByAppKey(curentJwtToken.getAppKey());
ImClient imClientSender = imClientService.getCurentClient();
......@@ -214,4 +214,9 @@ public class ImConversationMembersServiceImpl extends BaseServiceImpl<ImConversa
return new Paging<ImConversationMembersQueryVo>(iPage);
}
@Override
public List<Long> findThousandGroupsByClientId(Long clientId) {
return imConversationMembersMapper.findThousandGroupsByClientId(clientId);
}
}
......@@ -115,7 +115,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
// shiro线程中获取当前token
JwtToken curentJwtToken = JwtUtil.getCurentJwtToken();
// 根据appKey查询application
ImApplication imApplication = imApplicationService.getOneByAppKey(curentJwtToken.getAppKey());
ImApplication imApplication = imApplicationService.getCacheAppByAppKey(curentJwtToken.getAppKey());
// 该应用 是否允许创建重复一对一会话 0不允许 1允许
if (imApplication.getRepeatSessionStatus() != null && imApplication.getRepeatSessionStatus() == 0) {
......@@ -227,7 +227,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
// shiro线程中获取当前token
JwtToken curentJwtToken = JwtUtil.getCurentJwtToken();
// 根据appKey查询application
ImApplication imApplication = imApplicationService.getOneByAppKey(curentJwtToken.getAppKey());
ImApplication imApplication = imApplicationService.getCacheAppByAppKey(curentJwtToken.getAppKey());
ImClient imClientSender = imClientService.getCurentClient();
......@@ -331,7 +331,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
responseModel.setMsg(result.getMessage());
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, imApplication.getAppKey(), imClientReceiver.getClientId());
channelSender.sendMsg(responseModel, imClientReceiver.getId());
}
......@@ -356,7 +356,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
// shiro线程中获取当前token
JwtToken curentJwtToken = JwtUtil.getCurentJwtToken();
// 根据appKey查询application
ImApplication imApplication = imApplicationService.getOneByAppKey(curentJwtToken.getAppKey());
ImApplication imApplication = imApplicationService.getCacheAppByAppKey(curentJwtToken.getAppKey());
ImClient imClientSender = imClientService.getCurentClient();
......@@ -454,7 +454,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
responseModel.setMsg(result.getMessage());
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, imApplication.getAppKey(), imClientReceiver.getClientId());
channelSender.sendMsg(responseModel, imClientReceiver.getId());
}
......@@ -474,7 +474,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
// shiro线程中获取当前token
JwtToken curentJwtToken = JwtUtil.getCurentJwtToken();
// 根据appKey查询application
ImApplication imApplication = imApplicationService.getOneByAppKey(curentJwtToken.getAppKey());
ImApplication imApplication = imApplicationService.getCacheAppByAppKey(curentJwtToken.getAppKey());
ImClient imClientSender = imClientService.getCurentClient();
......@@ -568,7 +568,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
responseModel.setMsg(result.getMessage());
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, imApplication.getAppKey(), imClientReceiver.getClientId());
channelSender.sendMsg(responseModel, imClientReceiver.getId());
}
......@@ -594,7 +594,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
// shiro线程中获取当前token
JwtToken curentJwtToken = JwtUtil.getCurentJwtToken();
// 根据appKey查询application
ImApplication imApplication = imApplicationService.getOneByAppKey(curentJwtToken.getAppKey());
ImApplication imApplication = imApplicationService.getCacheAppByAppKey(curentJwtToken.getAppKey());
ImClient imClientSender = imClientService.getCurentClient();
......@@ -655,7 +655,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
responseModel.setMsg(result.getMessage());
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, imApplication.getAppKey(), imClientReceiver.getClientId());
channelSender.sendMsg(responseModel, imClientReceiver.getId());
}
return ApiResult.ok();
......@@ -674,7 +674,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
// shiro线程中获取当前token
JwtToken curentJwtToken = JwtUtil.getCurentJwtToken();
// 根据appKey查询application
ImApplication imApplication = imApplicationService.getOneByAppKey(curentJwtToken.getAppKey());
ImApplication imApplication = imApplicationService.getCacheAppByAppKey(curentJwtToken.getAppKey());
ImClient imClientSender = imClientService.getCurentClient();
......@@ -736,7 +736,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
responseModel.setMsg(result.getMessage());
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, imApplication.getAppKey(), imClientReceiver.getClientId());
channelSender.sendMsg(responseModel, imClientReceiver.getId());
}
......@@ -760,11 +760,10 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
* 是否单向隐藏会话","云端聊天记录不删除;假设有A和B两个用户,A删会话,B还能发; 如果B发了消息,A这边要重新把会话显示出来,并能显示之前的聊天记录"
*
* @return
* @throws Exception
*/
@Transactional(rollbackFor = Exception.class)
@Override
public boolean updateDisplayConversation(ImConversationDisplayUpdate imConversationDisplayUpdate) throws Exception {
public boolean updateDisplayConversation(ImConversationDisplayUpdate imConversationDisplayUpdate) {
ImClient curentClient = imClientService.getCurentClient();
for (Long id : imConversationDisplayUpdate.getConversationIds()) {
// 修改为删除隐藏状态
......@@ -779,7 +778,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
}
@Override
public ImConversationQueryVo getImConversationById(Long id) throws Exception {
public ImConversationQueryVo getImConversationById(Long id) {
return imConversationMapper.getImConversationById(id);
}
......
......@@ -213,7 +213,7 @@ public class ImInboxServiceImpl extends BaseServiceImpl<ImInboxMapper, ImInbox>
responseModel.setMsg(result.getMessage());
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, application.getAppKey(), imClientReceiver.getClientId());
channelSender.sendMsg(responseModel, imClientReceiver.getId());
}
}
}
......
......@@ -123,7 +123,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
responseModel.setReqId(null);
// 向接收方推送
channelSender.sendMsg(responseModel, imApplication.getAppKey(), imClientReceiver.getClientId());
channelSender.sendMsg(responseModel, imClientReceiver.getId());
}
......@@ -234,7 +234,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
responseModel.setMsg(result.getMessage());
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, imApplication.getAppKey(), imClientReceiver.getClientId());
channelSender.sendMsg(responseModel, imClientReceiver.getId());
// 获取自定义推送字段
PushVO pushVO = imMsgRecall.getPush();
......
package com.wecloud.im.thousandchat.cache;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.service.ImClientService;
import com.wecloud.im.service.ImConversationMembersService;
import com.wecloud.im.ws.cache.UserStateListener;
import com.wecloud.im.ws.utils.InitIp;
import com.wecloud.im.ws.utils.RedisUtils;
......@@ -7,7 +11,9 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Set;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 千人群的缓存(本地缓存待实现)
......@@ -19,20 +25,40 @@ import java.util.Set;
public class GroupCacheManager extends UserStateListener {
/**
* 在线状态
* redis的群键 key
*/
public static final Integer ONLINE = 1;
public static final String GROUP_KEY = "gp:";
@Autowired
private RedisUtils redisUtils;
@Autowired
private ImConversationMembersService imConversationMembersService;
@Override
public void onLineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
public void onLineEvent(Long clientId, Integer platform, String longChannelId) {
List<Long> thousandGroupIds = imConversationMembersService.findThousandGroupsByClientId(clientId);
for(Long thousandGroupId : thousandGroupIds) {
redisUtils.hashset(GROUP_KEY + thousandGroupId, clientId + RedisUtils.SPLIT + platform, InitIp.lAN_IP,
150, TimeUnit.DAYS);
}
}
@Override
public void offlineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
public void offlineEvent(Long clientId, Integer platform, String longChannelId) {
List<Long> thousandGroupIds = imConversationMembersService.findThousandGroupsByClientId(clientId);
for(Long thousandGroupId : thousandGroupIds) {
redisUtils.hashdel(GROUP_KEY + thousandGroupId, clientId + RedisUtils.SPLIT + platform);
}
}
/**
* 根据群ID 获取 万人群的 在线成员的key-val
* @param conversionId
* @return 在线成员的key-val,其中key是 client的主键id:platform, val是 ip
*/
public Map<String, String> findOnlineClientsByThousandGroupId(Long conversionId) {
return redisUtils.hashgetll(GROUP_KEY + conversionId);
}
......
......@@ -3,17 +3,11 @@ package com.wecloud.im.ws.cache;
import com.wecloud.im.ws.model.redis.ClientChannelInfo;
import com.wecloud.im.ws.utils.InitIp;
import com.wecloud.im.ws.utils.RedisUtils;
import com.wecloud.im.ws.utils.SpringBeanUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
......@@ -32,59 +26,41 @@ public class UserStateCacheManager extends UserStateListener {
* 值为set集合,netty的channel的id
*/
private static final String CLIENTS = "cis:";
/**
* 设备类型 web,ios,android,ios,other
*/
private static final String DEVICE_TYPE = "ty";
/**
* 用户公网ip,在公网部署集群需要用到
*/
private static final String PUBLIC_IP = "pip";
/**
* 推送token
*/
private static final String DEVICE_TOKEN = "dt";
/**
* 设备ID
*/
private static final String DEVICE_ID = "di";
@Autowired
private RedisUtils redisUtils;
@Override
public void onLineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
public void onLineEvent(Long clientId, Integer platform, String longChannelId) {
log.info("ws用户上线保存redis连接ip: {}, uid: {}", InitIp.lAN_IP, longChannelId);
// 先删除旧的重复的platform
Set<String> platformAndIps = redisUtils.getForSetMembers(getUserStateCacheKey(appKey, clientId));
Set<String> platformAndIps = redisUtils.getForSetMembers(getUserStateCacheKey(clientId));
for(String platformAndIp : platformAndIps) {
String[] split = platformAndIp.split(RedisUtils.SPLIT);
String innerPlatform = split[0];
if(innerPlatform.equals(String.valueOf(platform))) {
redisUtils.removeForSet(getUserStateCacheKey(appKey, clientId), platformAndIp);
redisUtils.removeForSet(getUserStateCacheKey(clientId), platformAndIp);
}
}
redisUtils.addForSet(getUserStateCacheKey(appKey, clientId), platform + RedisUtils.SPLIT + InitIp.lAN_IP, 10, TimeUnit.DAYS);
redisUtils.addForSet(getUserStateCacheKey(clientId), platform + RedisUtils.SPLIT + InitIp.lAN_IP, 10, TimeUnit.DAYS);
}
@Override
public void offlineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
public void offlineEvent(Long clientId, Integer platform, String longChannelId) {
log.info("ws用户离线删除redis key,uid:" + longChannelId);
redisUtils.removeForSet(getUserStateCacheKey(appKey, clientId), platform + RedisUtils.SPLIT + InitIp.lAN_IP);
redisUtils.removeForSet(getUserStateCacheKey(clientId), platform + RedisUtils.SPLIT + InitIp.lAN_IP);
}
/**
* 根据clientId获取在线用户信息
* @param appKey
* @param clientId
* @return
*/
public List<ClientChannelInfo> findOnlineInfosByClientId(String appKey, String clientId) {
public List<ClientChannelInfo> findOnlineInfosByClientId(Long clientId) {
// 获取所有 CLIENTS的 <platform>:<ip>
Set<String> platformAndIs = redisUtils.getForSetMembers(getUserStateCacheKey(appKey, clientId));
Set<String> platformAndIs = redisUtils.getForSetMembers(getUserStateCacheKey(clientId));
ArrayList<ClientChannelInfo> clientChannelInfos = new ArrayList<>();
for(String platformAndIp : platformAndIs) {
......@@ -100,17 +76,16 @@ public class UserStateCacheManager extends UserStateListener {
/**
* 判断用户是否在线
* @param appKey
* @param clientId
* @return true表示在线,false表示离线
*/
public boolean isOnline(String appKey, String clientId) {
Set<String> platformAndIs = redisUtils.getForSetMembers(getUserStateCacheKey(appKey, clientId));
public boolean isOnline(Long clientId) {
Set<String> platformAndIs = redisUtils.getForSetMembers(getUserStateCacheKey(clientId));
return platformAndIs.size() > 0;
}
private String getUserStateCacheKey(String appKey, String clientId) {
return CLIENTS + appKey + RedisUtils.SPLIT + clientId;
private String getUserStateCacheKey(Long clientId) {
return CLIENTS + clientId;
}
}
package com.wecloud.im.ws.cache;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.ws.utils.SpringBeanUtils;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
......@@ -27,19 +28,19 @@ public abstract class UserStateListener {
listeners.add(listener);
}
public static void triggerOnlineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
public static void triggerOnlineEvent(Long clientId, Integer platform, String longChannelId) {
for(UserStateListener listener: listeners) {
listener.onLineEvent(appKey, clientId, platform, longChannelId);
listener.onLineEvent(clientId, platform, longChannelId);
}
}
public static void triggerOfflineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
public static void triggerOfflineEvent(Long clientId, Integer platform, String longChannelId) {
for(UserStateListener listener: listeners) {
listener.offlineEvent(appKey, clientId, platform, longChannelId);
listener.offlineEvent(clientId, platform, longChannelId);
}
}
public abstract void onLineEvent(String appKey, String clientId, Integer platform, String longChannelId);
public abstract void onLineEvent(Long clientId, Integer platform, String longChannelId);
public abstract void offlineEvent(String appKey, String clientId, Integer platform, String longChannelId);
public abstract void offlineEvent(Long clientId, Integer platform, String longChannelId);
}
......@@ -8,9 +8,9 @@ package com.wecloud.im.ws.enums;
public enum WsRequestCmdEnum {
/**
* 数据
* 普通聊天
*/
DATA(1),
NORMAL_CHAT(1),
/**
* ping
......
package com.wecloud.im.ws.manager;
import com.wecloud.im.ws.cache.UserStateCacheManager;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.ws.cache.UserStateListener;
import com.wecloud.im.ws.model.ClientInfo;
import com.wecloud.im.ws.model.redis.ClientChannelInfo;
import com.wecloud.im.ws.utils.RedisUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
......@@ -33,12 +32,7 @@ public class ChannelManager {
/**
* CLIENT_ID,是客户端的字符串id
*/
public static final AttributeKey<String> CLIENT_ID = AttributeKey.valueOf("ci");
/**
* 是app的字符串id
*/
public static final AttributeKey<String> APP_KEY = AttributeKey.valueOf("ak");
public static final AttributeKey<Long> CLIENT_ID = AttributeKey.valueOf("ci");
public static final AttributeKey<Integer> READ_IDLE_TIMES = AttributeKey.valueOf("readIdleTimes");
......@@ -58,28 +52,25 @@ public class ChannelManager {
/**
* client上线
* userID绑定channel
*
* @param appKey
* @param clientId
* @param platform
* @param channel
*/
public void online(String appKey, String clientId, Integer platform, NioSocketChannel channel) {
public void online(Long clientId, Integer platform, NioSocketChannel channel) {
String longChannelId = channel.id().asLongText();
this.putSessionInfoMap(appKey, clientId, platform, channel);
this.putSessionInfoMap(clientId, platform, channel);
UserStateListener.triggerOnlineEvent(appKey, clientId, platform, longChannelId);
UserStateListener.triggerOnlineEvent(clientId, platform, longChannelId);
}
/**
* 下线移除channel
*
* @param appKey
* @param clientId
* @param platform
* @param channelHandlerContext
*/
public void offline(String appKey, String clientId, Integer platform, ChannelHandlerContext channelHandlerContext) {
public void offline(Long clientId, Integer platform, ChannelHandlerContext channelHandlerContext) {
String longChannelId = channelHandlerContext.channel().id().asLongText();
......@@ -88,26 +79,26 @@ public class ChannelManager {
// 移除本地维护的channel
delSessionInfoMap(appKey, clientId, platform);
delSessionInfoMap(clientId, platform);
UserStateListener.triggerOfflineEvent(appKey, clientId, platform, longChannelId);
UserStateListener.triggerOfflineEvent(clientId, platform, longChannelId);
}
public static String genKeyForSessionInfoMap(String appKey, String clientId, Integer platform) {
return new StringBuilder(appKey).append("-").append(clientId).append("-").append(platform).toString();
public static String genKeyForSessionInfoMap(Long clientId, Integer platform) {
return clientId + RedisUtils.SPLIT + platform;
}
private void putSessionInfoMap(String appKey, String clientId, Integer platform, NioSocketChannel channel) {
private void putSessionInfoMap(Long clientId, Integer platform, NioSocketChannel channel) {
ClientInfo clientInfo = new ClientInfo();
clientInfo.setDeviceId("");
clientInfo.setNioSocketChannel(channel);
clientInfo.setToken("");
ChannelManager.SESSION_INFO_MAP.put(genKeyForSessionInfoMap(appKey, clientId, platform), clientInfo);
ChannelManager.SESSION_INFO_MAP.put(genKeyForSessionInfoMap(clientId, platform), clientInfo);
}
private void delSessionInfoMap(String appKey, String clientId, Integer platform) {
ChannelManager.SESSION_INFO_MAP.remove(genKeyForSessionInfoMap(appKey, clientId, platform));
private void delSessionInfoMap(Long clientId, Integer platform) {
ChannelManager.SESSION_INFO_MAP.remove(genKeyForSessionInfoMap(clientId, platform));
}
}
package com.wecloud.im.ws.sender;
import com.wecloud.im.executor.SendMsgThreadPool;
import com.wecloud.im.router.RouterSendService;
import com.wecloud.im.ws.cache.UserStateCacheManager;
import com.wecloud.im.ws.model.ClientInfo;
......@@ -8,6 +9,7 @@ import com.wecloud.im.ws.model.redis.ClientChannelInfo;
import com.wecloud.im.ws.model.request.ReceiveVO;
import com.wecloud.im.ws.manager.ChannelManager;
import com.wecloud.im.ws.utils.InitIp;
import com.wecloud.im.ws.utils.RedisUtils;
import com.wecloud.utils.JsonUtils;
import io.geekidea.springbootplus.framework.common.api.ApiCode;
import io.geekidea.springbootplus.framework.common.api.ApiResult;
......@@ -45,11 +47,10 @@ public class ChannelSender {
*
* @param receiveVO
* @param data
* @param toAppKey
* @param toClientId
*/
public void sendMsgData(ReceiveVO receiveVO, Object data, String toAppKey, String toClientId) {
this.sendMsg(receiveVO, ApiCode.SUCCESS, data, toAppKey, toClientId);
public void sendMsgData(ReceiveVO receiveVO, Object data, Long toClientId) {
this.sendMsg(receiveVO, ApiCode.SUCCESS, data, toClientId);
}
/**
......@@ -57,21 +58,19 @@ public class ChannelSender {
*
* @param receiveVO
* @param apiCode
* @param toAppKey
* @param toClientId
*/
public void sendMsgSucess(ReceiveVO receiveVO, ApiCode apiCode, String toAppKey, String toClientId) {
this.sendMsg(receiveVO, apiCode, new HashMap<>(1), toAppKey, toClientId);
public void sendMsgSucess(ReceiveVO receiveVO, ApiCode apiCode, Long toClientId) {
this.sendMsg(receiveVO, apiCode, new HashMap<>(1), toClientId);
}
/**
* 固定"参数错误"状态码 无data
*
* @param receiveVO
* @param toAppKey
* @param toClientId
*/
public void sendMsgIllegeArgs(ReceiveVO receiveVO, String toAppKey, String toClientId) {
public void sendMsgIllegeArgs(ReceiveVO receiveVO, Long toClientId) {
// this.nullDataSuccess(requestModel, ResultStatus.PARAM_ERROR, userId);
}
......@@ -82,7 +81,7 @@ public class ChannelSender {
* @param receiveVO
* @param data
*/
public void sendMsg(ReceiveVO receiveVO, ApiCode apiCode, Object data, String toAppKey, String toClientId) {
public void sendMsg(ReceiveVO receiveVO, ApiCode apiCode, Object data, Long toClientId) {
ApiResult<Boolean> apiResult = ApiResult.result(apiCode);
WsResponseModel responseModel = new WsResponseModel();
responseModel.setMsg(apiResult.getMessage());
......@@ -90,22 +89,37 @@ public class ChannelSender {
responseModel.setReqId(receiveVO.getReqId());
responseModel.setData(data);
responseModel.setCode(apiResult.getCode());
this.sendMsg(responseModel, toAppKey, toClientId);
this.sendMsg(responseModel, toClientId);
}
/**
* 批量发消息
* @param responseModel
* @param toIp
* @param toClientIdAndPlatforms
*/
public void batchSendMsg(WsResponseModel responseModel, String toIp, List<String> toClientIdAndPlatforms) {
// 是否为当前机器的ip
if (InitIp.lAN_IP.equals(toIp)) {
String msgJson = JsonUtils.encodeJson(responseModel);
batchSendMsgLocal(toClientIdAndPlatforms, msgJson);
} else {
String msgJson = JsonUtils.encodeJson(responseModel);
routerSendService.batchSendMsgRemote(toClientIdAndPlatforms, msgJson);
}
}
/**
* 调用ws处理响应逻辑
*
* @param responseModel
* @param toAppKey
* @param toClientId
*/
public void sendMsg(WsResponseModel responseModel, String toAppKey, String toClientId) {
public void sendMsg(WsResponseModel responseModel, Long toClientId) {
String msgJson = JsonUtils.encodeJson(responseModel);
List<ClientChannelInfo> channelInfos = userStateCacheManager.findOnlineInfosByClientId(toAppKey, toClientId);
List<ClientChannelInfo> channelInfos = userStateCacheManager.findOnlineInfosByClientId(toClientId);
// 一个用户存在多端的情况,所以先进行分类,key是ip地址,value是channel的列表
Map<String, List<ClientChannelInfo>> ipChannels = channelInfos.stream().collect(Collectors.groupingBy(ClientChannelInfo::getLanIp));
......@@ -116,7 +130,7 @@ public class ChannelSender {
if (InitIp.lAN_IP.equals(channelInfoEntry.getKey())) {
// 调用本地下发
for(ClientChannelInfo clientChannelInfo : channelInfoEntry.getValue()) {
this.sendMsgLocal(toAppKey, toClientId, clientChannelInfo.getPlatform(), msgJson);
this.sendMsgLocal(toClientId, clientChannelInfo.getPlatform(), msgJson);
}
continue;
}
......@@ -126,7 +140,7 @@ public class ChannelSender {
for(ClientChannelInfo clientChannelInfo : channelInfoEntry.getValue()) {
Address address = new Address(clientChannelInfo.getLanIp(), 20881);
RpcContext.getContext().setObjectAttachment("address", address);
routerSendService.sendMsgRemote(toAppKey, toClientId, clientChannelInfo.getPlatform(), msgJson);
routerSendService.sendMsgRemote(toClientId, clientChannelInfo.getPlatform(), msgJson);
}
}
......@@ -147,17 +161,38 @@ public class ChannelSender {
}
/**
* 批量发送
* @param toClientIdAndPlatforms list里的 结构为 client的主键id:platform
* @param msgJson
*/
public void batchSendMsgLocal(List<String> toClientIdAndPlatforms, String msgJson) {
for(String arrStr : toClientIdAndPlatforms) {
SendMsgThreadPool.SEND_MSG_THREAD_POOL_EXECUTOR.execute(()->{
this.sendMsgLocal(arrStr, msgJson);
});
}
}
/**
* 向指定channelId下发数据,限定本机有的channel
*
* @param appKey
* @param clientId
* @param platform
* @param msg
* @return
*/
public boolean sendMsgLocal(String appKey, String clientId, Integer platform, String msg) {
public boolean sendMsgLocal(Long clientId, Integer platform, String msg) {
String key = ChannelManager.genKeyForSessionInfoMap(clientId, platform);
return sendMsgLocal(key, msg);
}
String key = ChannelManager.genKeyForSessionInfoMap(appKey, clientId, platform);
/**
* 本地的channel 推送
* @param key 结构为 client的主键id:platform
* @param msgJsonStr json结构
* @return
*/
private boolean sendMsgLocal(String key, String msgJsonStr) {
ClientInfo clientInfo = ChannelManager.SESSION_INFO_MAP.get(key);
if (clientInfo == null) {
return false;
......@@ -165,18 +200,18 @@ public class ChannelSender {
NioSocketChannel nioSocketChannel = clientInfo.getNioSocketChannel();
if (null == nioSocketChannel) {
log.info("writeData连接为空:" + msg);
log.info("writeData连接为空: {}", msgJsonStr);
return false;
}
// 判断连接是否断开
if (nioSocketChannel.isShutdown()) {
log.info("writeData连接断开:" + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
log.warn("writeData 已连接断开: {}\n channelId: {}", msgJsonStr, nioSocketChannel.id().asLongText());
return false;
}
log.info("writeData:" + ",\nchannelId:" + nioSocketChannel.id().asLongText());
log.info("writeData, channelId: {}", nioSocketChannel.id().asLongText());
nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg));
nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msgJsonStr));
return true;
}
......
package com.wecloud.im.ws.strategy;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.param.ImConversationQueryVo;
import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientService;
import com.wecloud.im.service.ImConversationService;
import com.wecloud.im.ws.enums.WsRequestCmdEnum;
import com.wecloud.im.ws.model.request.ReceiveVO;
import com.wecloud.im.ws.utils.SpringBeanUtils;
......@@ -14,6 +14,7 @@ import com.wecloud.utils.JsonUtils;
import io.geekidea.springbootplus.framework.common.exception.BusinessException;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.BooleanUtils;
/**
* @Description 处理Cmd请求
......@@ -27,10 +28,11 @@ public abstract class AbstractImCmdStrategy {
private static ImCmdStrategyFactory imCmdStrategyFactory;
private static ImApplicationService imApplicationService;
private static ImClientService imClientService;
private static ImConversationService imConversationService;
public static void process(String appKey, String clientId, ChannelHandlerContext ctx, String data) throws JsonProcessingException {
public static void process(Long senderClientId, ChannelHandlerContext ctx, String data) {
if(log.isDebugEnabled()) {
log.debug("appWS收到data: {}\n appKey:{}, clientId:{}, channelId:{}", data, appKey, clientId, ctx.channel().id().asLongText());
log.debug("appWS收到data: {}\n senderClientId:{}, channelId:{}", data, senderClientId, ctx.channel().id().asLongText());
}
if(imCmdStrategyFactory == null) {
......@@ -40,6 +42,7 @@ public abstract class AbstractImCmdStrategy {
imCmdStrategyFactory = SpringBeanUtils.getBean(ImCmdStrategyFactory.class);
imApplicationService = SpringBeanUtils.getBean(ImApplicationService.class);
imClientService = SpringBeanUtils.getBean(ImClientService.class);
imConversationService = SpringBeanUtils.getBean(ImConversationService.class);
}
}
}
......@@ -59,26 +62,35 @@ public abstract class AbstractImCmdStrategy {
}
WsRequestCmdEnum wsRequestPathEnum = WsRequestCmdEnum.getByCode(receiveVO.getCmd());
// 使用策略模式, 根据不同类型请求调用不同实现类
AbstractImCmdStrategy cmdStrategy = imCmdStrategyFactory.getStrategy(wsRequestPathEnum);
// 查询imApplication
ImApplication imApplication = imApplicationService.getOneByAppKey(appKey);
if (imApplication == null) {
log.warn("根据appKey: {} 查找不到 imApplication!", appKey);
//查看接收的群属性,是否万人群
ImConversationQueryVo conversation = imConversationService.getImConversationById(receiveVO.getData().getToConversation());
if(conversation == null) {
log.warn("会话消息reqId: {} 会话id不合法!", receiveVO.getReqId());
return;
}
if(BooleanUtils.isTrue(conversation.getIsThousand()) && WsRequestCmdEnum.NORMAL_CHAT == wsRequestPathEnum) {
// 普通群升级为万人群
wsRequestPathEnum = WsRequestCmdEnum.THROUSAND_CHAT;
}
// 使用策略模式, 根据不同类型请求调用不同实现类
AbstractImCmdStrategy cmdStrategy = imCmdStrategyFactory.getStrategy(wsRequestPathEnum);
// 查询发送者client
ImClient imClientSender = imClientService.getOne(new QueryWrapper<ImClient>().lambda()
.eq(ImClient::getFkAppid, imApplication.getId())
.eq(ImClient::getClientId, clientId));
ImClient imClientSender = imClientService.getCacheImClient(senderClientId);
if (imClientSender == null) {
log.warn("根据appKey: {} 查找不到 imClientSender!", imApplication.getAppKey());
log.warn("根据senderClientId: {} 查找不到 imClientSender!", senderClientId);
return;
}
//查看接收者
// 查询imApplication
ImApplication imApplication = imApplicationService.getById(imClientSender.getFkAppid());
if (imApplication == null) {
log.warn("根据appId: {} 查找不到 imApplication!", imClientSender.getFkAppid());
return;
}
cmdStrategy.process(imApplication, imClientSender, ctx, receiveVO);
......
......@@ -37,7 +37,7 @@ import java.util.List;
/**
* @Description 处理app数据消息
*/
@ImCmdType(type = WsRequestCmdEnum.DATA)
@ImCmdType(type = WsRequestCmdEnum.NORMAL_CHAT)
@Service
@Slf4j
public class NormalChatStrategy extends AbstractImCmdStrategy {
......@@ -131,7 +131,7 @@ public class NormalChatStrategy extends AbstractImCmdStrategy {
responseModel.setMsg(result.getMessage());
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, imApplication.getAppKey(), imClientReceiver.getClientId());
channelSender.sendMsg(responseModel, imClientReceiver.getId());
// 异步推送系统通知消息
systemPush.push(receiveVO.getData().getPush(), imClientReceiver, imApplication);
......
......@@ -12,6 +12,7 @@ import com.wecloud.im.service.ImClientService;
import com.wecloud.im.service.ImConversationMembersService;
import com.wecloud.im.service.ImInboxService;
import com.wecloud.im.service.ImMessageService;
import com.wecloud.im.thousandchat.cache.GroupCacheManager;
import com.wecloud.im.ws.annotation.ImCmdType;
import com.wecloud.im.ws.enums.WsRequestCmdEnum;
import com.wecloud.im.ws.enums.WsResponseCmdEnum;
......@@ -30,9 +31,11 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @Description 处理app数据消息
......@@ -64,6 +67,8 @@ public class ThousandChatStrategy extends AbstractImCmdStrategy {
@Autowired
private AsyncPush systemPush;
@Autowired
private GroupCacheManager groupCacheManager;
@Override
public void process(ImApplication imApplication, ImClient imSender, ChannelHandlerContext ctx, ReceiveVO receiveVO) {
......@@ -75,20 +80,7 @@ public class ThousandChatStrategy extends AbstractImCmdStrategy {
List<ImConversationMembers> membersList = imConversationMembersService.list(
new QueryWrapper<ImConversationMembers>().lambda()
.eq(ImConversationMembers::getFkConversationId, toConversationId)
.notIn(ImConversationMembers::getFkClientId, imSender.getId())
);
if (membersList.isEmpty()) {
log.info("查询会话所有成员返回空,会话ID: {}", toConversationId);
return;
}
// 判断为单聊
if (membersList.size() == 1) {
// 判断是否被拉黑逻辑
if (black(receiveVO, imSender, membersList, channel)) {
return;
}
}
// 生成消息id
long messageId = SnowflakeUtil.getId();
......@@ -109,11 +101,32 @@ public class ThousandChatStrategy extends AbstractImCmdStrategy {
imMessageOnlineSend.setSystem(imMessage.getSystemFlag());
imMessageOnlineSend.setAt(imMessage.getAt());
// 遍历发送
// 在线用户直接发消息
Map<String, String> onlineClientIpMap = groupCacheManager.findOnlineClientsByThousandGroupId(toConversationId);
Map<String /** ip **/, List<String /** client的主键ID:platform **/>> onlineIpClientMap = new HashMap<>();
onlineClientIpMap.forEach((clientIdAndPlatforms, ip) -> {
onlineIpClientMap.putIfAbsent(ip, new ArrayList<>());
onlineIpClientMap.get(ip).add(clientIdAndPlatforms);
});
// 封装要推给接收方的消息
WsResponseModel<ImMessageOnlineSend> responseModel = new WsResponseModel<>();
responseModel.setCmd(WsResponseCmdEnum.ONLINE_MSG.getCmdCode());
ApiResult<Boolean> result = ApiResult.result(ApiCode.SUCCESS);
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
onlineIpClientMap.forEach((ip, clientIdAndPlatforms) -> {
channelSender.batchSendMsg(responseModel, ip, clientIdAndPlatforms);
});
// 给所有人(在线+离线)遍历发送
//todo 需要改成批量
for (ImConversationMembers conversationMembers : membersList) {
// 入库 保存收件箱
long imInboxId = SnowflakeUtil.getId();
saveImInbox(imApplication, toConversationId, messageId, conversationMembers, imInboxId);
// long imInboxId = SnowflakeUtil.getId();
// saveImInbox(imApplication, toConversationId, messageId, conversationMembers, imInboxId);
// 查询接收方
ImClient imClientReceiver = imClientService.getOne(new QueryWrapper<ImClient>().lambda()
......@@ -122,17 +135,6 @@ public class ThousandChatStrategy extends AbstractImCmdStrategy {
if (imClientReceiver == null) {
continue;
}
// 向接收方推送
WsResponseModel<ImMessageOnlineSend> responseModel = new WsResponseModel<>();
responseModel.setCmd(WsResponseCmdEnum.ONLINE_MSG.getCmdCode());
ApiResult<Boolean> result = ApiResult.result(ApiCode.SUCCESS);
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, imApplication.getAppKey(), imClientReceiver.getClientId());
// 异步推送系统通知消息
systemPush.push(receiveVO.getData().getPush(), imClientReceiver, imApplication);
}
......
......@@ -12,10 +12,7 @@ import java.io.Serializable;
public class RtcChannelInfo implements Serializable {
@ApiModelProperty("当前房主")
private String owner;
private String appKey;
// private String appId;
private Long owner;
@ApiModelProperty("创建时间")
private Long createTimestamp;
......
......@@ -40,21 +40,20 @@ public interface MangerRtcCacheService {
/**
* 创建一个频道
*
* @param appKey
* @param clientId
* @param rtcChannelId 雪花算法生成频道id
*/
void create(String appKey, String clientId, Long rtcChannelId) throws JsonProcessingException;
void create(Long clientId, Long rtcChannelId) throws JsonProcessingException;
/**
* 加入频道
*/
void join(String appKey, String clientId, Long rtcChannelId);
void join(Long clientId, Long rtcChannelId);
/**
* 退出频道
*/
void leave(String appKey, String clientId, Long rtcChannelId);
void leave(Long clientId, Long rtcChannelId);
/**
* 根据频道ID获取频道内所有client
......@@ -64,15 +63,14 @@ public interface MangerRtcCacheService {
/**
* 根据客户端ID获取该客户端加入的频道ID
*/
Long getRtcChannelIdListByClientId(String appKey, String clientId);
Long getRtcChannelIdListByClientId(Long clientId);
/**
* 获取客户端忙线/空闲状态
*
* @param appKey
* @param clientId
* @return true:忙线,false空闲
*/
boolean getBusyStatus(String appKey, String clientId);
boolean getBusyStatus(Long clientId);
}
......@@ -15,31 +15,31 @@ public interface WsRtcWrite {
/**
* 接收到RTC邀请
*/
void rtcCall(RtcCallResponse rtcCallResponse, String toAppKey, String toClientId);
void rtcCall(RtcCallResponse rtcCallResponse, Long toClientId);
/**
* 用户状态更新事件(用户加入频道)
*/
void clientJoin(RtcClientJoinResponse rtcClientJoinResponse, String toAppKey, String toClientId);
void clientJoin(RtcClientJoinResponse rtcClientJoinResponse, Long toClientId);
/**
* 用户状态更新事件(用户退出频道)
*/
void clientLeave(RtcClientLeaveResponse rtcClientLeaveResponse, String toAppKey, String toClientId);
void clientLeave(RtcClientLeaveResponse rtcClientLeaveResponse, Long toClientId);
/**
* 用户状态更新事件(用户拒接邀请;不同意进入频道)
*/
void clientReject(RtcClientRejectResponse rtcClientRejectResponse, String toAppKey, String toClientId);
void clientReject(RtcClientRejectResponse rtcClientRejectResponse, Long toClientId);
/**
* SDP数据转发
*/
void sdpForward(RtcSdpForwardResponse rtcSdpForwardResponse, String toAppKey, String toClientId);
void sdpForward(RtcSdpForwardResponse rtcSdpForwardResponse, Long toClientId);
/**
* candidate候选者数据转发
*/
void candidateForward(RtcCandidateForwardResponse rtcCandidateForwardResponse, String toAppKey, String toClientId);
void candidateForward(RtcCandidateForwardResponse rtcCandidateForwardResponse, Long toClientId);
}
......@@ -54,12 +54,10 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService {
@Override
public void create(String appKey, String clientId, Long rtcChannelId) throws JsonProcessingException {
public void create(Long clientId, Long rtcChannelId) {
// --- 频道信息
RtcChannelInfo rtcChannelInfo = new RtcChannelInfo();
rtcChannelInfo.setAppKey(appKey);
// rtcChannelInfo.setAppId("");
//当前房主
rtcChannelInfo.setOwner(clientId);
......@@ -73,47 +71,47 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService {
redisUtils.addKey(channelKey, rtcChannelInfoJson, Duration.ofDays(10));
//用户当前在线的频道ID
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId);
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, clientId);
redisUtils.addKey(userJoinChannelKey, rtcChannelId.toString(), Duration.ofDays(10));
//频道中存在的用户
String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId);
redisUtils.addForSet(rtcChannelUsers, clientId, 10, TimeUnit.DAYS);
redisUtils.addForSet(rtcChannelUsers, clientId + "", 10, TimeUnit.DAYS);
}
@Override
public void join(String appKey, String clientId, Long rtcChannelId) {
public void join(Long clientId, Long rtcChannelId) {
//用户当前在线的频道ID
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId);
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, clientId);
redisUtils.addKey(userJoinChannelKey, rtcChannelId.toString(), Duration.ofDays(10));
//频道中存在的用户
String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId);
redisUtils.addForSet(rtcChannelUsers, clientId, 10, TimeUnit.DAYS);
redisUtils.addForSet(rtcChannelUsers, clientId + "", 10, TimeUnit.DAYS);
}
@Override
public void leave(String appKey, String clientId, Long rtcChannelId) {
public void leave(Long clientId, Long rtcChannelId) {
//用户当前在线的频道ID
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId);
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, clientId);
redisUtils.delKey(userJoinChannelKey);
//频道中存在的用户
String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId);
redisUtils.removeForSet(rtcChannelUsers, clientId);
String rtcChannelUserskey = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId);
redisUtils.removeForSet(rtcChannelUserskey, clientId + "");
}
@Override
public List<String> getClientListByRtcChannelId(Long rtcChannelId) {
//频道中存在的用户
String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId);
String key = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId);
Set<String> forSetMembers = redisUtils.getForSetMembers(rtcChannelUsers);
Set<String> forSetMembers = redisUtils.getForSetMembers(key);
return new ArrayList<>(forSetMembers);
......@@ -121,10 +119,10 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService {
@Override
public Long getRtcChannelIdListByClientId(String appKey, String clientId) {
public Long getRtcChannelIdListByClientId(Long clientId) {
//用户当前在线的频道ID
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId);
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, clientId);
String key = redisUtils.getKey(userJoinChannelKey);
if (StringUtils.isBlank(key)) {
return null;
......@@ -133,10 +131,10 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService {
}
@Override
public boolean getBusyStatus(String appKey, String clientId) {
public boolean getBusyStatus(Long clientId) {
//用户当前在线的频道ID
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId);
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, clientId);
String key = redisUtils.getKey(userJoinChannelKey);
......
......@@ -14,7 +14,6 @@ import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientService;
import com.wecloud.im.ws.cache.UserStateCacheManager;
import com.wecloud.im.ws.cache.UserStateListener;
import com.wecloud.im.ws.manager.ChannelManager;
import com.wecloud.rtc.entity.response.RtcCallResponse;
import com.wecloud.rtc.entity.response.RtcCandidateForwardResponse;
import com.wecloud.rtc.entity.response.RtcClientJoinResponse;
......@@ -55,18 +54,17 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
private UserStateCacheManager userStateCacheManager;
@Override
public void onLineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
public void onLineEvent(Long client, Integer platform, String longChannelId) {
// nothing need to do
}
@Override
public void offlineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
public void offlineEvent(Long clientId, Integer platform, String longChannelId) {
// 根据appKey查询appid
ImApplication imApplication = imApplicationService.getOneByAppKey(appKey);
ImClient client = imClientService.getCacheImClient(imApplication.getId(), clientId);
ImClient client = imClientService.getCacheImClient(clientId);
// 获取该客户端加入的频道ID
Long listByClientId = mangerRtcCacheService.getRtcChannelIdListByClientId(imApplication.getAppKey(), client.getClientId());
Long listByClientId = mangerRtcCacheService.getRtcChannelIdListByClientId(client.getId());
if (listByClientId == null) {
return;
}
......@@ -74,7 +72,7 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
LeaveRtcChannelParam leaveRtcChannelParam = new LeaveRtcChannelParam();
leaveRtcChannelParam.setChannelId(listByClientId);
// websocket离线逻辑 服务端踢出频道
this.leave(leaveRtcChannelParam, client, imApplication);
this.leave(leaveRtcChannelParam);
}
@Override
......@@ -82,18 +80,15 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
ImClient client = imClientService.getCurentClient();
Long rtcChannelId = SnowflakeUtil.getId();
// 根据appKey查询appid
ImApplication imApplication = imApplicationService.getById(client.getFkAppid());
// 判断发起方必须在线
boolean onlineStatus = userStateCacheManager.isOnline(imApplication.getAppKey(), client.getClientId());
boolean onlineStatus = userStateCacheManager.isOnline(client.getId());
if (!onlineStatus) {
log.info("发起方必须在线" + imApplication.getAppKey() + client.getClientId());
log.info("发起方必须在线" + client.getFkAppid() + client.getClientId());
ApiResult.fail();
}
// 添加缓存
mangerRtcCacheService.create(imApplication.getAppKey(), client.getClientId(), rtcChannelId);
mangerRtcCacheService.create(client.getId(), rtcChannelId);
CreateRtcChannelResult createRtcChannelResult = new CreateRtcChannelResult();
createRtcChannelResult.setChannelId(rtcChannelId);
......@@ -105,7 +100,9 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
rtcCallResponse.setChannelId(rtcChannelId);
rtcCallResponse.setClientId(client.getClientId());
rtcCallResponse.setTimestamp(System.currentTimeMillis());
wsRtcWrite.rtcCall(rtcCallResponse, imApplication.getAppKey(), createRtcChannelParam.getToClient());
ImClient toClient = imClientService.getCacheImClient(client.getFkAppid(), createRtcChannelParam.getToClient());
wsRtcWrite.rtcCall(rtcCallResponse, toClient.getId());
// TODO 待开发 下发安卓和ios系统推送
......@@ -119,11 +116,8 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
ImClient client = imClientService.getCurentClient();
// 根据appKey查询appid
ImApplication imApplication = imApplicationService.getById(client.getFkAppid());
// 修改缓存
mangerRtcCacheService.join(imApplication.getAppKey(), client.getClientId(), joinRtcChannelParam.getChannelId());
mangerRtcCacheService.join(client.getId(), joinRtcChannelParam.getChannelId());
//获取频道内所有client
List<String> clientListByRtcChannelId = mangerRtcCacheService.getClientListByRtcChannelId(joinRtcChannelParam.getChannelId());
......@@ -138,7 +132,7 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
rtcSdpForwardResponse.setChannelId(joinRtcChannelParam.getChannelId());
rtcSdpForwardResponse.setClientId(client.getClientId());
rtcSdpForwardResponse.setTimestamp(System.currentTimeMillis());
wsRtcWrite.clientJoin(rtcSdpForwardResponse, imApplication.getAppKey(), toClientId);
wsRtcWrite.clientJoin(rtcSdpForwardResponse, Long.valueOf(toClientId));
}
return ApiResult.ok(true);
......@@ -164,7 +158,7 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
rtcClientRejectResponse.setChannelId(rejectRtcChannelParam.getChannelId());
rtcClientRejectResponse.setClientId(client.getClientId());
rtcClientRejectResponse.setTimestamp(System.currentTimeMillis());
wsRtcWrite.clientReject(rtcClientRejectResponse, imApplication.getAppKey(), toClientId);
wsRtcWrite.clientReject(rtcClientRejectResponse, Long.valueOf(toClientId));
}
return ApiResult.ok(true);
}
......@@ -176,14 +170,14 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
// 根据appKey查询appid
ImApplication imApplication = imApplicationService.getById(client.getFkAppid());
this.leave(leaveRtcChannelParam, client, imApplication);
this.leave(leaveRtcChannelParam, client);
return ApiResult.ok(true);
}
private void leave(LeaveRtcChannelParam leaveRtcChannelParam, ImClient client, ImApplication imApplication) {
private void leave(LeaveRtcChannelParam leaveRtcChannelParam, ImClient client) {
// 修改缓存
mangerRtcCacheService.leave(imApplication.getAppKey(), client.getClientId(), leaveRtcChannelParam.getChannelId());
mangerRtcCacheService.leave(client.getId(), leaveRtcChannelParam.getChannelId());
//获取频道内所有client
List<String> clientListByRtcChannelId = mangerRtcCacheService.getClientListByRtcChannelId(leaveRtcChannelParam.getChannelId());
......@@ -198,7 +192,7 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
rtcClientLeaveResponse.setChannelId(leaveRtcChannelParam.getChannelId());
rtcClientLeaveResponse.setClientId(client.getClientId());
rtcClientLeaveResponse.setTimestamp(System.currentTimeMillis());
wsRtcWrite.clientLeave(rtcClientLeaveResponse, imApplication.getAppKey(), toClientId);
wsRtcWrite.clientLeave(rtcClientLeaveResponse, Long.valueOf(toClientId));
}
// 判断频道内是否无其他人了
......@@ -218,7 +212,7 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
ImApplication imApplication = imApplicationService.getById(client.getFkAppid());
// 判断发起方必须在线
boolean onlineStatus = userStateCacheManager.isOnline(imApplication.getAppKey(), client.getClientId());
boolean onlineStatus = userStateCacheManager.isOnline(client.getId());
if (!onlineStatus) {
log.info("发起方必须在线" + imApplication.getAppKey() + client.getClientId());
ApiResult.fail();
......@@ -242,7 +236,7 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
rtcSdpForwardResponse.setChannelId(rtcChannelId);
rtcSdpForwardResponse.setClientId(client.getClientId());
rtcSdpForwardResponse.setTimestamp(System.currentTimeMillis());
wsRtcWrite.sdpForward(rtcSdpForwardResponse, imApplication.getAppKey(), toClientId);
wsRtcWrite.sdpForward(rtcSdpForwardResponse, Long.valueOf(toClientId));
}
......@@ -259,7 +253,7 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
ImApplication imApplication = imApplicationService.getById(client.getFkAppid());
// 判断发起方必须在线
boolean onlineStatus = userStateCacheManager.isOnline(imApplication.getAppKey(), client.getClientId());
boolean onlineStatus = userStateCacheManager.isOnline(client.getId());
if (!onlineStatus) {
log.info("发起方必须在线" + imApplication.getAppKey() + client.getClientId());
ApiResult.fail();
......@@ -283,7 +277,7 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
rtcCandidateForwardResponse.setChannelId(rtcChannelId);
rtcCandidateForwardResponse.setClientId(client.getClientId());
rtcCandidateForwardResponse.setTimestamp(System.currentTimeMillis());
wsRtcWrite.candidateForward(rtcCandidateForwardResponse, imApplication.getAppKey(), toClientId);
wsRtcWrite.candidateForward(rtcCandidateForwardResponse, Long.valueOf(toClientId));
}
return ApiResult.ok(true);
......
......@@ -24,7 +24,7 @@ public class WsRtcWriteImpl implements WsRtcWrite {
private ChannelSender channelSender;
@Override
public void rtcCall(RtcCallResponse rtcCallResponse, String toAppKey, String toClientId) {
public void rtcCall(RtcCallResponse rtcCallResponse, Long toClientId) {
RtcResponseBase<RtcCallResponse> rtcResponseBase = new RtcResponseBase<>();
rtcResponseBase.setSubCmd(WsRtcResponseSubCmdEnum.RTC_CALL.getCmdCode());
......@@ -39,12 +39,12 @@ public class WsRtcWriteImpl implements WsRtcWrite {
responseModel.setMsg(result.getMessage());
responseModel.setData(rtcResponseBase);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, toAppKey, toClientId);
channelSender.sendMsg(responseModel, toClientId);
}
@Override
public void clientJoin(RtcClientJoinResponse rtcClientJoinResponse, String toAppKey, String toClientId) {
public void clientJoin(RtcClientJoinResponse rtcClientJoinResponse, Long toClientId) {
RtcResponseBase<RtcClientJoinResponse> rtcResponseBase = new RtcResponseBase<>();
rtcResponseBase.setSubCmd(WsRtcResponseSubCmdEnum.CLIENT_JOIN.getCmdCode());
......@@ -59,13 +59,13 @@ public class WsRtcWriteImpl implements WsRtcWrite {
responseModel.setMsg(result.getMessage());
responseModel.setData(rtcResponseBase);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, toAppKey, toClientId);
channelSender.sendMsg(responseModel, toClientId);
}
@Override
public void clientLeave(RtcClientLeaveResponse rtcClientLeaveResponse, String toAppKey, String toClientId) {
public void clientLeave(RtcClientLeaveResponse rtcClientLeaveResponse, Long toClientId) {
RtcResponseBase<RtcClientLeaveResponse> rtcResponseBase = new RtcResponseBase<>();
......@@ -81,12 +81,12 @@ public class WsRtcWriteImpl implements WsRtcWrite {
responseModel.setMsg(result.getMessage());
responseModel.setData(rtcResponseBase);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, toAppKey, toClientId);
channelSender.sendMsg(responseModel, toClientId);
}
@Override
public void clientReject(RtcClientRejectResponse rtcClientRejectResponse, String toAppKey, String toClientId) {
public void clientReject(RtcClientRejectResponse rtcClientRejectResponse, Long toClientId) {
RtcResponseBase<RtcClientRejectResponse> rtcResponseBase = new RtcResponseBase<>();
rtcResponseBase.setSubCmd(WsRtcResponseSubCmdEnum.CLIENT_REJECT.getCmdCode());
......@@ -101,11 +101,11 @@ public class WsRtcWriteImpl implements WsRtcWrite {
responseModel.setMsg(result.getMessage());
responseModel.setData(rtcResponseBase);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, toAppKey, toClientId);
channelSender.sendMsg(responseModel, toClientId);
}
@Override
public void sdpForward(RtcSdpForwardResponse rtcSdpForwardResponse, String toAppKey, String toClientId) {
public void sdpForward(RtcSdpForwardResponse rtcSdpForwardResponse, Long toClientId) {
RtcResponseBase<RtcSdpForwardResponse> rtcResponseBase = new RtcResponseBase<>();
rtcResponseBase.setSubCmd(WsRtcResponseSubCmdEnum.SDP_FORWARD.getCmdCode());
......@@ -120,11 +120,11 @@ public class WsRtcWriteImpl implements WsRtcWrite {
responseModel.setMsg(result.getMessage());
responseModel.setData(rtcResponseBase);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, toAppKey, toClientId);
channelSender.sendMsg(responseModel, toClientId);
}
@Override
public void candidateForward(RtcCandidateForwardResponse rtcCandidateForwardResponse, String toAppKey, String toClientId) {
public void candidateForward(RtcCandidateForwardResponse rtcCandidateForwardResponse, Long toClientId) {
RtcResponseBase<RtcCandidateForwardResponse> rtcResponseBase = new RtcResponseBase<>();
rtcResponseBase.setSubCmd(WsRtcResponseSubCmdEnum.CANDIDATE_FORWARD.getCmdCode());
......@@ -139,7 +139,7 @@ public class WsRtcWriteImpl implements WsRtcWrite {
responseModel.setMsg(result.getMessage());
responseModel.setData(rtcResponseBase);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, toAppKey, toClientId);
channelSender.sendMsg(responseModel, toClientId);
}
......
......@@ -5,7 +5,7 @@
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id
, create_time, update_time, fk_appid, attributes,device_type,valid
, create_time, update_time, fk_appid, attributes,client_id,device_type,valid,device_token,head_portrait,nickname
</sql>
<update id="removeOldToken">
UPDATE im_client
......@@ -14,12 +14,6 @@
AND fk_appid = #{appId}
</update>
<select id="getImClientById" resultType="com.wecloud.im.param.ImClientQueryVo">
select
<include refid="Base_Column_List"/>
from im_client where id = #{id}
</select>
<select id="getImClientPageList" parameterType="com.wecloud.im.param.ImClientPageParam"
resultType="com.wecloud.im.param.ImClientQueryVo">
select
......
......@@ -43,4 +43,10 @@
</select>
<select id="findThousandGroupsByClientId" resultType="java.lang.Long">
select a.id from im_conversation a
inner join im_conversation_members b on a.id = b.fk_conversation_id
where b.fk_client_id = #{clientId} and a.is_thousand = 1
</select>
</mapper>
......@@ -221,3 +221,4 @@ ALTER TABLE `im_message` CHANGE COLUMN `system` `system_flag` tinyint(1) NULL DE
ALTER TABLE im_conversation`
ADD COLUMN `member_count` int NULL COMMENT '群成员数' AFTER `last_message`,
ADD COLUMN `is_thousand` tinyint NULL COMMENT '是否万人群' AFTER `member_count`;
ALTER TABLE `im_conversation_members` ADD INDEX `fk_client_id`(`fk_client_id`);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment