Commit 0e37c019 by lixiaozhong

1、在线离线状态逻辑抽象;

2、群表添加千人群标志。添加群成员人数冗余
3、普通群升级千人群的
parent 2c365d27
...@@ -39,6 +39,12 @@ public class ImConversation extends BaseEntity { ...@@ -39,6 +39,12 @@ public class ImConversation extends BaseEntity {
@ApiModelProperty("对话中最后一条消息的发送或接收时间") @ApiModelProperty("对话中最后一条消息的发送或接收时间")
private Date lastMessage; private Date lastMessage;
@ApiModelProperty("群成员数量")
private Integer memberCount;
@ApiModelProperty("是否万人群")
private Boolean isThousand;
@NotNull(message = "应用appid不能为空") @NotNull(message = "应用appid不能为空")
@ApiModelProperty("应用appid") @ApiModelProperty("应用appid")
private Long fkAppid; private Long fkAppid;
......
...@@ -84,4 +84,21 @@ public interface ImConversationMapper extends BaseMapper<ImConversation> { ...@@ -84,4 +84,21 @@ public interface ImConversationMapper extends BaseMapper<ImConversation> {
* @return * @return
*/ */
ImConversation getRepetitionConversationInfo(@Param("clientId1") Long clientId1, @Param("clientId2") Long clientId2); ImConversation getRepetitionConversationInfo(@Param("clientId1") Long clientId1, @Param("clientId2") Long clientId2);
/**
* 增减成员数量
* @param appId fkAppid
* @param conversationId 会话id
* @param addCount 要增减的人数, 负数是减
* @return
*/
boolean addMemberCount(@Param("appId") Long appId, @Param("conversationId") Long conversationId, @Param("addCount") int addCount);
/**
* 升级成万人群
* @param appId fkAppId
* @param conversationId 会话id
* @return 成功则返回true
*/
boolean upgradeToThousandChat(@Param("appId") Long appId, @Param("conversationId") Long conversationId);
} }
...@@ -117,38 +117,15 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -117,38 +117,15 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
} }
// /**
// * 检测到异常
// *
// * @param ctx
// * @param cause
// * @throws Exception
// */
// @Override
// public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// String userIdByChannel = mangerChannelService.getInfoByChannel(ctx);
// log.info("uid:" + userIdByChannel + ",ws异常,channelId:" + ctx.channel().id().asLongText(), cause);
// }
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) { public void handlerAdded(ChannelHandlerContext ctx) {
String userIdByChannel = channelManager.getStringInfoByChannel(ctx); if(log.isInfoEnabled()) {
log.info("连接WS成功handlerAdded,uid:" + userIdByChannel + "," + ",channelId:" + ctx.channel().id().asLongText()); String appKey = ctx.channel().attr(ChannelManager.APP_KEY).get();
String clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
log.info("HandlerAdded. uid is APP_KEY:{},CLIENT_ID:{}, channelId is {}", appKey, clientId, ctx.channel().id().asLongText());
}
} }
// /**
// * 客户端不活跃
// *
// * @param ctx
// * @throws Exception
// */
// @Override
// public void channelInactive(ChannelHandlerContext ctx) {
// String userIdByChannel = mangerChannelService.getInfoByChannel(ctx);
// log.info("uid:" + userIdByChannel + "," + "channelInactive" + ",channelId:" + ctx.channel().id().asLongText());
// }
/** /**
* 移除时触发, 不活跃的情况下会移除,会再次触发该事件 * 移除时触发, 不活跃的情况下会移除,会再次触发该事件
*/ */
...@@ -158,12 +135,10 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -158,12 +135,10 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
String appKey = ctx.channel().attr(ChannelManager.APP_KEY).get(); String appKey = ctx.channel().attr(ChannelManager.APP_KEY).get();
String clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get(); String clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
String userIdByChannel = channelManager.getStringInfoByChannel(ctx); log.info("uid is APP_KEY:{},CLIENT_ID:{}, handlerRemoved. channelId is {}", appKey, clientId, ctx.channel().id().asLongText());
log.info("uid:" + userIdByChannel + "," + "handlerRemoved" + ",channelId:" + ctx.channel().id().asLongText());
// 关掉连接 // 关掉连接
channelManager.offline(ctx); channelManager.offline(appKey, clientId, ctx);
// rtc清空缓存
rtcService.clientOffline(appKey, clientId);
} }
} }
...@@ -47,5 +47,11 @@ public class ImConversationQueryVo implements Serializable { ...@@ -47,5 +47,11 @@ public class ImConversationQueryVo implements Serializable {
private String attributes; private String attributes;
@ApiModelProperty("可选 对话类型标志,是否是系统对话,后面会说明。") @ApiModelProperty("可选 对话类型标志,是否是系统对话,后面会说明。")
private Boolean system; private Boolean systemFlag;
}
\ No newline at end of file @ApiModelProperty("群成员数量")
private Integer memberCount;
@ApiModelProperty("是否万人群")
private Boolean isThousand;
}
...@@ -179,6 +179,8 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -179,6 +179,8 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
imConversation.setLastMessage(null); imConversation.setLastMessage(null);
imConversation.setFkAppid(createClient.getFkAppid()); imConversation.setFkAppid(createClient.getFkAppid());
imConversation.setCreator(creator); imConversation.setCreator(creator);
imConversation.setMemberCount(imConversationCreate.getClientIds().size() + 1);
imConversation.setIsThousand(false);
imConversation.setName(imConversationCreate.getName()); imConversation.setName(imConversationCreate.getName());
imConversation.setSystemFlag(false); imConversation.setSystemFlag(false);
// 拓展数据 // 拓展数据
...@@ -242,6 +244,8 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -242,6 +244,8 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
return ApiResult.fail(); return ApiResult.fail();
} }
int needAddCount = 0;
// 将他人添加到会话 // 将他人添加到会话
for (String id : imClientToConversation.getClientIds()) { for (String id : imClientToConversation.getClientIds()) {
...@@ -270,6 +274,8 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -270,6 +274,8 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
imConversationMembers2.setFkClientId(client2.getId()); imConversationMembers2.setFkClientId(client2.getId());
imConversationMembersService.save(imConversationMembers2); imConversationMembersService.save(imConversationMembers2);
needAddCount++;
// ws邀请事件通知给群内其他人 ---------- // ws邀请事件通知给群内其他人 ----------
...@@ -330,6 +336,15 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -330,6 +336,15 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
} }
} }
// 将群成员数量减
imConversationMapper.addMemberCount(imApplication.getId(), imClientToConversation.getConversationId(), needAddCount);
ImConversationQueryVo conversation = imConversationMapper.getImConversationById(imClientToConversation.getConversationId());
if(conversation.getMemberCount() > 2000) {
// 升级为万人群
imConversationMapper.upgradeToThousandChat(imApplication.getId(), imClientToConversation.getConversationId());
}
return ApiResult.ok(); return ApiResult.ok();
} }
...@@ -364,6 +379,8 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -364,6 +379,8 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
return ApiResult.fail(); return ApiResult.fail();
} }
int needAddCount = 0;
// 将client从会话移除 // 将client从会话移除
for (String id : imClientToConversation.getClientIds()) { for (String id : imClientToConversation.getClientIds()) {
...@@ -383,6 +400,8 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -383,6 +400,8 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
// ws移除事件通知给群内其他人 ---------- // ws移除事件通知给群内其他人 ----------
needAddCount--;
// 生成消息id // 生成消息id
long messageId = SnowflakeUtil.getId(); long messageId = SnowflakeUtil.getId();
...@@ -440,6 +459,10 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -440,6 +459,10 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
} }
} }
// 将群成员数量减
imConversationMapper.addMemberCount(imApplication.getId(), imClientToConversation.getConversationId(), needAddCount);
return ApiResult.ok(); return ApiResult.ok();
} }
...@@ -486,6 +509,9 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -486,6 +509,9 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
throw new Exception("deleteImConversationMembers"); throw new Exception("deleteImConversationMembers");
} }
// 将群成员数量减1
imConversationMapper.addMemberCount(imApplication.getId(), imClientToConversation.getConversationId(), -1);
// ws 退出事件通知给群内其他人 ---------- // ws 退出事件通知给群内其他人 ----------
......
package com.wecloud.im.thousandchat.cache;
import com.wecloud.im.ws.cache.UserStateListener;
import com.wecloud.im.ws.utils.InitIp;
import com.wecloud.im.ws.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Set;
/**
* 千人群的缓存(本地缓存待实现)
* @author lixiaozhong
* @date 2022年01月4日 17:00:00
*/
@Service
@Slf4j
public class GroupCacheManager extends UserStateListener {
/**
* 在线状态
*/
public static final Integer ONLINE = 1;
@Autowired
private RedisUtils redisUtils;
@Override
public void onLineEvent(String appKey, String clientId, String longChannelId) {
}
@Override
public void offlineEvent(String appKey, String clientId, String longChannelId) {
}
}
package com.wecloud.im.ws.cache;
import com.wecloud.im.ws.model.redis.ClientChannelInfo;
import com.wecloud.im.ws.model.redis.ClientConnectionInfo;
import java.util.List;
/**
* @author hewei123@163.com
* @Description 用户与redis绑定
* @createTime 2020年04月14日 16:21:00
*/
public interface UserCacheService {
/**
* 用户上线绑定机器ip
*
* @param longChannelId
*/
public void online(String appKey, String clientId, String longChannelId);
/**
* 用户下线删除绑定机器ip
*
* @param longChannelId
*/
public void offline(String appKey, String clientId, String longChannelId);
/**
* 根据ClientId从redis获取client信息
*
* @param appKey
* @param clientId
* @return
*/
List<ClientChannelInfo> getIpByClientIdAndOnline(String appKey, String clientId);
}
package com.wecloud.im.ws.cache; package com.wecloud.im.ws.cache;
import com.wecloud.im.ws.model.redis.ClientChannelInfo; import com.wecloud.im.ws.model.redis.ClientChannelInfo;
import com.wecloud.im.ws.model.redis.ClientConnectionInfo;
import com.wecloud.im.ws.utils.InitIp; import com.wecloud.im.ws.utils.InitIp;
import com.wecloud.im.ws.utils.RedisUtils; import com.wecloud.im.ws.utils.RedisUtils;
import com.wecloud.im.ws.utils.SpringBeanUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
/** /**
...@@ -20,7 +24,7 @@ import java.util.Set; ...@@ -20,7 +24,7 @@ import java.util.Set;
*/ */
@Service @Service
@Slf4j @Slf4j
public class UserCacheServiceImpl implements UserCacheService { public class UserStateCacheManager extends UserStateListener {
/** /**
* 在线状态 * 在线状态
...@@ -68,13 +72,8 @@ public class UserCacheServiceImpl implements UserCacheService { ...@@ -68,13 +72,8 @@ public class UserCacheServiceImpl implements UserCacheService {
@Autowired @Autowired
private RedisUtils redisUtils; private RedisUtils redisUtils;
/**
* 用户上线绑定机器ip
*
* @param longChannelId
*/
@Override @Override
public void online(String appKey, String clientId, String longChannelId) { public void onLineEvent(String appKey, String clientId, String longChannelId) {
log.info("ws用户上线保存redis连接ip:" + InitIp.lAN_IP, ",uid:" + longChannelId); log.info("ws用户上线保存redis连接ip:" + InitIp.lAN_IP, ",uid:" + longChannelId);
// 先进行历史数据清理 // 先进行历史数据清理
...@@ -87,17 +86,10 @@ public class UserCacheServiceImpl implements UserCacheService { ...@@ -87,17 +86,10 @@ public class UserCacheServiceImpl implements UserCacheService {
redisUtils.addForSet(CLIENTS + appKey + clientId, longChannelId); redisUtils.addForSet(CLIENTS + appKey + clientId, longChannelId);
redisUtils.hashset(CLIENT_INFO + longChannelId, LAN_IP, InitIp.lAN_IP); redisUtils.hashset(CLIENT_INFO + longChannelId, LAN_IP, InitIp.lAN_IP);
redisUtils.hashset(CLIENT_INFO + longChannelId, ONLINE_STATUS, ONLINE.toString()); redisUtils.hashset(CLIENT_INFO + longChannelId, ONLINE_STATUS, ONLINE.toString());
} }
/**
* 用户下线删除绑定机器ip
*
* @param longChannelId
*/
@Override @Override
public void offlineEvent(String appKey, String clientId, String longChannelId) {
public void offline(String appKey, String clientId, String longChannelId) {
log.info("ws用户离线删除redis key,uid:" + longChannelId); log.info("ws用户离线删除redis key,uid:" + longChannelId);
redisUtils.removeForSet(CLIENTS + appKey + clientId, longChannelId); redisUtils.removeForSet(CLIENTS + appKey + clientId, longChannelId);
...@@ -106,7 +98,12 @@ public class UserCacheServiceImpl implements UserCacheService { ...@@ -106,7 +98,12 @@ public class UserCacheServiceImpl implements UserCacheService {
redisUtils.delKey(CLIENT_INFO + longChannelId); redisUtils.delKey(CLIENT_INFO + longChannelId);
} }
@Override /**
* 根据clientId获取redis 的channel信息
* @param appKey
* @param clientId
* @return
*/
public List<ClientChannelInfo> getIpByClientIdAndOnline(String appKey, String clientId) { public List<ClientChannelInfo> getIpByClientIdAndOnline(String appKey, String clientId) {
// 获取所有 CLIENTS的 longChannelId // 获取所有 CLIENTS的 longChannelId
...@@ -134,5 +131,4 @@ public class UserCacheServiceImpl implements UserCacheService { ...@@ -134,5 +131,4 @@ public class UserCacheServiceImpl implements UserCacheService {
return clientChannelInfos; return clientChannelInfos;
} }
} }
package com.wecloud.im.ws.cache;
import com.wecloud.im.ws.utils.SpringBeanUtils;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
/**
* 事件源接口(被观察者)
*/
public abstract class UserStateListener {
private static final Set<UserStateListener> listeners = new HashSet<>();
@EventListener(ApplicationStartedEvent.class)
private void registerEventListener() {
Collection<UserStateListener> userStateListeners = SpringBeanUtils.getBeansOfType(UserStateListener.class);
for(UserStateListener listener : userStateListeners) {
listeners.add(listener);
}
}
public static void addListener(UserStateListener listener) {
listeners.add(listener);
}
public static void triggerOnlineEvent(String appKey, String clientId, String longChannelId) {
for(UserStateListener listener: listeners) {
listener.onLineEvent(appKey, clientId, longChannelId);
}
}
public static void triggerOfflineEvent(String appKey, String clientId, String longChannelId) {
for(UserStateListener listener: listeners) {
listener.offlineEvent(appKey, clientId, longChannelId);
}
}
public abstract void onLineEvent(String appKey, String clientId, String longChannelId);
public abstract void offlineEvent(String appKey, String clientId, String longChannelId);
}
...@@ -20,7 +20,11 @@ public enum WsRequestCmdEnum { ...@@ -20,7 +20,11 @@ public enum WsRequestCmdEnum {
/** /**
* 单人WebRTC音视频通话 * 单人WebRTC音视频通话
*/ */
SINGLE_RTC(3); SINGLE_RTC(3),
/**
* 单人WebRTC音视频通话
*/
THROUSAND_CHAT(4);
private final int cmdCode; private final int cmdCode;
......
package com.wecloud.im.ws.manager; package com.wecloud.im.ws.manager;
import com.wecloud.im.ws.cache.UserCacheService; import com.wecloud.im.ws.cache.UserStateCacheManager;
import com.wecloud.im.ws.cache.UserStateListener;
import com.wecloud.im.ws.model.ClientInfo; import com.wecloud.im.ws.model.ClientInfo;
import com.wecloud.im.ws.model.redis.ClientChannelInfo; import com.wecloud.im.ws.model.redis.ClientChannelInfo;
import com.wecloud.rtc.service.RtcService; import com.wecloud.rtc.service.RtcService;
...@@ -64,7 +65,7 @@ public class ChannelManager { ...@@ -64,7 +65,7 @@ public class ChannelManager {
private RtcService rtcService; private RtcService rtcService;
@Autowired @Autowired
private UserCacheService userCacheService; private UserStateCacheManager userStateCacheManager;
/** /**
* client上线 * client上线
...@@ -77,23 +78,19 @@ public class ChannelManager { ...@@ -77,23 +78,19 @@ public class ChannelManager {
this.putClientsMap(appKey, clientId, longChannelId); this.putClientsMap(appKey, clientId, longChannelId);
this.putSessionInfoMap(longChannelId, channel); this.putSessionInfoMap(longChannelId, channel);
userCacheService.online(appKey, clientId, longChannelId); UserStateListener.triggerOnlineEvent(appKey, clientId, longChannelId);
} }
/** /**
* 下线移除channel * 下线移除channel
* *
* @param appKey
* @param clientId
* @param channelHandlerContext * @param channelHandlerContext
*/ */
public void offline(ChannelHandlerContext channelHandlerContext) { public void offline(String appKey, String clientId, ChannelHandlerContext channelHandlerContext) {
String appKey = channelHandlerContext.channel().attr(ChannelManager.APP_KEY).get();
String clientId = channelHandlerContext.channel().attr(ChannelManager.CLIENT_ID).get();
String userIdByChannelString = this.getStringInfoByChannel(channelHandlerContext);
String longChannelId = channelHandlerContext.channel().id().asLongText(); String longChannelId = channelHandlerContext.channel().id().asLongText();
log.info("uid:" + userIdByChannelString + "," + "handlerRemoved" + ",channelId:" + longChannelId);
// 关掉连接 // 关掉连接
channelHandlerContext.close(); channelHandlerContext.close();
...@@ -103,23 +100,8 @@ public class ChannelManager { ...@@ -103,23 +100,8 @@ public class ChannelManager {
delSessionInfoMap(longChannelId); delSessionInfoMap(longChannelId);
delClientsMap(appKey, clientId, longChannelId); delClientsMap(appKey, clientId, longChannelId);
// 移除redis缓存 UserStateListener.triggerOfflineEvent(appKey, clientId, longChannelId);
userCacheService.offline(appKey, clientId, longChannelId);
// rtc清空缓存
rtcService.clientOffline(appKey, clientId);
}
/**
* 根据channel返回客户端key和id
*
* @param channelHandlerContext
* @return
*/
public String getStringInfoByChannel(ChannelHandlerContext channelHandlerContext) {
return "APP_KEY:" + channelHandlerContext.channel().attr(ChannelManager.APP_KEY).get()
+ ",CLIENT_ID:" + channelHandlerContext.channel().attr(ChannelManager.CLIENT_ID).get();
} }
/** /**
...@@ -131,7 +113,7 @@ public class ChannelManager { ...@@ -131,7 +113,7 @@ public class ChannelManager {
*/ */
public boolean getOnlineStatus(String toAppKey, String toClientId) { public boolean getOnlineStatus(String toAppKey, String toClientId) {
List<ClientChannelInfo> channelInfos = userCacheService.getIpByClientIdAndOnline(toAppKey, toClientId); List<ClientChannelInfo> channelInfos = userStateCacheManager.getIpByClientIdAndOnline(toAppKey, toClientId);
boolean flag = false; boolean flag = false;
for (ClientChannelInfo channelInfo : channelInfos) { for (ClientChannelInfo channelInfo : channelInfos) {
......
package com.wecloud.im.ws.sender; package com.wecloud.im.ws.sender;
import com.wecloud.im.router.RouterSendService; import com.wecloud.im.router.RouterSendService;
import com.wecloud.im.ws.cache.UserCacheService; import com.wecloud.im.ws.cache.UserStateCacheManager;
import com.wecloud.im.ws.model.ClientInfo; import com.wecloud.im.ws.model.ClientInfo;
import com.wecloud.im.ws.model.WsResponseModel; import com.wecloud.im.ws.model.WsResponseModel;
import com.wecloud.im.ws.model.redis.ClientChannelInfo; import com.wecloud.im.ws.model.redis.ClientChannelInfo;
...@@ -16,11 +16,14 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; ...@@ -16,11 +16,14 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference; import org.apache.dubbo.config.annotation.DubboReference;
import org.apache.dubbo.rpc.RpcContext; import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.cluster.router.address.Address;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/** /**
* @Description 下发数据 * @Description 下发数据
...@@ -32,9 +35,9 @@ import java.util.List; ...@@ -32,9 +35,9 @@ import java.util.List;
public class ChannelSender { public class ChannelSender {
@Autowired @Autowired
private UserCacheService userCacheService; private UserStateCacheManager userStateCacheManager;
@DubboReference @DubboReference(parameters = {"router", "address"})
private RouterSendService routerSendService; private RouterSendService routerSendService;
/** /**
...@@ -102,21 +105,30 @@ public class ChannelSender { ...@@ -102,21 +105,30 @@ public class ChannelSender {
String msgJson = JsonUtils.encodeJson(responseModel); String msgJson = JsonUtils.encodeJson(responseModel);
List<ClientChannelInfo> channelInfos = userCacheService.getIpByClientIdAndOnline(toAppKey, toClientId); List<ClientChannelInfo> channelInfos = userStateCacheManager.getIpByClientIdAndOnline(toAppKey, toClientId);
for (ClientChannelInfo channelInfo : channelInfos) { // 先进行分类,key是ip地址,value是channel的列表
Map<String, List<ClientChannelInfo>> ipChannels = channelInfos.stream().collect(Collectors.groupingBy(ClientChannelInfo::getLanIp));
for (Map.Entry<String, List<ClientChannelInfo>> channelInfoEntry : ipChannels.entrySet()) {
// 是否为当前机器的ip // 是否为当前机器的ip
if (InitIp.lAN_IP.equals(channelInfo.getLanIp())) { if (InitIp.lAN_IP.equals(channelInfoEntry.getKey())) {
// 调用本地下发 // 调用本地下发
// this.sendMsgLocal(channelInfo.getChannelId(), msgJson); for(ClientChannelInfo clientChannelInfo : channelInfoEntry.getValue()) {
// this.sendMsgLocal(clientChannelInfo.getChannelId(), msgJson);
// } else { }
// todo rpc调用下发 continue;
RpcContext.getContext().set("ip", channelInfo.getLanIp()); }
routerSendService.sendMsgRemote(channelInfo.getChannelId(), msgJson); // todo rpc调用下发,需要改成批量
// RpcContext.getContext().set("ip", channelInfo.getLanIp());
// 根据provider的ip,port创建Address实例
for(ClientChannelInfo clientChannelInfo : channelInfoEntry.getValue()) {
Address address = new Address(clientChannelInfo.getLanIp(), 20881);
RpcContext.getContext().setObjectAttachment("address", address);
routerSendService.sendMsgRemote(clientChannelInfo.getChannelId(), msgJson);
} }
} }
} }
......
...@@ -52,6 +52,12 @@ public abstract class AbstractImCmdStrategy { ...@@ -52,6 +52,12 @@ public abstract class AbstractImCmdStrategy {
throw new BusinessException("null == receiveVO || null == receiveVO.getCmd()"); throw new BusinessException("null == receiveVO || null == receiveVO.getCmd()");
} }
// 获取会话id
if (receiveVO.getData() == null || receiveVO.getData().getToConversation() == null) {
log.warn("会话消息reqId: {} 不合法,缺少会话id!", receiveVO.getReqId());
return;
}
WsRequestCmdEnum wsRequestPathEnum = WsRequestCmdEnum.getByCode(receiveVO.getCmd()); WsRequestCmdEnum wsRequestPathEnum = WsRequestCmdEnum.getByCode(receiveVO.getCmd());
// 使用策略模式, 根据不同类型请求调用不同实现类 // 使用策略模式, 根据不同类型请求调用不同实现类
AbstractImCmdStrategy cmdStrategy = imCmdStrategyFactory.getStrategy(wsRequestPathEnum); AbstractImCmdStrategy cmdStrategy = imCmdStrategyFactory.getStrategy(wsRequestPathEnum);
...@@ -72,6 +78,8 @@ public abstract class AbstractImCmdStrategy { ...@@ -72,6 +78,8 @@ public abstract class AbstractImCmdStrategy {
return; return;
} }
//查看接收者
cmdStrategy.process(imApplication, imClientSender, ctx, receiveVO); cmdStrategy.process(imApplication, imClientSender, ctx, receiveVO);
} }
......
...@@ -69,12 +69,6 @@ public class NormalChatStrategy extends AbstractImCmdStrategy { ...@@ -69,12 +69,6 @@ public class NormalChatStrategy extends AbstractImCmdStrategy {
public void process(ImApplication imApplication, ImClient imSender, ChannelHandlerContext ctx, ReceiveVO receiveVO) { public void process(ImApplication imApplication, ImClient imSender, ChannelHandlerContext ctx, ReceiveVO receiveVO) {
NioSocketChannel channel = (NioSocketChannel) ctx.channel(); NioSocketChannel channel = (NioSocketChannel) ctx.channel();
// 获取会话id
if (receiveVO.getData().getToConversation() == null) {
log.warn("会话消息reqId: {} 不合法,缺少会话id!", receiveVO.getReqId());
return;
}
Long toConversationId = receiveVO.getData().getToConversation(); Long toConversationId = receiveVO.getData().getToConversation();
// 查询该会话所有成员 // 查询该会话所有成员
......
package com.wecloud.im.ws.strategy.concrete;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.entity.ImConversationMembers;
import com.wecloud.im.entity.ImInbox;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.entity.ImMessageOnlineSend;
import com.wecloud.im.service.ImClientBlacklistService;
import com.wecloud.im.service.ImClientService;
import com.wecloud.im.service.ImConversationMembersService;
import com.wecloud.im.service.ImInboxService;
import com.wecloud.im.service.ImMessageService;
import com.wecloud.im.ws.annotation.ImCmdType;
import com.wecloud.im.ws.enums.WsRequestCmdEnum;
import com.wecloud.im.ws.enums.WsResponseCmdEnum;
import com.wecloud.im.ws.model.WsResponseModel;
import com.wecloud.im.ws.model.request.ReceiveVO;
import com.wecloud.im.ws.sender.AsyncPush;
import com.wecloud.im.ws.sender.ChannelSender;
import com.wecloud.im.ws.strategy.AbstractImCmdStrategy;
import com.wecloud.utils.JsonUtils;
import io.geekidea.springbootplus.framework.common.api.ApiCode;
import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.shiro.util.SnowflakeUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
/**
* @Description 处理app数据消息
*/
@ImCmdType(type = WsRequestCmdEnum.THROUSAND_CHAT)
@Service
@Slf4j
public class ThousandChatStrategy extends AbstractImCmdStrategy {
public static final String MSG_ID = "msgId";
@Autowired
private ImClientBlacklistService imClientBlacklistService;
@Autowired
private ChannelSender channelSender;
@Autowired
private ImMessageService imMessageService;
@Autowired
private ImInboxService imInboxService;
@Autowired
private ImConversationMembersService imConversationMembersService;
@Autowired
private ImClientService imClientService;
@Autowired
private AsyncPush systemPush;
@Override
public void process(ImApplication imApplication, ImClient imSender, ChannelHandlerContext ctx, ReceiveVO receiveVO) {
NioSocketChannel channel = (NioSocketChannel) ctx.channel();
Long toConversationId = receiveVO.getData().getToConversation();
// 查询该会话所有成员
List<ImConversationMembers> membersList = imConversationMembersService.list(
new QueryWrapper<ImConversationMembers>().lambda()
.eq(ImConversationMembers::getFkConversationId, toConversationId)
.notIn(ImConversationMembers::getFkClientId, imSender.getId())
);
if (membersList.isEmpty()) {
log.info("查询会话所有成员返回空,会话ID: {}", toConversationId);
return;
}
// 判断为单聊
if (membersList.size() == 1) {
// 判断是否被拉黑逻辑
if (black(receiveVO, imSender, membersList, channel)) {
return;
}
}
// 生成消息id
long messageId = SnowflakeUtil.getId();
// 入库 保存消息至消息表
ImMessage imMessage = imMessageService.saveImMessage(imApplication, imSender, toConversationId, messageId, receiveVO.getData());
// 封装响应的实体
ImMessageOnlineSend imMessageOnlineSend = new ImMessageOnlineSend();
imMessageOnlineSend.setMsgId(imMessage.getId());
imMessageOnlineSend.setSender(imSender.getClientId());
HashMap<String, Object> content = JsonUtils.decodeJson2Map(JsonUtils.encodeJson(receiveVO.getData()));
imMessageOnlineSend.setContent(content);
imMessageOnlineSend.setConversationId(toConversationId);
imMessageOnlineSend.setCreateTime(imMessage.getCreateTime());
imMessageOnlineSend.setWithdrawTime(imMessage.getWithdrawTime());
imMessageOnlineSend.setWithdraw(imMessage.getWithdraw());
imMessageOnlineSend.setEvent(imMessage.getEvent());
imMessageOnlineSend.setSystem(imMessage.getSystemFlag());
imMessageOnlineSend.setAt(imMessage.getAt());
// 遍历发送
for (ImConversationMembers conversationMembers : membersList) {
// 入库 保存收件箱
long imInboxId = SnowflakeUtil.getId();
saveImInbox(imApplication, toConversationId, messageId, conversationMembers, imInboxId);
// 查询接收方
ImClient imClientReceiver = imClientService.getOne(new QueryWrapper<ImClient>().lambda()
.eq(ImClient::getFkAppid, imApplication.getId())
.eq(ImClient::getId, conversationMembers.getFkClientId()));
if (imClientReceiver == null) {
continue;
}
// 向接收方推送
WsResponseModel<ImMessageOnlineSend> responseModel = new WsResponseModel<>();
responseModel.setCmd(WsResponseCmdEnum.ONLINE_MSG.getCmdCode());
ApiResult<Boolean> result = ApiResult.result(ApiCode.SUCCESS);
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, imApplication.getAppKey(), imClientReceiver.getClientId());
// 异步推送系统通知消息
systemPush.push(receiveVO.getData().getPush(), imClientReceiver, imApplication);
}
// 响应发送方消息id等信息
response(receiveVO, messageId, channel);
}
/**
* 入库 保存收件箱
*
* @param imApplication
* @param toConversationId
* @param messageId
* @param conversationMembers
* @param imInboxId
*/
private void saveImInbox(ImApplication imApplication, Long toConversationId, long messageId, ImConversationMembers conversationMembers, long imInboxId) {
ImInbox imInbox = new ImInbox();
imInbox.setId(imInboxId);
imInbox.setCreateTime(new Date());
imInbox.setFkAppid(imApplication.getId());
imInbox.setReceiver(conversationMembers.getFkClientId());
imInbox.setFkMsgId(messageId);
imInbox.setReadMsgStatus(0);
imInbox.setReceiverMsgStatus(0);
imInbox.setFkConversationId(toConversationId);
imInboxService.save(imInbox);
}
/**
* 响应发送方消息id等信息
*
* @param receiveVO
* @param messageId
*/
private void response(ReceiveVO receiveVO, long messageId, NioSocketChannel channel) {
WsResponseModel<HashMap<String, Long>> responseModel = new WsResponseModel<>();
ApiResult<Boolean> result = ApiResult.result(ApiCode.SUCCESS);
responseModel.setCmd(WsResponseCmdEnum.RES.getCmdCode());
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
HashMap<String, Long> stringHashMap = new HashMap<>(3);
stringHashMap.put(MSG_ID, messageId);
responseModel.setData(stringHashMap);
responseModel.setReqId(receiveVO.getReqId());
// 响应发送方
// writeDataService.write(responseModel, appKey, clientId);
channelSender.sendMsgLocal(channel, responseModel);
}
/**
* 判断是否被拉黑
*
* @param receiveVO
* @param imClientSender
* @param membersList
* @return
*/
private boolean black(ReceiveVO receiveVO, ImClient imClientSender, List<ImConversationMembers> membersList, NioSocketChannel channel) {
// 判断是否被拉黑
boolean beBlack = imClientBlacklistService.isBeBlack(membersList.get(0).getFkClientId(), imClientSender.getId());
if (beBlack) {
log.info("被对方拉黑了");
// 响应发送方
WsResponseModel<HashMap<String, Long>> responseModel = new WsResponseModel<>();
ApiResult<Boolean> result = ApiResult.result(ApiCode.IS_BE_BLACK);
responseModel.setCmd(WsResponseCmdEnum.RES.getCmdCode());
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
responseModel.setReqId(receiveVO.getReqId());
// writeDataService.write(responseModel, appKey, clientUniId);
channelSender.sendMsgLocal(channel, responseModel);
return true;
}
// 是否把对方拉黑
boolean black = imClientBlacklistService.isBeBlack(imClientSender.getId(), membersList.get(0).getFkClientId());
if (black) {
log.info("你把对方拉黑了");
// 响应发送方
WsResponseModel<HashMap<String, Long>> responseModel = new WsResponseModel<>();
ApiResult<Boolean> result = ApiResult.result(ApiCode.IS_TO_BLACK);
responseModel.setCmd(WsResponseCmdEnum.RES.getCmdCode());
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
responseModel.setReqId(receiveVO.getReqId());
// writeDataService.write(responseModel, appKey, clientUniId);
channelSender.sendMsgLocal(channel, responseModel);
return true;
}
return false;
}
}
...@@ -5,6 +5,9 @@ import org.springframework.context.ApplicationContext; ...@@ -5,6 +5,9 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.List;
/** /**
* @Description 在非spring管理的类中获取spring注册的bean * @Description 在非spring管理的类中获取spring注册的bean
* @Author hewei hwei1233@163.com * @Author hewei hwei1233@163.com
...@@ -23,6 +26,10 @@ public class SpringBeanUtils implements ApplicationContextAware { ...@@ -23,6 +26,10 @@ public class SpringBeanUtils implements ApplicationContextAware {
return applicationContext.getBean(clazz); return applicationContext.getBean(clazz);
} }
public static <T> Collection<T> getBeansOfType(Class<T> clazz) {
return applicationContext.getBeansOfType(clazz).values();
}
@Override @Override
public void setApplicationContext(ApplicationContext context) throws BeansException { public void setApplicationContext(ApplicationContext context) throws BeansException {
......
...@@ -16,11 +16,6 @@ import io.geekidea.springbootplus.framework.common.api.ApiResult; ...@@ -16,11 +16,6 @@ import io.geekidea.springbootplus.framework.common.api.ApiResult;
public interface RtcService { public interface RtcService {
/** /**
* 客户端离线
*/
void clientOffline(String appKey, String clientId);
/**
* 创建一个频道,并向接收方发送系统推送 * 创建一个频道,并向接收方发送系统推送
*/ */
ApiResult<CreateRtcChannelResult> createAndCall(CreateRtcChannelParam createRtcChannelParam) throws JsonProcessingException; ApiResult<CreateRtcChannelResult> createAndCall(CreateRtcChannelParam createRtcChannelParam) throws JsonProcessingException;
......
...@@ -12,6 +12,7 @@ import com.wecloud.im.param.rtc.RejectRtcChannelParam; ...@@ -12,6 +12,7 @@ import com.wecloud.im.param.rtc.RejectRtcChannelParam;
import com.wecloud.im.param.rtc.SdpForwardParam; import com.wecloud.im.param.rtc.SdpForwardParam;
import com.wecloud.im.service.ImApplicationService; import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientService; import com.wecloud.im.service.ImClientService;
import com.wecloud.im.ws.cache.UserStateListener;
import com.wecloud.im.ws.manager.ChannelManager; import com.wecloud.im.ws.manager.ChannelManager;
import com.wecloud.rtc.entity.response.RtcCallResponse; import com.wecloud.rtc.entity.response.RtcCallResponse;
import com.wecloud.rtc.entity.response.RtcCandidateForwardResponse; import com.wecloud.rtc.entity.response.RtcCandidateForwardResponse;
...@@ -32,7 +33,7 @@ import java.util.List; ...@@ -32,7 +33,7 @@ import java.util.List;
@Slf4j @Slf4j
@Service @Service
public class RtcServiceImpl implements RtcService { public class RtcServiceImpl extends UserStateListener implements RtcService {
@Autowired @Autowired
private ImApplicationService imApplicationService; private ImApplicationService imApplicationService;
...@@ -53,8 +54,12 @@ public class RtcServiceImpl implements RtcService { ...@@ -53,8 +54,12 @@ public class RtcServiceImpl implements RtcService {
private ChannelManager channelManager; private ChannelManager channelManager;
@Override @Override
public void clientOffline(String appKey, String clientId) { public void onLineEvent(String appKey, String clientId, String longChannelId) {
// nothing need to do
}
@Override
public void offlineEvent(String appKey, String clientId, String longChannelId) {
// 根据appKey查询appid // 根据appKey查询appid
ImApplication imApplication = imApplicationService.getOneByAppKey(appKey); ImApplication imApplication = imApplicationService.getOneByAppKey(appKey);
ImClient client = imClientService.getCacheImClient(imApplication.getId(), clientId); ImClient client = imClientService.getCacheImClient(imApplication.getId(), clientId);
...@@ -69,7 +74,6 @@ public class RtcServiceImpl implements RtcService { ...@@ -69,7 +74,6 @@ public class RtcServiceImpl implements RtcService {
leaveRtcChannelParam.setChannelId(listByClientId); leaveRtcChannelParam.setChannelId(listByClientId);
// websocket离线逻辑 服务端踢出频道 // websocket离线逻辑 服务端踢出频道
this.leave(leaveRtcChannelParam, client, imApplication); this.leave(leaveRtcChannelParam, client, imApplication);
} }
@Override @Override
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<!-- 通用查询结果列 --> <!-- 通用查询结果列 -->
<sql id="Base_Column_List"> <sql id="Base_Column_List">
id id
, create_time, update_time, last_message, fk_appid, creator, name, attributes, system_flag , create_time, update_time, last_message, member_count, is_thousand, fk_appid, creator, name, attributes, system_flag
</sql> </sql>
<select id="getImConversationById" resultType="com.wecloud.im.param.ImConversationQueryVo"> <select id="getImConversationById" resultType="com.wecloud.im.param.ImConversationQueryVo">
...@@ -107,4 +107,12 @@ ...@@ -107,4 +107,12 @@
HAVING members_count = 2 LIMIT 1 HAVING members_count = 2 LIMIT 1
</select> </select>
<update id="addMemberCount">
update im_conversation set member_count = member_count + #{addCount} where fk_appid = #{appId} and id = #{conversationId}
</update>
<update id="updateMemberCount">
update im_conversation set is_thousand = 1 where fk_appid = #{appId} and id = #{conversationId}
</update>
</mapper> </mapper>
...@@ -217,3 +217,7 @@ FOREIGN_KEY_CHECKS = 1; ...@@ -217,3 +217,7 @@ FOREIGN_KEY_CHECKS = 1;
-- 在feature-cluster 2021年12月22日之后,需要执行的的sql增量脚本 -- 在feature-cluster 2021年12月22日之后,需要执行的的sql增量脚本
ALTER TABLE `im_conversation` CHANGE COLUMN `system` `system_flag` tinyint(1) NULL DEFAULT NULL COMMENT '可选 对话类型标志,是否是系统对话,后面会说明。' AFTER `attributes`; ALTER TABLE `im_conversation` CHANGE COLUMN `system` `system_flag` tinyint(1) NULL DEFAULT NULL COMMENT '可选 对话类型标志,是否是系统对话,后面会说明。' AFTER `attributes`;
ALTER TABLE `im_message` CHANGE COLUMN `system` `system_flag` tinyint(1) NULL DEFAULT 0 COMMENT '0非系统通知; 1为系统通知' AFTER `event`; ALTER TABLE `im_message` CHANGE COLUMN `system` `system_flag` tinyint(1) NULL DEFAULT 0 COMMENT '0非系统通知; 1为系统通知' AFTER `event`;
-- 在feature-cluster 2022年1月4日之后,需要执行的sql增量脚本
ALTER TABLE im_conversation`
ADD COLUMN `member_count` int NULL COMMENT '群成员数' AFTER `last_message`,
ADD COLUMN `is_thousand` tinyint NULL COMMENT '是否万人群' AFTER `member_count`;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment