Commit 9f698d49 by zhangjw

1:完善MQ配置和逻辑

2:删除重发逻辑
parent 1a78e727
...@@ -67,7 +67,6 @@ public class MerchantServiceImpl extends BaseServiceImpl<MerchantMapper, Merchan ...@@ -67,7 +67,6 @@ public class MerchantServiceImpl extends BaseServiceImpl<MerchantMapper, Merchan
return baseMapper.selectList(new QueryWrapper<Merchant>().lambda() return baseMapper.selectList(new QueryWrapper<Merchant>().lambda()
.eq(Merchant::getState, 1) .eq(Merchant::getState, 1)
.eq(Merchant::getDeleted, 0) .eq(Merchant::getDeleted, 0)
.eq(Merchant::getAuditRegisterStatus, 1)
.orderByDesc(Merchant::getCreateTime)); .orderByDesc(Merchant::getCreateTime));
} }
......
...@@ -28,6 +28,13 @@ public class RabbitConfig { ...@@ -28,6 +28,13 @@ public class RabbitConfig {
private String exchangeName; private String exchangeName;
private String orderQueueName;
@Bean
public Queue orderQueue() {
return new Queue(orderQueueName);
}
@Bean @Bean
public Queue delayQueue() { public Queue delayQueue() {
return new Queue(delayQueueName); return new Queue(delayQueueName);
......
package com.ym.im.mq; package com.ym.im.mq;
import com.ym.im.config.RabbitConfig; import com.ym.im.config.RabbitConfig;
import com.ym.im.entity.MsgBody;
import com.ym.im.entity.StaffSocketInfo; import com.ym.im.entity.StaffSocketInfo;
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -21,19 +20,6 @@ public class Queue { ...@@ -21,19 +20,6 @@ public class Queue {
private RabbitConfig rabbitConfig; private RabbitConfig rabbitConfig;
/** /**
* 延迟队列 重发未回应消息
*
* @param msgBody
*/
public void delaysQueue(MsgBody msgBody) {
// 通过广播模式发布延时消息 延时3秒 消费后销毁 这里无需指定路由,会广播至每个绑定此交换机的队列
rabbitTemplate.convertAndSend(rabbitConfig.getExchangeName(), rabbitConfig.getDelayQueueName(), msgBody, message -> {
message.getMessageProperties().setDelay(3 * 1000); // 毫秒为单位,指定此消息的延时时长
return message;
});
}
/**
* 客服离线 队列 * 客服离线 队列
* *
* @param msgBody * @param msgBody
......
package com.ym.im.mq; package com.ym.im.mq;
import com.ym.im.config.RabbitConfig; import com.ym.im.entity.MsgBody;
import com.ym.im.entity.*; import com.ym.im.entity.StaffSocketInfo;
import com.ym.im.entity.Stroke;
import com.ym.im.entity.UserSocketInfo;
import com.ym.im.entity.base.NettyConstant; import com.ym.im.entity.base.NettyConstant;
import com.ym.im.entity.enums.RoleEnum;
import com.ym.im.entity.model.IdModel; import com.ym.im.entity.model.IdModel;
import com.ym.im.handler.ChannelGroupHandler; import com.ym.im.handler.ChannelGroupHandler;
import com.ym.im.service.StaffService; import com.ym.im.service.StaffService;
...@@ -16,9 +17,6 @@ import org.springframework.data.redis.core.RedisTemplate; ...@@ -16,9 +17,6 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Set; import java.util.Set;
/** /**
...@@ -41,23 +39,6 @@ public class Receiver { ...@@ -41,23 +39,6 @@ public class Receiver {
@Autowired @Autowired
private ChannelGroupHandler channelGroup; private ChannelGroupHandler channelGroup;
/**
* 禁用用户 队列名称
*/
public static final String USER_QUEUE_NAME = "disable.user";
/**
* 订单队列
*/
public static final String ORDER_QUEUE_NAME = "push.order";
@RabbitListener(queues = "#{delayQueue.name}")
public void delayAckHandler(MsgBody msgBody) throws IOException {
log.info("接收时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 消息内容:" + JsonUtils.obj2Json(msgBody));
retry(msgBody);
}
@RabbitListener(queues = "#{staffOfflineQueue.name}") @RabbitListener(queues = "#{staffOfflineQueue.name}")
public void offlineHandler(StaffSocketInfo staffSocketInfo) { public void offlineHandler(StaffSocketInfo staffSocketInfo) {
...@@ -82,22 +63,6 @@ public class Receiver { ...@@ -82,22 +63,6 @@ public class Receiver {
} }
/**
* 禁用用户后 关闭socket
*
* @param userId
* @throws IOException
*/
@RabbitListener(queues = USER_QUEUE_NAME)
public void disableUserHandler(String userId) {
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(Long.valueOf(userId));
if (userSocketInfo != null) {
userSocketInfo.writeAndFlush(new MsgBody<>().setCode(MsgBody.LOGOUT));
userSocketInfo.close();
log.info("用户: " + userId + "被禁用");
}
}
/** /**
* 订单相关处理 * 订单相关处理
...@@ -105,7 +70,7 @@ public class Receiver { ...@@ -105,7 +70,7 @@ public class Receiver {
* @param orderModel * @param orderModel
*/ */
@SneakyThrows @SneakyThrows
@RabbitListener(queues = ORDER_QUEUE_NAME) @RabbitListener(queues = "#{orderQueue.name}")
public void orderHandler(String json) { public void orderHandler(String json) {
final Stroke stroke = JsonUtils.json2Obj(json, Stroke.class); final Stroke stroke = JsonUtils.json2Obj(json, Stroke.class);
final Long mcId = stroke.getMcId(); final Long mcId = stroke.getMcId();
...@@ -115,78 +80,12 @@ public class Receiver { ...@@ -115,78 +80,12 @@ public class Receiver {
return; return;
} }
final MsgBody<Stroke> orderInfo = new MsgBody<Stroke>().setCode(MsgBody.ORDER).setData(stroke); final MsgBody<Stroke> orderInfo = new MsgBody<Stroke>().setCode(MsgBody.ORDER).setData(stroke);
final StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(userSocketInfo.getStaffId(stroke.getMcId())) == null ? staffService.getIdleStaff(mcId, userId) : null; final StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(userSocketInfo.getStaffId(stroke.getMcId())) == null ? staffService.getIdleStaff(mcId, userId) : channelGroup.getMerchantStaff(userSocketInfo.getStaffId(stroke.getMcId()));
if (staffSocketInfo != null) { if (staffSocketInfo != null) {
staffSocketInfo.writeAndFlush(orderInfo); staffSocketInfo.writeAndFlush(orderInfo);
log.info("客服订单: " + "给客服(" + staffSocketInfo.getStaffId() + ")发送订单:" + orderInfo.toString()); log.info("客服订单: " + "给客服(" + staffSocketInfo.getStaffId() + ")发送订单:" + orderInfo.toString());
} }
// /**
// * 绑定客服不在线,给历史客服发送订单信息
// */
// final Long staffId = (Long) redisTemplate.opsForHash().get(NettyConstant.IM_USERS, userId);
// if (staffId != null) {
// log.info("客服订单: " + "尝试给历史客服(" + staffId + ")发送订单:" + orderInfo.toString());
// staffSocketInfo = channelGroup.getMerchantStaff(staffId);
// if (staffSocketInfo != null) {
// staffSocketInfo.writeAndFlush(orderInfo);
// log.info("客服订单: " + "给历史客服(" + staffId + ")发送订单:" + orderInfo.toString());
// }
// }
}
/**
* 重发未回执的消息
*
* @param msgBody
* @throws IOException
*/
public void retry(MsgBody<ChatRecord> msgBody) throws IOException {
final ChatRecord chatRecord = msgBody.getData();
final Long userId = Long.valueOf(chatRecord.getUserId());
final String recordId = String.valueOf(chatRecord.getId());
if (msgBody != null && chatRecord.getRetryCount().intValue() < NettyConstant.RETRY_COUNT.intValue()) {
UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(userId);
if (userSocketInfo == null) {
return;
}
MsgBody<ChatRecord> msg = JsonUtils.json2Obj(String.valueOf(redisTemplate.opsForHash().get(NettyConstant.MSG_KEY + userId, recordId)), JsonUtils.getJavaType(MsgBody.class, ChatRecord.class));
if (msg != null && msg.getCode().equals(MsgBody.HAVE_READ)) {
redisTemplate.opsForHash().delete(NettyConstant.MSG_KEY + userId, recordId);
return;
}
final Integer sendReceive = chatRecord.getSendReceive();
switch (RoleEnum.get(sendReceive)) {
case APP:
Long staffId = userSocketInfo.getStaffId(chatRecord.getMerchantId());
StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(staffId);
if (staffSocketInfo != null) {
staffSocketInfo.writeAndFlush(msgBody);
}
break;
case merchant:
userSocketInfo.writeAndFlush(msgBody);
break;
default:
} }
//重发三次
chatRecord.setRetryCount(chatRecord.getRetryCount() + 1);
queue.delaysQueue(msgBody);
} else {
//移除失败消息
redisTemplate.opsForHash().delete(NettyConstant.MSG_KEY + userId, recordId);
}
}
} }
\ No newline at end of file
...@@ -85,8 +85,8 @@ public class MsgBodyServiceImpl implements MsgBodyService { ...@@ -85,8 +85,8 @@ public class MsgBodyServiceImpl implements MsgBodyService {
msgBody.setCode(SEND_MSG); msgBody.setCode(SEND_MSG);
// 先保存消息至Redis // 先保存消息至Redis
redisTemplate.opsForHash().put(NettyConstant.MSG_KEY + msgBody.getData().getUserId(), msgBody.getData().getId(), JsonUtils.obj2Json(msgBody)); redisTemplate.opsForHash().put(NettyConstant.MSG_KEY + msgBody.getData().getUserId(), msgBody.getData().getId(), JsonUtils.obj2Json(msgBody));
// 再默认以用户没有收到消息为前提,做循环、延迟通知 // // 再默认以用户没有收到消息为前提,做循环、延迟通知
queue.delaysQueue(msgBody); // queue.delaysQueue(msgBody);
// 最后发送聊天信息 // 最后发送聊天信息
channel.writeAndFlush(msgBody); channel.writeAndFlush(msgBody);
} }
......
...@@ -30,6 +30,7 @@ spring: ...@@ -30,6 +30,7 @@ spring:
password: root password: root
delay-queue-name: delay.ack.dev delay-queue-name: delay.ack.dev
staff-offline-Queue-Name: staff.offline.dev staff-offline-Queue-Name: staff.offline.dev
order-queue-name: push.order.dev
exchange-name: delay.exchange.dev exchange-name: delay.exchange.dev
listener: listener:
simple: simple:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment