Commit c4f674bc by zhangjw

1:在原单商户的基础上 新增多商户客户逻辑 待完善和测试

parent fe2730a4
package com.ym.im.controller;
import com.ym.im.entity.model.IdModel;
import com.ym.im.entity.MsgBody;
import com.ym.im.entity.model.IdModel;
import com.ym.im.service.StaffService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
......@@ -24,9 +24,9 @@ public class StaffController {
private StaffService staffService;
@GetMapping(value = "/getStaffList")
@ApiOperation(value = "获取所有客服信息")
public MsgBody getStaffList() {
return staffService.getStaffList();
@ApiOperation(value = "获取商户客服信息")
public MsgBody getStaffList(Long merchantId) {
return staffService.getMerchantStaffGroup(merchantId);
}
@PostMapping(value = "/forward")
......
......@@ -53,6 +53,11 @@ public class ChatRecord implements Serializable {
@ApiModelProperty(value = "员工(客服)Id")
private Long staffId;
@NotNull(message = "{error.merchant_id_empty}", groups = {Default.class})
@Positive(message = "{error.merchant_id_greater_than_zero}", groups = {Default.class})
@ApiModelProperty(value = "商户ID")
private Long merchantId;
@NotNull(message = "{error.chat_msg_type_empty}", groups = {Default.class, ChatRecordSaveGroup.class, ChatRecordSendGroup.class})
@ApiModelProperty(value = "消息类型:1、聊天信息,2、PDF")
private Integer msgType;
......
//package com.ym.im.entity;
//
//import lombok.Data;
//
//import java.io.Serializable;
//import java.util.Set;
//
///**
// * @author: JJww
// * 商户对应的客服集合
// * @Date:2020/10/13
// */
//@Data
//public class Merchant implements Serializable {
//
// private Long merchantId;
//
// private Set<StaffSocketInfo> staffSocketInfoSet;
//}
......@@ -23,6 +23,8 @@ public class StaffSocketInfo extends BaseSocketInfo {
private Set<Long> userIds;
private Long merchantId;
public Set<Long> getUserIds() {
return userIds != null ? userIds : new HashSet<Long>();
}
......
......@@ -15,6 +15,11 @@ public class ChannelAttributeKey {
/**
* 商户ID
*/
public static final AttributeKey<Long> MERCHANT_ID = AttributeKey.valueOf("merchant_Id");
/**
* 角色类型
*/
public static final AttributeKey<String> ROLE_TYPE = AttributeKey.valueOf("role_type");
......
package com.ym.im.handler;
import com.ym.im.entity.StaffSocketInfo;
import com.ym.im.entity.UserSocketInfo;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author: JJww
* @Date:2020/10/14
*/
@Component
public class ChannelGroupHandler {
/**
* 在线用户Group
*/
public final Map<Long, UserSocketInfo> USER_GROUP = new ConcurrentHashMap<>();
/**
* 在线客服Group
*/
public final Map<Long, Map<Long, StaffSocketInfo>> STAFF_GROUP = new ConcurrentHashMap<>();
/**
* 新增商户 客服
*
* @param merchantId
* @param staffSocketInfo
*/
public void putMerchantStaff(Long merchantId, StaffSocketInfo staffSocketInfo) {
final Map<Long, StaffSocketInfo> staffSocketInfoMap = STAFF_GROUP.get(merchantId) != null ? STAFF_GROUP.get(merchantId) : new HashMap<Long, StaffSocketInfo>();
staffSocketInfoMap.put(staffSocketInfo.getStaffId(), staffSocketInfo);
STAFF_GROUP.put(merchantId, staffSocketInfoMap);
}
/**
* 移除商户 客服
*
* @param merchantId
* @param staffId
*/
public void removeMerchantStaff(Long merchantId, Long staffId) {
final Map<Long, StaffSocketInfo> staffSocketInfoMap = STAFF_GROUP.get(merchantId);
staffSocketInfoMap.remove(staffId);
if (staffSocketInfoMap.isEmpty()) {
STAFF_GROUP.remove(merchantId);
}
}
public StaffSocketInfo getMerchantStaff(Long merchantId, Long staffId) {
return STAFF_GROUP.get(merchantId).get(staffId);
}
public StaffSocketInfo getMerchantStaff(Long staffId) {
StaffSocketInfo staffSocketInfo = null;
for (Map<Long, StaffSocketInfo> staffGroup : STAFF_GROUP.values()) {
for (StaffSocketInfo staffInfo : staffGroup.values()) {
if (staffInfo.getStaffId().equals(staffId)) {
staffInfo = staffSocketInfo;
}
}
}
return staffSocketInfo;
}
}
\ No newline at end of file
......@@ -7,7 +7,6 @@ import com.ym.im.entity.base.NettyConstant;
import com.ym.im.entity.enums.RoleEnum;
import com.ym.im.exception.HttpException;
import com.ym.im.factory.SingleChatFactory;
import com.ym.im.service.ChannelGroupService;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
......@@ -33,6 +32,9 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
public class WebSocketHandshakerHandler extends BaseHandler<FullHttpRequest> {
@Autowired
private ChannelGroupHandler channelGroup;
@Autowired
private SingleChatFactory singleChatFactory;
public static final String ROLE_TYPE = "roleType";
......@@ -54,6 +56,7 @@ public class WebSocketHandshakerHandler extends BaseHandler<FullHttpRequest> {
ctx.channel().attr(ChannelAttributeKey.ROLE_ID).set(userId);
ctx.channel().attr(ChannelAttributeKey.TOKEN_INFO).set(token);
ctx.channel().attr(ChannelAttributeKey.ROLE_TYPE).set(roleType);
ctx.channel().attr(ChannelAttributeKey.MERCHANT_ID).set(merchantId);
this.sso(token, userId, roleType);
singleChatFactory.getService(roleType).init(ctx);
fullHttpRequest.setUri(NettyConstant.CS);
......@@ -65,16 +68,16 @@ public class WebSocketHandshakerHandler extends BaseHandler<FullHttpRequest> {
BaseSocketInfo baseSocketInfo = null;
switch (RoleEnum.get(type)) {
case APP:
baseSocketInfo = ChannelGroupService.USER_GROUP.get(roleId);
baseSocketInfo = channelGroup.USER_GROUP.get(roleId);
break;
case merchant:
baseSocketInfo = ChannelGroupService.STAFF_GROUP.get(roleId);
baseSocketInfo = channelGroup.getMerchantStaff(roleId);
break;
}
if (baseSocketInfo != null && !token.equals(baseSocketInfo.getToken())) {
baseSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.FORCEDOFFLINE));
baseSocketInfo.close();
ChannelGroupService.USER_GROUP.remove(roleId);//待完善
channelGroup.USER_GROUP.remove(roleId);//待完善
log.info("用户: " + roleId + " 被迫下线");
}
}
......
......@@ -9,7 +9,7 @@ import com.ym.im.entity.base.NettyConstant;
import com.ym.im.entity.enums.RoleEnum;
import com.ym.im.entity.model.IdModel;
import com.ym.im.entity.model.OrderModel;
import com.ym.im.service.ChannelGroupService;
import com.ym.im.handler.ChannelGroupHandler;
import com.ym.im.service.StaffService;
import com.ym.im.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
......@@ -40,6 +40,9 @@ public class Receiver {
@Autowired
private StaffService staffService;
@Autowired
private ChannelGroupHandler channelGroup;
/**
* 禁用用户 队列名称
*/
......@@ -64,13 +67,13 @@ public class Receiver {
//移除用户列表
redisTemplate.delete(NettyConstant.STAFF_USERIDS_KEY + staffId);
//客服真离线后 才转发
if (ChannelGroupService.STAFF_GROUP.get(staffId) == null) {
if (channelGroup.getMerchantStaff(staffId) == null) {
final Set<Long> userIds = staffSocketInfo.getUserIds();
log.info("客服离线队列: " + "ID: " + "UserIds:" + userIds);
userIds.forEach((Long userId) -> {
//用户在线才重新分配和转发
if (ChannelGroupService.USER_GROUP.get(userId) != null) {
final StaffSocketInfo idleStaff = staffService.getIdleStaff(userId);
if (channelGroup.USER_GROUP.get(userId) != null) {
final StaffSocketInfo idleStaff = staffService.getIdleStaff(staffSocketInfo.getMerchantId(),userId);
if (idleStaff != null) {
idleStaff.writeAndFlush(new MsgBody<>().setStatus(MsgBody.DISTRIBUTION_STAFF).setData(new IdModel().setStaffId(staffId).setUserId(userId)));
}
......@@ -89,7 +92,7 @@ public class Receiver {
@RabbitListener(queues = USER_QUEUE_NAME)
public void disableUserHandler(String userId) {
final UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(Long.valueOf(userId));
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(Long.valueOf(userId));
if (userSocketInfo != null) {
userSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.LOGOUT));
userSocketInfo.close();
......@@ -107,11 +110,11 @@ public class Receiver {
log.info("Constants.ORDER_QUEUE_NAME: " + JSON.toJSONString(orderModel));
final UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(Long.valueOf(orderModel.getUserId()));
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(Long.valueOf(orderModel.getUserId()));
if (userSocketInfo == null) {
return;
}
StaffSocketInfo staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(userSocketInfo.getStaffId());
StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(userSocketInfo.getStaffId());
final MsgBody<OrderModel> orderInfo = new MsgBody<OrderModel>().setStatus(MsgBody.ORDER).setData(orderModel);
/**
* 绑定客服在线,发送订单信息
......@@ -127,7 +130,7 @@ public class Receiver {
final Long staffId = (Long) redisTemplate.opsForHash().get(NettyConstant.IM_USERS, orderModel.getUserId());
if (staffId != null) {
log.info("客服订单: " + "尝试给历史客服(" + staffId + ")发送订单:" + orderInfo.toString());
staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(staffId);
staffSocketInfo = channelGroup.getMerchantStaff(staffId);
if (staffSocketInfo != null) {
staffSocketInfo.writeAndFlush(orderInfo);
log.info("客服订单: " + "给历史客服(" + staffId + ")发送订单:" + orderInfo.toString());
......@@ -150,7 +153,7 @@ public class Receiver {
if (msgBody != null && chatRecord.getRetryCount().intValue() < NettyConstant.RETRY_COUNT.intValue()) {
UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(userId);
UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(userId);
if (userSocketInfo == null) {
return;
}
......@@ -165,7 +168,7 @@ public class Receiver {
case APP:
Long staffId = userSocketInfo.getStaffId() == null ? chatRecord.getStaffId() : userSocketInfo.getStaffId();
StaffSocketInfo staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(staffId);
StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(staffId);
if (staffSocketInfo != null) {
staffSocketInfo.writeAndFlush(msgBody);
}
......
package com.ym.im.service;
import com.ym.im.entity.StaffSocketInfo;
import com.ym.im.entity.UserSocketInfo;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author: JJww
* Channe管理
* @Date:2019-05-21
*/
public interface ChannelGroupService {
/**
* 在线用户Group
*/
Map<Long, UserSocketInfo> USER_GROUP = new ConcurrentHashMap<>();
/**
* 在线客服Group
*/
Map<Long, StaffSocketInfo> STAFF_GROUP = new ConcurrentHashMap<>();
}
......@@ -17,7 +17,7 @@ public interface StaffService {
* @param userId
* @return
*/
StaffSocketInfo getIdleStaff(Long userId);
StaffSocketInfo getIdleStaff(Long merchantId, Long userId);
/**
......@@ -29,10 +29,10 @@ public interface StaffService {
MsgBody forward(IdModel idModel);
/**
* 获取所有在线客服
* 获取商户所有在线客服
*
* @return
*/
MsgBody getStaffList();
MsgBody getMerchantStaffGroup(Long merchantId);
}
package com.ym.im.service.impl;
import com.ym.im.entity.model.IdModel;
import com.ym.im.entity.MsgBody;
import com.ym.im.entity.StaffSocketInfo;
import com.ym.im.entity.UserSocketInfo;
import com.ym.im.entity.enums.ResultStatus;
import com.ym.im.service.ChannelGroupService;
import com.ym.im.entity.model.IdModel;
import com.ym.im.handler.ChannelGroupHandler;
import com.ym.im.service.StaffService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
......@@ -24,10 +25,13 @@ import static java.util.stream.Collectors.toMap;
@Service
public class StaffServiceImpl implements StaffService {
@Autowired
private ChannelGroupHandler channelGroup;
@Override
public StaffSocketInfo getIdleStaff(Long userId) {
public StaffSocketInfo getIdleStaff(Long merchantId,Long userId) {
final LinkedHashMap<Long, StaffSocketInfo> collect = ChannelGroupService.STAFF_GROUP
final LinkedHashMap<Long, StaffSocketInfo> collect = channelGroup.STAFF_GROUP.get(merchantId)
.entrySet()
.stream()
.sorted(comparingByValue(new Comparator<StaffSocketInfo>() {
......@@ -38,7 +42,7 @@ public class StaffServiceImpl implements StaffService {
})).collect(toMap(e -> e.getKey(), e -> e.getValue(), (e1, e2) -> e2, LinkedHashMap::new));
if (collect.size() == 0) {
ChannelGroupService.USER_GROUP.get(userId).setStaffId(null);
channelGroup.USER_GROUP.get(userId).setStaffId(null);
return null;
}
//客服和用户绑定
......@@ -46,7 +50,7 @@ public class StaffServiceImpl implements StaffService {
staffSocketInfo.getUserIds().add(userId);
Long staffId = staffSocketInfo.getStaffId();
//用户和客服绑定
final UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(userId);
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(userId);
if (userSocketInfo != null) {
userSocketInfo.setStaffId(staffId);
//通知用户 新的客服
......@@ -59,13 +63,13 @@ public class StaffServiceImpl implements StaffService {
@Override
public MsgBody forward(IdModel idModel) {
final UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(idModel.getUserId());
final StaffSocketInfo staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(idModel.getStaffId());
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(idModel.getUserId());
final StaffSocketInfo staffSocketInfo =channelGroup.getMerchantStaff(idModel.getStaffId());
if (staffSocketInfo == null || userSocketInfo == null) {
return new MsgBody<>().setStatus(MsgBody.BINDINGFAILURE).setMessage(ResultStatus.FORWARD_FAILURE.getMessage());
}
//移除原客服绑定
ChannelGroupService.STAFF_GROUP.get(userSocketInfo.getStaffId()).getUserIds().remove(idModel.getUserId());
channelGroup.getMerchantStaff(userSocketInfo.getStaffId()).getUserIds().remove(idModel.getUserId());
//设置新的客服
staffSocketInfo.getUserIds().add(idModel.getUserId());
userSocketInfo.setStaffId(idModel.getStaffId());
......@@ -79,9 +83,9 @@ public class StaffServiceImpl implements StaffService {
}
@Override
public MsgBody getStaffList() {
public MsgBody getMerchantStaffGroup(Long merchantId) {
List<StaffSocketInfo> staffs = new ArrayList<StaffSocketInfo>();
ChannelGroupService.STAFF_GROUP.forEach((k, v) -> {
channelGroup.STAFF_GROUP.get(merchantId).forEach((k, v) -> {
staffs.add(v);
});
return new MsgBody<>().setStatus(ResultStatus.SUCCESS.getCode()).setData(staffs);
......
......@@ -9,11 +9,9 @@ import com.ym.im.entity.UserSocketInfo;
import com.ym.im.entity.base.ChannelAttributeKey;
import com.ym.im.entity.base.NettyConstant;
import com.ym.im.entity.model.IdModel;
import com.ym.im.handler.ChannelGroupHandler;
import com.ym.im.mq.Queue;
import com.ym.im.service.ChannelGroupService;
import com.ym.im.service.ChatRecordService;
import com.ym.im.service.ChatService;
import com.ym.im.service.MsgBodyService;
import com.ym.im.service.*;
import com.ym.im.util.JsonUtils;
import com.ym.im.validation.group.*;
import io.netty.channel.ChannelHandlerContext;
......@@ -52,6 +50,10 @@ public class StaffSingleChatServiceImpl implements ChatService {
@Autowired
private MsgBodyService msgBodyService;
@Autowired
private ChannelGroupHandler channelGroup;
// @Autowired
// private PushGatherService pushService;
......@@ -60,12 +62,13 @@ public class StaffSingleChatServiceImpl implements ChatService {
public void init(ChannelHandlerContext ctx) {
final Long staffId = ctx.channel().attr(ChannelAttributeKey.ROLE_ID).get();
final Long merchantId = ctx.channel().attr(ChannelAttributeKey.MERCHANT_ID).get();
StaffSocketInfo staffSocketInfo = new StaffSocketInfo();
staffSocketInfo.setStaffId(staffId);
staffSocketInfo.setToken(ctx.channel().attr(ChannelAttributeKey.TOKEN_INFO).get());
staffSocketInfo.setChannel((NioSocketChannel) ctx.channel());
staffSocketInfo.setUserIds(redisTemplate.opsForSet().members(NettyConstant.STAFF_USERIDS_KEY + staffId));
ChannelGroupService.STAFF_GROUP.put(staffId, staffSocketInfo);
channelGroup.putMerchantStaff(merchantId, staffSocketInfo);
log.info("客服: " + staffId + " 上线:");
}
......@@ -73,7 +76,8 @@ public class StaffSingleChatServiceImpl implements ChatService {
public void offline(ChannelHandlerContext ctx) {
final Long staffId = ctx.channel().attr(ChannelAttributeKey.ROLE_ID).get();
final StaffSocketInfo staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(staffId);
final Long merchantId = ctx.channel().attr(ChannelAttributeKey.MERCHANT_ID).get();
final StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(merchantId,staffId);
if (ctx.channel().attr(ChannelAttributeKey.TOKEN_INFO).get().equals(staffSocketInfo.getToken())) {
final Set<Long> userIds = staffSocketInfo.getUserIds();
if (userIds.size() != 0) {
......@@ -82,7 +86,7 @@ public class StaffSingleChatServiceImpl implements ChatService {
redisTemplate.opsForSet().add(userListKey, userIds.toArray(new Long[userIds.size()]));
queue.staffOfflineQueue(new StaffSocketInfo(staffId, userIds)); //NioSocketChannel无法序列化 所以new StaffSocketInfo
}
ChannelGroupService.STAFF_GROUP.remove(staffId);
channelGroup.removeMerchantStaff(merchantId,staffId);
ctx.close();
log.info("客服: " + staffId + " 下线:");
}
......@@ -93,7 +97,7 @@ public class StaffSingleChatServiceImpl implements ChatService {
public NioSocketChannel distribution(Long id, @Valid MsgBody<ChatRecord> msgBody) {
final Long userId = msgBody.getData().getUserId();
final UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(userId);
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(userId);
if (userSocketInfo == null) {
//用户不在线,保存最后发送消息的客服ID
redisTemplate.opsForHash().put(NettyConstant.IM_USERS, userId, id);
......@@ -106,11 +110,11 @@ public class StaffSingleChatServiceImpl implements ChatService {
//通知用户 新的客服
userSocketInfo.setStaffId(id);
userSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.DISTRIBUTION_STAFF).setData(new IdModel().setStaffId(id).setUserId(userId)));
ChannelGroupService.STAFF_GROUP.get(id).getUserIds().add(userId);
channelGroup.getMerchantStaff(id).getUserIds().add(userId);
}
if (currentStaffId != null && !currentStaffId.equals(id)) {
//通知客服 绑定失败 当前用户已绑定客服
ChannelGroupService.STAFF_GROUP.get(id).writeAndFlush(new MsgBody<>().setStatus(MsgBody.BINDINGFAILURE).setData(new IdModel().setStaffId(currentStaffId).setUserId(userId)));
channelGroup.getMerchantStaff(id).writeAndFlush(new MsgBody<>().setStatus(MsgBody.BINDINGFAILURE).setData(new IdModel().setStaffId(currentStaffId).setUserId(userId)));
return null;
}
return userSocketInfo.getChannel();
......@@ -157,7 +161,7 @@ public class StaffSingleChatServiceImpl implements ChatService {
final ChatRecord record = msgBody.getData();
record.setModifyTime(new Date());
chatRecordService.updateReceiveTime(record);
UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(record.getUserId());
UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(record.getUserId());
if (userSocketInfo != null) {
userSocketInfo.writeAndFlush(msgBody);
redisTemplate.opsForHash().put(NettyConstant.MSG_KEY + record.getUserId(), record.getId(), JsonUtils.obj2Json(msgBody));
......
package com.ym.im.service.impl;
import com.ym.im.entity.*;
import com.ym.im.entity.MsgBody;
import com.ym.im.entity.StaffSocketInfo;
import com.ym.im.entity.UserSocketInfo;
import com.ym.im.entity.base.NettyConstant;
import com.ym.im.entity.enums.ResultStatus;
import com.ym.im.entity.model.IdModel;
import com.ym.im.service.ChannelGroupService;
import com.ym.im.handler.ChannelGroupHandler;
import com.ym.im.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
......@@ -24,6 +27,9 @@ public class UserServiceImpl implements UserService {
@Resource(name = "myRedisTemplate")
private RedisTemplate redisTemplate;
@Autowired
private ChannelGroupHandler channelGroup;
@Override
public MsgBody getUserList(Long staffId) {
......@@ -37,12 +43,12 @@ public class UserServiceImpl implements UserService {
@Override
public MsgBody deleteUserFromList(IdModel idModel) {
final StaffSocketInfo staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(idModel.getStaffId());
final StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(idModel.getStaffId());
if (staffSocketInfo == null) {
return new MsgBody<>().setStatus(ResultStatus.REQUEST_ERROR.getCode()).setMessage(ResultStatus.REQUEST_ERROR.getMessage());
}
staffSocketInfo.getUserIds().remove(idModel.getUserId());
final UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(idModel.getUserId());
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(idModel.getUserId());
if (userSocketInfo != null && idModel.getStaffId().equals(userSocketInfo.getStaffId())) {
userSocketInfo.setStaffId(null);
}
......@@ -52,7 +58,7 @@ public class UserServiceImpl implements UserService {
@Override
public MsgBody checkBinding(IdModel idModel) {
final UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(idModel.getUserId());
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(idModel.getUserId());
/**
* 用户不在线 不校验绑定关系
* 用户在线,只有绑定的客服才能发送消息
......
......@@ -9,6 +9,7 @@ import com.ym.im.entity.UserSocketInfo;
import com.ym.im.entity.base.ChannelAttributeKey;
import com.ym.im.entity.base.NettyConstant;
import com.ym.im.entity.model.IdModel;
import com.ym.im.handler.ChannelGroupHandler;
import com.ym.im.service.*;
import com.ym.im.util.JsonUtils;
import com.ym.im.validation.group.ChatRecordReceiveGroup;
......@@ -51,6 +52,9 @@ public class UserSingleChatServiceImpl implements ChatService {
@Autowired
private ChatRecordService chatRecordService;
@Autowired
private ChannelGroupHandler channelGroup;
@Override
public void init(ChannelHandlerContext ctx) {
......@@ -61,7 +65,7 @@ public class UserSingleChatServiceImpl implements ChatService {
userSocketInfo.setCol(ctx.channel().attr(ChannelAttributeKey.COL_INFO).get());
userSocketInfo.setToken(ctx.channel().attr(ChannelAttributeKey.TOKEN_INFO).get());
userSocketInfo.setPushToken(null);
ChannelGroupService.USER_GROUP.put(userId, userSocketInfo);
channelGroup.USER_GROUP.put(userId, userSocketInfo);
//恢复历史绑定关系
restoreBindingRelationship(userId);
//通知客服 用户上线
......@@ -75,13 +79,13 @@ public class UserSingleChatServiceImpl implements ChatService {
public void offline(ChannelHandlerContext ctx) {
final Long userId = ctx.channel().attr(ChannelAttributeKey.ROLE_ID).get();
final UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(userId);
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(userId);
if (userSocketInfo != null && ctx.channel().attr(ChannelAttributeKey.TOKEN_INFO).get().equals(userSocketInfo.getToken())) {
final Long staffId = userSocketInfo.getStaffId();
ChannelGroupService.USER_GROUP.remove(userId);
channelGroup.USER_GROUP.remove(userId);
ctx.close();
if (staffId != null && ChannelGroupService.STAFF_GROUP.get(staffId) != null) {
if (staffId != null && channelGroup.getMerchantStaff(staffId) != null) {
// 保存最后的客服
redisTemplate.opsForHash().put(NettyConstant.IM_USERS, userId, staffId);
}
......@@ -94,19 +98,20 @@ public class UserSingleChatServiceImpl implements ChatService {
@Validated({MsgBodyGroup.class, ChatRecordSendGroup.class})
public NioSocketChannel distribution(Long id, @Valid MsgBody<ChatRecord> msgBody) {
final Long merchantId = msgBody.getData().getMerchantId();
// 获取服务用户的客服Id
Long staffId = ChannelGroupService.USER_GROUP.get(id).getStaffId();
Long staffId = channelGroup.USER_GROUP.get(id).getStaffId();
// 客服SocketInfo对象
StaffSocketInfo staffSocketInfo;
// 若客服Id为空,分配客服,不为空则获取客服SocketInfo
if (staffId == null) {
staffSocketInfo = staffService.getIdleStaff(id);
staffSocketInfo = staffService.getIdleStaff(merchantId,id);
if (staffSocketInfo == null) {
return null;
}
} else {
// 根据客服id获取SocketInfo
staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(staffId);
staffSocketInfo = channelGroup.getMerchantStaff(staffId);
// 服务用户的客服不在线
if (staffSocketInfo == null) {
// Redis是否存在当前客服服务用户Set对象(TTL=60秒)
......@@ -114,7 +119,7 @@ public class UserSingleChatServiceImpl implements ChatService {
// 客服下线超过60秒,当前客服服务用户Set对象已过期,重新分配客服
// 过期后set对象不会为空而是大小为0
if (members.size() == 0) {
staffSocketInfo = staffService.getIdleStaff(id);
staffSocketInfo = staffService.getIdleStaff(merchantId,id);
}
return null;
} else {
......@@ -149,7 +154,7 @@ public class UserSingleChatServiceImpl implements ChatService {
* 情况4:当前用户有客服服务,且客服已下线并超过60秒,重新分配客服且有客服可分配,ChatRecord.staffId = StaffSocketInfo.id;
* 情况5(特殊:需求未定暂时这样处理):当前用户有客服服务,且客服已下线并超过60秒,重新分配客服且无客服可分配,ChatRecord.staffId = staffId;
*/
record.setStaffId(ChannelGroupService.USER_GROUP.get(id).getStaffId());
record.setStaffId(channelGroup.USER_GROUP.get(id).getStaffId());
// 先保存至数据库,再发送消息(若颠倒顺序可能导致数据未保存,更新已读操作先执行导致消息一直是未读状态)
chatRecordService.insertSelective(record);
log.info("用户 消息保存:" + record.getId());
......@@ -173,10 +178,10 @@ public class UserSingleChatServiceImpl implements ChatService {
final ChatRecord record = msgBody.getData();
record.setModifyTime(new Date());
chatRecordService.updateReceiveTime(record);
UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(record.getUserId());
UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(record.getUserId());
if (userSocketInfo != null) {
userSocketInfo.setStaffId(userSocketInfo.getStaffId() == null ? record.getStaffId() : userSocketInfo.getStaffId());
StaffSocketInfo staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(userSocketInfo.getStaffId());
StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(userSocketInfo.getStaffId());
if (staffSocketInfo != null) {
staffSocketInfo.writeAndFlush(msgBody);
}
......@@ -193,9 +198,9 @@ public class UserSingleChatServiceImpl implements ChatService {
private void restoreBindingRelationship(Long userId) {
if (redisTemplate.opsForHash().hasKey(NettyConstant.IM_USERS, userId)) {
final Long staffId = (Long) redisTemplate.opsForHash().get(NettyConstant.IM_USERS, userId);
final StaffSocketInfo staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(staffId);
final StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(staffId);
if (staffSocketInfo != null) {
ChannelGroupService.USER_GROUP.get(userId).setStaffId(staffId);
channelGroup.USER_GROUP.get(userId).setStaffId(staffId);
staffSocketInfo.getUserIds().add(userId);
}
}
......@@ -207,11 +212,13 @@ public class UserSingleChatServiceImpl implements ChatService {
* @param userId
*/
private void broadcastUserOnline(Long userId) {
final Long staffId = ChannelGroupService.USER_GROUP.get(userId).getStaffId();
ChannelGroupService.STAFF_GROUP.forEach((key, staffSocketInfo) -> {
final Long staffId = channelGroup.USER_GROUP.get(userId).getStaffId();
channelGroup.STAFF_GROUP.forEach((key, staffGroup) -> {
staffGroup.values().forEach(staffSocketInfo -> {
staffSocketInfo.getUserIds().remove(userId);
staffSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.USERS_ONLINE).setData(new IdModel().setStaffId(staffId).setUserId(userId)));
});
});
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment