Commit 49239792 by hewei

Merge branch 'feature-cluster-wei' into 'feature-cluster'

分支合并

See merge request !14
parents ec675e7c 56a61889
package io.geekidea.springbootplus.test;
import com.wecloud.utils.SnowflakeUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* Id生成 单元测试
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class IdTest {
@Test
public void test() {
Long id = SnowflakeUtil.getId();
}
}
...@@ -4,7 +4,7 @@ import com.wecloud.im.entity.ImConversation; ...@@ -4,7 +4,7 @@ import com.wecloud.im.entity.ImConversation;
import com.wecloud.im.entity.ImConversationMembers; import com.wecloud.im.entity.ImConversationMembers;
import com.wecloud.im.service.ImConversationMembersService; import com.wecloud.im.service.ImConversationMembersService;
import com.wecloud.im.service.ImConversationService; import com.wecloud.im.service.ImConversationService;
import io.geekidea.springbootplus.framework.shiro.util.SnowflakeUtil; import com.wecloud.utils.SnowflakeUtil;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
......
...@@ -4,7 +4,7 @@ import cn.hutool.core.codec.Base64; ...@@ -4,7 +4,7 @@ import cn.hutool.core.codec.Base64;
import com.wecloud.im.entity.ImIosApns; import com.wecloud.im.entity.ImIosApns;
import com.wecloud.im.service.ImIosApnsService; import com.wecloud.im.service.ImIosApnsService;
import com.wecloud.im.ws.sender.IosPush; import com.wecloud.im.ws.sender.IosPush;
import io.geekidea.springbootplus.framework.shiro.util.SnowflakeUtil; import com.wecloud.utils.SnowflakeUtil;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
......
...@@ -46,6 +46,10 @@ public class ImMessage extends BaseEntity { ...@@ -46,6 +46,10 @@ public class ImMessage extends BaseEntity {
@ApiModelProperty("发送者客户端id") @ApiModelProperty("发送者客户端id")
private Long sender; private Long sender;
/**
* 数据库字段类型为JSON格式
* 因mysql关系型数据库非MongoDB文档类型数据库,第三方应用拓展的自定义参数名和值需使用json格式落库
*/
@ApiModelProperty("内容") @ApiModelProperty("内容")
private String content; private String content;
...@@ -69,4 +73,7 @@ public class ImMessage extends BaseEntity { ...@@ -69,4 +73,7 @@ public class ImMessage extends BaseEntity {
@ApiModelProperty("会话id") @ApiModelProperty("会话id")
private Long fkConversationId; private Long fkConversationId;
@ApiModelProperty("消息类型")
private Integer msgType;
} }
...@@ -24,6 +24,9 @@ import java.util.HashMap; ...@@ -24,6 +24,9 @@ import java.util.HashMap;
public class ImMessageOnlineSend extends BaseEntity { public class ImMessageOnlineSend extends BaseEntity {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@ApiModelProperty("消息类型")
private Integer type;
@NotNull(message = "消息id不能为空") @NotNull(message = "消息id不能为空")
@ApiModelProperty("消息id") @ApiModelProperty("消息id")
private Long msgId; private Long msgId;
......
...@@ -15,7 +15,7 @@ import java.util.concurrent.TimeUnit; ...@@ -15,7 +15,7 @@ import java.util.concurrent.TimeUnit;
public class BusinessThreadPool { public class BusinessThreadPool {
private static final ThreadFactory BUSINESS_THREAD_FACTORY = new ThreadFactoryBuilder() private static final ThreadFactory BUSINESS_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNamePrefix("-business-").build(); .setNamePrefix("-bs-").build();
/** /**
* 业务处理线程池 * 业务处理线程池
......
...@@ -10,12 +10,12 @@ import java.util.concurrent.ThreadPoolExecutor; ...@@ -10,12 +10,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* 业务处理线程池 * ws消息下发 线程池
*/ */
public class SendMsgThreadPool { public class SendMsgThreadPool {
private static final ThreadFactory SEND_MSG_THREAD_FACTORY = new ThreadFactoryBuilder() private static final ThreadFactory SEND_MSG_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNamePrefix("-sendMsg-").build(); .setNamePrefix("-sendM-").build();
/** /**
* 业务处理线程池 * 业务处理线程池
......
...@@ -19,9 +19,8 @@ import com.wecloud.im.service.ImClientService; ...@@ -19,9 +19,8 @@ import com.wecloud.im.service.ImClientService;
import io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl; import io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl;
import io.geekidea.springbootplus.framework.core.pagination.PageInfo; import io.geekidea.springbootplus.framework.core.pagination.PageInfo;
import io.geekidea.springbootplus.framework.core.pagination.Paging; import io.geekidea.springbootplus.framework.core.pagination.Paging;
import io.geekidea.springbootplus.framework.shiro.util.SnowflakeUtil; import com.wecloud.utils.SnowflakeUtil;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
......
...@@ -14,27 +14,36 @@ import org.springframework.stereotype.Component; ...@@ -14,27 +14,36 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.wecloud.im.ws.ImConstant.READER_IDLE_TIME;
@Component @Component
public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Resource @Resource
private ChannelInboundHandler channelInboundHandler; private ChannelInboundHandler channelInboundHandler;
public static final String SINGLE_HTTP_REQUEST_HANDLER = "SingleHttpRequestHandler";
public static final String APP_WEB_SOCKET_SERVEROTOCOL_HANDLER = "appWebSocketServerotocolHandler";
public static final int MAX_CONTENT_LENGTH = 2048576000;
@Override @Override
protected void initChannel(SocketChannel ch) { protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
// http // http
pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(2048576000)); pipeline.addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH));
// 服务端api接口 // 服务端api接口
pipeline.addLast("SingleHttpRequestHandler", channelInboundHandler); pipeline.addLast(SINGLE_HTTP_REQUEST_HANDLER, channelInboundHandler);
// 连接超时管理 (判断通道是否有数据写入) // 连接超时管理 (判断通道是否有数据写入)
// pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(60)); // pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(60));
// "/appws"路径 升级长连接 // "/appws"路径 升级长连接
pipeline.addLast("appWebSocketServerotocolHandler", new WebSocketServerProtocolHandler(WsConstants.WS_URL)); pipeline.addLast(APP_WEB_SOCKET_SERVEROTOCOL_HANDLER, new WebSocketServerProtocolHandler(WsConstants.WS_URL));
/* /*
* 心跳机制 * 心跳机制
...@@ -44,7 +53,7 @@ public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { ...@@ -44,7 +53,7 @@ public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
* allIdleTime—状态为IdleState的IdleStateEvent。在一定时间内不读不写会触发ALL_IDLE。指定0禁用。 * allIdleTime—状态为IdleState的IdleStateEvent。在一定时间内不读不写会触发ALL_IDLE。指定0禁用。
* unit—readerIdleTime、writeIdleTime和allIdleTime的时间单位 * unit—readerIdleTime、writeIdleTime和allIdleTime的时间单位
*/ */
pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS)); pipeline.addLast(new IdleStateHandler(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS));
} }
} }
...@@ -13,6 +13,10 @@ import org.springframework.stereotype.Component; ...@@ -13,6 +13,10 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import static com.wecloud.im.ws.ImConstant.PING;
import static com.wecloud.im.ws.ImConstant.PONG;
import static com.wecloud.im.ws.ImConstant.READ_IDLE_CLOSE_COUNT;
/** /**
* @Description app端 长连接事件处理 * @Description app端 长连接事件处理
* @Author hewei hwei1233@163.com * @Author hewei hwei1233@163.com
...@@ -23,15 +27,13 @@ import javax.annotation.Resource; ...@@ -23,15 +27,13 @@ import javax.annotation.Resource;
@Slf4j @Slf4j
public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final String PING = "ping";
private static final String PONG = "pong";
@Resource @Resource
private ChannelManager channelManager; private ChannelManager channelManager;
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
ctx.channel().attr(ChannelManager.READ_IDLE_TIMES).set(0);// 读空闲的计数清零 // 读空闲的计数清零
ctx.channel().attr(ChannelManager.READ_IDLE_TIMES).set(0);
String data = msg.text(); String data = msg.text();
try { try {
...@@ -86,11 +88,11 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -86,11 +88,11 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
String eventType = null; String eventType = null;
switch (event.state()) { switch (event.state()) {
case READER_IDLE: case READER_IDLE:
eventType = "读空闲:readIdleTimes=" + readIdleTimes; eventType = "读空闲:readIdleTimes=" + readIdleTimes;
ctx.channel().attr(ChannelManager.READ_IDLE_TIMES).set(readIdleTimes + 1);// 读空闲的计数加1 // 读空闲的计数加1
ctx.channel().attr(ChannelManager.READ_IDLE_TIMES).set(readIdleTimes + 1);
// 发ping // 发ping
ctx.channel().writeAndFlush(new TextWebSocketFrame(PING)); ctx.channel().writeAndFlush(new TextWebSocketFrame(PING));
...@@ -103,9 +105,10 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -103,9 +105,10 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
eventType = "读写空闲"; eventType = "读写空闲";
// 不处理 // 不处理
break; break;
default:
} }
log.info(clientId + "超时事件:" + eventType); log.info(clientId + "超时事件:" + eventType);
if (readIdleTimes >= 5) { if (readIdleTimes >= READ_IDLE_CLOSE_COUNT) {
log.info(clientId + ".读空闲超过5次关闭连接"); log.info(clientId + ".读空闲超过5次关闭连接");
ctx.channel().close(); ctx.channel().close();
} }
......
...@@ -95,13 +95,13 @@ public interface ImConversationService extends BaseService<ImConversation> { ...@@ -95,13 +95,13 @@ public interface ImConversationService extends BaseService<ImConversation> {
boolean updateDisplayConversation(ImConversationDisplayUpdate imConversationDisplayUpdate); boolean updateDisplayConversation(ImConversationDisplayUpdate imConversationDisplayUpdate);
/** /**
* 根据ID获取查询对象 * 根据ID获取查询对象 (redis缓存)
* *
* @param id * @param id
* @return * @return
* @throws Exception * @throws Exception
*/ */
ImConversationQueryVo getImConversationById(Long id); ImConversationQueryVo getCacheImConversationById(Long id);
/** /**
* 删除redis中该会话的缓存 * 删除redis中该会话的缓存
......
...@@ -10,6 +10,7 @@ import com.wecloud.im.param.add.ImMsgUpdate; ...@@ -10,6 +10,7 @@ import com.wecloud.im.param.add.ImMsgUpdate;
import com.wecloud.im.vo.ImMessageOfflineListVo; import com.wecloud.im.vo.ImMessageOfflineListVo;
import com.wecloud.im.vo.OfflineMsgDto; import com.wecloud.im.vo.OfflineMsgDto;
import com.wecloud.im.ws.model.request.ReceiveDataVO; import com.wecloud.im.ws.model.request.ReceiveDataVO;
import com.wecloud.im.ws.model.request.ReceiveVO;
import io.geekidea.springbootplus.framework.common.api.ApiResult; import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.common.service.BaseService; import io.geekidea.springbootplus.framework.common.service.BaseService;
import io.geekidea.springbootplus.framework.core.pagination.Paging; import io.geekidea.springbootplus.framework.core.pagination.Paging;
...@@ -34,7 +35,7 @@ public interface ImMessageService extends BaseService<ImMessage> { ...@@ -34,7 +35,7 @@ public interface ImMessageService extends BaseService<ImMessage> {
ApiResult<Boolean> restApiImMessageSend(ImMsgSendToOnlineClient imMsgSendToOnlineClient, ImApplication imApplication); ApiResult<Boolean> restApiImMessageSend(ImMsgSendToOnlineClient imMsgSendToOnlineClient, ImApplication imApplication);
ImMessage saveImMessage(ImApplication imApplication, ImClient imClientSender, Long toConversationId, long messageId, ReceiveDataVO content); ImMessage saveImMessage(ImApplication imApplication, ImClient imClientSender, long messageId, ReceiveVO receiveVO, ReceiveDataVO sysParam);
/** /**
......
...@@ -16,7 +16,7 @@ import io.geekidea.springbootplus.framework.common.api.ApiResult; ...@@ -16,7 +16,7 @@ import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl; import io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl;
import io.geekidea.springbootplus.framework.core.pagination.PageInfo; import io.geekidea.springbootplus.framework.core.pagination.PageInfo;
import io.geekidea.springbootplus.framework.core.pagination.Paging; import io.geekidea.springbootplus.framework.core.pagination.Paging;
import io.geekidea.springbootplus.framework.shiro.util.SnowflakeUtil; import com.wecloud.utils.SnowflakeUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
......
...@@ -20,6 +20,7 @@ import com.wecloud.im.service.ImApplicationService; ...@@ -20,6 +20,7 @@ import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientService; import com.wecloud.im.service.ImClientService;
import com.wecloud.im.vo.GetInfoListVo; import com.wecloud.im.vo.GetInfoListVo;
import io.geekidea.springbootplus.framework.common.api.ApiResult; import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.common.exception.BusinessException;
import io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl; import io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl;
import io.geekidea.springbootplus.framework.core.pagination.PageInfo; import io.geekidea.springbootplus.framework.core.pagination.PageInfo;
import io.geekidea.springbootplus.framework.core.pagination.Paging; import io.geekidea.springbootplus.framework.core.pagination.Paging;
...@@ -97,6 +98,10 @@ public class ImClientServiceImpl extends BaseServiceImpl<ImClientMapper, ImClien ...@@ -97,6 +98,10 @@ public class ImClientServiceImpl extends BaseServiceImpl<ImClientMapper, ImClien
public ApiResult<List<GetInfoListVo>> getInfoList(GetClientInfoParam getClientInfoParam) throws Exception { public ApiResult<List<GetInfoListVo>> getInfoList(GetClientInfoParam getClientInfoParam) throws Exception {
ImClient curentClient = getCurentClient(); ImClient curentClient = getCurentClient();
if (getClientInfoParam.getClientIds() == null || getClientInfoParam.getClientIds().isEmpty()) {
throw new BusinessException("getClientInfoParam.getClientIds() == null");
}
// List<ImClient> imClients = this.list(new QueryWrapper<ImClient>().lambda() // List<ImClient> imClients = this.list(new QueryWrapper<ImClient>().lambda()
// .eq(ImClient::getFkAppid, curentClient.getFkAppid()) // .eq(ImClient::getFkAppid, curentClient.getFkAppid())
// .in(ImClient::getClientId, getClientInfoParam.getClientId()) // .in(ImClient::getClientId, getClientInfoParam.getClientId())
......
...@@ -42,7 +42,7 @@ import io.geekidea.springbootplus.framework.core.pagination.PageInfo; ...@@ -42,7 +42,7 @@ import io.geekidea.springbootplus.framework.core.pagination.PageInfo;
import io.geekidea.springbootplus.framework.core.pagination.Paging; import io.geekidea.springbootplus.framework.core.pagination.Paging;
import io.geekidea.springbootplus.framework.shiro.jwt.JwtToken; import io.geekidea.springbootplus.framework.shiro.jwt.JwtToken;
import io.geekidea.springbootplus.framework.shiro.util.JwtUtil; import io.geekidea.springbootplus.framework.shiro.util.JwtUtil;
import io.geekidea.springbootplus.framework.shiro.util.SnowflakeUtil; import com.wecloud.utils.SnowflakeUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheConfig;
...@@ -797,7 +797,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -797,7 +797,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
@Override @Override
@Cacheable(key = "#p0") @Cacheable(key = "#p0")
public ImConversationQueryVo getImConversationById(Long id) { public ImConversationQueryVo getCacheImConversationById(Long id) {
return imConversationMapper.getImConversationById(id); return imConversationMapper.getImConversationById(id);
} }
......
...@@ -29,6 +29,7 @@ import com.wecloud.im.ws.enums.WsResponseCmdEnum; ...@@ -29,6 +29,7 @@ import com.wecloud.im.ws.enums.WsResponseCmdEnum;
import com.wecloud.im.ws.model.WsResponse; import com.wecloud.im.ws.model.WsResponse;
import com.wecloud.im.ws.model.request.PushVO; import com.wecloud.im.ws.model.request.PushVO;
import com.wecloud.im.ws.model.request.ReceiveDataVO; import com.wecloud.im.ws.model.request.ReceiveDataVO;
import com.wecloud.im.ws.model.request.ReceiveVO;
import com.wecloud.im.ws.sender.AsyncPush; import com.wecloud.im.ws.sender.AsyncPush;
import com.wecloud.im.ws.sender.ChannelSender; import com.wecloud.im.ws.sender.ChannelSender;
import com.wecloud.utils.JsonUtils; import com.wecloud.utils.JsonUtils;
...@@ -99,7 +100,6 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes ...@@ -99,7 +100,6 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
} }
ImApiMessageOnlineSend imApiMessageOnlineSend = new ImApiMessageOnlineSend(); ImApiMessageOnlineSend imApiMessageOnlineSend = new ImApiMessageOnlineSend();
imApiMessageOnlineSend.setCreateTime(new Date()); imApiMessageOnlineSend.setCreateTime(new Date());
...@@ -138,20 +138,26 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes ...@@ -138,20 +138,26 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
} }
@Override @Override
public ImMessage saveImMessage(ImApplication imApplication, ImClient imClientSender, Long toConversationId, long messageId, ReceiveDataVO content) { @Transactional(rollbackFor = Exception.class)
public ImMessage saveImMessage(ImApplication imApplication, ImClient imClientSender, long messageId, ReceiveVO receiveVO, ReceiveDataVO sysParam) {
ImMessage imMessage = new ImMessage(); ImMessage imMessage = new ImMessage();
// 数据库字段类型为JSON格式
// 因mysql关系型数据库非MongoDB文档类型数据库,第三方应用拓展的自定义参数名和值需使用json格式落库
String contentJsonString = JsonUtils.encodeJson(receiveVO.getData());
imMessage.setContent(contentJsonString);
imMessage.setId(messageId); imMessage.setId(messageId);
imMessage.setCreateTime(new Date()); imMessage.setCreateTime(new Date());
imMessage.setFkAppid(imApplication.getId()); imMessage.setFkAppid(imApplication.getId());
imMessage.setSender(imClientSender.getId()); imMessage.setSender(imClientSender.getId());
String contentStr = JsonUtils.encodeJson(content);
imMessage.setContent(contentStr);
imMessage.setWithdraw(false); imMessage.setWithdraw(false);
imMessage.setEvent(false); imMessage.setEvent(false);
imMessage.setSystemFlag(false); imMessage.setSystemFlag(false);
imMessage.setSendStatus(2); imMessage.setSendStatus(2);
imMessage.setFkConversationId(toConversationId); imMessage.setMsgType(sysParam.getType());
imMessage.setFkConversationId(sysParam.getToConversation());
this.save(imMessage); this.save(imMessage);
return imMessage; return imMessage;
} }
......
...@@ -21,7 +21,7 @@ import com.wecloud.im.ws.sender.ChannelSender; ...@@ -21,7 +21,7 @@ import com.wecloud.im.ws.sender.ChannelSender;
import io.geekidea.springbootplus.framework.common.api.ApiCode; import io.geekidea.springbootplus.framework.common.api.ApiCode;
import io.geekidea.springbootplus.framework.common.api.ApiResult; import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl; import io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl;
import io.geekidea.springbootplus.framework.shiro.util.SnowflakeUtil; import com.wecloud.utils.SnowflakeUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.BooleanUtils;
......
...@@ -16,6 +16,9 @@ import java.util.Date; ...@@ -16,6 +16,9 @@ import java.util.Date;
@ApiModel(value = "OfflineMsgDto") @ApiModel(value = "OfflineMsgDto")
public class OfflineMsgDto implements Serializable { public class OfflineMsgDto implements Serializable {
@ApiModelProperty("消息类型")
private Integer type;
@ApiModelProperty("消息id") @ApiModelProperty("消息id")
private Long msgId; private Long msgId;
......
package com.wecloud.im.ws;
import lombok.Data;
import java.io.Serializable;
/**
* IM相关常量
*/
@Data
public class ImConstant implements Serializable {
/**
* channel超时n次后断开连接
*/
public static final int READ_IDLE_CLOSE_COUNT = 5;
/**
* channel 读取时间 心跳检查间隔实际 (秒)
*/
public static final int READER_IDLE_TIME = 5;
/**
* 心跳
*/
public static final String PING = "ping";
/**
* 心跳
*/
public static final String PONG = "pong";
/**
* 推送
*/
public static final String PUSH_KEY = "push";
public static final String TITLE = "title";
public static final String SUB_TITLE = "subTitle";
/**
* 数据
*/
public static final String DATA = "data";
/**
* 您收到一条新消息
*/
public static final String PUSH_TITLE = "You have received a new message";
/**
* 点击查看
*/
public static final String PUSH_BODY = "Click to view";
/**
* 会话id的key名
*/
public static final String TO_CONVERSATION_KEY = "toConversation";
/**
* 消息类型
*/
public static final String MSG_TYPE = "type";
}
...@@ -3,6 +3,7 @@ package com.wecloud.im.ws.model.request; ...@@ -3,6 +3,7 @@ package com.wecloud.im.ws.model.request;
import lombok.Data; import lombok.Data;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap;
/** /**
* @Description 推送model * @Description 推送model
...@@ -22,4 +23,10 @@ public class PushVO implements Serializable { ...@@ -22,4 +23,10 @@ public class PushVO implements Serializable {
*/ */
private String subTitle; private String subTitle;
/**
* 自定义系统推送内容
*/
private HashMap data;
} }
...@@ -3,36 +3,26 @@ package com.wecloud.im.ws.model.request; ...@@ -3,36 +3,26 @@ package com.wecloud.im.ws.model.request;
import lombok.Data; import lombok.Data;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap;
/**
* 内部固定参数
*/
@Data @Data
public class ReceiveDataVO implements Serializable { public class ReceiveDataVO implements Serializable {
/** /**
* push 对象 * push 对象
*/ */
private PushVO push; private PushVO pushVO;
/** /**
* 自定义的值
*/
private String diyAbcd;
/**
* 发送的会话id * 发送的会话id
*/ */
private Long toConversation; private Long toConversation;
/** /**
* 发送的type * 消息的type
*/ */
private Integer type; private Integer type;
/**
* 发送的内容文本
*/
private String text;
/**
* 自定义可扩展的发送字段和值
*/
private HashMap attrs;
} }
...@@ -25,9 +25,10 @@ public class ReceiveVO implements Serializable { ...@@ -25,9 +25,10 @@ public class ReceiveVO implements Serializable {
private Integer cmd; private Integer cmd;
/** /**
* json数据 * 转换保存json数据
* 因集成版无法预先知道第三方应用所拓展的数据格式,所以用Object(hashmap)来保存应用上行参数,不能使用固定的成员变量或实体来接收
*/ */
private ReceiveDataVO data; private HashMap data;
/** /**
* 请求id, 以判空是否请求成功, 服务端处理完成后 返回此id * 请求id, 以判空是否请求成功, 服务端处理完成后 返回此id
......
...@@ -26,6 +26,11 @@ import java.net.URL; ...@@ -26,6 +26,11 @@ import java.net.URL;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static com.wecloud.im.ws.ImConstant.PUSH_BODY;
import static com.wecloud.im.ws.ImConstant.PUSH_TITLE;
import static com.wecloud.im.ws.ImConstant.SUB_TITLE;
import static com.wecloud.im.ws.ImConstant.TITLE;
/** /**
* 异步系统推送 * 异步系统推送
*/ */
...@@ -37,16 +42,16 @@ public class AsyncPush { ...@@ -37,16 +42,16 @@ public class AsyncPush {
* 谷歌推送地址 * 谷歌推送地址
*/ */
private static final String API_URL_FCM = "https://fcm.googleapis.com/fcm/send"; private static final String API_URL_FCM = "https://fcm.googleapis.com/fcm/send";
/** // /**
* 您收到一条新消息 // * 您收到一条新消息
*/ // */
private static final String PUSH_TITLE = "You have received a new message"; // private static final String PUSH_TITLE = "You have received a new message";
/** // /**
* 点击查看 // * 点击查看
*/ // */
private static final String PUSH_BODY = "Click to view"; // private static final String PUSH_BODY = "Click to view";
private static final String TITLE = "title"; // private static final String TITLE = "title";
private static final String SUB_TITLE = "subTitle"; // private static final String SUB_TITLE = "subTitle";
@Autowired @Autowired
private ImIosApnsService imIosApnsService; private ImIosApnsService imIosApnsService;
@Autowired @Autowired
...@@ -59,7 +64,7 @@ public class AsyncPush { ...@@ -59,7 +64,7 @@ public class AsyncPush {
*/ */
@Async @Async
public void push(PushVO pushVO, ImClient imClientReceiver, ImApplication imApplication) { public void push(PushVO pushVO, ImClient imClientReceiver, ImApplication imApplication) {
log.info("push:" + imClientReceiver.getClientId()); log.info("push: {}", imClientReceiver.getClientId());
if (pushVO == null) { if (pushVO == null) {
pushVO = new PushVO(); pushVO = new PushVO();
...@@ -70,7 +75,7 @@ public class AsyncPush { ...@@ -70,7 +75,7 @@ public class AsyncPush {
// 校验参数 // 校验参数
if (imClientReceiver.getValid() == null || imClientReceiver.getDeviceToken() == null || imClientReceiver.getDeviceType() == null) { if (imClientReceiver.getValid() == null || imClientReceiver.getDeviceToken() == null || imClientReceiver.getDeviceType() == null) {
log.info("push参数错误"); log.info("{} 应用未配置push,或client无DeviceToken {}", imApplication.getId(), imClientReceiver.getClientId());
return; return;
} }
// 设备不想收到推送提醒, 1想, 0不想 // 设备不想收到推送提醒, 1想, 0不想
......
package com.wecloud.im.ws.strategy; package com.wecloud.im.ws.strategy;
import com.wecloud.im.entity.ImApplication; import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient; import com.wecloud.im.entity.ImClient;
import com.wecloud.im.enums.ChatTypeEnum; import com.wecloud.im.enums.ChatTypeEnum;
...@@ -9,13 +8,25 @@ import com.wecloud.im.service.ImApplicationService; ...@@ -9,13 +8,25 @@ import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientService; import com.wecloud.im.service.ImClientService;
import com.wecloud.im.service.ImConversationService; import com.wecloud.im.service.ImConversationService;
import com.wecloud.im.ws.enums.WsRequestCmdEnum; import com.wecloud.im.ws.enums.WsRequestCmdEnum;
import com.wecloud.im.ws.model.request.PushVO;
import com.wecloud.im.ws.model.request.ReceiveDataVO;
import com.wecloud.im.ws.model.request.ReceiveVO; import com.wecloud.im.ws.model.request.ReceiveVO;
import com.wecloud.im.ws.utils.SpringBeanUtils; import com.wecloud.im.ws.utils.SpringBeanUtils;
import com.wecloud.utils.JsonUtils; import com.wecloud.utils.JsonUtils;
import io.geekidea.springbootplus.framework.common.exception.BusinessException;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import static com.wecloud.im.ws.ImConstant.DATA;
import static com.wecloud.im.ws.ImConstant.MSG_TYPE;
import static com.wecloud.im.ws.ImConstant.PUSH_BODY;
import static com.wecloud.im.ws.ImConstant.PUSH_KEY;
import static com.wecloud.im.ws.ImConstant.PUSH_TITLE;
import static com.wecloud.im.ws.ImConstant.SUB_TITLE;
import static com.wecloud.im.ws.ImConstant.TITLE;
import static com.wecloud.im.ws.ImConstant.TO_CONVERSATION_KEY;
/** /**
* @Description 处理Cmd请求 * @Description 处理Cmd请求
* 抽象类 策略设计模式 * 抽象类 策略设计模式
...@@ -25,20 +36,20 @@ import lombok.extern.slf4j.Slf4j; ...@@ -25,20 +36,20 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public abstract class AbstractImCmdStrategy { public abstract class AbstractImCmdStrategy {
private static ImCmdStrategyFactory imCmdStrategyFactory; private static volatile ImCmdStrategyFactory imCmdStrategyFactory;
private static ImApplicationService imApplicationService; private static ImApplicationService imApplicationService;
private static ImClientService imClientService; private static ImClientService imClientService;
private static ImConversationService imConversationService; private static ImConversationService imConversationService;
public static void process(Long senderClientId, ChannelHandlerContext ctx, String data) { public static void process(Long senderClientId, ChannelHandlerContext ctx, String data) {
if(log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("appWS收到data: {}\n senderClientId:{}, channelId:{}", data, senderClientId, ctx.channel().id().asLongText()); log.debug("appWS收到data: {}\n senderClientId:{}, channelId:{}", data, senderClientId, ctx.channel().id().asLongText());
} }
if(imCmdStrategyFactory == null) { if (imCmdStrategyFactory == null) {
synchronized (AbstractImCmdStrategy.class) { synchronized (AbstractImCmdStrategy.class) {
// 双空判断,懒汉模式下的绝对线程安全 // 双空判断,懒汉模式下的绝对线程安全
if(imCmdStrategyFactory == null) { if (imCmdStrategyFactory == null) {
imCmdStrategyFactory = SpringBeanUtils.getBean(ImCmdStrategyFactory.class); imCmdStrategyFactory = SpringBeanUtils.getBean(ImCmdStrategyFactory.class);
imApplicationService = SpringBeanUtils.getBean(ImApplicationService.class); imApplicationService = SpringBeanUtils.getBean(ImApplicationService.class);
imClientService = SpringBeanUtils.getBean(ImClientService.class); imClientService = SpringBeanUtils.getBean(ImClientService.class);
...@@ -47,31 +58,41 @@ public abstract class AbstractImCmdStrategy { ...@@ -47,31 +58,41 @@ public abstract class AbstractImCmdStrategy {
} }
} }
// 解析jsonO // 解析json
ReceiveVO receiveVO = JsonUtils.decodeJson(data, ReceiveVO.class); ReceiveVO receiveVO = JsonUtils.decodeJson(data, ReceiveVO.class);
// 参数判空校验
if (null == receiveVO || null == receiveVO.getCmd()) { if (paramCheck(receiveVO)) {
throw new BusinessException("null == receiveVO || null == receiveVO.getCmd()");
}
// 获取会话id
if (receiveVO.getData() == null || receiveVO.getData().getToConversation() == null) {
log.warn("会话消息reqId: {} 不合法,缺少会话id!", receiveVO.getReqId());
return; return;
} }
WsRequestCmdEnum wsRequestPathEnum = WsRequestCmdEnum.getByCode(receiveVO.getCmd()); Long toConversationId = Long.valueOf(receiveVO.getData().get(TO_CONVERSATION_KEY).toString());
Integer type = Integer.valueOf(receiveVO.getData().get(MSG_TYPE).toString());
// 移除后端固定的字段,其余字段都为第三方应用定义的,需要作为可拓展的json传入数据库
// 因mysql关系型数据库非MongoDB文档类型数据库,第三方应用拓展的参数值需要使用json格式落库
receiveVO.getData().remove(MSG_TYPE);
receiveVO.getData().remove(TO_CONVERSATION_KEY);
// 获取自定义推送字段 (可选)
HashMap<String, Object> pushMap = getPushMap(receiveVO);
// 内部固定参数model
ReceiveDataVO sysParam = new ReceiveDataVO();
sysParam.setPushVO(getPushModel(pushMap));
sysParam.setToConversation(toConversationId);
sysParam.setType(type);
//查看接收的群属性,是否万人群 //查看接收的群属性,是否万人群
ImConversationQueryVo conversation = imConversationService.getImConversationById(receiveVO.getData().getToConversation()); ImConversationQueryVo conversation = imConversationService.getCacheImConversationById(toConversationId);
if(conversation == null) { if (conversation == null) {
log.warn("会话消息reqId: {} 会话id不合法!", receiveVO.getReqId()); log.warn("会reqId: {} ,会话id: {}db中不存在", receiveVO.getReqId(), toConversationId);
return; return;
} }
if(ChatTypeEnum.THOUSAND_GROUP.getCode().equals(conversation.getChatType()) && WsRequestCmdEnum.NORMAL_CHAT == wsRequestPathEnum) { WsRequestCmdEnum wsRequestPathEnum = WsRequestCmdEnum.getByCode(receiveVO.getCmd());
if (ChatTypeEnum.THOUSAND_GROUP.getCode().equals(conversation.getChatType()) && WsRequestCmdEnum.NORMAL_CHAT == wsRequestPathEnum) {
// 普通群升级为万人群 // 普通群升级为万人群
wsRequestPathEnum = WsRequestCmdEnum.THROUSAND_CHAT; wsRequestPathEnum = WsRequestCmdEnum.THROUSAND_CHAT;
} }
...@@ -92,8 +113,80 @@ public abstract class AbstractImCmdStrategy { ...@@ -92,8 +113,80 @@ public abstract class AbstractImCmdStrategy {
return; return;
} }
cmdStrategy.process(imApplication, imClientSender, ctx, receiveVO); cmdStrategy.process(imApplication, imClientSender, ctx, receiveVO, sysParam);
}
/**
* 封装推送实体
*
* @param pushMap
* @return
*/
private static PushVO getPushModel(HashMap<String, Object> pushMap) {
PushVO pushModel = new PushVO();
if (pushMap == null || pushMap.isEmpty()) {
pushModel.setTitle(PUSH_TITLE);
pushModel.setSubTitle(PUSH_BODY);
pushModel.setData(new HashMap<>(1));
} else {
pushModel.setTitle((String) pushMap.get(TITLE));
pushModel.setSubTitle((String) pushMap.get(SUB_TITLE));
// 自定义推送内容
HashMap hashMap = (HashMap) pushMap.get(DATA);
pushModel.setData(hashMap);
}
return pushModel;
}
/**
* 获取自定义推送字段
*
* @param receiveVO
* @return
*/
private static HashMap<String, Object> getPushMap(ReceiveVO receiveVO) {
HashMap<String, Object> pushMap = null;
if (receiveVO.getData().get(PUSH_KEY) != null) {
pushMap = (HashMap<String, Object>) receiveVO.getData().get(PUSH_KEY);
receiveVO.getData().remove(PUSH_KEY);
}
return pushMap;
}
/**
* 参数判空校验
*
* @param receiveVO
* @return
*/
private static boolean paramCheck(ReceiveVO receiveVO) {
if (receiveVO == null) {
log.warn("receiveVO == null 不合法 ");
return true;
}
if (receiveVO.getReqId() == null) {
log.warn("receiveVO.getReqId() == null 不合法");
return true;
}
if (receiveVO.getData() == null || receiveVO.getData().isEmpty()) {
log.warn("receiveVO.getData() null 不合法 reqId: {}", receiveVO.getReqId());
return true;
}
// 获取会话id(必传)
if (receiveVO.getData().get(TO_CONVERSATION_KEY) == null) {
log.warn("reqId: {} 不合法,缺少会话id!", receiveVO.getReqId());
return true;
}
// 获取消息type(必传)
if (receiveVO.getData().get(MSG_TYPE) == null) {
log.warn("reqId: {} 不合法,缺少消息type!", receiveVO.getReqId());
return true;
}
return false;
} }
/** /**
...@@ -105,5 +198,5 @@ public abstract class AbstractImCmdStrategy { ...@@ -105,5 +198,5 @@ public abstract class AbstractImCmdStrategy {
* @param receiveVO * @param receiveVO
* @throws Exception * @throws Exception
*/ */
public abstract void process(ImApplication imApplication, ImClient imSender, ChannelHandlerContext ctx, ReceiveVO receiveVO); public abstract void process(ImApplication imApplication, ImClient imSender, ChannelHandlerContext ctx, ReceiveVO receiveVO, ReceiveDataVO receiveDataVO);
} }
...@@ -16,6 +16,7 @@ import com.wecloud.im.ws.annotation.ImCmdType; ...@@ -16,6 +16,7 @@ import com.wecloud.im.ws.annotation.ImCmdType;
import com.wecloud.im.ws.enums.WsRequestCmdEnum; import com.wecloud.im.ws.enums.WsRequestCmdEnum;
import com.wecloud.im.ws.enums.WsResponseCmdEnum; import com.wecloud.im.ws.enums.WsResponseCmdEnum;
import com.wecloud.im.ws.model.WsResponse; import com.wecloud.im.ws.model.WsResponse;
import com.wecloud.im.ws.model.request.ReceiveDataVO;
import com.wecloud.im.ws.model.request.ReceiveVO; import com.wecloud.im.ws.model.request.ReceiveVO;
import com.wecloud.im.ws.sender.AsyncPush; import com.wecloud.im.ws.sender.AsyncPush;
import com.wecloud.im.ws.sender.ChannelSender; import com.wecloud.im.ws.sender.ChannelSender;
...@@ -23,7 +24,7 @@ import com.wecloud.im.ws.strategy.AbstractImCmdStrategy; ...@@ -23,7 +24,7 @@ import com.wecloud.im.ws.strategy.AbstractImCmdStrategy;
import com.wecloud.utils.JsonUtils; import com.wecloud.utils.JsonUtils;
import io.geekidea.springbootplus.framework.common.api.ApiCode; import io.geekidea.springbootplus.framework.common.api.ApiCode;
import io.geekidea.springbootplus.framework.common.api.ApiResult; import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.shiro.util.SnowflakeUtil; import com.wecloud.utils.SnowflakeUtil;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -65,20 +66,19 @@ public class NormalChatStrategy extends AbstractImCmdStrategy { ...@@ -65,20 +66,19 @@ public class NormalChatStrategy extends AbstractImCmdStrategy {
@Autowired @Autowired
private AsyncPush systemPush; private AsyncPush systemPush;
@Override @Override
public void process(ImApplication imApplication, ImClient imSender, ChannelHandlerContext ctx, ReceiveVO receiveVO) { public void process(ImApplication imApplication, ImClient imSender, ChannelHandlerContext ctx, ReceiveVO receiveVO, ReceiveDataVO sysParam) {
NioSocketChannel channel = (NioSocketChannel) ctx.channel(); NioSocketChannel channel = (NioSocketChannel) ctx.channel();
Long toConversationId = receiveVO.getData().getToConversation();
// 查询该会话所有成员 // 查询该会话所有成员
List<ImConversationMembers> membersList = imConversationMembersService.list( List<ImConversationMembers> membersList = imConversationMembersService.list(
new QueryWrapper<ImConversationMembers>().lambda() new QueryWrapper<ImConversationMembers>().lambda()
.eq(ImConversationMembers::getFkConversationId, toConversationId) .eq(ImConversationMembers::getFkConversationId, sysParam.getToConversation())
.notIn(ImConversationMembers::getFkClientId, imSender.getId()) .notIn(ImConversationMembers::getFkClientId, imSender.getId())
); );
if (membersList.isEmpty()) { if (membersList.isEmpty()) {
log.info("查询会话所有成员返回空,会话ID: {}", toConversationId); log.info("查询会话所有成员返回空,会话ID: {}", sysParam.getToConversation());
return; return;
} }
...@@ -93,7 +93,7 @@ public class NormalChatStrategy extends AbstractImCmdStrategy { ...@@ -93,7 +93,7 @@ public class NormalChatStrategy extends AbstractImCmdStrategy {
// 生成消息id // 生成消息id
long messageId = SnowflakeUtil.getId(); long messageId = SnowflakeUtil.getId();
// 入库 保存消息至消息表 // 入库 保存消息至消息表
ImMessage imMessage = imMessageService.saveImMessage(imApplication, imSender, toConversationId, messageId, receiveVO.getData()); ImMessage imMessage = imMessageService.saveImMessage(imApplication, imSender, messageId, receiveVO, sysParam);
// 封装响应的实体 // 封装响应的实体
ImMessageOnlineSend imMessageOnlineSend = new ImMessageOnlineSend(); ImMessageOnlineSend imMessageOnlineSend = new ImMessageOnlineSend();
...@@ -101,11 +101,12 @@ public class NormalChatStrategy extends AbstractImCmdStrategy { ...@@ -101,11 +101,12 @@ public class NormalChatStrategy extends AbstractImCmdStrategy {
imMessageOnlineSend.setSender(imSender.getClientId()); imMessageOnlineSend.setSender(imSender.getClientId());
HashMap<String, Object> content = JsonUtils.decodeJson2Map(JsonUtils.encodeJson(receiveVO.getData())); HashMap<String, Object> content = JsonUtils.decodeJson2Map(JsonUtils.encodeJson(receiveVO.getData()));
imMessageOnlineSend.setContent(content); imMessageOnlineSend.setContent(content);
imMessageOnlineSend.setConversationId(toConversationId); imMessageOnlineSend.setConversationId(sysParam.getToConversation());
imMessageOnlineSend.setCreateTime(imMessage.getCreateTime()); imMessageOnlineSend.setCreateTime(imMessage.getCreateTime());
imMessageOnlineSend.setWithdrawTime(imMessage.getWithdrawTime()); imMessageOnlineSend.setWithdrawTime(imMessage.getWithdrawTime());
imMessageOnlineSend.setWithdraw(imMessage.getWithdraw()); imMessageOnlineSend.setWithdraw(imMessage.getWithdraw());
imMessageOnlineSend.setEvent(imMessage.getEvent()); imMessageOnlineSend.setEvent(imMessage.getEvent());
imMessageOnlineSend.setType(sysParam.getType());
imMessageOnlineSend.setSystemFlag(imMessage.getSystemFlag()); imMessageOnlineSend.setSystemFlag(imMessage.getSystemFlag());
imMessageOnlineSend.setAt(imMessage.getAt()); imMessageOnlineSend.setAt(imMessage.getAt());
...@@ -113,7 +114,7 @@ public class NormalChatStrategy extends AbstractImCmdStrategy { ...@@ -113,7 +114,7 @@ public class NormalChatStrategy extends AbstractImCmdStrategy {
for (ImConversationMembers conversationMembers : membersList) { for (ImConversationMembers conversationMembers : membersList) {
// 入库 保存收件箱 // 入库 保存收件箱
long imInboxId = SnowflakeUtil.getId(); long imInboxId = SnowflakeUtil.getId();
saveImInbox(imApplication, toConversationId, messageId, conversationMembers, imInboxId); saveImInbox(imApplication, sysParam.getToConversation(), messageId, conversationMembers, imInboxId);
// 查询接收方 // 查询接收方
ImClient imClientReceiver = imClientService.getOne(new QueryWrapper<ImClient>().lambda() ImClient imClientReceiver = imClientService.getOne(new QueryWrapper<ImClient>().lambda()
...@@ -134,7 +135,7 @@ public class NormalChatStrategy extends AbstractImCmdStrategy { ...@@ -134,7 +135,7 @@ public class NormalChatStrategy extends AbstractImCmdStrategy {
channelSender.sendMsg(responseModel, imClientReceiver.getId()); channelSender.sendMsg(responseModel, imClientReceiver.getId());
// 异步推送系统通知消息 // 异步推送系统通知消息
systemPush.push(receiveVO.getData().getPush(), imClientReceiver, imApplication); systemPush.push(sysParam.getPushVO(), imClientReceiver, imApplication);
} }
// 响应发送方消息id等信息 // 响应发送方消息id等信息
...@@ -142,6 +143,7 @@ public class NormalChatStrategy extends AbstractImCmdStrategy { ...@@ -142,6 +143,7 @@ public class NormalChatStrategy extends AbstractImCmdStrategy {
} }
/** /**
* 入库 保存收件箱 * 入库 保存收件箱
* *
......
...@@ -17,6 +17,7 @@ import com.wecloud.im.ws.annotation.ImCmdType; ...@@ -17,6 +17,7 @@ import com.wecloud.im.ws.annotation.ImCmdType;
import com.wecloud.im.ws.enums.WsRequestCmdEnum; import com.wecloud.im.ws.enums.WsRequestCmdEnum;
import com.wecloud.im.ws.enums.WsResponseCmdEnum; import com.wecloud.im.ws.enums.WsResponseCmdEnum;
import com.wecloud.im.ws.model.WsResponse; import com.wecloud.im.ws.model.WsResponse;
import com.wecloud.im.ws.model.request.ReceiveDataVO;
import com.wecloud.im.ws.model.request.ReceiveVO; import com.wecloud.im.ws.model.request.ReceiveVO;
import com.wecloud.im.ws.sender.AsyncPush; import com.wecloud.im.ws.sender.AsyncPush;
import com.wecloud.im.ws.sender.ChannelSender; import com.wecloud.im.ws.sender.ChannelSender;
...@@ -24,7 +25,7 @@ import com.wecloud.im.ws.strategy.AbstractImCmdStrategy; ...@@ -24,7 +25,7 @@ import com.wecloud.im.ws.strategy.AbstractImCmdStrategy;
import com.wecloud.utils.JsonUtils; import com.wecloud.utils.JsonUtils;
import io.geekidea.springbootplus.framework.common.api.ApiCode; import io.geekidea.springbootplus.framework.common.api.ApiCode;
import io.geekidea.springbootplus.framework.common.api.ApiResult; import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.shiro.util.SnowflakeUtil; import com.wecloud.utils.SnowflakeUtil;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -70,21 +71,19 @@ public class ThousandChatStrategy extends AbstractImCmdStrategy { ...@@ -70,21 +71,19 @@ public class ThousandChatStrategy extends AbstractImCmdStrategy {
private ThousandChatCacheManager thousandChatCacheManager; private ThousandChatCacheManager thousandChatCacheManager;
@Override @Override
public void process(ImApplication imApplication, ImClient imSender, ChannelHandlerContext ctx, ReceiveVO receiveVO) { public void process(ImApplication imApplication, ImClient imSender, ChannelHandlerContext ctx, ReceiveVO receiveVO, ReceiveDataVO sysParam) {
NioSocketChannel channel = (NioSocketChannel) ctx.channel(); NioSocketChannel channel = (NioSocketChannel) ctx.channel();
Long toConversationId = receiveVO.getData().getToConversation();
// 查询该会话所有成员 // 查询该会话所有成员
List<ImConversationMembers> membersList = imConversationMembersService.list( List<ImConversationMembers> membersList = imConversationMembersService.list(
new QueryWrapper<ImConversationMembers>().lambda() new QueryWrapper<ImConversationMembers>().lambda()
.eq(ImConversationMembers::getFkConversationId, toConversationId) .eq(ImConversationMembers::getFkConversationId, sysParam.getToConversation())
); );
// 生成消息id // 生成消息id
long messageId = SnowflakeUtil.getId(); long messageId = SnowflakeUtil.getId();
// 入库 保存消息至消息表 // 入库 保存消息至消息表
ImMessage imMessage = imMessageService.saveImMessage(imApplication, imSender, toConversationId, messageId, receiveVO.getData()); ImMessage imMessage = imMessageService.saveImMessage(imApplication, imSender, messageId, receiveVO, sysParam);
// 封装响应的实体 // 封装响应的实体
ImMessageOnlineSend imMessageOnlineSend = new ImMessageOnlineSend(); ImMessageOnlineSend imMessageOnlineSend = new ImMessageOnlineSend();
...@@ -92,17 +91,18 @@ public class ThousandChatStrategy extends AbstractImCmdStrategy { ...@@ -92,17 +91,18 @@ public class ThousandChatStrategy extends AbstractImCmdStrategy {
imMessageOnlineSend.setSender(imSender.getClientId()); imMessageOnlineSend.setSender(imSender.getClientId());
HashMap<String, Object> content = JsonUtils.decodeJson2Map(JsonUtils.encodeJson(receiveVO.getData())); HashMap<String, Object> content = JsonUtils.decodeJson2Map(JsonUtils.encodeJson(receiveVO.getData()));
imMessageOnlineSend.setContent(content); imMessageOnlineSend.setContent(content);
imMessageOnlineSend.setConversationId(toConversationId); imMessageOnlineSend.setConversationId(sysParam.getToConversation());
imMessageOnlineSend.setCreateTime(imMessage.getCreateTime()); imMessageOnlineSend.setCreateTime(imMessage.getCreateTime());
imMessageOnlineSend.setWithdrawTime(imMessage.getWithdrawTime()); imMessageOnlineSend.setWithdrawTime(imMessage.getWithdrawTime());
imMessageOnlineSend.setWithdraw(imMessage.getWithdraw()); imMessageOnlineSend.setWithdraw(imMessage.getWithdraw());
imMessageOnlineSend.setEvent(imMessage.getEvent()); imMessageOnlineSend.setEvent(imMessage.getEvent());
imMessageOnlineSend.setSystemFlag(imMessage.getSystemFlag()); imMessageOnlineSend.setSystemFlag(imMessage.getSystemFlag());
imMessageOnlineSend.setType(sysParam.getType());
imMessageOnlineSend.setAt(imMessage.getAt()); imMessageOnlineSend.setAt(imMessage.getAt());
// 在线用户直接发消息 // 在线用户直接发消息
Map<String /** ip **/, List<String /** client的主键ID:platform **/>> onlineIpClientMap = Map<String /** ip **/, List<String /** client的主键ID:platform **/>> onlineIpClientMap =
thousandChatCacheManager.findOnlineHostsByThousandGroupId(toConversationId); thousandChatCacheManager.findOnlineHostsByThousandGroupId(sysParam.getToConversation());
// 封装要推给接收方的消息 // 封装要推给接收方的消息
WsResponse<ImMessageOnlineSend> responseModel = new WsResponse<>(); WsResponse<ImMessageOnlineSend> responseModel = new WsResponse<>();
responseModel.setCmd(WsResponseCmdEnum.ONLINE_MSG.getCmdCode()); responseModel.setCmd(WsResponseCmdEnum.ONLINE_MSG.getCmdCode());
...@@ -131,7 +131,7 @@ public class ThousandChatStrategy extends AbstractImCmdStrategy { ...@@ -131,7 +131,7 @@ public class ThousandChatStrategy extends AbstractImCmdStrategy {
continue; continue;
} }
// 异步推送系统通知消息 // 异步推送系统通知消息
systemPush.push(receiveVO.getData().getPush(), imClientReceiver, imApplication); systemPush.push(sysParam.getPushVO(), imClientReceiver, imApplication);
} }
// 响应发送方消息id等信息 // 响应发送方消息id等信息
......
...@@ -9,7 +9,6 @@ import org.springframework.data.redis.core.StringRedisTemplate; ...@@ -9,7 +9,6 @@ import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.sql.Time;
import java.time.Duration; import java.time.Duration;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
...@@ -47,6 +46,20 @@ public class RedisUtils { ...@@ -47,6 +46,20 @@ public class RedisUtils {
redisTemplate.opsForValue().set(key, value, timeout); redisTemplate.opsForValue().set(key, value, timeout);
} }
/**
* 递增
*
* @param key 键
* @delta 要增加几(大于0)
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/** /**
* 删除Key * 删除Key
* *
......
...@@ -24,7 +24,7 @@ import com.wecloud.rtc.service.MangerRtcCacheService; ...@@ -24,7 +24,7 @@ import com.wecloud.rtc.service.MangerRtcCacheService;
import com.wecloud.rtc.service.RtcService; import com.wecloud.rtc.service.RtcService;
import com.wecloud.rtc.service.WsRtcWrite; import com.wecloud.rtc.service.WsRtcWrite;
import io.geekidea.springbootplus.framework.common.api.ApiResult; import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.shiro.util.SnowflakeUtil; import com.wecloud.utils.SnowflakeUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
......
package com.wecloud.utils;
import cn.hutool.core.lang.Snowflake;
import com.wecloud.im.ws.utils.RedisUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 雪花算法 获取id工具类
*
* @author
*/
@Component
public class SnowflakeUtil {
private static SnowflakeUtil snowflakeUtil;
/**
* workerId, dataCenterId动态获取
* 12位序列号部分,支持同一毫秒内同一个节点可以生成4096个ID, 在目前一段不用做成动态获取服务器ID
*/
private static volatile Snowflake SNOWFLAKE = null;
@Autowired
private RedisUtils redisUtils;
/**
* 多线程中加synchronized 保证不会获取重复id
*
* @return
*/
public static Long getId() {
if (SNOWFLAKE == null) {
synchronized (SnowflakeUtil.class) {
if (SNOWFLAKE == null) {
// workerId通过redis获取
long workerId = snowflakeUtil.redisUtils.incr("workerId", 1);
// redisUtils不需要用了,释放回收掉
snowflakeUtil.redisUtils = null;
SNOWFLAKE = new Snowflake(workerId, 1L);
}
}
}
return SNOWFLAKE.nextId();
}
/**
* 静态方法里调用spring注入的方法
*/
@PostConstruct
public void init() {
snowflakeUtil = this;
snowflakeUtil.redisUtils = this.redisUtils;
}
}
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
<!-- 通用查询结果列 --> <!-- 通用查询结果列 -->
<sql id="Base_Column_List"> <sql id="Base_Column_List">
id id
, create_time, withdraw_time, update_date, fk_appid, sender, content, withdraw, event, system_flag, at, send_status, fk_conversation_id , create_time, withdraw_time, update_date, fk_appid, sender, content, withdraw, event, msg_type, system_flag, at, send_status, fk_conversation_id
</sql> </sql>
<select id="getImMessageById" resultType="com.wecloud.im.param.ImMessageQueryVo"> <select id="getImMessageById" resultType="com.wecloud.im.param.ImMessageQueryVo">
...@@ -33,7 +33,10 @@ ...@@ -33,7 +33,10 @@
im_message.`at`, im_message.`at`,
im_message.send_status, im_message.send_status,
im_message.fk_conversation_id, im_message.fk_conversation_id,
(SELECT COUNT(id) FROM im_inbox WHERE fk_msg_id = msgId AND read_msg_status = 0) AS not_read_count, im_message.`msg_type` AS 'type', (SELECT COUNT(id)
FROM im_inbox
WHERE fk_msg_id = msgId
AND read_msg_status = 0) AS not_read_count,
(SELECT COUNT(id) (SELECT COUNT(id)
FROM im_inbox FROM im_inbox
WHERE fk_msg_id = msgId WHERE fk_msg_id = msgId
...@@ -58,7 +61,7 @@ ...@@ -58,7 +61,7 @@
im_message.system_flag, im_message.system_flag,
im_message.`at`, im_message.`at`,
im_message.send_status, im_message.send_status,
im_message.fk_conversation_id, im_message.`msg_type` AS 'type', im_message.fk_conversation_id,
(SELECT COUNT(id) FROM im_inbox WHERE fk_msg_id = msgId AND read_msg_status = 0) AS not_read_count, (SELECT COUNT(id) FROM im_inbox WHERE fk_msg_id = msgId AND read_msg_status = 0) AS not_read_count,
(SELECT COUNT(id) (SELECT COUNT(id)
FROM im_inbox FROM im_inbox
...@@ -81,7 +84,7 @@ ...@@ -81,7 +84,7 @@
im_message.`event`, im_message.`event`,
im_message.system_flag, im_message.system_flag,
im_message.`at`, im_message.`at`,
im_message.send_status, im_message.`msg_type` AS 'type', im_message.send_status,
im_message.fk_conversation_id im_message.fk_conversation_id
FROM im_inbox FROM im_inbox
INNER JOIN im_message ON im_message.id = im_inbox.fk_msg_id INNER JOIN im_message ON im_message.id = im_inbox.fk_msg_id
...@@ -102,7 +105,7 @@ ...@@ -102,7 +105,7 @@
im_message.system_flag, im_message.system_flag,
im_message.`at`, im_message.`at`,
im_message.send_status, im_message.send_status,
im_message.fk_conversation_id im_message.`msg_type` AS 'type', im_message.fk_conversation_id
FROM im_message FROM im_message
INNER JOIN im_client AS im_client ON im_client.id = im_message.sender INNER JOIN im_client AS im_client ON im_client.id = im_message.sender
WHERE im_message.fk_conversation_id = #{conversationId} WHERE im_message.fk_conversation_id = #{conversationId}
......
-- 在feature-cluster 2021年12月22日之后,需要执行的的sql增量脚本
-- 在feature-cluster 2021年12月22日之后,需要执行的的sql增量脚本
ALTER TABLE `im_conversation` CHANGE COLUMN `system``system_flag` tinyint(1) NULL DEFAULT NULL COMMENT '可选 对话类型标志,是否是系统对话,后面会说明。' AFTER `attributes`;
ALTER TABLE `im_message` CHANGE COLUMN `system``system_flag` tinyint(1) NULL DEFAULT 0 COMMENT '0非系统通知; 1为系统通知' AFTER `event`;
-- 在feature-cluster 2022年1月4日之后,需要执行的sql增量脚本
ALTER TABLE im_conversation
ADD COLUMN`member_count` int NULL COMMENT '群成员数' AFTER`last_message`;
ALTER TABLE im_conversation
ADD COLUMN`chat_type` tinyint NULL COMMENT '是否万人群' AFTER`member_count`;
ALTER TABLE `im_conversation_members`
ADD INDEX `fk_client_id`(`fk_client_id`);
-- 在feature-cluster 2022年1月10日之后,需要执行的sql增量脚本
-- 消息最新已读寻址表
CREATE TABLE `im_msg_read_lastest`
(
`id` bigint NOT NULL COMMENT '收件id',
`create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',
`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`read_time` timestamp NULL DEFAULT NULL COMMENT '读取时间',
`receive_time` timestamp NULL DEFAULT NULL COMMENT '接收时间',
`fk_appid` bigint NOT NULL COMMENT '应用appid',
`fk_client_id` bigint NOT NULL COMMENT '接收客户端id',
`fk_conversation_id` bigint NOT NULL COMMENT '会话id',
`fk_receive_msg_id` bigint DEFAULT NULL COMMENT '最后一条已接收的消息id',
`fk_read_msg_id` bigint DEFAULT NULL COMMENT '最后一条已读的消息id',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `receiver_conversation`(`fk_client_id`,`fk_conversation_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息最新已读寻址表';
CREATE TABLE `im_friend`
(
`id` bigint NOT NULL COMMENT '主键id',
`fk_client_id` bigint NOT NULL COMMENT '自己id',
`fk_client_id_friend` bigint NOT NULL COMMENT '好友id',
`fk_client_id_claimer` bigint NOT NULL COMMENT '好友申请者',
`friend_name` varchar(255) DEFAULT NULL COMMENT '好友名称备注',
`reject_remark` varchar(255) DEFAULT NULL COMMENT '拒绝说明',
`request_remark` varchar(255) DEFAULT NULL COMMENT '好友请求说明',
`state` tinyint NOT NULL DEFAULT '0' COMMENT '好友状态,1:待确定,2:已确认,3:已拒绝,4:已删除',
`create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',
`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `friend_id`(`fk_client_id`,`fk_client_id_friend`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='好友表';
CREATE TABLE `im_friend_recommend`
(
`id` bigint(20) NOT NULL COMMENT '主键id',
`fk_client_id` bigint(20) NOT NULL COMMENT '自己id',
`fk_client_id_friend` bigint(20) NOT NULL COMMENT '好友id',
`source` tinyint DEFAULT NULL COMMENT '推荐来源',
`del_flag` bit(1) NOT NULL default 0 COMMENT '删除标识',
`create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',
`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `friend_id`(fk_client_id, fk_client_id_friend) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='好友表';
-- 2022年01月21日17:31:16 何威 将type参数上移一层
ALTER TABLE `im_message`
ADD COLUMN`msg_type` int(2) NULL DEFAULT NULL COMMENT '消息类型' AFTER`send_status`;
\ No newline at end of file
...@@ -213,57 +213,3 @@ CREATE TABLE `im_rtc_log` ...@@ -213,57 +213,3 @@ CREATE TABLE `im_rtc_log`
SET SET
FOREIGN_KEY_CHECKS = 1; FOREIGN_KEY_CHECKS = 1;
-- 在feature-cluster 2021年12月22日之后,需要执行的的sql增量脚本
ALTER TABLE `im_conversation` CHANGE COLUMN `system` `system_flag` tinyint(1) NULL DEFAULT NULL COMMENT '可选 对话类型标志,是否是系统对话,后面会说明。' AFTER `attributes`;
ALTER TABLE `im_message` CHANGE COLUMN `system` `system_flag` tinyint(1) NULL DEFAULT 0 COMMENT '0非系统通知; 1为系统通知' AFTER `event`;
-- 在feature-cluster 2022年1月4日之后,需要执行的sql增量脚本
ALTER TABLE im_conversation`
ADD COLUMN `member_count` int NULL COMMENT '群成员数' AFTER `last_message`,
ADD COLUMN `chat_type` tinyint NULL COMMENT '是否万人群' AFTER `member_count`;
ALTER TABLE `im_conversation_members` ADD INDEX `fk_client_id`(`fk_client_id`);
-- 在feature-cluster 2022年1月10日之后,需要执行的sql增量脚本
-- 消息最新已读寻址表
CREATE TABLE `im_msg_read_lastest` (
`id` bigint NOT NULL COMMENT '收件id',
`create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',
`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`read_time` timestamp NULL DEFAULT NULL COMMENT '读取时间',
`receive_time` timestamp NULL DEFAULT NULL COMMENT '接收时间',
`fk_appid` bigint NOT NULL COMMENT '应用appid',
`fk_client_id` bigint NOT NULL COMMENT '接收客户端id',
`fk_conversation_id` bigint NOT NULL COMMENT '会话id',
`fk_receive_msg_id` bigint DEFAULT NULL COMMENT '最后一条已接收的消息id',
`fk_read_msg_id` bigint DEFAULT NULL COMMENT '最后一条已读的消息id',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `receiver_conversation` (`fk_client_id`,`fk_conversation_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息最新已读寻址表';
CREATE TABLE `im_friend` (
`id` bigint NOT NULL COMMENT '主键id',
`fk_client_id` bigint NOT NULL COMMENT '自己id',
`fk_client_id_friend` bigint NOT NULL COMMENT '好友id',
`fk_client_id_claimer` bigint NOT NULL COMMENT '好友申请者',
`friend_name` varchar(255) DEFAULT NULL COMMENT '好友名称备注',
`reject_remark` varchar(255) DEFAULT NULL COMMENT '拒绝说明',
`request_remark` varchar(255) DEFAULT NULL COMMENT '好友请求说明',
`state` tinyint NOT NULL DEFAULT '0' COMMENT '好友状态,1:待确定,2:已确认,3:已拒绝,4:已删除',
`create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',
`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `friend_id` (`fk_client_id`,`fk_client_id_friend`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='好友表';
CREATE TABLE `im_friend_recommend` (
`id` bigint(20) NOT NULL COMMENT '主键id',
`fk_client_id` bigint(20) NOT NULL COMMENT '自己id',
`fk_client_id_friend` bigint(20) NOT NULL COMMENT '好友id',
`source` tinyint DEFAULT NULL COMMENT '推荐来源',
`del_flag` bit(1) NOT NULL default 0 COMMENT '删除标识',
`create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',
`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `friend_id` (fk_client_id, fk_client_id_friend) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='好友表';
package io.geekidea.springbootplus.framework.shiro.util;
import cn.hutool.core.lang.Snowflake;
public class SnowflakeUtil {
public static Long getId() {
return new Snowflake(1L, 1L).nextId();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment