Commit a27a28d0 by 罗长华

完善消息路由功能

parent 8a172b8f
package com.wecloud.im.event; package com.wecloud.im.event;
import lombok.Getter;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
import com.wecloud.im.entity.ImMessage;
/** /**
* 消息发送事件 * 消息发送事件
* @Author luozh * @Author luozh
* @Date 2022年04月22日 09:40 * @Date 2022年04月22日 09:40
* @Version 1.0 * @Version 1.0
*/ */
@Getter
public class ClientSendMessageEvent extends ApplicationEvent { public class ClientSendMessageEvent extends ApplicationEvent {
private Long applicationId;
private ImMessage message;
/** /**
* Create a new {@code ApplicationEvent}. * Create a new {@code ApplicationEvent}.
...@@ -18,5 +26,13 @@ public class ClientSendMessageEvent extends ApplicationEvent { ...@@ -18,5 +26,13 @@ public class ClientSendMessageEvent extends ApplicationEvent {
*/ */
public ClientSendMessageEvent(Object source) { public ClientSendMessageEvent(Object source) {
super(source); super(source);
} }
public ClientSendMessageEvent(Long applicationId, ImMessage message) {
super(message);
this.applicationId = applicationId;
this.message = message;
}
} }
package com.wecloud.im.event.listener; package com.wecloud.im.event.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.event.ClientSendMessageEvent; import com.wecloud.im.event.ClientSendMessageEvent;
import com.wecloud.im.service.ImCallbackService;
/** /**
* *
...@@ -13,8 +16,14 @@ import com.wecloud.im.event.ClientSendMessageEvent; ...@@ -13,8 +16,14 @@ import com.wecloud.im.event.ClientSendMessageEvent;
*/ */
@Component @Component
public class ClientSendMessageListener implements ApplicationListener<ClientSendMessageEvent> { public class ClientSendMessageListener implements ApplicationListener<ClientSendMessageEvent> {
@Autowired
private ImCallbackService callbackService;
@Override @Override
public void onApplicationEvent(ClientSendMessageEvent event) { public void onApplicationEvent(ClientSendMessageEvent event) {
Long applicationId = event.getApplicationId();
ImMessage message = event.getMessage();
callbackService.fullMessageRouting(applicationId, message);
} }
} }
package com.wecloud.im.service; package com.wecloud.im.service;
import com.wecloud.im.entity.ImMessage;
/** /**
* sdk 回调服务 * sdk 回调服务
* @Author luozh * @Author luozh
...@@ -15,7 +17,7 @@ public interface ImCallbackService { ...@@ -15,7 +17,7 @@ public interface ImCallbackService {
* @param * @param
* @Return 路由结果 * @Return 路由结果
*/ */
Boolean fullMessageRouting(); Boolean fullMessageRouting(Long applicationId, ImMessage message);
/** /**
* 用户在线状态变化 * 用户在线状态变化
...@@ -26,4 +28,5 @@ public interface ImCallbackService { ...@@ -26,4 +28,5 @@ public interface ImCallbackService {
*/ */
Boolean clientOnlineStatusChange(Long applicationId, Long clientId, Integer status, Boolean clientOnlineStatusChange(Long applicationId, Long clientId, Integer status,
Integer deviceType, Long time, String clientIp); Integer deviceType, Long time, String clientIp);
} }
...@@ -3,6 +3,8 @@ package com.wecloud.im.service.impl; ...@@ -3,6 +3,8 @@ package com.wecloud.im.service.impl;
import io.geekidea.springbootplus.framework.shiro.signature.SignUtils; import io.geekidea.springbootplus.framework.shiro.signature.SignUtils;
import java.util.Date; import java.util.Date;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -14,12 +16,21 @@ import org.springframework.web.client.RestTemplate; ...@@ -14,12 +16,21 @@ import org.springframework.web.client.RestTemplate;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.id.NanoId; import cn.hutool.core.lang.id.NanoId;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.wecloud.im.entity.ImApplication; import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient; import com.wecloud.im.entity.ImClient;
import com.wecloud.im.entity.ImConversation;
import com.wecloud.im.entity.ImConversationMembers;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.param.ClientOnlineStatusChangeDto; import com.wecloud.im.param.ClientOnlineStatusChangeDto;
import com.wecloud.im.sdk.enums.ChatTypeEnum;
import com.wecloud.im.service.ImApplicationService; import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImCallbackService; import com.wecloud.im.service.ImCallbackService;
import com.wecloud.im.service.ImClientService; import com.wecloud.im.service.ImClientService;
import com.wecloud.im.service.ImConversationMembersService;
import com.wecloud.im.service.ImConversationService;
import com.wecloud.im.ws.enums.MsgTypeEnum;
/** /**
* 回调服务实现类 * 回调服务实现类
...@@ -39,6 +50,12 @@ public class ImCallbackServiceImpl implements ImCallbackService { ...@@ -39,6 +50,12 @@ public class ImCallbackServiceImpl implements ImCallbackService {
@Autowired @Autowired
private ImClientService imClientService; private ImClientService imClientService;
@Autowired
private ImConversationService conversationService;
@Autowired
private ImConversationMembersService conversationMembersService;
/** /**
* 全量消息路由 * 全量消息路由
* @Author luozh * @Author luozh
...@@ -47,8 +64,38 @@ public class ImCallbackServiceImpl implements ImCallbackService { ...@@ -47,8 +64,38 @@ public class ImCallbackServiceImpl implements ImCallbackService {
* @Return * @Return
*/ */
@Override @Override
public Boolean fullMessageRouting() { public Boolean fullMessageRouting(Long applicationId, ImMessage message) {
return null;
ImApplication application = applicationService.getById(applicationId);
if (application == null) {
return false;
}
ImConversation conversation = conversationService.getById(message.getFkConversationId());
if (!isNeedSync(conversation, message)) {
return false;
}
String subscribeUrl = application.getFullMessageSubscribeUrl();
if (StringUtils.isNotBlank(subscribeUrl)) {
String appKey = application.getAppKey();
String appSecret = application.getAppSecret();
String callbackUrl = buildCallbackUrl(subscribeUrl, appKey, appSecret);
try {
ResponseEntity<Object> response = restTemplate.postForEntity(callbackUrl, buildMsgBody(conversation,
message),
Object.class);
// 同步在线状态时需要接收服务提供应答,只要有 HTTP 应答码 200 即认为状态已经同步
if (response.getStatusCode().equals(HttpStatus.OK)) {
// do nothing
} else {
// 如果应答超时 5 秒,会再尝试推送 2 次,如果仍然失败,将不再同步此条状态。如短时间内有大面积超时,将暂停推送,1 分钟后会继续推送。
}
} catch (Exception e) {
// do nothing is ok
}
}
return true;
} }
/** /**
...@@ -119,4 +166,50 @@ public class ImCallbackServiceImpl implements ImCallbackService { ...@@ -119,4 +166,50 @@ public class ImCallbackServiceImpl implements ImCallbackService {
subscribeUrl + "?appKey=" + appKey + "&nonce=" + nonce + "&date=" + date + "&signature=" + signature; subscribeUrl + "?appKey=" + appKey + "&nonce=" + nonce + "&date=" + date + "&signature=" + signature;
return finalUrl; return finalUrl;
} }
private Boolean isNeedSync(ImConversation conversation, ImMessage message) {
int chatType = conversation.getChatType();
if (!(ChatTypeEnum.SINGLE.getCode().equals(chatType) || ChatTypeEnum.NORMAL_GROUP.getCode().equals(chatType))) {
return false;
}
int msgType = message.getMsgType();
// 只需要同步 文本消息 图像消息 音频消息 视频消息 文件消息
if (MsgTypeEnum.MEDIA_TYPE_TEXT.getUriCode() == msgType
|| MsgTypeEnum.MEDIA_TYPE_IMAGE.getUriCode() == msgType
|| MsgTypeEnum.MEDIA_TYPE_AUDIO.getUriCode() == msgType
|| MsgTypeEnum.MEDIA_TYPE_VIDEO.getUriCode() == msgType
|| MsgTypeEnum.MEDIA_TYPE_FILE.getUriCode() == msgType) {
return true;
}
return false;
}
private JSONObject buildMsgBody(ImConversation conversation, ImMessage message) {
ImClient sender = imClientService.getById(message.getSender());
JSONObject body = new JSONObject();
body.put("id", message.getId());
body.put("fromUserId", sender.getClientId());
if (Objects.equals(ChatTypeEnum.SINGLE.getCode(), conversation.getChatType())) {
// 查找出目标id
ImConversationMembers anotherMember =
conversationMembersService.list(Wrappers.<ImConversationMembers>lambdaQuery()
.eq(ImConversationMembers::getFkConversationId, conversation.getId()))
.stream()
.filter(member -> !member.getClientId().equals(sender.getClientId())).collect(Collectors.toList()).get(0);
body.put("targetId", anotherMember.getClientId());
body.put("chatType", ChatTypeEnum.SINGLE);
} else {
body.put("chatType", ChatTypeEnum.NORMAL_GROUP);
body.put("targetId", conversation.getId());
}
int msgType = message.getMsgType();
body.put("msgType", MsgTypeEnum.getByCode(msgType));
body.put("content", message.getContent());
body.put("msgSendTime", DateUtil.formatDateTime(message.getCreateTime()));
body.put("withdrawTime", DateUtil.formatDateTime(message.getWithdrawTime()));
return body;
}
} }
...@@ -24,6 +24,7 @@ import java.util.stream.Collectors; ...@@ -24,6 +24,7 @@ import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
...@@ -45,6 +46,7 @@ import com.wecloud.im.entity.ImConversationMembers; ...@@ -45,6 +46,7 @@ import com.wecloud.im.entity.ImConversationMembers;
import com.wecloud.im.entity.ImInbox; import com.wecloud.im.entity.ImInbox;
import com.wecloud.im.entity.ImMessage; import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.entity.ImMessageOnlineSend; import com.wecloud.im.entity.ImMessageOnlineSend;
import com.wecloud.im.event.ClientSendMessageEvent;
import com.wecloud.im.mapper.ImMessageMapper; import com.wecloud.im.mapper.ImMessageMapper;
import com.wecloud.im.mq.MqSender; import com.wecloud.im.mq.MqSender;
import com.wecloud.im.param.ChatContentVo; import com.wecloud.im.param.ChatContentVo;
...@@ -127,6 +129,9 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes ...@@ -127,6 +129,9 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
@Autowired @Autowired
private ImInboxService imInboxService; private ImInboxService imInboxService;
@Autowired
private ApplicationEventPublisher eventPublisher;
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public ApiResult<Boolean> restApiImMessageSend(ImMsgSendToOnlineClient imMsgSendToOnlineClient, ImApplication imApplication) { public ApiResult<Boolean> restApiImMessageSend(ImMsgSendToOnlineClient imMsgSendToOnlineClient, ImApplication imApplication) {
...@@ -202,6 +207,9 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes ...@@ -202,6 +207,9 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
imMessage.setMsgType(sysParam.getType()); imMessage.setMsgType(sysParam.getType());
imMessage.setFkConversationId(sysParam.getToConversation()); imMessage.setFkConversationId(sysParam.getToConversation());
this.save(imMessage); this.save(imMessage);
eventPublisher.publishEvent(new ClientSendMessageEvent(imApplication.getId(), imMessage));
return imMessage; return imMessage;
} }
...@@ -227,6 +235,9 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes ...@@ -227,6 +235,9 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
imMessage.setAt(data.getAt()); imMessage.setAt(data.getAt());
imMessage.setFkConversationId(data.getToConversation()); imMessage.setFkConversationId(data.getToConversation());
this.save(imMessage); this.save(imMessage);
eventPublisher.publishEvent(new ClientSendMessageEvent(client.getFkAppid(), imMessage));
return imMessage; return imMessage;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment