Commit 61f36d6a by lixiaozhong

1、实现万人群已读回执,

2、实现万人群已接收回执,
2、实现万人群离线消息拉取功能
parent d8313c69
......@@ -47,7 +47,7 @@ public class ImMessageOnlineSend extends BaseEntity {
private Boolean event;
@ApiModelProperty("0非系统通知; 1为系统通知")
private Boolean system;
private Boolean systemFlag;
@ApiModelProperty("at他人,传入客户端id数组")
private String at;
......
......@@ -21,6 +21,7 @@ import com.wecloud.im.service.ImClientService;
import com.wecloud.im.service.ImConversationMembersService;
import com.wecloud.im.service.ImConversationService;
import com.wecloud.im.service.ImMessageService;
import com.wecloud.im.thousandchat.service.ThousandChatService;
import com.wecloud.im.vo.ImMessageOfflineListVo;
import com.wecloud.im.vo.OfflineMsgDto;
import com.wecloud.im.ws.enums.WsResponseCmdEnum;
......@@ -36,6 +37,8 @@ import io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl;
import io.geekidea.springbootplus.framework.core.pagination.PageInfo;
import io.geekidea.springbootplus.framework.core.pagination.Paging;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
......@@ -77,6 +80,9 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
@Autowired
private ChannelSender channelSender;
@Autowired
private ThousandChatService thousandChatService;
@Override
@Transactional(rollbackFor = Exception.class)
public ApiResult<Boolean> restApiImMessageSend(ImMsgSendToOnlineClient imMsgSendToOnlineClient, ImApplication imApplication) {
......@@ -333,9 +339,16 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
// 获取加入的所有会话
List<ImConversation> myImConversationList = imConversationService.getMyImConversationList();
List<ImConversation> thousandConversations = new ArrayList<>();
// 遍历会话列表, 查询每个会话列表的离线消息
for (ImConversation imConversation : myImConversationList) {
//万人群 暂时跳过,后面统一处理
if(BooleanUtils.isTrue(imConversation.getIsThousand())) {
thousandConversations.add(imConversation);
continue;
}
//根据客户端id与会话id 查询离线消息
List<OfflineMsgDto> offlineListByClientAndConversation = this.getOfflineListByClientAndConversation(client.getId(), imConversation.getId());
......@@ -348,6 +361,11 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
imMessageOfflineListVo.setConversationId(imConversation.getId());
imMessageOfflineListVoList.add(imMessageOfflineListVo);
}
// 万人群的部分
if(CollectionUtils.isNotEmpty(thousandConversations)) {
List<ImMessageOfflineListVo> offlineMsgs = thousandChatService.findOfflineMsgs(thousandConversations);
imMessageOfflineListVoList.addAll(offlineMsgs);
}
return imMessageOfflineListVoList;
}
......
package com.wecloud.im.thousandchat.cache;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.service.ImClientService;
import com.wecloud.im.service.ImConversationMembersService;
import com.wecloud.im.ws.cache.UserStateListener;
import com.wecloud.im.ws.utils.InitIp;
......@@ -11,18 +8,20 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 人群的缓存(本地缓存待实现)
* 人群的缓存(本地缓存待实现)
* @author lixiaozhong
* @date 2022年01月4日 17:00:00
*/
@Service
@Slf4j
public class GroupCacheManager extends UserStateListener {
public class ThousandChatCacheManager extends UserStateListener {
/**
* redis的群键 key
......@@ -61,5 +60,21 @@ public class GroupCacheManager extends UserStateListener {
return redisUtils.hashgetll(GROUP_KEY + conversionId);
}
/**
* 根据群ID 获取 万人群的 在线成员的ip以及对应的成员信息,key-vaul
* @param conversionId
* @return 在线成员的key-val,其中key是 ip地址, val是 client的主键id:platform
*/
public Map<String, List<String>> findOnlineHostsByThousandGroupId(Long conversionId) {
Map<String /** client的主键ID:platform **/, String /** ip **/> onlineClientIpMap = findOnlineClientsByThousandGroupId(conversionId);
Map<String /** ip **/, List<String /** client的主键ID:platform **/>> onlineIpClientMap = new HashMap<>();
onlineClientIpMap.forEach((clientIdAndPlatforms, ip) -> {
onlineIpClientMap.putIfAbsent(ip, new ArrayList<>());
onlineIpClientMap.get(ip).add(clientIdAndPlatforms);
});
return onlineIpClientMap;
}
}
package com.wecloud.im.thousandchat.controller;
import com.wecloud.im.thousandchat.param.LastestReceivedMsg;
import com.wecloud.im.thousandchat.service.ThousandChatService;
import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.common.controller.BaseController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 万人群消息处理controller
*/
@Slf4j
@RestController
@RequestMapping("/imState")
@Api(value = "消息收件箱表API", tags = {"消息收件箱表"})
public class ThousandChatController extends BaseController {
@Autowired
private ThousandChatService thousandChatService;
/**
* 消息修改为已接收状态
*/
@PostMapping("/msgReceivedUpdate")
@ApiOperation(value = "消息修改为已接收状态")
public ApiResult<Boolean> updateImMsgReceived(@RequestBody @Validated LastestReceivedMsg lastestReceivedMsg) {
return thousandChatService.updateImMsgReceived(lastestReceivedMsg);
}
/**
* 消息修改为已读状态
*/
@PostMapping("/msgReadUpdate")
@ApiOperation(value = "消息修改为已读状态")
public ApiResult<Boolean> updateInMsgReadUpdate(Long lastestMsgId) {
return thousandChatService.updateImMsgRead(lastestMsgId);
}
}
package com.wecloud.im.thousandchat.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.geekidea.springbootplus.framework.common.entity.BaseEntity;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import javax.validation.constraints.NotNull;
import java.util.Date;
/**
* @author lixiaozhong
* 消息最新已读寻址表
*/
@Data
@Accessors(chain = true)
@EqualsAndHashCode(callSuper = true)
@TableName("im_msg_read_lastest")
@ApiModel(value = "ImMsgReadLastest对象")
public class ImMsgReadLastest extends BaseEntity {
private static final long serialVersionUID = 1L;
@NotNull(message = "收件id不能为空")
@ApiModelProperty("收件id")
@TableId(value = "id", type = IdType.INPUT)
private Long id;
@ApiModelProperty("创建时间")
private Date createTime;
@ApiModelProperty("修改时间")
private Date updateTime;
@ApiModelProperty("读取时间")
private Date readTime;
@ApiModelProperty("接收时间")
private Date receiveTime;
@NotNull(message = "应用appid不能为空")
@ApiModelProperty("应用appid")
private Long fkAppid;
@NotNull(message = "接收客户端id不能为空")
@ApiModelProperty("接收客户端id")
private Long fkClientId;
@NotNull(message = "会话id不能为空")
@ApiModelProperty("会话id")
private Long fkConversationId;
@ApiModelProperty("最后一条已接收消息id")
private Long fkReceiveMsgId;
@ApiModelProperty("最后一条已读消息id")
private Long fkReadMsgId;
}
package com.wecloud.im.thousandchat.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.wecloud.im.entity.ImInbox;
import com.wecloud.im.param.ImInboxPageParam;
import com.wecloud.im.param.ImInboxQueryVo;
import com.wecloud.im.thousandchat.entity.ImMsgReadLastest;
import com.wecloud.im.thousandchat.param.ConversationLastestReadMsg;
import com.wecloud.im.thousandchat.param.ThousandOffineMsgDto;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;
import java.io.Serializable;
import java.util.List;
/**
* @author lixiaozhong
* 主要针对万人群的消息状态处理dao类
*/
@Repository
public interface ImMsgReadLastestMapper extends BaseMapper<ImMsgReadLastest> {
/**
* 获取分页对象
*
* @param page
* @param imInboxPageParam
* @return
*/
IPage<ImInboxQueryVo> getImInboxPageList(@Param("page") Page page, @Param("param") ImInboxPageParam imInboxPageParam);
/**
* 万人群的消息已送达的插入/更新
* @param lastestRead
* @return
*/
Long upsertImMsgReceive(ImMsgReadLastest lastestRead);
/**
* 万人群消息已读的插入/更新
* @param lastestRead
* @return
*/
Long upsertImMsgRead(ImMsgReadLastest lastestRead);
/**
* 查找最新的已读记录,如果查回来是空,表示会话消息全未读
* @param clientId
* @return
*/
List<ConversationLastestReadMsg> findLastestReadMsgs(@Param("clientId") Long clientId);
/**
* 查找万人群的离线消息
* @param allConversationOfLastestReadMsgs
*/
List<ThousandOffineMsgDto> findOfflineMsgs(@Param("lastestMsgs") List<ConversationLastestReadMsg> allConversationOfLastestReadMsgs);
}
package com.wecloud.im.thousandchat.param;
import io.geekidea.springbootplus.framework.common.entity.BaseEntity;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import javax.validation.constraints.NotNull;
/**
* @author lixiaozhong
* 最新一条只读消息
*/
@Data
@Accessors(chain = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "ConversationLastestReadMsg")
public class ConversationLastestReadMsg extends BaseEntity {
private static final long serialVersionUID = 1L;
@ApiModelProperty(value = "最新的一条已读消息")
private Long lastestReadMsgId;
@ApiModelProperty(value = "最新的一条已接收消息")
private Long lastestReceiveMsgId;
@ApiModelProperty(value = "会话id")
private Long conversationId;
}
package com.wecloud.im.thousandchat.param;
import io.geekidea.springbootplus.framework.common.entity.BaseEntity;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
/**
* @author lixiaozhong
* 最后一条已接收消息
*/
@Data
@Accessors(chain = true)
@EqualsAndHashCode(callSuper = true)
@ApiModel(value = "ImMsgReceivedStatusUpdate")
public class LastestReceivedMsg extends BaseEntity {
private static final long serialVersionUID = 1L;
@ApiModelProperty(value = "消息id,只需要发送最新的一条已接收消息", required = true)
@NotNull(message = "消息ID不能为空")
private Long lastestMsgId;
@ApiModelProperty(value = "是否同时修改为已读状态", required = false)
private Boolean readStatus = false;
}
package com.wecloud.im.thousandchat.param;
import com.wecloud.im.vo.OfflineMsgDto;
import lombok.Data;
/**
* @Description 万人群离线消息dto类
* @Author lixiaozhong
* @Date 2022/1/11 10:00 上午
*/
@Data
public class ThousandOffineMsgDto extends OfflineMsgDto {
/**
* 会话id
*/
private Long conversationId;
}
package com.wecloud.im.thousandchat.service;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.entity.ImConversation;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.entity.ImMessageOnlineSend;
import com.wecloud.im.service.ImClientService;
import com.wecloud.im.service.ImMessageService;
import com.wecloud.im.thousandchat.cache.ThousandChatCacheManager;
import com.wecloud.im.thousandchat.entity.ImMsgReadLastest;
import com.wecloud.im.thousandchat.mapper.ImMsgReadLastestMapper;
import com.wecloud.im.thousandchat.param.ConversationLastestReadMsg;
import com.wecloud.im.thousandchat.param.LastestReceivedMsg;
import com.wecloud.im.thousandchat.param.ThousandOffineMsgDto;
import com.wecloud.im.vo.ImMessageOfflineListVo;
import com.wecloud.im.vo.OfflineMsgDto;
import com.wecloud.im.ws.enums.MsgTypeEnum;
import com.wecloud.im.ws.enums.WsResponseCmdEnum;
import com.wecloud.im.ws.model.WsResponseModel;
import com.wecloud.im.ws.sender.ChannelSender;
import com.wecloud.utils.JsonUtils;
import io.geekidea.springbootplus.framework.common.api.ApiCode;
import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl;
import io.geekidea.springbootplus.framework.shiro.util.SnowflakeUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 消息收件箱表 服务实现类
*
* @author wei
* @since 2021-05-06
*/
@Slf4j
@Service
public class ThousandChatService extends BaseServiceImpl<ImMsgReadLastestMapper, ImMsgReadLastest> {
@Autowired
private ImClientService imClientService;
@Autowired
private ImMessageService imMessageService;
@Autowired
private ChannelSender channelSender;
@Autowired
private ImMsgReadLastestMapper imMsgReadLastestMapper;
@Autowired
private ThousandChatCacheManager thousandChatCacheManager;
/**
* 更新消息已接收,已接收的最新那条作为消息id
* @param lastestReceivedMsg
* @return
*/
public ApiResult<Boolean> updateImMsgReceived(LastestReceivedMsg lastestReceivedMsg) {
ImClient curentClient = imClientService.getCurentClient();
ImMessage lastestMsg = imMessageService.getById(lastestReceivedMsg.getLastestMsgId());
if(lastestMsg == null) {
return ApiResult.ok(false);
}
// 生成消息id
long readLastestId = SnowflakeUtil.getId();
ImMsgReadLastest lastest = new ImMsgReadLastest();
lastest.setId(readLastestId);
lastest.setFkAppid(curentClient.getFkAppid());
lastest.setFkClientId(curentClient.getId());
lastest.setFkConversationId(lastestMsg.getFkConversationId());
lastest.setFkReceiveMsgId(lastestReceivedMsg.getLastestMsgId());
if(BooleanUtils.isTrue(lastestReceivedMsg.getReadStatus())) {
lastest.setFkReadMsgId(lastestReceivedMsg.getLastestMsgId());
}
// 修改已接收状态
imMsgReadLastestMapper.upsertImMsgReceive(lastest);
// 内容
HashMap<String, String> stringStringHashMap = new HashMap<>();
stringStringHashMap.put("type", MsgTypeEnum.CLIENT_RECEIVED_MSG.getUriCode() + "");
stringStringHashMap.put("receiverId", curentClient.getClientId());
// 推送给接收方
sendMsgStatus(curentClient, stringStringHashMap, lastestMsg);
return ApiResult.ok();
}
/**
* 更新消息已读,读到最新的那条作为消息id
* @param lastestMsgId 消息id
* @return
*/
public ApiResult<Boolean> updateImMsgRead(Long lastestMsgId) {
ImClient curentClient = imClientService.getCurentClient();
ImMessage lastestMsg = imMessageService.getById(lastestMsgId);
if(lastestMsg == null) {
return ApiResult.ok(false);
}
// 生成消息id
long readLastestId = SnowflakeUtil.getId();
ImMsgReadLastest lastest = new ImMsgReadLastest();
lastest.setId(readLastestId);
lastest.setFkAppid(curentClient.getFkAppid());
lastest.setFkClientId(curentClient.getId());
lastest.setFkConversationId(lastestMsg.getFkConversationId());
lastest.setFkReadMsgId(lastestMsgId);
// 修改已读状态
imMsgReadLastestMapper.upsertImMsgRead(lastest);
// 内容
HashMap<String, String> stringStringHashMap = new HashMap<>();
stringStringHashMap.put("type", MsgTypeEnum.CLIENT_READ_MSG + "");
stringStringHashMap.put("receiverId", curentClient.getClientId());
sendMsgStatus(curentClient, stringStringHashMap, lastestMsg);
return ApiResult.ok();
}
public List<ImMessageOfflineListVo> findOfflineMsgs(List<ImConversation> thousandConversations) {
if(CollectionUtils.isEmpty(thousandConversations)) {
return new ArrayList<>();
}
ImClient curentClient = imClientService.getCurentClient();
List<ConversationLastestReadMsg> lastestReadMsgs = imMsgReadLastestMapper.findLastestReadMsgs(curentClient.getId());
Map<Long, ConversationLastestReadMsg> conversationMsgMap = lastestReadMsgs.stream().collect(Collectors.toMap(ConversationLastestReadMsg::getConversationId, v->v,(a,b)->a));
List<ConversationLastestReadMsg> allConversationOfLastestReadMsgs = new ArrayList<>();
thousandConversations.forEach(conversation -> {
ConversationLastestReadMsg conversationLastestReadMsg = conversationMsgMap.get(conversation.getId());
if(conversationLastestReadMsg == null) {
ConversationLastestReadMsg msg = new ConversationLastestReadMsg();
msg.setLastestReadMsgId(0L);
msg.setLastestReceiveMsgId(0L);
msg.setConversationId(conversation.getId());
allConversationOfLastestReadMsgs.add(msg);
} else {
allConversationOfLastestReadMsgs.add(conversationLastestReadMsg);
}
});
List<ImMessageOfflineListVo> result = new ArrayList<>();
if(CollectionUtils.isEmpty(allConversationOfLastestReadMsgs)) {
return result;
}
List<ThousandOffineMsgDto> offlineMsgs = imMsgReadLastestMapper.findOfflineMsgs(allConversationOfLastestReadMsgs);
if(CollectionUtils.isEmpty(offlineMsgs)) {
return result;
}
Map<Long, List<ThousandOffineMsgDto>> longListMap = offlineMsgs.stream().collect(Collectors.groupingBy(ThousandOffineMsgDto::getConversationId));
for (Map.Entry<Long, List<ThousandOffineMsgDto>> offlineMsg : longListMap.entrySet()) {
ImMessageOfflineListVo msg = new ImMessageOfflineListVo();
msg.setConversationId(offlineMsg.getKey());
List<OfflineMsgDto> collect = offlineMsg.getValue().stream().map(p -> (OfflineMsgDto) p).collect(Collectors.toList());
msg.setMsgList(collect);
result.add(msg);
}
return result;
}
/**
* 下发状态类型消息
*
* @param curentClient 当前客户端
* @param stringStringHashMap 消息content内容
* @param lastestMsg 最新一条待处理消息
*/
private void sendMsgStatus(ImClient curentClient, HashMap<String, String> stringStringHashMap, ImMessage lastestMsg) {
Map<String /** ip **/, List<String /** client的主键ID:platform **/>> onlineIpClientMap =
thousandChatCacheManager.findOnlineHostsByThousandGroupId(lastestMsg.getFkConversationId());
// 封装响应的实体
ImMessageOnlineSend imMessageOnlineSend = new ImMessageOnlineSend();
imMessageOnlineSend.setMsgId(lastestMsg.getId());
imMessageOnlineSend.setSender(curentClient.getClientId());
imMessageOnlineSend.setContent(stringStringHashMap);
imMessageOnlineSend.setConversationId(lastestMsg.getFkConversationId());
imMessageOnlineSend.setCreateTime(new Date());
imMessageOnlineSend.setWithdraw(false);
imMessageOnlineSend.setEvent(true);
imMessageOnlineSend.setSystemFlag(false);
// 向接收方推送
WsResponseModel<ImMessageOnlineSend> responseModel = new WsResponseModel<>();
responseModel.setCmd(WsResponseCmdEnum.ONLINE_EVENT_MSG.getCmdCode());
ApiResult<Boolean> result = ApiResult.result(ApiCode.SUCCESS);
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
// 遍历发送给该会话的每个在线成员
onlineIpClientMap.forEach((ip, clientIdAndPlatforms) -> {
channelSender.batchSendMsg(responseModel, ip, clientIdAndPlatforms);
});
}
}
......@@ -38,7 +38,7 @@ public class OfflineMsgDto implements Serializable {
private Boolean event;
@ApiModelProperty("0非系统通知; 1为系统通知")
private Boolean system;
private Boolean systemFlag;
@ApiModelProperty("at他人,传入客户端id数组")
private String at;
......
......@@ -106,7 +106,7 @@ public class NormalChatStrategy extends AbstractImCmdStrategy {
imMessageOnlineSend.setWithdrawTime(imMessage.getWithdrawTime());
imMessageOnlineSend.setWithdraw(imMessage.getWithdraw());
imMessageOnlineSend.setEvent(imMessage.getEvent());
imMessageOnlineSend.setSystem(imMessage.getSystemFlag());
imMessageOnlineSend.setSystemFlag(imMessage.getSystemFlag());
imMessageOnlineSend.setAt(imMessage.getAt());
// 遍历发送
......
......@@ -12,7 +12,7 @@ import com.wecloud.im.service.ImClientService;
import com.wecloud.im.service.ImConversationMembersService;
import com.wecloud.im.service.ImInboxService;
import com.wecloud.im.service.ImMessageService;
import com.wecloud.im.thousandchat.cache.GroupCacheManager;
import com.wecloud.im.thousandchat.cache.ThousandChatCacheManager;
import com.wecloud.im.ws.annotation.ImCmdType;
import com.wecloud.im.ws.enums.WsRequestCmdEnum;
import com.wecloud.im.ws.enums.WsResponseCmdEnum;
......@@ -31,7 +31,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
......@@ -68,7 +67,7 @@ public class ThousandChatStrategy extends AbstractImCmdStrategy {
@Autowired
private AsyncPush systemPush;
@Autowired
private GroupCacheManager groupCacheManager;
private ThousandChatCacheManager thousandChatCacheManager;
@Override
public void process(ImApplication imApplication, ImClient imSender, ChannelHandlerContext ctx, ReceiveVO receiveVO) {
......@@ -98,16 +97,12 @@ public class ThousandChatStrategy extends AbstractImCmdStrategy {
imMessageOnlineSend.setWithdrawTime(imMessage.getWithdrawTime());
imMessageOnlineSend.setWithdraw(imMessage.getWithdraw());
imMessageOnlineSend.setEvent(imMessage.getEvent());
imMessageOnlineSend.setSystem(imMessage.getSystemFlag());
imMessageOnlineSend.setSystemFlag(imMessage.getSystemFlag());
imMessageOnlineSend.setAt(imMessage.getAt());
// 在线用户直接发消息
Map<String, String> onlineClientIpMap = groupCacheManager.findOnlineClientsByThousandGroupId(toConversationId);
Map<String /** ip **/, List<String /** client的主键ID:platform **/>> onlineIpClientMap = new HashMap<>();
onlineClientIpMap.forEach((clientIdAndPlatforms, ip) -> {
onlineIpClientMap.putIfAbsent(ip, new ArrayList<>());
onlineIpClientMap.get(ip).add(clientIdAndPlatforms);
});
Map<String /** ip **/, List<String /** client的主键ID:platform **/>> onlineIpClientMap =
thousandChatCacheManager.findOnlineHostsByThousandGroupId(toConversationId);
// 封装要推给接收方的消息
WsResponseModel<ImMessageOnlineSend> responseModel = new WsResponseModel<>();
responseModel.setCmd(WsResponseCmdEnum.ONLINE_MSG.getCmdCode());
......
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.wecloud.im.thousandchat.mapper.ImMsgReadLastestMapper">
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id, create_time, update_time, read_time, receive_time, fk_appid, fk_client_id, fk_conversation_id, fk_read_msg_id, fk_receive_msg_id
</sql>
<update id="upsertImMsgReceive">
insert into im_msg_read_lastest(id, create_time, read_time, receive_time, fk_appid, fk_client_id, fk_conversation_id, fk_read_msg_id, fk_receive_msg_id)
values(#{id}, NOW(), NOW(), NOW(), #{fkAppid}, #{fkClientId}, #{fkConversationId}, #{fkReadMsgId}, #{fkReceiveMsgId})
ON DUPLICATE KEY UPDATE
<if test="fkReadMsgId">
read_time = if(fk_read_msg_id is null or fk_read_msg_id &lt; #{fkReadMsgId}, NOW(), read_time),
fk_read_msg_id = if(fk_read_msg_id is null or fk_read_msg_id &lt; #{fkReadMsgId}, #{fkReadMsgId}, fk_read_msg_id),
</if>
receive_time = if(fk_receive_msg_id is null or fk_receive_msg_id &lt; #{fkReceiveMsgId}, NOW(), receive_time),
fk_receive_msg_id = if(fk_receive_msg_id is null or fk_receive_msg_id &lt; #{fkReceiveMsgId}, #{fkReceiveMsgId}, fk_receive_msg_id)
</update>
<update id="upsertImMsgRead">
insert into im_msg_read_lastest(id, create_time, read_time, fk_appid, fk_client_id, fk_conversation_id, fk_read_msg_id)
values(#{id}, NOW(), NOW(), #{fkAppid}, #{fkClientId}, #{fkConversationId}, #{fkReadMsgId})
ON DUPLICATE KEY UPDATE
read_time = if(fk_read_msg_id is null or fk_read_msg_id &lt; #{fkReadMsgId}, NOW(), read_time),
fk_read_msg_id = if(fk_read_msg_id is null or fk_read_msg_id &lt; #{fkReadMsgId}, #{fkReadMsgId}, fk_read_msg_id)
</update>
<select id="findLastestReadMsgs" resultType="com.wecloud.im.thousandchat.param.ConversationLastestReadMsg">
select fk_conversation_id as conversationId, fk_read_msg_id as lastestReadMsgId, fk_receive_msg_id as lastestReceiveMsgId
from im_msg_read_lastest
where fk_client_id = #{clientId}
</select>
<select id="findOfflineMsgs" resultType="com.wecloud.im.thousandchat.param.ThousandOffineMsgDto">
<foreach collection="lastestMsgs" item="lastestMsg" separator=" union all ">
select msg.id AS msgId,
msg.create_time,
msg.withdraw_time,
msg.update_date,
client.client_id AS sender,
msg.content,
msg.withdraw,
msg.`event`,
msg.system_flag,
msg.`at`,
msg.send_status,
msg.fk_conversation_id as conversationId,
(SELECT COUNT(a.id) FROM im_message a WHERE a.fk_conversation_id = msg.fk_conversation_id and a.id > #{lastestMsg.lastestReadMsgId})
AS not_read_count,
(SELECT COUNT(b.id) FROM im_message b WHERE b.fk_conversation_id = msg.fk_conversation_id and b.id >= msg.id)
AS not_receiver_count
from im_message msg INNER JOIN `im_client` client ON client.id = msg.sender
where
(msg.fk_conversation_id = #{lastestMsg.conversationId} and msg.id > #{lastestMsg.lastestReceiveMsgId} )
</foreach>
</select>
</mapper>
......@@ -222,3 +222,21 @@ ALTER TABLE im_conversation`
ADD COLUMN `member_count` int NULL COMMENT '群成员数' AFTER `last_message`,
ADD COLUMN `is_thousand` tinyint NULL COMMENT '是否万人群' AFTER `member_count`;
ALTER TABLE `im_conversation_members` ADD INDEX `fk_client_id`(`fk_client_id`);
-- 在feature-cluster 2022年1月10日之后,需要执行的sql增量脚本
-- 消息最新已读寻址表
CREATE TABLE `im_msg_read_lastest` (
`id` bigint NOT NULL COMMENT '收件id',
`create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',
`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`read_time` timestamp NULL DEFAULT NULL COMMENT '读取时间',
`receive_time` timestamp NULL DEFAULT NULL COMMENT '接收时间',
`fk_appid` bigint NOT NULL COMMENT '应用appid',
`fk_client_id` bigint NOT NULL COMMENT '接收客户端id',
`fk_conversation_id` bigint NOT NULL COMMENT '会话id',
`fk_receive_msg_id` bigint DEFAULT NULL COMMENT '最后一条已接收的消息id',
`fk_read_msg_id` bigint DEFAULT NULL COMMENT '最后一条已读的消息id',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `receiver_conversation` (`fk_client_id`,`fk_conversation_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息最新已读寻址表';
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment