Commit 344d2d5e by Future

添加消息链

parent 4ec6121e
......@@ -29,7 +29,7 @@ spring:
redis:
database: 0
host: 127.0.0.1
password:
# password:
port: 6379
# database: 0
......
......@@ -164,6 +164,13 @@
<version>4.0.0</version>
</dependency>
<!-- 注解版分布式锁 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>lock4j-redisson-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
</dependencies>
<!-- <build>-->
<!-- <resources>-->
......
......@@ -185,7 +185,7 @@ public class NormalChatAction {
}
// 响应发送方消息id等信息
response(reqId, imMessageOnlineSend.getMsgId(), request.getSenderChannel());
response(reqId, imMessageOnlineSend, request.getSenderChannel());
}
private void sendMsgToMember(ImApplication imApplication, ImConversationMembers member, ImMessageOnlineSend imMessageOnlineSend, PushVO push, Boolean isPush) {
......@@ -235,6 +235,7 @@ public class NormalChatAction {
// 封装响应的实体
ImMessageOnlineSend imMessageOnlineSend = new ImMessageOnlineSend();
imMessageOnlineSend.setMsgId(imMessage.getId());
imMessageOnlineSend.setPreMessageId(imMessage.getPreMessageId());
imMessageOnlineSend.setSender(imClientSender.getClientId());
Map<String, Object> content = JsonUtils.beanCopyDeep(data, Map.class);
//action的属性无需要返回
......@@ -255,16 +256,16 @@ public class NormalChatAction {
* 响应发送方消息id等信息
*
* @param reqId
* @param messageId
* @param imMessageOnlineSend
* @param channel
*/
private void response(String reqId, long messageId, Channel channel) {
private void response(String reqId, ImMessageOnlineSend imMessageOnlineSend, Channel channel) {
WsResponse<MsgVo> responseModel = new WsResponse<>();
ApiResult<Boolean> result = ApiResult.result(ApiCode.SUCCESS);
responseModel.setCmd(WsResponseCmdEnum.RES.getCmdCode());
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
responseModel.setData(new MsgVo(messageId));
responseModel.setData(new MsgVo(imMessageOnlineSend.getMsgId(), imMessageOnlineSend.getPreMessageId(), imMessageOnlineSend.getCreateTime()));
responseModel.setReqId(reqId);
// 响应发送方
channelSender.sendMsgLocal((NioSocketChannel) channel, responseModel);
......
......@@ -154,7 +154,7 @@ public class ChatRoomAction {
responseModel.setCmd(WsResponseCmdEnum.RES.getCmdCode());
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
responseModel.setData(new MsgVo(messageId));
responseModel.setData(new MsgVo(messageId, null, new Date()));
responseModel.setReqId(reqId);
// 响应发送方
channelSender.sendMsgLocal((NioSocketChannel) channel, responseModel);
......
......@@ -35,6 +35,9 @@ public class ImMessageOnlineSend extends BaseEntity {
@ApiModelProperty("消息id")
private Long msgId;
@ApiModelProperty("前一条消息的id")
private Long preMessageId;
@ApiModelProperty("创建时间")
private Date createTime;
......
......@@ -6,6 +6,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
/**
* @Description TODO
......@@ -16,5 +17,16 @@ import java.io.Serializable;
@AllArgsConstructor
@NoArgsConstructor
public class MsgVo implements Serializable {
private Long msgId;
/**
* 上一条消息id
*/
private Long preMessageId;
/**
* 消息创建时间
*/
private Date createTime;
}
......@@ -42,7 +42,8 @@ public class Couriers {
ImMessageOnlineSend imMessageOnlineSend = new ImMessageOnlineSend();
imMessageOnlineSend.setMsgId(message.getId());
imMessageOnlineSend.setCreateTime(new Date());
imMessageOnlineSend.setPreMessageId(message.getPreMessageId());
imMessageOnlineSend.setCreateTime(message.getCreateTime());
imMessageOnlineSend.setType(message.getMsgType());
imMessageOnlineSend.setSender(sender.getClientId());
imMessageOnlineSend.setContent(content);
......
......@@ -54,16 +54,13 @@ public class MessageBuilder {
* @param type
* @param application
* @param sender
* @param receiver
* @Return
*/
public static ImMessage buildEventMessage(MsgTypeEnum type, ImApplication application,
ImClient sender, ImConversation conversation,
String content) {
long messageId = SnowflakeUtil.getId();
ImMessage imMessage = new ImMessage();
// 保存消息至消息表
imMessage.setId(messageId);
imMessage.setMsgType(type.getUriCode());
imMessage.setFkAppid(application.getId());
imMessage.setSender(sender.getId());
......@@ -73,7 +70,6 @@ public class MessageBuilder {
imMessage.setSystemFlag(false);
imMessage.setSendStatus(2);
imMessage.setFkConversationId(conversation.getId());
imMessage.setCreateTime(new Date());
return imMessage;
}
......
......@@ -47,10 +47,6 @@ public interface ImMessageService extends BaseService<ImMessage> {
*/
ApiResult<Boolean> restApiImMessageSend(ImMsgSendToOnlineClient imMsgSendToOnlineClient, ImApplication imApplication);
ImMessage saveImMessage(ImApplication imApplication, ImClient imClientSender, long messageId, ReceiveVO receiveVO, ReceiveDataVO sysParam);
@Transactional(rollbackFor = Exception.class)
ImMessage saveImMessage(ImClientSimpleDto client, ChatContentVo data);
......@@ -188,4 +184,11 @@ public interface ImMessageService extends BaseService<ImMessage> {
* @Return
*/
Boolean singleUserNotification(SingleUserNotificationParam param);
/**
* 保存消息到数据库
* @param imMessage
*/
void saveMessageToDb(ImMessage imMessage);
}
package com.wecloud.im.service.impl;
import com.wecloud.im.entity.ImClientBlacklist;
import com.wecloud.im.param.ImConversationQueryVo;
import com.wecloud.im.post.MessageBuilder;
import io.geekidea.springbootplus.framework.common.api.ApiCode;
import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.common.exception.BusinessException;
import io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl;
import io.geekidea.springbootplus.framework.core.pagination.PageInfo;
import io.geekidea.springbootplus.framework.core.pagination.Paging;
import io.geekidea.springbootplus.framework.shiro.util.SecurityUtils;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.lock.annotation.Lock4j;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.metadata.OrderItem;
......@@ -69,6 +37,7 @@ import com.wecloud.im.param.add.ImMsgRecall;
import com.wecloud.im.param.add.ImMsgSendToOnlineClient;
import com.wecloud.im.param.add.ImMsgUpdate;
import com.wecloud.im.param.add.ServerImConversationCreate;
import com.wecloud.im.post.MessageBuilder;
import com.wecloud.im.sdk.enums.ChatTypeEnum;
import com.wecloud.im.service.ContextService;
import com.wecloud.im.service.ImApplicationService;
......@@ -89,10 +58,39 @@ import com.wecloud.im.ws.model.request.PushVO;
import com.wecloud.im.ws.model.request.ReceiveDataVO;
import com.wecloud.im.ws.model.request.ReceiveVO;
import com.wecloud.im.ws.sender.ChannelSender;
import com.wecloud.im.ws.utils.RedisUtils;
import com.wecloud.pushserver.client.model.constant.MqConstant;
import com.wecloud.pushserver.client.model.dto.PushDTO;
import com.wecloud.utils.JsonUtils;
import com.wecloud.utils.SnowflakeUtil;
import io.geekidea.springbootplus.framework.common.api.ApiCode;
import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.common.exception.BusinessException;
import io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl;
import io.geekidea.springbootplus.framework.core.pagination.PageInfo;
import io.geekidea.springbootplus.framework.core.pagination.Paging;
import io.geekidea.springbootplus.framework.shiro.util.SecurityUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
......@@ -105,6 +103,8 @@ import com.wecloud.utils.SnowflakeUtil;
@Service
public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMessage> implements ImMessageService {
private static final String LAST_MSG_PREFIX = "last_msg_";
@Autowired
private ImMessageMapper imMessageMapper;
......@@ -138,6 +138,9 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private RedisUtils redisUtils;
@Override
@Transactional(rollbackFor = Exception.class)
public ApiResult<Boolean> restApiImMessageSend(ImMsgSendToOnlineClient imMsgSendToOnlineClient, ImApplication imApplication) {
......@@ -192,34 +195,6 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
}
@Override
@Transactional(rollbackFor = Exception.class)
public ImMessage saveImMessage(ImApplication imApplication, ImClient imClientSender, long messageId, ReceiveVO receiveVO, ReceiveDataVO sysParam) {
ImMessage imMessage = new ImMessage();
// 数据库字段类型为JSON格式
// 因mysql关系型数据库非MongoDB文档类型数据库,第三方应用拓展的自定义参数名和值需使用json格式落库
String contentJsonString = JsonUtils.encodeJson(receiveVO.getData());
imMessage.setContent(contentJsonString);
imMessage.setId(messageId);
imMessage.setCreateTime(new Date());
imMessage.setFkAppid(imApplication.getId());
imMessage.setSender(imClientSender.getId());
imMessage.setWithdraw(false);
imMessage.setEvent(false);
imMessage.setSystemFlag(false);
imMessage.setSendStatus(2);
imMessage.setMsgType(sysParam.getType());
imMessage.setFkConversationId(sysParam.getToConversation());
this.save(imMessage);
// eventPublisher.publishEvent(new ClientSendMessageEvent(imApplication.getId(), imMessage));
return imMessage;
}
@Override
public ImMessage saveImMessage(ImClientSimpleDto client, ChatContentVo data) {
ImMessage imMessage = new ImMessage();
......@@ -228,9 +203,6 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
// 因mysql关系型数据库非MongoDB文档类型数据库,第三方应用拓展的自定义参数名和值需使用json格式落库
String contentJsonString = JsonUtils.encodeJson(data);
imMessage.setContent(contentJsonString);
imMessage.setId(SnowflakeUtil.getId());
imMessage.setCreateTime(new Date());
imMessage.setFkAppid(client.getFkAppid());
imMessage.setSender(client.getId());
imMessage.setWithdraw(false);
......@@ -240,7 +212,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
imMessage.setMsgType(data.getType());
imMessage.setAt(data.getAt());
imMessage.setFkConversationId(data.getToConversation());
this.save(imMessage);
this.saveMessageToDb(imMessage);
// eventPublisher.publishEvent(new ClientSendMessageEvent(client.getFkAppid(), imMessage));
......@@ -283,7 +255,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
content.put("msgOwner", msgOwner.getClientId());
ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.MSG_WITHDRAW, imApplication,
imClientSender, new ImConversation().setId(messageById.getFkConversationId()), JsonUtils.encodeJson(content));
this.save(imMessage);
this.saveMessageToDb(imMessage);
if (saveOk) {
// 查询该会话所有成员
......@@ -373,7 +345,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
content.put("msgOwner", msgOwner.getClientId());
ImMessage imMessage = MessageBuilder.buildEventMessage(MsgTypeEnum.MSG_DELETE, imApplication,
imClientSender, new ImConversation().setId(message.getFkConversationId()), JsonUtils.encodeJson(content));
this.save(imMessage);
this.saveMessageToDb(imMessage);
if (saveOk) {
// 查询该会话所有成员
......@@ -631,7 +603,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
ImMessage message = assembleImMessage(appId, sender, conversation.getId(), param.getMessageType(), false,
JSONObject.toJSONString(param.getContent()));
// 持久化
this.save(message);
this.saveMessageToDb(message);
// 拼装发送消息体
ImMessageOnlineSend imMessageOnlineSend = assembleImMessageOnlineSend(message, sender, appId);
// 入库 保存收件箱
......@@ -729,7 +701,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
}
}
// 持久化
this.save(message);
this.saveMessageToDb(message);
// 拼装发送消息体
ImMessageOnlineSend imMessageOnlineSend = assembleImMessageOnlineSend(message, sender, appId);
......@@ -770,7 +742,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
// 组装消息
ImMessage message = assembleImMessage(appId, sender, conversation.getId(), param.getMessageType(), false, JSONObject.toJSONString(param.getContent()));
// 持久化
this.save(message);
this.saveMessageToDb(message);
// 拼装发送消息体
ImMessageOnlineSend imMessageOnlineSend = assembleImMessageOnlineSend(message, sender, appId);
......@@ -820,7 +792,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
ImMessage message = assembleImMessage(appId, sender, conversation.getId(), param.getMessageType(), true,
JSONObject.toJSONString(param.getContent()));
// 持久化
this.save(message);
this.saveMessageToDb(message);
// 拼装发送消息体
ImMessageOnlineSend imMessageOnlineSend = assembleImMessageOnlineSend(message, sender, appId);
......@@ -871,7 +843,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
ImMessage message = assembleImMessage(application.getId(), sender, conversation.getId(), param.getMessageType(),
false, param.getContent());
// 持久化
this.save(message);
this.saveMessageToDb(message);
// 拼装发送消息体
ImMessageOnlineSend imMessageOnlineSend = assembleImMessageOnlineSend(message, sender, application.getId());
......@@ -898,6 +870,53 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
return true;
}
/**
* 持久化消息到数据库并更新相关缓存
* expire 锁过期时间 防止死锁
*
* @param imMessage
*/
@Lock4j(keys = {"#imMessage.fkConversationId"}, expire = 3000, acquireTimeout = 1000)
@Override
public void saveMessageToDb(ImMessage imMessage) {
imMessage.setId(SnowflakeUtil.getId());
imMessage.setCreateTime(new Date());
this.setLastMessageIdAndFreshCash(imMessage);
this.save(imMessage);
}
/**
* 查找会话的最新一条消息
*
* @param message
* @return
*/
private void setLastMessageIdAndFreshCash(ImMessage message) {
try {
// 执行业务方法
String key = LAST_MSG_PREFIX + message.getFkConversationId();
String messageId = redisUtils.getKey(key);
if (StringUtils.isNotBlank(messageId)) {
// 将Redis中最新的messageId存入
message.setPreMessageId(Long.valueOf(messageId));
} else {
// 落库查
ImMessage preMessage = imMessageMapper.selectOne(new QueryWrapper<ImMessage>().lambda()
.eq(ImMessage::getFkConversationId, message.getFkConversationId())
.orderByDesc(ImMessage::getId)
.last("limit 1")
);
if (preMessage != null) {
message.setPreMessageId(preMessage.getId());
}
}
redisUtils.addKey(key, message.getId().toString(), Duration.ofMinutes(30));
} catch (Exception e) {
log.info("获取会话最后一条消息异常 ", e);
}
}
/**
*
* @Author luozh
......@@ -1067,13 +1086,8 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
*/
private ImMessage assembleImMessage(Long appId, ImClient sender, Long toConversationId, String messageType,
Boolean event, String content) {
long messageId = SnowflakeUtil.getId();
ImMessage imMessage = new ImMessage();
imMessage.setContent(content);
imMessage.setId(messageId);
imMessage.setCreateTime(new Date());
imMessage.setFkAppid(appId);
imMessage.setSender(sender.getId());
imMessage.setWithdraw(false);
......
......@@ -139,7 +139,7 @@ public class ThousandChatAction {
}
// 响应发送方消息id等信息
response(reqId, imMessageOnlineSend.getMsgId(), request.getSenderChannel());
response(reqId, imMessageOnlineSend, request.getSenderChannel());
}
......@@ -200,6 +200,7 @@ public class ThousandChatAction {
// 封装响应的实体
ImMessageOnlineSend imMessageOnlineSend = new ImMessageOnlineSend();
imMessageOnlineSend.setMsgId(imMessage.getId());
imMessageOnlineSend.setPreMessageId(imMessage.getPreMessageId());
imMessageOnlineSend.setSender(imClientSender.getClientId());
Map<String, Object> content = JsonUtils.beanCopyDeep(data, Map.class);
//action的属性无需要返回
......@@ -220,16 +221,16 @@ public class ThousandChatAction {
* 响应发送方消息id等信息
*
* @param reqId
* @param messageId
* @param imMessageOnlineSend
* @param channel
*/
private void response(String reqId, long messageId, Channel channel) {
private void response(String reqId, ImMessageOnlineSend imMessageOnlineSend, Channel channel) {
WsResponse<MsgVo> responseModel = new WsResponse<>();
ApiResult<Boolean> result = ApiResult.result(ApiCode.SUCCESS);
responseModel.setCmd(WsResponseCmdEnum.RES.getCmdCode());
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
responseModel.setData(new MsgVo(messageId));
responseModel.setData(new MsgVo(imMessageOnlineSend.getMsgId(), imMessageOnlineSend.getPreMessageId(), imMessageOnlineSend.getCreateTime()));
responseModel.setReqId(reqId);
// 响应发送方
channelSender.sendMsgLocal((NioSocketChannel)channel, responseModel);
......
......@@ -29,6 +29,9 @@ public class OfflineMsgDto implements Serializable {
@ApiModelProperty("消息id")
private Long msgId;
@ApiModelProperty("前一条消息的id")
private Long preMessageId;
@ApiModelProperty("创建时间")
private Date createTime;
......
......@@ -35,6 +35,7 @@
<select id="findOfflineMsgs" resultType="com.wecloud.im.thousandchat.param.ThousandOffineMsgDto">
<foreach collection="lastestMsgs" item="lastestMsg" separator=" union all ">
select msg.id AS msgId,
msg.pre_message_id AS preMessageId,
msg.create_time,
msg.withdraw_time,
msg.update_date,
......
-- 在feature-cluster 2021年12月22日之后,需要执行的的sql增量脚本
-- 在feature-cluster 2021年12月22日之后,需要执行的的sql增量脚本
......@@ -210,3 +210,6 @@ ALTER TABLE `im_message` ADD INDEX `idx_sender`(`sender`);
ALTER TABLE `im_message` ADD INDEX `idx_create_time`(`create_time`);
ALTER TABLE im_message
ADD COLUMN `pre_message_id` bigint(20) DEFAULT NULL COMMENT '前一条消息的id';
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment