Commit ee454a17 by Future

性能优化

parent 6722c11c
......@@ -7,7 +7,12 @@ import com.google.common.collect.Lists;
import com.wecloud.dispatch.annotation.ActionMapping;
import com.wecloud.dispatch.common.BaseRequest;
import com.wecloud.dispatch.extend.ActionRequest;
import com.wecloud.im.entity.*;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.entity.ImConversationMembers;
import com.wecloud.im.entity.ImInbox;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.entity.ImMessageOnlineSend;
import com.wecloud.im.friend.entity.ImFriend;
import com.wecloud.im.friend.service.ImFriendService;
import com.wecloud.im.mq.MqSender;
......@@ -19,7 +24,13 @@ import com.wecloud.im.sdk.enums.ChatTypeEnum;
import com.wecloud.im.sdk.enums.FriendStateEnum;
import com.wecloud.im.sdk.enums.GroupRoleEnum;
import com.wecloud.im.sdk.enums.MutedEnum;
import com.wecloud.im.service.*;
import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientBlacklistService;
import com.wecloud.im.service.ImClientService;
import com.wecloud.im.service.ImConversationMembersService;
import com.wecloud.im.service.ImConversationService;
import com.wecloud.im.service.ImInboxService;
import com.wecloud.im.service.ImMessageService;
import com.wecloud.im.ws.enums.MsgTypeEnum;
import com.wecloud.im.ws.enums.WsResponseCmdEnum;
import com.wecloud.im.ws.model.WsResponse;
......@@ -40,7 +51,12 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
......@@ -119,7 +135,7 @@ public class NormalChatAction {
channelSender.sendMsgLocal((NioSocketChannel) request.getSenderChannel(), responseModel);
return;
}
Map<Long, ImConversationMembers> memberMap = membersList.stream().collect(Collectors.toMap(ImConversationMembers::getFkClientId, member -> member));
// 判断为单聊
if (membersList.size() == 2) {
......@@ -136,38 +152,28 @@ public class NormalChatAction {
if (beKickOut(reqId, imClientSender, membersList, request.getSenderChannel())) {
return;
}
if (muted(conversation, reqId, imClientSender, membersList, request.getSenderChannel())) {
if (muted(conversation, reqId, imClientSender, membersList, memberMap, request.getSenderChannel())) {
return;
}
if (baned(conversation, reqId, imClientSender, membersList, request.getSenderChannel())) {
return;
}
// 消息内容校验
if (!checkMsg(imClientSender, conversation, reqId, data, request.getSenderChannel())) {
if (!checkMsg(imClientSender, conversation, reqId, data, request.getSenderChannel(), memberMap)) {
return;
}
ImMessageOnlineSend imMessageOnlineSend = assembleImMessageOnlineSend(data, imClientSender, imApplication.getId());
// 再给所有人发 todo 需要改成批量
for (ImConversationMembers conversationMembers : membersList) {
// 查询接收方
ImClient imClientReceiver = imClientService.getOne(new QueryWrapper<ImClient>().lambda()
.eq(ImClient::getFkAppid, imApplication.getId())
.eq(ImClient::getId, conversationMembers.getFkClientId()));
if (imClientReceiver == null) {
continue;
}
if (imClientReceiver.getId().equals(imClientSender.getId())) {
for (ImConversationMembers member : membersList) {
if (member.getFkClientId().equals(imClientSender.getId())) {
// 不给自己发
continue;
}
// 入库 保存收件箱
saveImInbox(imApplication, conversation, imMessageOnlineSend.getMsgId(),
conversationMembers, SnowflakeUtil.getId());
member, SnowflakeUtil.getId());
// 入库成功后 判断是否是临时会话 如果是,双方会话display状态是否是1(显示),如果不是,需要修改为是
if (ChatTypeEnum.TEMP.getCode().equals(conversation.getChatType())) {
......@@ -179,11 +185,11 @@ public class NormalChatAction {
}
// 在线用户直接发消息
sendMsgForOnline(imClientReceiver.getId(), imMessageOnlineSend);
Boolean sendSuccess = sendMsgForOnline(member.getFkClientId(), imMessageOnlineSend);
if (!conversationMembers.getDoNotDisturb()) {
if (!sendSuccess && !member.getDoNotDisturb()) {
// 异步推送系统通知消息
PushDTO pushDTO = mqSender.buildPushDto(data.getPush(), imClientReceiver, imApplication);
PushDTO pushDTO = mqSender.buildPushDto(data.getPush(), member.getFkClientId(), member.getClientId(), imApplication);
if (pushDTO != null) {
mqSender.orderSend(MqConstant.Topic.IM_ORDER_MSG_TOPIC, MqConstant.Tag.IM_ORDER_MSG_TAG, pushDTO);
}
......@@ -200,7 +206,7 @@ public class NormalChatAction {
* @param receiverClientId
* @param imMessageOnlineSend
*/
private void sendMsgForOnline(Long receiverClientId, ImMessageOnlineSend imMessageOnlineSend) {
private Boolean sendMsgForOnline(Long receiverClientId, ImMessageOnlineSend imMessageOnlineSend) {
// 封装要推给接收方的消息
WsResponse<ImMessageOnlineSend> responseModel = new WsResponse<>();
responseModel.setCmd(WsResponseCmdEnum.ONLINE_MSG.getCmdCode());
......@@ -209,7 +215,7 @@ public class NormalChatAction {
responseModel.setMsg(result.getMessage());
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
channelSender.sendMsg(responseModel, receiverClientId);
return channelSender.sendMsg(responseModel, receiverClientId);
}
/**
......@@ -478,10 +484,10 @@ public class NormalChatAction {
* @param channel
* @return 是-ture 否-false
*/
private boolean muted(ImConversationQueryVo conversation, String reqId, ImClient imClientSender, List<ImConversationMembers> membersList, Channel channel) {
private boolean muted(ImConversationQueryVo conversation, String reqId, ImClient imClientSender, List<ImConversationMembers> membersList, Map<Long, ImConversationMembers> memberMap, Channel channel) {
Long senderId = imClientSender.getId();
if (imConversationService.isBelongToRole(imClientSender.getClientId(), conversation.getId(),
Lists.newArrayList(GroupRoleEnum.OWNER.getCode(), GroupRoleEnum.ADMIN.getCode()))) {
if (Lists.newArrayList(GroupRoleEnum.OWNER.getCode(), GroupRoleEnum.ADMIN.getCode())
.contains(memberMap.get(senderId).getRole())) {
// 当前操作人属于群主或管理人员 - 不做禁言处理
return false;
}
......@@ -552,10 +558,9 @@ public class NormalChatAction {
* @Return
*/
private boolean checkMsg(ImClient sender, ImConversationQueryVo conversation, String reqId,
ChatContentVo data, Channel channel) {
if (imConversationService.isBelongToRole(sender.getClientId(), conversation.getId(),
Lists.newArrayList(GroupRoleEnum.OWNER.getCode(), GroupRoleEnum.ADMIN.getCode()))) {
ChatContentVo data, Channel channel, Map<Long, ImConversationMembers> memberMap) {
if (Lists.newArrayList(GroupRoleEnum.OWNER.getCode(), GroupRoleEnum.ADMIN.getCode())
.contains(memberMap.get(sender.getId()).getRole())) {
// 当前操作人属于群主或管理人员 - 不做处理
return true;
}
......
......@@ -67,7 +67,7 @@ public class FriendEventSender {
PushVO pushVO = new PushVO();
pushVO.setTitle(FRIEND_APPLY_TITLE);
pushVO.setSubTitle(FRIEND_APPLY_TITLE_SUB);
PushDTO pushDTO = mqSender.buildPushDto(pushVO, receiveClient, app);
PushDTO pushDTO = mqSender.buildPushDto(pushVO, receiveClient.getId(), receiveClient.getClientId(), app);
if (pushDTO != null) {
mqSender.orderSend(MqConstant.Topic.IM_ORDER_MSG_TOPIC, MqConstant.Tag.IM_ORDER_MSG_TAG, pushDTO);
}
......@@ -96,7 +96,7 @@ public class FriendEventSender {
PushVO pushVO = new PushVO();
pushVO.setTitle(FRIEND_APPROVE_TITLE);
pushVO.setSubTitle(isAgree ? FRIEND_APPROVE_TITLE_AGREE : FRIEND_APPROVE_TITLE_REJECT);
PushDTO pushDTO = mqSender.buildPushDto(pushVO, receiveClient, app);
PushDTO pushDTO = mqSender.buildPushDto(pushVO, receiveClient.getId(), receiveClient.getClientId(), app);
if (pushDTO != null) {
mqSender.orderSend(MqConstant.Topic.IM_ORDER_MSG_TOPIC, MqConstant.Tag.IM_ORDER_MSG_TAG, pushDTO);
}
......
......@@ -86,8 +86,7 @@ public class MqSender {
}
}
public PushDTO buildPushDto(PushVO pushVO, ImClient imClientReceiver, ImApplication imApplication) {
public PushDTO buildPushDto(PushVO pushVO, Long fkClientId, String clientId, ImApplication imApplication) {
PushDTO pushDTO = new PushDTO();
MessageDTO messageDTO = new MessageDTO();
if (pushVO == null) {
......@@ -102,12 +101,12 @@ public class MqSender {
// int badge = imInboxService.countMyNotReadCount(imClientReceiver.getId());
// messageDTO.setBadge(badge);
ClientDTO clientDTO = new ClientDTO();
clientDTO.setClientId(imClientReceiver.getClientId());
clientDTO.setClientId(clientId);
List<ImClientDevice> clientDeviceList = imClientDeviceService.list(
new QueryWrapper<ImClientDevice>().lambda()
.eq(ImClientDevice::getFkClientId, imClientReceiver.getId()));
.eq(ImClientDevice::getFkClientId, fkClientId));
if (CollectionUtils.isEmpty(clientDeviceList)) {
log.info("接收人信息 {} 查无推送设备", imClientReceiver.getId());
log.info("接收人信息 {} 查无推送设备", fkClientId);
return null;
}
List<ClientDeviceDTO> deviceDTOList = Lists.newArrayList();
......
......@@ -13,7 +13,7 @@ public interface RouterSendService {
* @param platform
* @param msg
*/
void sendMsgRemote(Long clientId, Integer platform, String msg);
Boolean sendMsgRemote(Long clientId, Integer platform, String msg);
/**
* 通过rpc调用 批量发送,解决channel不在本机时调用
......
......@@ -18,9 +18,9 @@ public class RouterSendServiceImpl implements RouterSendService {
private ChannelSender channelSender;
@Override
public void sendMsgRemote(Long clientId, Integer platform, String msg) {
public Boolean sendMsgRemote(Long clientId, Integer platform, String msg) {
log.info("接收到rpc调用,本地ip {}", IpUtil.getLocalhostIp());
channelSender.sendMsgLocal(clientId, platform, msg);
return channelSender.sendMsgLocal(clientId, platform, msg);
}
@Override
......
......@@ -342,7 +342,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
channelSender.sendMsg(responseModel, imClientReceiver.getId());
// 异步推送系统通知消息
PushDTO pushDTO = mqSender.buildPushDto(imMsgRecall.getPush(), imClientReceiver, imApplication);
PushDTO pushDTO = mqSender.buildPushDto(imMsgRecall.getPush(), imClientReceiver.getId(), imClientReceiver.getClientId(), imApplication);
if (pushDTO != null) {
mqSender.orderSend(MqConstant.Topic.IM_ORDER_MSG_TOPIC, MqConstant.Tag.IM_ORDER_MSG_TAG, pushDTO);
}
......@@ -1160,7 +1160,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
ImClient receiver = new ImClient();
receiver.setId(members.getFkClientId());
receiver.setClientId(members.getClientId());
PushDTO pushDTO = mqSender.buildPushDto(pushVO, receiver, application);
PushDTO pushDTO = mqSender.buildPushDto(pushVO, receiver.getId(), receiver.getClientId(), application);
if (pushDTO != null) {
mqSender.orderSend(MqConstant.Topic.IM_ORDER_MSG_TOPIC, MqConstant.Tag.IM_ORDER_MSG_TAG, pushDTO);
}
......
......@@ -132,7 +132,7 @@ public class ThousandChatAction {
continue;
}
// 异步推送系统通知消息
PushDTO pushDTO = mqSender.buildPushDto(data.getPush(), imClientReceiver, imApplication);
PushDTO pushDTO = mqSender.buildPushDto(data.getPush(), imClientReceiver.getId(), imClientReceiver.getClientId(), imApplication);
if (pushDTO != null) {
mqSender.orderSend(MqConstant.Topic.IM_ORDER_MSG_TOPIC, MqConstant.Tag.IM_ORDER_MSG_TAG, pushDTO);
}
......
......@@ -11,6 +11,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dubbo.config.annotation.DubboReference;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
......@@ -130,13 +131,17 @@ public class ChannelSender {
* @param responseModel
* @param toClientId
*/
public void sendMsg(WsResponse responseModel, Long toClientId) {
public Boolean sendMsg(WsResponse responseModel, Long toClientId) {
String msgJson = JsonUtils.encodeJson(responseModel);
List<ClientChannelInfo> channelInfos = userStateCacheManager.findOnlineInfosByClientId(toClientId);
// log.info("获取在线用户入参 {}, 结果 {}", toClientId, JSON.toJSONString(channelInfos));
if (CollectionUtils.isEmpty(channelInfos)) {
return false;
}
Boolean sendSuccess = false;
// 一个用户存在多端的情况,所以先进行分类,key是ip地址,value是channel的列表
Map<String, List<ClientChannelInfo>> ipChannels = channelInfos.stream().collect(Collectors.groupingBy(ClientChannelInfo::getLanIp));
......@@ -152,7 +157,7 @@ public class ChannelSender {
// log.info("在线用户入参 {}, 具体ip结果 {}", toClientId, JSON.toJSONString(channelInfoEntry));
for(ClientChannelInfo clientChannelInfo : channelInfoEntry.getValue()) {
// log.info("客户端 {}, 推送消息内容 {}", toClientId, msgJson);
this.sendMsgLocal(toClientId, clientChannelInfo.getPlatform(), msgJson);
sendSuccess = this.sendMsgLocal(toClientId, clientChannelInfo.getPlatform(), msgJson);
}
continue;
}
......@@ -165,7 +170,7 @@ public class ChannelSender {
RpcContext.getContext().setObjectAttachment("address", address);
// log.info("dubbo指定ip调用 {}", JSON.toJSONString(address));
try {
routerSendService.sendMsgRemote(toClientId, clientChannelInfo.getPlatform(), msgJson);
sendSuccess = routerSendService.sendMsgRemote(toClientId, clientChannelInfo.getPlatform(), msgJson);
} catch (RpcException exception) {
// do nothing is ok
tryFindAndSendAndRefresh(toClientId, clientChannelInfo.getPlatform(), msgJson);
......@@ -173,7 +178,7 @@ public class ChannelSender {
}
}
}
return sendSuccess;
}
/**
......
......@@ -146,7 +146,7 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
// 异步推送系统通知消息
ImApplication imApplication = contextService.getImApplicationIfNotNullOrThrow(currentClient.getFkAppid());
PushDTO pushDTO = mqSender.buildPushDto(new PushVO("音视频通话", "点击查看", null), toClient, imApplication);
PushDTO pushDTO = mqSender.buildPushDto(new PushVO("音视频通话", "点击查看", null), toClient.getId(), toClient.getClientId(), imApplication);
if (pushDTO != null) {
mqSender.orderSend(MqConstant.Topic.IM_ORDER_MSG_TOPIC, MqConstant.Tag.IM_ORDER_MSG_TAG, pushDTO);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment