Commit 74045943 by Future

发消息性能优化

parent beb6bbdb
...@@ -182,31 +182,32 @@ public class NormalChatAction { ...@@ -182,31 +182,32 @@ public class NormalChatAction {
final Boolean isPush = ehcacheService.getIsPush("push" + conversation.getId()); final Boolean isPush = ehcacheService.getIsPush("push" + conversation.getId());
// 多线程处理消息下发 // 多线程处理消息下发
long time5 = System.currentTimeMillis(); long time5 = System.currentTimeMillis();
// 构造发送响应
WsResponse<ImMessageOnlineSend> responseModel = new WsResponse<>();
responseModel.setCmd(WsResponseCmdEnum.ONLINE_MSG.getCmdCode());
responseModel.setCode(200);
responseModel.setMsg(null);
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
for (ImConversationMembers member : membersList) { for (ImConversationMembers member : membersList) {
if (member.getFkClientId().equals(imClientSender.getId())) { if (member.getFkClientId().equals(imClientSender.getId())) {
// 不给自己发 // 不给自己发
continue; continue;
} }
SendMsgThreadPool.SEND_MSG_THREAD_POOL_EXECUTOR.execute(() -> { SendMsgThreadPool.SEND_MSG_THREAD_POOL_EXECUTOR.execute(() -> {
this.sendMsgToMember(imApplication, member, imMessageOnlineSend, data.getPush(), isPush); this.sendMsgToMember(imApplication, member, responseModel, data.getPush(), isPush);
}); });
} }
log.info("会话id{} 多线程发消息耗时 {}", conversation.getId(), System.currentTimeMillis()-time5); log.info("会话id{} 多线程发消息耗时 {}", conversation.getId(), System.currentTimeMillis()-time5);
// 响应发送方消息id等信息 // 响应发送方消息id等信息
long time6 = System.currentTimeMillis();
response(reqId, imMessageOnlineSend, request.getSenderChannel()); response(reqId, imMessageOnlineSend, request.getSenderChannel());
log.info("会话id{} 构造响应耗时 {}", conversation.getId(), System.currentTimeMillis()-time6);
} }
private void sendMsgToMember(ImApplication imApplication, ImConversationMembers member, ImMessageOnlineSend imMessageOnlineSend, PushVO push, Boolean isPush) { private void sendMsgToMember(ImApplication imApplication, ImConversationMembers member, WsResponse<ImMessageOnlineSend> responseModel, PushVO push, Boolean isPush) {
// 在线用户直接发消息 // 在线用户直接发消息
sendMsgForOnline(member.getFkClientId(), responseModel);
long time1 = System.currentTimeMillis();
sendMsgForOnline(member.getFkClientId(), imMessageOnlineSend);
log.info("会话id{} 在线发消息耗时 {} isPush {}", imMessageOnlineSend.getConversationId(), System.currentTimeMillis()-time1, isPush);
if (isPush && !member.getDoNotDisturb()) { if (isPush && !member.getDoNotDisturb()) {
long time2 = System.currentTimeMillis();
try { try {
// 异步推送系统通知消息 5分钟内推一次消息 // 异步推送系统通知消息 5分钟内推一次消息
PushDTO pushDTO = mqSender.buildPushDto(push, member.getFkClientId(), member.getClientId(), imApplication); PushDTO pushDTO = mqSender.buildPushDto(push, member.getFkClientId(), member.getClientId(), imApplication);
...@@ -216,7 +217,6 @@ public class NormalChatAction { ...@@ -216,7 +217,6 @@ public class NormalChatAction {
} catch (Exception e) { } catch (Exception e) {
log.info("发送系统消息失败异常 ", e); log.info("发送系统消息失败异常 ", e);
} }
log.info("会话id{} mq推送耗时 {}", imMessageOnlineSend.getConversationId(), System.currentTimeMillis()-time2);
} }
} }
...@@ -224,17 +224,9 @@ public class NormalChatAction { ...@@ -224,17 +224,9 @@ public class NormalChatAction {
* 发送消息给在线客户 * 发送消息给在线客户
* *
* @param receiverClientId * @param receiverClientId
* @param imMessageOnlineSend * @param responseModel
*/ */
private Boolean sendMsgForOnline(Long receiverClientId, ImMessageOnlineSend imMessageOnlineSend) { private Boolean sendMsgForOnline(Long receiverClientId, WsResponse<ImMessageOnlineSend> responseModel) {
// 封装要推给接收方的消息
WsResponse<ImMessageOnlineSend> responseModel = new WsResponse<>();
responseModel.setCmd(WsResponseCmdEnum.ONLINE_MSG.getCmdCode());
responseModel.setCode(200);
responseModel.setMsg(null);
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
return channelSender.sendMsg(responseModel, receiverClientId); return channelSender.sendMsg(responseModel, receiverClientId);
} }
...@@ -280,10 +272,9 @@ public class NormalChatAction { ...@@ -280,10 +272,9 @@ public class NormalChatAction {
*/ */
private void response(String reqId, ImMessageOnlineSend imMessageOnlineSend, Channel channel) { private void response(String reqId, ImMessageOnlineSend imMessageOnlineSend, Channel channel) {
WsResponse<MsgVo> responseModel = new WsResponse<>(); WsResponse<MsgVo> responseModel = new WsResponse<>();
ApiResult<Boolean> result = ApiResult.result(ApiCode.SUCCESS);
responseModel.setCmd(WsResponseCmdEnum.RES.getCmdCode()); responseModel.setCmd(WsResponseCmdEnum.RES.getCmdCode());
responseModel.setCode(result.getCode()); responseModel.setCode(200);
responseModel.setMsg(result.getMessage()); responseModel.setMsg(null);
responseModel.setData(new MsgVo(imMessageOnlineSend.getMsgId(), imMessageOnlineSend.getPreMessageId(), imMessageOnlineSend.getCreateTime())); responseModel.setData(new MsgVo(imMessageOnlineSend.getMsgId(), imMessageOnlineSend.getPreMessageId(), imMessageOnlineSend.getCreateTime()));
responseModel.setReqId(reqId); responseModel.setReqId(reqId);
// 响应发送方 // 响应发送方
......
...@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; ...@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.wecloud.im.entity.ImClient; import com.wecloud.im.entity.ImClient;
import com.wecloud.im.entity.ImConversationMembers; import com.wecloud.im.entity.ImConversationMembers;
import com.wecloud.im.param.ImConversationQueryVo; import com.wecloud.im.param.ImConversationQueryVo;
import com.wecloud.im.ws.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -11,6 +12,7 @@ import org.springframework.cache.Cache; ...@@ -11,6 +12,7 @@ import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager; import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.List; import java.util.List;
...@@ -35,20 +37,25 @@ public class EhcacheService { ...@@ -35,20 +37,25 @@ public class EhcacheService {
@Autowired @Autowired
private ImConversationMembersService imConversationMembersService; private ImConversationMembersService imConversationMembersService;
@Autowired
private RedisUtils redisUtils;
/** /**
* 本地缓存中获取 * 缓存中获取
* *
* @param key * @param key
* @return * @return
*/ */
public Boolean getIsPush(String key) { public Boolean getIsPush(String key) {
try { try {
Cache cache = cacheManager.getCache("push"); String value = redisUtils.getKey(key);
String value = cache.get(key, String.class);
if (StringUtils.isNotBlank(value)) { if (StringUtils.isNotBlank(value)) {
// 缓存中有数据 不做系统消息推送
return false; return false;
} else { } else {
cache.put(key, "1"); // 缓存中无数据, 设置缓存 3分钟, 并且需要推送
redisUtils.addKey(key, "1", Duration.ofMinutes(3));
return true; return true;
} }
} catch (Exception e) { } catch (Exception e) {
......
<ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="../config/ehcache.xsd">
<diskStore path="java.io.tmpdir"/>
<defaultCache
maxElementsInMemory="10000"
eternal="false"
timeToIdleSeconds="120"
timeToLiveSeconds="120"
maxElementsOnDisk="10000000"
diskExpiryThreadIntervalSeconds="120"
memoryStoreEvictionPolicy="LRU">
<persistence strategy="localTempSwap"/>
</defaultCache>
<cache name="business"
maxElementsInMemory="100000"
eternal="false"
timeToIdleSeconds="86400"
timeToLiveSeconds="86400"
maxElementsOnDisk="10000000"
diskExpiryThreadIntervalSeconds="120"
memoryStoreEvictionPolicy="LRU">
<persistence strategy="localTempSwap"/>
</cache>
<cache name="push"
maxElementsInMemory="10000"
eternal="false"
timeToIdleSeconds="180"
timeToLiveSeconds="180"
maxElementsOnDisk="10000000"
diskExpiryThreadIntervalSeconds="120"
memoryStoreEvictionPolicy="LRU">
<persistence strategy="localTempSwap"/>
</cache>
</ehcache>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment