Commit ddcf6b33 by zhangjw

1:完善MQ配置和逻辑

2:删除重发逻辑
parent 48b89eb2
...@@ -22,30 +22,22 @@ import java.util.Map; ...@@ -22,30 +22,22 @@ import java.util.Map;
@ConfigurationProperties(prefix = "spring.rabbitmq") @ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitConfig { public class RabbitConfig {
private String delayQueueName;
private String staffOfflineQueueName;
private String exchangeName; private String exchangeName;
private String orderQueueName; private String orderQueueName;
private String staffOfflineQueueName;
@Bean @Bean
public Queue orderQueue() { public Queue orderQueue() {
return new Queue(orderQueueName); return new Queue(orderQueueName);
} }
@Bean @Bean
public Queue delayQueue() {
return new Queue(delayQueueName);
}
@Bean
public Queue staffOfflineQueue() { public Queue staffOfflineQueue() {
return new Queue(staffOfflineQueueName); return new Queue(staffOfflineQueueName);
} }
/** /**
* 配置默认的交换机 * 配置默认的交换机
*/ */
...@@ -60,14 +52,6 @@ public class RabbitConfig { ...@@ -60,14 +52,6 @@ public class RabbitConfig {
* 绑定队列到交换器 * 绑定队列到交换器
*/ */
@Bean @Bean
Binding bindingDelayQueue(Queue delayQueue, CustomExchange customExchange) {
return BindingBuilder.bind(delayQueue).to(customExchange).with(delayQueueName).noargs();
}
/**
* 绑定队列到交换器
*/
@Bean
Binding bindingStaffOfflineQueue(Queue staffOfflineQueue, CustomExchange customExchange) { Binding bindingStaffOfflineQueue(Queue staffOfflineQueue, CustomExchange customExchange) {
return BindingBuilder.bind(staffOfflineQueue).to(customExchange).with(staffOfflineQueueName).noargs(); return BindingBuilder.bind(staffOfflineQueue).to(customExchange).with(staffOfflineQueueName).noargs();
} }
......
package com.ym.im.entity; package com.ym.im.entity;
import com.ym.im.validation.group.ChatRecordSaveGroup;
import com.ym.im.validation.group.ChatRecordSendGroup;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import javax.validation.constraints.NotNull;
import javax.validation.groups.Default;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.Date;
/** /**
* 行程表 * 行程表
...@@ -139,4 +144,6 @@ public class Stroke implements Serializable { ...@@ -139,4 +144,6 @@ public class Stroke implements Serializable {
@ApiModelProperty("是否是优惠调机,0-否,1-是") @ApiModelProperty("是否是优惠调机,0-否,1-是")
private Boolean isDiscount; private Boolean isDiscount;
@ApiModelProperty(value = "发送时间")
private Date sendTime;
} }
...@@ -17,6 +17,7 @@ import org.springframework.data.redis.core.RedisTemplate; ...@@ -17,6 +17,7 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Date;
import java.util.Set; import java.util.Set;
/** /**
...@@ -63,7 +64,6 @@ public class Receiver { ...@@ -63,7 +64,6 @@ public class Receiver {
} }
/** /**
* 订单相关处理 * 订单相关处理
* *
...@@ -73,17 +73,19 @@ public class Receiver { ...@@ -73,17 +73,19 @@ public class Receiver {
@RabbitListener(queues = "#{orderQueue.name}") @RabbitListener(queues = "#{orderQueue.name}")
public void orderHandler(String json) { public void orderHandler(String json) {
final Stroke stroke = JsonUtils.json2Obj(json, Stroke.class); final Stroke stroke = JsonUtils.json2Obj(json, Stroke.class);
log.info("订单信息: " + json);
final Long mcId = stroke.getMcId(); final Long mcId = stroke.getMcId();
final Long userId = stroke.getUserId(); final Long userId = stroke.getUserId();
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(userId); final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(userId);
if (userSocketInfo == null) { if (userSocketInfo == null) {
return; return;
} }
stroke.setSendTime(new Date());
final MsgBody<Stroke> orderInfo = new MsgBody<Stroke>().setCode(MsgBody.ORDER).setData(stroke); final MsgBody<Stroke> orderInfo = new MsgBody<Stroke>().setCode(MsgBody.ORDER).setData(stroke);
final StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(userSocketInfo.getStaffId(stroke.getMcId())) == null ? staffService.getIdleStaff(mcId, userId) : channelGroup.getMerchantStaff(userSocketInfo.getStaffId(stroke.getMcId())); final StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(userSocketInfo.getStaffId(stroke.getMcId())) == null ? staffService.getIdleStaff(mcId, userId) : channelGroup.getMerchantStaff(userSocketInfo.getStaffId(stroke.getMcId()));
if (staffSocketInfo != null) { if (staffSocketInfo != null) {
staffSocketInfo.writeAndFlush(orderInfo); staffSocketInfo.writeAndFlush(orderInfo);
log.info("客服订单: " + "给客服(" + staffSocketInfo.getStaffId() + ")发送订单:" + orderInfo.toString()); log.info("客服订单: " + "给客服(" + staffSocketInfo.getStaffId() + ")发送订单:" + json);
} }
} }
......
...@@ -6,7 +6,6 @@ import com.ym.im.entity.MsgBody; ...@@ -6,7 +6,6 @@ import com.ym.im.entity.MsgBody;
import com.ym.im.entity.base.ChannelAttributeKey; import com.ym.im.entity.base.ChannelAttributeKey;
import com.ym.im.entity.base.NettyConstant; import com.ym.im.entity.base.NettyConstant;
import com.ym.im.factory.SingleChatFactory; import com.ym.im.factory.SingleChatFactory;
import com.ym.im.mq.Queue;
import com.ym.im.service.ChatService; import com.ym.im.service.ChatService;
import com.ym.im.service.MsgBodyService; import com.ym.im.service.MsgBodyService;
import com.ym.im.util.JsonUtils; import com.ym.im.util.JsonUtils;
...@@ -36,15 +35,11 @@ import static com.ym.im.entity.MsgBody.SEND_MSG; ...@@ -36,15 +35,11 @@ import static com.ym.im.entity.MsgBody.SEND_MSG;
@Validated({MsgBodyGroup.class}) @Validated({MsgBodyGroup.class})
public class MsgBodyServiceImpl implements MsgBodyService { public class MsgBodyServiceImpl implements MsgBodyService {
@Autowired
private Queue queue;
@Autowired
private SingleChatFactory singleChatFactory;
@Resource(name = "myRedisTemplate") @Resource(name = "myRedisTemplate")
private RedisTemplate redisTemplate; private RedisTemplate redisTemplate;
@Autowired
private SingleChatFactory singleChatFactory;
@Override @Override
public void msgBodyHandle(@NotNull ChannelHandlerContext ctx, @Valid MsgBody<ChatRecord> msgBody) throws JsonProcessingException { public void msgBodyHandle(@NotNull ChannelHandlerContext ctx, @Valid MsgBody<ChatRecord> msgBody) throws JsonProcessingException {
...@@ -85,8 +80,6 @@ public class MsgBodyServiceImpl implements MsgBodyService { ...@@ -85,8 +80,6 @@ public class MsgBodyServiceImpl implements MsgBodyService {
msgBody.setCode(SEND_MSG); msgBody.setCode(SEND_MSG);
// 先保存消息至Redis // 先保存消息至Redis
redisTemplate.opsForHash().put(NettyConstant.MSG_KEY + msgBody.getData().getUserId(), msgBody.getData().getId(), JsonUtils.obj2Json(msgBody)); redisTemplate.opsForHash().put(NettyConstant.MSG_KEY + msgBody.getData().getUserId(), msgBody.getData().getId(), JsonUtils.obj2Json(msgBody));
// // 再默认以用户没有收到消息为前提,做循环、延迟通知
// queue.delaysQueue(msgBody);
// 最后发送聊天信息 // 最后发送聊天信息
channel.writeAndFlush(msgBody); channel.writeAndFlush(msgBody);
} }
......
...@@ -28,7 +28,6 @@ spring: ...@@ -28,7 +28,6 @@ spring:
port: 5672 port: 5672
username: root username: root
password: root password: root
delay-queue-name: delay.ack.dev
staff-offline-Queue-Name: staff.offline.dev staff-offline-Queue-Name: staff.offline.dev
order-queue-name: push.order.dev order-queue-name: push.order.dev
exchange-name: delay.exchange.dev exchange-name: delay.exchange.dev
......
...@@ -29,7 +29,6 @@ spring: ...@@ -29,7 +29,6 @@ spring:
port: 5672 port: 5672
username: admin username: admin
password: Yum123456 password: Yum123456
delay-queue-name: delay.ack
staff-offline-Queue-Name: staff.offline staff-offline-Queue-Name: staff.offline
exchange-name: delayAck exchange-name: delayAck
listener: listener:
......
...@@ -29,7 +29,6 @@ spring: ...@@ -29,7 +29,6 @@ spring:
port: 5672 port: 5672
username: root username: root
password: root password: root
delay-queue-name: delay.ack.dev
staff-offline-Queue-Name: staff.offline.dev staff-offline-Queue-Name: staff.offline.dev
exchange-name: delay.exchange.dev exchange-name: delay.exchange.dev
listener: listener:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment