Commit fc359073 by Future

保存消息逻辑

parent 81c41b05
...@@ -180,10 +180,4 @@ public interface ImMessageService extends BaseService<ImMessage> { ...@@ -180,10 +180,4 @@ public interface ImMessageService extends BaseService<ImMessage> {
List<WeCloudMessageVo> syncListMessage(SyncListMessageParam param); List<WeCloudMessageVo> syncListMessage(SyncListMessageParam param);
/**
* 保存消息到数据库
* @param imMessage
*/
void saveMessageToDb(ImMessage imMessage);
} }
package com.wecloud.im.service;
import com.baomidou.lock.annotation.Lock4j;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.entity.ImConversationMembers;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.param.ImConversationQueryVo;
import com.wecloud.im.ws.utils.RedisUtils;
import com.wecloud.utils.SnowflakeUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.Date;
import java.util.List;
/**
* @Author Future
* @Date 2022/8/24 17:18
* @Description 本地缓存服务
*/
@Slf4j
@Service
public class SaveMessageService {
private static final String LAST_MSG_PREFIX = "last_msg_";
@Autowired
private ImMessageService imMessageService;
@Autowired
private RedisUtils redisUtils;
/**
* 持久化消息到数据库并更新相关缓存
* expire 锁过期时间 防止死锁
*
* @param imMessage
*/
@Lock4j(keys = {"#imMessage.fkConversationId"}, expire = 3000, acquireTimeout = 1000)
public void saveMessageToDb(ImMessage imMessage) {
imMessage.setId(SnowflakeUtil.getId());
imMessage.setCreateTime(new Date());
this.setLastMessageIdAndFreshCash(imMessage);
imMessageService.save(imMessage);
}
/**
* 查找会话的最新一条消息
*
* @param message
* @return
*/
private void setLastMessageIdAndFreshCash(ImMessage message) {
try {
// 执行业务方法
String key = LAST_MSG_PREFIX + message.getFkConversationId();
String messageId = redisUtils.getKey(key);
if (StringUtils.isNotBlank(messageId)) {
// 将Redis中最新的messageId存入
message.setPreMessageId(Long.valueOf(messageId));
} else {
// 落库查
ImMessage preMessage = imMessageService.getOne(new QueryWrapper<ImMessage>().lambda()
.eq(ImMessage::getFkConversationId, message.getFkConversationId())
.orderByDesc(ImMessage::getId)
.last("limit 1")
);
if (preMessage != null) {
message.setPreMessageId(preMessage.getId());
}
}
redisUtils.addKey(key, message.getId().toString(), Duration.ofMinutes(30));
} catch (Exception e) {
log.info("获取会话最后一条消息异常 ", e);
}
}
}
...@@ -62,6 +62,7 @@ import com.wecloud.im.service.ImConversationMembersService; ...@@ -62,6 +62,7 @@ import com.wecloud.im.service.ImConversationMembersService;
import com.wecloud.im.service.ImConversationService; import com.wecloud.im.service.ImConversationService;
import com.wecloud.im.service.ImInboxService; import com.wecloud.im.service.ImInboxService;
import com.wecloud.im.service.ImMessageService; import com.wecloud.im.service.ImMessageService;
import com.wecloud.im.service.SaveMessageService;
import com.wecloud.im.vo.ChatRoomMemberVo; import com.wecloud.im.vo.ChatRoomMemberVo;
import com.wecloud.im.vo.ConversationCountVo; import com.wecloud.im.vo.ConversationCountVo;
import com.wecloud.im.vo.ConversationMemberVo; import com.wecloud.im.vo.ConversationMemberVo;
...@@ -162,6 +163,9 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -162,6 +163,9 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
@Resource @Resource
private EhcacheService ehcacheService; private EhcacheService ehcacheService;
@Autowired
private SaveMessageService saveMessageService;
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public ImConversationCreateVo createImConversation(ImConversationCreate imConversationCreate) { public ImConversationCreateVo createImConversation(ImConversationCreate imConversationCreate) {
...@@ -341,7 +345,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -341,7 +345,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
content.put("operator", imClientSender.getClientId()); //操作的client ID content.put("operator", imClientSender.getClientId()); //操作的client ID
content.put("passivityOperator", clientToConversation.getClientId()); //被操作的client ID content.put("passivityOperator", clientToConversation.getClientId()); //被操作的client ID
ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.INVITE_CLIENT_JOIN_CONVERSATION, imApplication, createClient, imConversation, JsonUtils.encodeJson(content)); ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.INVITE_CLIENT_JOIN_CONVERSATION, imApplication, createClient, imConversation, JsonUtils.encodeJson(content));
boolean save = imMessageService.save(imMessage); saveMessageService.saveMessageToDb(imMessage);
// 发送给在群内的成员 // 发送给在群内的成员
sendMsgToMembers(imConversation, membersList, createClient, imMessage, content); sendMsgToMembers(imConversation, membersList, createClient, imMessage, content);
// 发送给被邀请人 // 发送给被邀请人
...@@ -403,7 +407,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -403,7 +407,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
content.put("passivityOperator", clientToBeRemove.getClientId()); //被操作的client ID content.put("passivityOperator", clientToBeRemove.getClientId()); //被操作的client ID
ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.REMOVE_CLIENT_CONVERSATION, imApplication, createClient, imConversation, JsonUtils.encodeJson(content)); ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.REMOVE_CLIENT_CONVERSATION, imApplication, createClient, imConversation, JsonUtils.encodeJson(content));
imMessageService.save(imMessage); saveMessageService.saveMessageToDb(imMessage);
membersList.removeIf(e -> e.getId().equals(members.getId())); membersList.removeIf(e -> e.getId().equals(members.getId()));
...@@ -451,7 +455,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -451,7 +455,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
Map<String, Object> content = new HashMap<>(); Map<String, Object> content = new HashMap<>();
content.put("operator", currentClient.getClientId()); content.put("operator", currentClient.getClientId());
ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.CONVERSATION_DISBAND, imApplication, currentClient, imConversation, JsonUtils.encodeJson(content)); ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.CONVERSATION_DISBAND, imApplication, currentClient, imConversation, JsonUtils.encodeJson(content));
imMessageService.save(imMessage); saveMessageService.saveMessageToDb(imMessage);
sendMsgToMembers(imConversation, membersList, currentClient, imMessage, content); sendMsgToMembers(imConversation, membersList, currentClient, imMessage, content);
} }
...@@ -527,11 +531,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -527,11 +531,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
// 生成消息id // 生成消息id
ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.LEAVE_CONVERSATION, imApplication, currentClient, imConversation, ""); ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.LEAVE_CONVERSATION, imApplication, currentClient, imConversation, "");
// 保存消息至消息表 // 保存消息至消息表
boolean save = imMessageService.save(imMessage); saveMessageService.saveMessageToDb(imMessage);
if (!save) {
throw new BusinessException("退出群聊错误");
}
sendMsgToMembers(imConversation, membersList, currentClient, imMessage, null); sendMsgToMembers(imConversation, membersList, currentClient, imMessage, null);
// 群主退出 转移给下一个人 // 群主退出 转移给下一个人
if (GroupRoleEnum.OWNER.getCode().equals(members.getRole())) { if (GroupRoleEnum.OWNER.getCode().equals(members.getRole())) {
...@@ -1022,7 +1022,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -1022,7 +1022,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
if (!membersList.isEmpty()) { if (!membersList.isEmpty()) {
// 保存事件消息 // 保存事件消息
ImMessage imMessage = MessageBuilder.buildEventMessage(msgType, application, operatorClient, group, JsonUtils.encodeJson(content)); ImMessage imMessage = MessageBuilder.buildEventMessage(msgType, application, operatorClient, group, JsonUtils.encodeJson(content));
imMessageService.save(imMessage); saveMessageService.saveMessageToDb(imMessage);
// 发送消息至群成员 // 发送消息至群成员
sendMsgToMembers(group, membersList, operatorClient, imMessage, content); sendMsgToMembers(group, membersList, operatorClient, imMessage, content);
} }
...@@ -1541,7 +1541,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -1541,7 +1541,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
content.put("operator", creator.getClientId()); //操作的client ID content.put("operator", creator.getClientId()); //操作的client ID
content.put("passivityOperator", member.getClientId()); //被操作的client ID content.put("passivityOperator", member.getClientId()); //被操作的client ID
ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.CLIENT_JOIN_NEW_CONVERSATION, application, creator, conversation, JsonUtils.encodeJson(content)); ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.CLIENT_JOIN_NEW_CONVERSATION, application, creator, conversation, JsonUtils.encodeJson(content));
imMessageService.save(imMessage); saveMessageService.saveMessageToDb(imMessage);
// 投递消息 // 投递消息
couriers.deliver(imMessage, content, creator, member, WsResponseCmdEnum.CONVERSATION_EVENT_MSG); couriers.deliver(imMessage, content, creator, member, WsResponseCmdEnum.CONVERSATION_EVENT_MSG);
} }
......
package com.wecloud.im.service.impl; package com.wecloud.im.service.impl;
import com.wecloud.im.service.SaveMessageService;
import io.geekidea.springbootplus.framework.common.exception.BusinessException; import io.geekidea.springbootplus.framework.common.exception.BusinessException;
import io.geekidea.springbootplus.framework.shiro.util.SecurityUtils; import io.geekidea.springbootplus.framework.shiro.util.SecurityUtils;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
...@@ -12,6 +13,7 @@ import java.util.List; ...@@ -12,6 +13,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
...@@ -86,10 +88,8 @@ public class ImGroupServiceImpl implements ImGroupService { ...@@ -86,10 +88,8 @@ public class ImGroupServiceImpl implements ImGroupService {
*/ */
private final ImConversationMapper imConversationMapper; private final ImConversationMapper imConversationMapper;
/** @Autowired
* 消息服务 private SaveMessageService saveMessageService;
*/
private final ImMessageService imMessageService;
@Override @Override
public Long createGroup(String creatorClientId, String groupName, List<String> memberIds) { public Long createGroup(String creatorClientId, String groupName, List<String> memberIds) {
...@@ -163,7 +163,7 @@ public class ImGroupServiceImpl implements ImGroupService { ...@@ -163,7 +163,7 @@ public class ImGroupServiceImpl implements ImGroupService {
ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.CONVERSATION_DISBAND, application, ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.CONVERSATION_DISBAND, application,
operator, conversation, JsonUtils.encodeJson(content)); operator, conversation, JsonUtils.encodeJson(content));
// 保存消息至消息表 // 保存消息至消息表
imMessageService.save(imMessage); saveMessageService.saveMessageToDb(imMessage);
conversationService.sendMsgToMembers(conversation, membersList, operator, imMessage, content); conversationService.sendMsgToMembers(conversation, membersList, operator, imMessage, content);
return true; return true;
...@@ -236,7 +236,7 @@ public class ImGroupServiceImpl implements ImGroupService { ...@@ -236,7 +236,7 @@ public class ImGroupServiceImpl implements ImGroupService {
content.put("passivityOperator", newMember.getClientId()); content.put("passivityOperator", newMember.getClientId());
ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.INVITE_CLIENT_JOIN_CONVERSATION, ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.INVITE_CLIENT_JOIN_CONVERSATION,
imApplication, inviter, conversation, JsonUtils.encodeJson(content)); imApplication, inviter, conversation, JsonUtils.encodeJson(content));
boolean save = imMessageService.save(imMessage); saveMessageService.saveMessageToDb(imMessage);
// 发送给在群内的成员 // 发送给在群内的成员
conversationService.sendMsgToMembers(conversation, oldMembers, inviter, imMessage, content); conversationService.sendMsgToMembers(conversation, oldMembers, inviter, imMessage, content);
// 发送给被邀请人 // 发送给被邀请人
...@@ -286,7 +286,7 @@ public class ImGroupServiceImpl implements ImGroupService { ...@@ -286,7 +286,7 @@ public class ImGroupServiceImpl implements ImGroupService {
content.put("operator", operator.getClientId()); content.put("operator", operator.getClientId());
// 自己主动退出 // 自己主动退出
ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.LEAVE_CONVERSATION, imApplication, operator, conversation, JsonUtils.encodeJson(content)); ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.LEAVE_CONVERSATION, imApplication, operator, conversation, JsonUtils.encodeJson(content));
imMessageService.save(imMessage); saveMessageService.saveMessageToDb(imMessage);
// 发送给在群内的成员 // 发送给在群内的成员
conversationService.sendMsgToMembers(conversation, membersList, operator, imMessage, content); conversationService.sendMsgToMembers(conversation, membersList, operator, imMessage, content);
} else { } else {
...@@ -294,7 +294,7 @@ public class ImGroupServiceImpl implements ImGroupService { ...@@ -294,7 +294,7 @@ public class ImGroupServiceImpl implements ImGroupService {
// 被操作的client ID // 被操作的client ID
content.put("passivityOperator", members.getClientId()); content.put("passivityOperator", members.getClientId());
ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.REMOVE_CLIENT_CONVERSATION, imApplication, operator, conversation, JsonUtils.encodeJson(content)); ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.REMOVE_CLIENT_CONVERSATION, imApplication, operator, conversation, JsonUtils.encodeJson(content));
imMessageService.save(imMessage); saveMessageService.saveMessageToDb(imMessage);
// 发送给在群内的成员 // 发送给在群内的成员
conversationService.sendMsgToMembers(conversation, membersList, operator, imMessage, content); conversationService.sendMsgToMembers(conversation, membersList, operator, imMessage, content);
} }
...@@ -394,7 +394,7 @@ public class ImGroupServiceImpl implements ImGroupService { ...@@ -394,7 +394,7 @@ public class ImGroupServiceImpl implements ImGroupService {
Map<String, Object> content = new HashMap<>(); Map<String, Object> content = new HashMap<>();
content.put("operator", newGroupOwner.getClientId()); content.put("operator", newGroupOwner.getClientId());
ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.CONVERSATION_NEW_CREATOR, imApplication, newGroupOwnerClient, conversation, JsonUtils.encodeJson(content)); ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.CONVERSATION_NEW_CREATOR, imApplication, newGroupOwnerClient, conversation, JsonUtils.encodeJson(content));
imMessageService.save(imMessage); saveMessageService.saveMessageToDb(imMessage);
// 发送给在群内的成员 // 发送给在群内的成员
List<ImConversationMembers> existMemberList = List<ImConversationMembers> existMemberList =
conversationMembersService.list(Wrappers.<ImConversationMembers>lambdaQuery() conversationMembersService.list(Wrappers.<ImConversationMembers>lambdaQuery()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment