Commit 85e6557e by Future

rocketmq 接入

parent 6ce86067
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package io.geekidea.springbootplus; package io.geekidea.springbootplus;
import com.wecloud.im.annotation.EnableRocketMq;
import io.geekidea.springbootplus.framework.util.PrintApplicationInfo; import io.geekidea.springbootplus.framework.util.PrintApplicationInfo;
import org.apache.dubbo.config.spring.context.annotation.EnableDubbo; import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.mybatis.spring.annotation.MapperScan; import org.mybatis.spring.annotation.MapperScan;
...@@ -41,6 +42,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement; ...@@ -41,6 +42,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
@EnableConfigurationProperties @EnableConfigurationProperties
@ServletComponentScan @ServletComponentScan
@EnableCaching @EnableCaching
@EnableRocketMq
@MapperScan({"io.geekidea.springbootplus.**.mapper", "com.wecloud.**.mapper"}) @MapperScan({"io.geekidea.springbootplus.**.mapper", "com.wecloud.**.mapper"})
@SpringBootApplication(scanBasePackages = {"io.geekidea.springbootplus", "com.wecloud"}) @SpringBootApplication(scanBasePackages = {"io.geekidea.springbootplus", "com.wecloud"})
public class SpringBootPlusApplication { public class SpringBootPlusApplication {
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wecloud.imserver</groupId>
<artifactId>client</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>client</name>
<description>对外接口提供模块</description>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<!--依赖-->
<dependencies>
<!-- lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.11.3</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
<repository>
<id>releases</id>
<name>Nexus Release Repository</name>
<url>http://192.168.1.187:8081/repository/maven-releases/</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<name>Nexus Snapshot Repository</name>
<url>http://192.168.1.187:8081/repository/maven-snapshots/</url>
</snapshotRepository>
</distributionManagement>
</project>
package com.wecloud.imserver.client.api;
import com.wecloud.imserver.client.model.ao.IosApnsQueryAO;
import com.wecloud.imserver.client.model.dto.ImIosApnsDTO;
import com.wecloud.imserver.client.model.dto.Result;
/**
* @Author wenzhida
* @Date 2022/1/27 16:35
* @Description apns信息获取rpc接口
*/
public interface ImIosApnsFacade {
/**
* 获取ios apns
*
* @param queryAO 入参ao
* @return 出参
*/
Result<ImIosApnsDTO> getImIosApns(IosApnsQueryAO queryAO);
}
package com.wecloud.imserver.client.model.ao;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import java.io.Serializable;
/**
* @Author wenzhida
* @Date 2022/1/27 17:15
* @Description 获取
*/
@Getter
@Setter
@ToString
public class IosApnsQueryAO implements Serializable {
private static final long serialVersionUID = -6378371794582236581L;
/**
* appId
*/
private Long appId;
}
package com.wecloud.imserver.client.model.dto;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import java.io.Serializable;
/**
* @Author wenzhida
* @Date 2022/1/27 11:16
* @Description apns配置表
*/
@Getter
@Setter
@ToString
public class ImIosApnsDTO implements Serializable {
private static final long serialVersionUID = -6104258897097495881L;
/**
* 应用appid
*/
private Long fkAppId;
/**
* Base64(apns.p12)
*/
private String apnsFileValue;
/**
* 环境: 正式-1,测试-0
*/
private Integer env;
/**
* bundle_id
*/
private String bundleId;
/**
* 证书密码
*/
private String pwd;
}
package com.wecloud.imserver.client.model.dto;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.Serializable;
/**
* @Author wenzhida
* @Date 2022/1/27 16:11
* @Description rpc接口外层Result
*/
public class Result<T> implements Serializable {
private static final long serialVersionUID = -8466892006788842306L;
/**
* 成功
*/
private static final Integer SUCCESS_STATUS = 0;
/**
* 业务异常,必填字段为空,年龄为负数等
*/
private static final Integer BUSINESS_ERROR_STATUS = -1;
/**
* 服务异常,数据库连接超时,空指针等
*/
private static final Integer SERVICE_ERROR_STATUS = -2;
/**
* 请求参数非法
*/
private static final Integer PARAM_ERROR_STATUS = -3;
/**
* 服务状态码,为integer类型, 0 为成功,-1为业务校验错误(手机号格式不对,会员不存在等),-2为服务不可用异常(sql语句异常,空指针等)。-3为请求参数非法<br/>
*/
private Integer status = SUCCESS_STATUS;
/**
* 描述信息。用于返回业务校验错误信息和服务不可用异常信息,不可用于设置日志堆栈信息。举例:业务信息: msg = "手机号格式不对" ,服务不可用异常信息:msg="数据库连接池中没有空闲连接".
*/
private String msg;
/**
* 业务语义状态码,用于业务语义路由,通常和msg搭配使用。当status为-1时,用该code指明具体业务校验错误
*/
private String code;
/**
* 业务正常返回的消息体,每个服务使用专用data对象,不可复用。属性命名规则同数据库命名。支持数据结构包装,例如:List<data>,Set<data>。
*/
private T data;
/**
* 异常堆栈信息
*/
private String trace;
public Result() {
}
public Result(Integer status, String msg, String code, T data, String trace) {
this.status = status;
this.msg = msg;
this.code = code;
this.data = data;
this.trace = trace;
}
public Result(Integer status, String msg, String code, T data) {
this(status, msg, code, data, null);
}
public Result(Integer status, String msg, String code) {
this(status, msg, code, null, null);
}
/**
* 请求是否成功,判断status值是否为{@link #SUCCESS_STATUS}
*
* @return
*/
@JsonIgnore
public boolean isSuccess() {
return SUCCESS_STATUS.equals(getStatus());
}
/**
* 得到服务异常 Result.msg字段值为"服务器繁忙,请稍候重试!"
*/
public static <T> Result<T> getServiceError() {
return getServiceError("服务器繁忙,请稍候重试!", null);
}
/**
* 得到服务异常Result.
*
* @param message 用于设置Result.msg字段
*/
public static <T> Result<T> getServiceError(String message) {
return getServiceError(message, null);
}
/**
* 得到服务异常Result.
*
* @param message 用于设置Result.msg字段
* @param trace 用于设置Result.trace字段
*/
public static <T> Result<T> getServiceError(String message, String trace) {
Result<T> result = new Result<>(SERVICE_ERROR_STATUS, message, null, null, trace);
return result;
}
/**
* 得到正常结果.
*
* @param re 正常结果
*/
public static <T> Result<T> getSuccessResult(T re) {
Result<T> result = new Result<>(SUCCESS_STATUS, null, null, re);
return result;
}
/**
* 得到业务异常Result.
*
* @param msg 用于设置Result.msg字段
* @param code 用于设置Result.code字段
*/
public static <T> Result<T> getBusinessException(String msg, String code) {
Result<T> result = new Result<>(BUSINESS_ERROR_STATUS, msg, code, null);
return result;
}
/**
* 得到业务异常Result.
*
* @param code 用于设置Result.code字段
*/
public static <T> Result<T> getBusinessException(String code) {
return getBusinessException(null, code);
}
/**
* 得到请求参数非法异常Result.
*
* @param code 用于设置Result.code字段
*/
public static <T> Result<T> getParamException(String code) {
return getParamException(code, null);
}
/**
* 得到请求参数非法异常Result.
*
* @param code 用于设置Result.code字段
*/
public static <T> Result<T> getParamException(String code, String msg) {
return new Result<>(PARAM_ERROR_STATUS, msg, code);
}
public int getStatus() {
return status;
}
public String getMsg() {
return msg;
}
public T getData() {
return data;
}
public String getCode() {
return code;
}
public String getTrace() {
return trace;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Result{");
sb.append("status=").append(status);
sb.append(", msg='").append(msg).append('\'');
sb.append(", code='").append(code).append('\'');
sb.append(", data=").append(data);
sb.append('}');
return sb.toString();
}
}
package com.wecloud.imserver.client.model.enums;
/**
* @Author wenzhida
* @Date 2022/1/26 16:58
* @Description 设备是否想接收到推送提醒美剧
*/
public enum DeviceTypeEnum {
/**
* 1 - ios
*/
IOS(1, "ios"),
/**
* 2 - android
*/
ANDROID(2, "android");
DeviceTypeEnum(int code, String value) {
this.code = code;
this.value = value;
}
private int code;
private String value;
/**
* 获取对应的中文名称
*
* @param code 枚举值
* @return 枚举对应的中文名称
*/
public static String getValue(Integer code) {
if (code == null) {
return null;
}
for (DeviceTypeEnum c : DeviceTypeEnum.values()) {
if (c.code == code) {
return c.value;
}
}
return null;
}
public String getValue() {
return this.value;
}
public int getCode() {
return this.code;
}
}
...@@ -55,3 +55,9 @@ mybatis-plus: ...@@ -55,3 +55,9 @@ mybatis-plus:
load-blance: load-blance:
# 服务器运营商local,aws,huawei # 服务器运营商local,aws,huawei
server-type: local server-type: local
# NameServer地址 用;作为地址的分隔符
rocketmq:
namesrvAddr: 127.0.0.1:9876
# 生产者的组名
producerId: im-server
\ No newline at end of file
...@@ -130,6 +130,17 @@ ...@@ -130,6 +130,17 @@
</dependency> </dependency>
<!-- wecloud短信 end--> <!-- wecloud短信 end-->
<dependency>
<groupId>com.wecloud</groupId>
<artifactId>pushserver-client</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.wecloud.imserver</groupId>
<artifactId>client</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
</dependencies> </dependencies>
<!-- <build>--> <!-- <build>-->
<!-- <resources>--> <!-- <resources>-->
......
...@@ -4,16 +4,30 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; ...@@ -4,16 +4,30 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.wecloud.dispatch.annotation.ActionMapping; import com.wecloud.dispatch.annotation.ActionMapping;
import com.wecloud.dispatch.common.BaseRequest; import com.wecloud.dispatch.common.BaseRequest;
import com.wecloud.dispatch.extend.ActionRequest; import com.wecloud.dispatch.extend.ActionRequest;
import com.wecloud.im.entity.*; import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.entity.ImConversationMembers;
import com.wecloud.im.entity.ImInbox;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.entity.ImMessageOnlineSend;
import com.wecloud.im.mq.MqSender;
import com.wecloud.im.param.ChatContentVo; import com.wecloud.im.param.ChatContentVo;
import com.wecloud.im.param.ImClientSimpleDto; import com.wecloud.im.param.ImClientSimpleDto;
import com.wecloud.im.param.ImConversationQueryVo; import com.wecloud.im.param.ImConversationQueryVo;
import com.wecloud.im.param.MsgVo; import com.wecloud.im.param.MsgVo;
import com.wecloud.im.service.*; import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientBlacklistService;
import com.wecloud.im.service.ImClientService;
import com.wecloud.im.service.ImConversationMembersService;
import com.wecloud.im.service.ImConversationService;
import com.wecloud.im.service.ImInboxService;
import com.wecloud.im.service.ImMessageService;
import com.wecloud.im.ws.enums.WsResponseCmdEnum; import com.wecloud.im.ws.enums.WsResponseCmdEnum;
import com.wecloud.im.ws.model.WsResponse; import com.wecloud.im.ws.model.WsResponse;
import com.wecloud.im.ws.sender.AsyncPush; import com.wecloud.im.ws.sender.AsyncPush;
import com.wecloud.im.ws.sender.ChannelSender; import com.wecloud.im.ws.sender.ChannelSender;
import com.wecloud.pushserver.client.model.constant.MqConstant;
import com.wecloud.pushserver.client.model.dto.PushDTO;
import com.wecloud.utils.JsonUtils; import com.wecloud.utils.JsonUtils;
import com.wecloud.utils.SnowflakeUtil; import com.wecloud.utils.SnowflakeUtil;
import io.geekidea.springbootplus.framework.common.api.ApiCode; import io.geekidea.springbootplus.framework.common.api.ApiCode;
...@@ -58,6 +72,8 @@ public class NormalChatAction { ...@@ -58,6 +72,8 @@ public class NormalChatAction {
private ImClientBlacklistService imClientBlacklistService; private ImClientBlacklistService imClientBlacklistService;
@Autowired @Autowired
private ImInboxService imInboxService; private ImInboxService imInboxService;
@Autowired
private MqSender mqSender;
@ActionMapping("/normal/send") @ActionMapping("/normal/send")
public void sendMsg(ActionRequest request, ChatContentVo data, String reqId) { public void sendMsg(ActionRequest request, ChatContentVo data, String reqId) {
...@@ -128,7 +144,9 @@ public class NormalChatAction { ...@@ -128,7 +144,9 @@ public class NormalChatAction {
sendMsgForOnline(imClientReceiver.getId(), imMessageOnlineSend); sendMsgForOnline(imClientReceiver.getId(), imMessageOnlineSend);
// 异步推送系统通知消息 // 异步推送系统通知消息
systemPush.push(data.getPush(), imClientReceiver, imApplication); // systemPush.push(data.getPush(), imClientReceiver, imApplication);
PushDTO pushDTO = mqSender.buildPushDto(data.getPush(), imClientReceiver, imApplication);
mqSender.synSend(MqConstant.Topic.IM_MSG_TOPIC, MqConstant.Tag.IM_MSG_TAG, pushDTO);
} }
// 响应发送方消息id等信息 // 响应发送方消息id等信息
......
package com.wecloud.im.annotation;
import com.wecloud.im.config.RocketMqFactoryBeanConfig;
import com.wecloud.im.config.RocketMqProperties;
import com.wecloud.im.mq.RocketMqProducerService;
import org.springframework.context.annotation.Import;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @Author wenzhida
* @Date 2022/1/25 15:25
* @Description 启用RocketMq 注解
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({
RocketMqProperties.class,
RocketMqProducerService.class,
RocketMqFactoryBeanConfig.class
})
public @interface EnableRocketMq {
}
\ No newline at end of file
package com.wecloud.im.config;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.context.annotation.Bean;
/**
* @Author wenzhida
* @Date 2022/1/25 15:32
* @Description 创建生产者的工厂bean
*/
public class RocketMqFactoryBeanConfig {
@Bean
public DefaultMQProducer defaultProducer(RocketMqProperties configuration) throws Exception {
if (configuration.getNamesrvAddr() == null) {
throw new IllegalArgumentException("rocketmq.nameSrvAddress 是必须的参数");
}
DefaultMQProducer producer = new DefaultMQProducer(configuration.getProducerId());
producer.setNamesrvAddr(configuration.getNamesrvAddr());
producer.setInstanceName(System.currentTimeMillis() + "");
producer.start();
return producer;
}
}
package com.wecloud.im.config;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.annotation.Order;
/**
* @Author wenzhida
* @Date 2022/1/25 15:36
* @Description Rocketmq配置文件
*/
@Data
@ConfigurationProperties(prefix = "rocketmq")
@Order(-1)
public class RocketMqProperties {
/**
* rocketmq集群名称服务地址,用;作为地址的分隔符
*/
private String namesrvAddr;
/**
* mq集群生产者id 如果不填使用实例名称
*/
private String producerId;
public String getProducerId() {
if (StringUtils.isBlank(producerId)) {
throw new IllegalArgumentException("rocketmq.producerId 是必须的");
}
return producerId;
}
}
package com.wecloud.im.exception;
/**
* @Author wenzhida
* @Date 2022/1/25 15:52
* @Description 客户端异常
*/
public class MqClientException extends RuntimeException {
private static final long serialVersionUID = 6591266214847410791L;
public MqClientException() {
}
public MqClientException(String message) {
super(message);
}
public MqClientException(Throwable cause) {
super(cause);
}
public MqClientException(String message, Throwable cause) {
super(message, cause);
}
}
\ No newline at end of file
package com.wecloud.im.exception;
/**
* @Author wenzhida
* @Date 2022/1/25 15:54
* @Description
*/
public class MqContextException extends Throwable {
private String messageId;
private String topic;
private MqClientException exception;
public MqContextException() {
}
public String getMessageId() {
return this.messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getTopic() {
return this.topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public MqClientException getException() {
return this.exception;
}
public void setException(MqClientException exception) {
this.exception = exception;
}
}
package com.wecloud.im.exception;
/**
* @Author wenzhida
* @Date 2022/1/25 15:55
* @Description 消息发送异常类
*/
public class MqSendException extends RuntimeException {
private static final long serialVersionUID = 4056679555332877494L;
public MqSendException() {
super();
}
public MqSendException(String message) {
super(message);
}
public MqSendException(String message, Throwable cause) {
super(message, cause);
}
public MqSendException(Throwable cause) {
super(cause);
}
}
...@@ -5,12 +5,15 @@ import com.wecloud.im.entity.ImClient; ...@@ -5,12 +5,15 @@ import com.wecloud.im.entity.ImClient;
import com.wecloud.im.enums.EventResponseSubCmdEnum; import com.wecloud.im.enums.EventResponseSubCmdEnum;
import com.wecloud.im.friend.param.FriendApplyEventDto; import com.wecloud.im.friend.param.FriendApplyEventDto;
import com.wecloud.im.friend.param.FriendApproveEventDto; import com.wecloud.im.friend.param.FriendApproveEventDto;
import com.wecloud.im.mq.MqSender;
import com.wecloud.im.service.ImApplicationService; import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.ws.enums.WsResponseCmdEnum; import com.wecloud.im.ws.enums.WsResponseCmdEnum;
import com.wecloud.im.ws.model.WsResponse; import com.wecloud.im.ws.model.WsResponse;
import com.wecloud.im.ws.model.request.PushVO; import com.wecloud.im.ws.model.request.PushVO;
import com.wecloud.im.ws.sender.AsyncPush; import com.wecloud.im.ws.sender.AsyncPush;
import com.wecloud.im.ws.sender.ChannelSender; import com.wecloud.im.ws.sender.ChannelSender;
import com.wecloud.pushserver.client.model.constant.MqConstant;
import com.wecloud.pushserver.client.model.dto.PushDTO;
import io.geekidea.springbootplus.framework.common.api.ApiCode; import io.geekidea.springbootplus.framework.common.api.ApiCode;
import io.geekidea.springbootplus.framework.common.api.ApiResult; import io.geekidea.springbootplus.framework.common.api.ApiResult;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -33,6 +36,9 @@ public class FriendEventSender { ...@@ -33,6 +36,9 @@ public class FriendEventSender {
@Autowired @Autowired
private ImApplicationService imApplicationService; private ImApplicationService imApplicationService;
@Autowired
private MqSender mqSender;
private final static String FRIEND_APPLY_TITLE = "好友申请"; private final static String FRIEND_APPLY_TITLE = "好友申请";
private final static String FRIEND_APPLY_TITLE_SUB = "您有好友申请待验证"; private final static String FRIEND_APPLY_TITLE_SUB = "您有好友申请待验证";
private final static String FRIEND_APPROVE_TITLE = "好友验证结果"; private final static String FRIEND_APPROVE_TITLE = "好友验证结果";
...@@ -62,7 +68,9 @@ public class FriendEventSender { ...@@ -62,7 +68,9 @@ public class FriendEventSender {
PushVO pushVO = new PushVO(); PushVO pushVO = new PushVO();
pushVO.setTitle(FRIEND_APPLY_TITLE); pushVO.setTitle(FRIEND_APPLY_TITLE);
pushVO.setSubTitle(FRIEND_APPLY_TITLE_SUB); pushVO.setSubTitle(FRIEND_APPLY_TITLE_SUB);
systemPush.push(pushVO, receiveClient, app); // systemPush.push(pushVO, receiveClient, app);
PushDTO pushDTO = mqSender.buildPushDto(pushVO, receiveClient, app);
mqSender.synSend(MqConstant.Topic.IM_MSG_TOPIC, MqConstant.Tag.IM_MSG_TAG, pushDTO);
} }
public void sendFriendApproveEventMsg(ImClient receiveClient, boolean isAgree, String rejectRemark) { public void sendFriendApproveEventMsg(ImClient receiveClient, boolean isAgree, String rejectRemark) {
......
package com.wecloud.im.mq;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.service.ImInboxService;
import com.wecloud.im.ws.model.request.PushVO;
import com.wecloud.pushserver.client.model.constant.MqConstant;
import com.wecloud.pushserver.client.model.dto.ImApplicationDTO;
import com.wecloud.pushserver.client.model.dto.ImClientDTO;
import com.wecloud.pushserver.client.model.dto.MessageDTO;
import com.wecloud.pushserver.client.model.dto.PushDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @Author wenzhida
* @Date 2022/1/28 11:47
* @Description mq推送服务
*/
@Slf4j
@Component
public class MqSender {
@Autowired
private ImInboxService imInboxService;
@Autowired
private RocketMqProducerService rocketMqProducerService;
/**
* mq同步推送方法
* @param topic
* @param tag
* @param pushDTO
* @return
*/
public SendResult synSend(String topic, String tag, PushDTO pushDTO) {
log.info("mq同步推送topic: {} tag: {} 推送内容: {}", topic, tag, JSON.toJSONString(pushDTO));
SendResult sendResult = rocketMqProducerService.synSend(topic, tag, JSON.toJSONString(pushDTO));
log.info("mq同步推送topic: {} tag: {} 返回结果: {}", topic, tag, JSON.toJSONString(sendResult));
return sendResult;
}
public PushDTO buildPushDto(PushVO pushVO, ImClient imClientReceiver, ImApplication imApplication) {
PushDTO pushDTO = new PushDTO();
MessageDTO messageDTO = new MessageDTO();
messageDTO.setTitle(pushVO.getTitle());
messageDTO.setSubTitle(pushVO.getSubTitle());
messageDTO.setData(pushVO.getData());
// 统计未读消息数量
int badge = imInboxService.countMyNotReadCount(imClientReceiver.getId());
messageDTO.setBadge(badge);
ImClientDTO imClientDTO = new ImClientDTO();
imClientDTO.setClientId(imClientReceiver.getClientId());
imClientDTO.setValid(imClientReceiver.getValid());
imClientDTO.setDeviceType(imClientReceiver.getDeviceType());
imClientDTO.setDeviceToken(imClientReceiver.getDeviceToken());
messageDTO.setImClientDTO(imClientDTO);
ImApplicationDTO imApplicationDTO = new ImApplicationDTO();
imApplicationDTO.setId(imApplication.getId());
imApplicationDTO.setAppKey(imApplication.getAppKey());
imApplicationDTO.setAppSecret(imApplication.getAppSecret());
imApplicationDTO.setAppName(imApplication.getAppName());
imApplicationDTO.setIosPushChannel(imApplication.getIosPushChannel());
imApplicationDTO.setAndroidPushChannel(imApplication.getAndroidPushChannel());
imApplicationDTO.setUmengKey(imApplication.getUmengKey());
imApplicationDTO.setUmengSecret(imApplication.getUmengSecret());
imApplicationDTO.setFirebaseSecret(imApplication.getFirebaseSecret());
messageDTO.setImApplicationDTO(imApplicationDTO);
pushDTO.setMessageDTOList(Lists.newArrayList(messageDTO));
return pushDTO;
}
}
package com.wecloud.im.mq;
import com.wecloud.im.exception.MqContextException;
import com.wecloud.im.exception.MqSendException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import java.util.List;
import java.util.Objects;
/**
* @Author wenzhida
* @Date 2022/1/25 16:29
* @Description mq消息生产
*/
public class RocketMqProducerService implements SendCallback {
private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqProducerService.class);
@Autowired
@Qualifier("defaultProducer")
private DefaultMQProducer rocketProducer;
private RocketSendCallback rocketSendCallback = new RocketSendCallback();
/**
* 单边发送
*
* @param topic topic
* @param tag tag
* @param content 字符串消息体
*/
public void sendOneway(String topic, String tag, String content) {
this.sendOneway(topic, tag, "", content);
}
/**
* 单边发送
*/
public void sendOneway(String topic, String tag, String keys, String content) {
try {
Message msg = getMessage(topic, tag, keys, content);
rocketProducer.sendOneway(msg);
this.logMsg(msg);
} catch (Exception e) {
LOGGER.error("单边发送消息失败", e);
throw new MqSendException(e);
}
}
/**
* 异步发送 默认回调函数
*
* @param topic topic
* @param tag tag
* @param content 字符串消息体
*/
public void sendAsyncDefault(String topic, String tag, String content) {
this.sendAsyncDefault(topic, tag, "", content);
}
/**
* 异步发送 默认回调函数
*/
public void sendAsyncDefault(String topic, String tag, String keys, String content) {
Message msg = getMessage(topic, tag, keys, content);
try {
rocketProducer.send(msg, rocketSendCallback);
} catch (Exception e) {
LOGGER.error("异步发送消息失败", e);
throw new MqSendException(e);
}
this.logMsg(msg);
}
/**
* 异步发送
*
* @param topic topic
* @param tag tag
* @param content 字符串消息体
*/
public void sendAsync(String topic, String tag, String content, SendCallback sendCallback) {
this.sendAsync(topic, tag, content, "", sendCallback);
}
/**
* 异步发送
*/
public void sendAsync(String topic, String tag, String content, String keys,
SendCallback sendCallback) {
Message msg = getMessage(topic, tag, keys, content);
try {
rocketProducer.send(msg, sendCallback);
} catch (Exception e) {
LOGGER.error("异步发送消息失败", e);
throw new MqSendException(e);
}
this.logMsg(msg);
}
/**
* 同步发送
*
* @param topic topic
* @param tag tag
* @param content 字符串消息体
* @return 可能返回null
*/
public SendResult synSend(String topic, String tag, String content) {
return this.synSend(topic, tag, "", content);
}
/**
* 同步发送
*
* @param topic topic
* @param tag tag
* @param content 字符串消息体
* @return 可能返回null
*/
public SendResult synSend(String topic, String tag, String keys, String content) {
Message msg = getMessage(topic, tag, keys, content);
try {
SendResult sendResult = rocketProducer.send(msg);
this.logMsg(msg, sendResult);
return sendResult;
} catch (Exception e) {
LOGGER.error("同步发送消息失败", e);
throw new MqSendException(e);
}
}
/**
* 有顺序发送
*
* @param orderId 相同的orderId 的消息会被有顺序的消费
*/
public SendResult orderSend(String topic, String tag, String content, int orderId) {
return this.orderSend(topic, tag, "", content, orderId);
}
/**
* 有顺序发送
*/
public SendResult orderSend(String topic, String tag, String keys, String content,
int orderId) {
Message msg = getMessage(topic, tag, keys, content);
try {
SendResult sendResult = rocketProducer
.send(msg, (List<MessageQueue> mqs, Message message, Object arg) -> {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
, orderId);
this.logMsg(msg, sendResult);
return sendResult;
} catch (Exception e) {
LOGGER.error("有顺序发送消息失败", e);
throw new MqSendException(e);
}
}
/**
* 构造message
*/
public Message getMessage(String topic, String tag, String keys, String content) {
return new Message(topic, tag, keys, content.getBytes());
}
@Override
public void onSuccess(final SendResult sendResult) {
// 消费发送成功
LOGGER.info("send message success. topic=" + sendResult.getMessageQueue().getTopic() +
", msgId=" + sendResult.getMsgId());
}
/**
* 打印消息topic等参数方便后续查找问题
*/
private void logMsg(Message message) {
LOGGER.info("消息队列发送完成:topic={},tag={},msgId={}", message.getTopic(), message.getTags(),
message.getKeys());
}
/**
* 打印消息topic等参数方便后续查找问题
*/
private void logMsg(Message message, SendResult sendResult) {
LOGGER.info("消息队列发送完成:topic={},tag={},msgId={},sendResult={}", message.getTopic(),
message.getTags(), message.getKeys(), Objects.nonNull(sendResult) ? sendResult : " result is null");
}
class RocketSendCallback implements SendCallback {
@Override
public void onSuccess(SendResult sendResult) {
LOGGER.info("send message success. topic=" + sendResult.getMessageQueue().getTopic()
+ ", msgId=" + sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
LOGGER.error("send message failed.", e);
}
}
@Override
public void onException(Throwable e) {
if (e instanceof MqContextException) {
MqContextException context = (MqContextException) e;
LOGGER.error("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
} else {
LOGGER.error("send message failed. =" + e);
}
}
}
package com.wecloud.im.router;
import com.alibaba.fastjson.JSON;
import com.wecloud.im.entity.ImIosApns;
import com.wecloud.im.service.ImIosApnsService;
import com.wecloud.imserver.client.api.ImIosApnsFacade;
import com.wecloud.imserver.client.model.ao.IosApnsQueryAO;
import com.wecloud.imserver.client.model.dto.ImIosApnsDTO;
import com.wecloud.imserver.client.model.dto.Result;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboService;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* @Author wenzhida
* @Date 2022/1/27 17:01
* @Description ios apns信息rpc获取
*/
@Slf4j
@Service
@DubboService
public class ImIosApnsFacadeImpl implements ImIosApnsFacade {
@Autowired
private ImIosApnsService imIosApnsService;
/**
* 获取ios apns
* @param queryAO 入参ao
* @return 出参
*/
@Override
public Result<ImIosApnsDTO> getImIosApns(IosApnsQueryAO queryAO) {
log.info("获取ios apns入参信息: {}", JSON.toJSONString(queryAO));
ImIosApns apns = imIosApnsService.getImIosApnsByAppId(queryAO.getAppId());
if (apns == null) {
return Result.getBusinessException("查无apns证件信息", null);
}
ImIosApnsDTO iosApnsDTO = new ImIosApnsDTO();
BeanUtils.copyProperties(apns, iosApnsDTO);
return Result.getSuccessResult(iosApnsDTO);
}
}
...@@ -51,6 +51,7 @@ ...@@ -51,6 +51,7 @@
<pagehelper.versiong>1.2.3</pagehelper.versiong> <pagehelper.versiong>1.2.3</pagehelper.versiong>
<mybatis-spring-boot-starter.versiong>2.0.1</mybatis-spring-boot-starter.versiong> <mybatis-spring-boot-starter.versiong>2.0.1</mybatis-spring-boot-starter.versiong>
<druid.version>1.1.10</druid.version> <druid.version>1.1.10</druid.version>
<rocket.version>4.3.2</rocket.version>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version> <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
<maven-resources-plugin.version>3.1.0</maven-resources-plugin.version> <maven-resources-plugin.version>3.1.0</maven-resources-plugin.version>
...@@ -66,6 +67,7 @@ ...@@ -66,6 +67,7 @@
<module>framework</module> <module>framework</module>
<module>generator</module> <module>generator</module>
<module>core</module> <module>core</module>
<module>client</module>
<!-- <module>api-app</module>--> <!-- <module>api-app</module>-->
<!-- <module>distribution</module>--> <!-- <module>distribution</module>-->
<!-- <module>admin</module>--> <!-- <module>admin</module>-->
...@@ -294,6 +296,11 @@ ...@@ -294,6 +296,11 @@
<artifactId>core</artifactId> <artifactId>core</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>io.geekidea.springbootplus</groupId>
<artifactId>client</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!-- <dependency>--> <!-- <dependency>-->
<!-- <groupId>io.geekidea.springbootplus</groupId>--> <!-- <groupId>io.geekidea.springbootplus</groupId>-->
...@@ -323,6 +330,14 @@ ...@@ -323,6 +330,14 @@
</dependency> </dependency>
<!-- 项目模块end --> <!-- 项目模块end -->
<!-- RocketMq start -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocket.version}</version>
</dependency>
<!-- RocketMq start -->
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
...@@ -480,6 +495,12 @@ ...@@ -480,6 +495,12 @@
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId> <artifactId>commons-text</artifactId>
</dependency> </dependency>
<!-- RocketMq start -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<!-- RocketMq start -->
</dependencies> </dependencies>
<build> <build>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment