Commit 82727ac2 by zhangjw

1:完善多商户客服逻辑(待测试)

2:新增sessionList 表和逻辑 增删改查会话列表
3:修改chat_record表和相关逻辑 支持多商户
4:其他细节调整
parent 2b1d00db
......@@ -30,7 +30,6 @@ public class ChatRecordController {
@GetMapping("/pull")
@ApiOperation(value = "获取聊天记录")
public MsgBody<PullChatRecord> pull(@Valid PullChatRecord pull) {
// return pull.setChatRecords(chatRecordService.getChatRecord(pull));
return chatRecordService.getChatRecord(pull);
}
......
package com.ym.im.controller;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ym.im.entity.MsgBody;
import com.ym.im.entity.Session;
import com.ym.im.entity.enums.ResultStatus;
import com.ym.im.service.SessionListService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* @author: JJww
* @Date:2020-10-21
*/
@RestController
@RequestMapping("/session")
@Api(description = "会话列表相关")
public class SessionController {
@Autowired
private SessionListService sessionListService;
@GetMapping("/list")
@ApiOperation(value = "获取用户会话列表")
public MsgBody<List<Session>> getSessionList(Long userId) {
return new MsgBody<List<Session>>()
.setStatus(ResultStatus.SUCCESS.getCode())
.setData(sessionListService.list(new QueryWrapper<Session>().lambda().eq(Session::getUserId, userId)));
}
@DeleteMapping("/list")
@ApiOperation(value = "删除商户会话")
public MsgBody deleteSessionList(Long merchantId) {
sessionListService.remove(new QueryWrapper<Session>().lambda().eq(Session::getMerchantId, merchantId));
return new MsgBody<>().setStatus(ResultStatus.SUCCESS.getCode());
}
}
......@@ -48,11 +48,6 @@ public class ChatRecord implements Serializable {
@ApiModelProperty(value = "用户Id")
private Long userId;
@NotNull(message = "{error.staff_id_empty}", groups = {Default.class})
@Positive(message = "{error.staff_id_greater_than_zero}", groups = {Default.class})
@ApiModelProperty(value = "员工(客服)Id")
private Long staffId;
@NotNull(message = "{error.merchant_id_empty}", groups = {Default.class})
@Positive(message = "{error.merchant_id_greater_than_zero}", groups = {Default.class})
@ApiModelProperty(value = "商户ID")
......@@ -70,11 +65,6 @@ public class ChatRecord implements Serializable {
@ApiModelProperty(value = "收或发(以用户为第一人称,0:用户 1:客服)")
private Integer sendReceive;
@Min(value = 0, message = "{error.chat_scope}", groups = {Default.class, ChatRecordSaveGroup.class})
@Max(value = 2, message = "{error.chat_scope}", groups = {Default.class, ChatRecordSaveGroup.class})
@ApiModelProperty(value = "内容范围: 1、机票,2、酒店等")
private Integer scope = 0;
@ApiModelProperty(value = "重试次数")
private Integer retryCount = 0;
......
//package com.ym.im.entity;
//
//import lombok.Data;
//
//import java.io.Serializable;
//import java.util.Set;
//
///**
// * @author: JJww
// * 商户对应的客服集合
// * @Date:2020/10/13
// */
//@Data
//public class Merchant implements Serializable {
//
// private Long merchantId;
//
// private Set<StaffSocketInfo> staffSocketInfoSet;
//}
......@@ -15,8 +15,8 @@ import java.io.Serializable;
* @Date:2019-05-17
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@EqualsAndHashCode(callSuper = false)
public class MsgBody<T> implements Serializable {
public static final int LOGOUT = -3;
......
......@@ -26,6 +26,11 @@ public class PullChatRecord implements Serializable {
@ApiModelProperty(value = "用户Id")
private Long userId;
@NotNull(message = "{error.merchant_id_empty}")
@Positive(message = "{error.merchant_id_greater_than_zero}")
@ApiModelProperty(value = "商户Id")
private Long merchantId;
@Positive(message = "{error.chat_id_greater_than_zero}")
@ApiModelProperty(value = "聊天记录Id,为获取坐标")
private Long chatRecordId;
......
package com.ym.im.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.ym.im.validation.group.ChatRecordSaveGroup;
import com.ym.im.validation.group.StaffSendGroup;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Positive;
import javax.validation.groups.Default;
import java.io.Serializable;
import java.util.Date;
/**
* @author: JJww
* @Date:2020/10/21
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("session_list")
public class Session implements Serializable {
@NotNull(message = "{error.chat_id_empty}", groups = {Default.class, ChatRecordSaveGroup.class})
@Positive(message = "{error.chat_id_greater_than_zero}", groups = {Default.class, ChatRecordSaveGroup.class})
private Long id;
@NotNull(message = "{error.user_id_empty}", groups = {Default.class, ChatRecordSaveGroup.class, StaffSendGroup.class})
@Positive(message = "{error.user_id_greater_than_zero}", groups = {Default.class, ChatRecordSaveGroup.class, StaffSendGroup.class})
@ApiModelProperty(value = "用户Id")
private Long userId;
@NotNull(message = "{error.merchant_id_empty}", groups = {Default.class})
@Positive(message = "{error.merchant_id_greater_than_zero}", groups = {Default.class})
@ApiModelProperty(value = "商户ID")
private Long merchantId;
@NotNull(message = "{error.chat_create_time_empty}", groups = {Default.class, ChatRecordSaveGroup.class})
@ApiModelProperty(value = "创建时间(服务端设置)")
private Date createTime;
@NotNull(message = "{error.chat_modify_time_empty}", groups = {Default.class})
@ApiModelProperty(value = "修改时间(服务端设置)")
private Date modifyTime;
}
......@@ -15,8 +15,8 @@ import java.util.Set;
* @Date:2019-05-21
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@EqualsAndHashCode(callSuper = false)
public class StaffSocketInfo extends BaseSocketInfo {
private Long staffId;
......
......@@ -6,23 +6,36 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @author: JJww
* @Date:2019-05-21
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@EqualsAndHashCode(callSuper = false)
public class UserSocketInfo extends BaseSocketInfo {
private Long userId;
private Long staffId;
private Map<Long, Long> staffIds = new HashMap<>();
private Integer scope;
private Set<Long> sessionList;
private PushToken pushToken;
private String col;
public Long getStaffId(Long merchantId) {
return staffIds.get(merchantId);
}
public void setStaff(Long merchantId, Long staffId) {
staffIds.put(merchantId, staffId);
}
}
......@@ -22,4 +22,8 @@ public class IdModel {
@NotNull(message = "{error.userId_empty}")
private Long userId;
@ApiModelProperty(value = "商户Id")
@NotNull(message = "{error.merchantId_empty}")
private Long merchantId;
}
......@@ -32,4 +32,6 @@ public class OrderModel implements Serializable {
* 用户Id
*/
private String userId;
private Long merchantId;
}
package com.ym.im.handler;
import com.ym.im.entity.base.ChannelAttributeKey;
import com.ym.im.entity.base.NettyConstant;
import com.ym.im.factory.SingleChatFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author: JJww
* @Date:2019/11/14
*/
@Component
@ChannelHandler.Sharable
public abstract class BaseHandler<T> extends SimpleChannelInboundHandler<T> {
@Autowired
......@@ -18,12 +21,21 @@ public abstract class BaseHandler<T> extends SimpleChannelInboundHandler<T> {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
singleChatFactory.getService(ctx.channel().attr(ChannelAttributeKey.ROLE_TYPE).get()).offline(ctx);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
try {
singleChatFactory.getService(ctx.channel().attr(ChannelAttributeKey.ROLE_TYPE).get()).offline(ctx);
} catch (Exception e) {
e.printStackTrace();
}
}
}
......@@ -44,26 +44,29 @@ public class ChannelGroupHandler {
* @param merchantId
* @param staffId
*/
public void removeMerchantStaff(Long merchantId, Long staffId) {
final Map<Long, StaffSocketInfo> staffSocketInfoMap = STAFF_GROUP.get(merchantId);
staffSocketInfoMap.remove(staffId);
if (staffSocketInfoMap.isEmpty()) {
STAFF_GROUP.remove(merchantId);
public void removeMerchantStaff(Long staffId) {
StaffSocketInfo staffSocketInfo = null;
for (Map<Long, StaffSocketInfo> staffGroup : STAFF_GROUP.values()) {
StaffSocketInfo staffInfo = staffGroup.get(staffId);
if (staffInfo != null) {
staffGroup.remove(staffId);
break;
}
}
}
public StaffSocketInfo getMerchantStaff(Long merchantId, Long staffId) {
return STAFF_GROUP.get(merchantId).get(staffId);
return STAFF_GROUP.get(merchantId) != null ? STAFF_GROUP.get(merchantId).get(staffId) : null;
}
public StaffSocketInfo getMerchantStaff(Long staffId) {
StaffSocketInfo staffSocketInfo = null;
for (Map<Long, StaffSocketInfo> staffGroup : STAFF_GROUP.values()) {
for (StaffSocketInfo staffInfo : staffGroup.values()) {
if (staffInfo.getStaffId().equals(staffId)) {
staffInfo = staffSocketInfo;
}
StaffSocketInfo staffInfo = staffGroup.get(staffId);
if (staffInfo != null) {
staffSocketInfo = staffInfo;
break;
}
}
return staffSocketInfo;
......
......@@ -37,9 +37,9 @@ public class WebSocketHandshakerHandler extends BaseHandler<FullHttpRequest> {
@Autowired
private SingleChatFactory singleChatFactory;
public static final String ROLE_TYPE = "roleType";
public static final String ROLE_TYPE = "RoleType";
public static final String MERCHANT_ID = "merchantId";
public static final String MERCHANT_ID = "MerchantId";
public static final String AUTHORIZATION = "Authorization";
......@@ -50,13 +50,15 @@ public class WebSocketHandshakerHandler extends BaseHandler<FullHttpRequest> {
Optional.ofNullable(token).orElseThrow(() -> new HttpException(ctx, fullHttpRequest, new DefaultFullHttpResponse(HTTP_1_1, UNAUTHORIZED)));
final String roleType = fullHttpRequest.headers().get(ROLE_TYPE, null);
Optional.ofNullable(roleType).orElseThrow(() -> new HttpException(ctx, fullHttpRequest, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)));
final Long merchantId = Long.valueOf(fullHttpRequest.headers().get(MERCHANT_ID, null));
if (RoleEnum.merchant.getType().equals(roleType)) {
final Object merchantId = fullHttpRequest.headers().get(MERCHANT_ID);
Optional.ofNullable(merchantId).orElseThrow(() -> new HttpException(ctx, fullHttpRequest, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)));
final Long userId = 123L;//待完善鉴权逻辑
ctx.channel().attr(ChannelAttributeKey.MERCHANT_ID).set(Long.valueOf(merchantId.toString()));
}
final Long userId = Long.valueOf(fullHttpRequest.headers().get(MERCHANT_ID)); //待完善鉴权逻辑
ctx.channel().attr(ChannelAttributeKey.ROLE_ID).set(userId);
ctx.channel().attr(ChannelAttributeKey.TOKEN_INFO).set(token);
ctx.channel().attr(ChannelAttributeKey.ROLE_TYPE).set(roleType);
ctx.channel().attr(ChannelAttributeKey.MERCHANT_ID).set(merchantId);
this.sso(token, userId, roleType);
singleChatFactory.getService(roleType).init(ctx);
fullHttpRequest.setUri(NettyConstant.CS);
......@@ -69,15 +71,17 @@ public class WebSocketHandshakerHandler extends BaseHandler<FullHttpRequest> {
switch (RoleEnum.get(type)) {
case APP:
baseSocketInfo = channelGroup.USER_GROUP.get(roleId);
channelGroup.USER_GROUP.remove(roleId);
break;
case merchant:
baseSocketInfo = channelGroup.getMerchantStaff(roleId);
channelGroup.removeMerchantStaff(roleId);
break;
default:
}
if (baseSocketInfo != null && !token.equals(baseSocketInfo.getToken())) {
baseSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.FORCEDOFFLINE));
baseSocketInfo.close();
channelGroup.USER_GROUP.remove(roleId);//待完善
log.info("用户: " + roleId + " 被迫下线");
}
}
......
package com.ym.im.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ym.im.entity.Session;
/**
* @author: JJww
* @Date:2020/10/21
*/
public interface SessionListMapper extends BaseMapper<Session> {
}
......@@ -73,7 +73,7 @@ public class Receiver {
userIds.forEach((Long userId) -> {
//用户在线才重新分配和转发
if (channelGroup.USER_GROUP.get(userId) != null) {
final StaffSocketInfo idleStaff = staffService.getIdleStaff(staffSocketInfo.getMerchantId(),userId);
final StaffSocketInfo idleStaff = staffService.getIdleStaff(staffSocketInfo.getMerchantId(), userId);
if (idleStaff != null) {
idleStaff.writeAndFlush(new MsgBody<>().setStatus(MsgBody.DISTRIBUTION_STAFF).setData(new IdModel().setStaffId(staffId).setUserId(userId)));
}
......@@ -114,7 +114,7 @@ public class Receiver {
if (userSocketInfo == null) {
return;
}
StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(userSocketInfo.getStaffId());
StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(userSocketInfo.getStaffId(orderModel.getMerchantId()));
final MsgBody<OrderModel> orderInfo = new MsgBody<OrderModel>().setStatus(MsgBody.ORDER).setData(orderModel);
/**
* 绑定客服在线,发送订单信息
......@@ -167,7 +167,7 @@ public class Receiver {
switch (RoleEnum.get(sendReceive)) {
case APP:
Long staffId = userSocketInfo.getStaffId() == null ? chatRecord.getStaffId() : userSocketInfo.getStaffId();
Long staffId = userSocketInfo.getStaffId(chatRecord.getMerchantId());
StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(staffId);
if (staffSocketInfo != null) {
staffSocketInfo.writeAndFlush(msgBody);
......
package com.ym.im.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.ym.im.entity.Session;
/**
* @author: JJww
* @Date:2020/10/21
*/
public interface SessionListService extends IService<Session> {
}
package com.ym.im.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ym.im.entity.Session;
import com.ym.im.mapper.SessionListMapper;
import com.ym.im.service.SessionListService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* @author: JJww
* @Date:2020/10/21
*/
@Slf4j
@Service
public class SessionListServiceImpl extends ServiceImpl<SessionListMapper, Session> implements SessionListService {
}
......@@ -10,10 +10,7 @@ import com.ym.im.service.StaffService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.*;
import static java.util.Map.Entry.comparingByValue;
import static java.util.stream.Collectors.toMap;
......@@ -29,9 +26,13 @@ public class StaffServiceImpl implements StaffService {
private ChannelGroupHandler channelGroup;
@Override
public StaffSocketInfo getIdleStaff(Long merchantId,Long userId) {
public StaffSocketInfo getIdleStaff(Long merchantId, Long userId) {
final LinkedHashMap<Long, StaffSocketInfo> collect = channelGroup.STAFF_GROUP.get(merchantId)
final Map<Long, StaffSocketInfo> socketInfoMap = channelGroup.STAFF_GROUP.get(merchantId);
if (socketInfoMap == null) {
return null;
}
final LinkedHashMap<Long, StaffSocketInfo> collect = socketInfoMap
.entrySet()
.stream()
.sorted(comparingByValue(new Comparator<StaffSocketInfo>() {
......@@ -42,17 +43,16 @@ public class StaffServiceImpl implements StaffService {
})).collect(toMap(e -> e.getKey(), e -> e.getValue(), (e1, e2) -> e2, LinkedHashMap::new));
if (collect.size() == 0) {
channelGroup.USER_GROUP.get(userId).setStaffId(null);
return null;
}
//客服和用户绑定
StaffSocketInfo staffSocketInfo = collect.entrySet().iterator().next().getValue();
staffSocketInfo.getUserIds().add(userId);
Long staffId = staffSocketInfo.getStaffId();
//用户和客服绑定
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(userId);
if (userSocketInfo != null) {
userSocketInfo.setStaffId(staffId);
final Long staffId = staffSocketInfo.getStaffId();
userSocketInfo.setStaff(merchantId, staffId);
//通知用户 新的客服
userSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.DISTRIBUTION_STAFF).setData(new IdModel().setStaffId(staffId).setUserId(userId)));
}
......@@ -63,16 +63,19 @@ public class StaffServiceImpl implements StaffService {
@Override
public MsgBody forward(IdModel idModel) {
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(idModel.getUserId());
final StaffSocketInfo staffSocketInfo =channelGroup.getMerchantStaff(idModel.getStaffId());
final Long userId = idModel.getUserId();
final Long staffId = idModel.getStaffId();
final Long merchantId = idModel.getMerchantId();
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(userId);
final StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(staffId);
if (staffSocketInfo == null || userSocketInfo == null) {
return new MsgBody<>().setStatus(MsgBody.BINDINGFAILURE).setMessage(ResultStatus.FORWARD_FAILURE.getMessage());
}
//移除原客服绑定
channelGroup.getMerchantStaff(userSocketInfo.getStaffId()).getUserIds().remove(idModel.getUserId());
channelGroup.getMerchantStaff(userSocketInfo.getStaffId(merchantId)).getUserIds().remove(userId);
//设置新的客服
staffSocketInfo.getUserIds().add(idModel.getUserId());
userSocketInfo.setStaffId(idModel.getStaffId());
staffSocketInfo.getUserIds().add(userId);
userSocketInfo.setStaff(merchantId, staffId);
final MsgBody<IdModel> msgBody = new MsgBody<IdModel>().setStatus(MsgBody.DISTRIBUTION_STAFF).setData(idModel);
//通知用户 新客服ID
userSocketInfo.writeAndFlush(msgBody);
......@@ -84,7 +87,7 @@ public class StaffServiceImpl implements StaffService {
@Override
public MsgBody getMerchantStaffGroup(Long merchantId) {
List<StaffSocketInfo> staffs = new ArrayList<StaffSocketInfo>();
final List<StaffSocketInfo> staffs = new ArrayList<StaffSocketInfo>();
channelGroup.STAFF_GROUP.get(merchantId).forEach((k, v) -> {
staffs.add(v);
});
......
......@@ -11,7 +11,9 @@ import com.ym.im.entity.base.NettyConstant;
import com.ym.im.entity.model.IdModel;
import com.ym.im.handler.ChannelGroupHandler;
import com.ym.im.mq.Queue;
import com.ym.im.service.*;
import com.ym.im.service.ChatRecordService;
import com.ym.im.service.ChatService;
import com.ym.im.service.MsgBodyService;
import com.ym.im.util.JsonUtils;
import com.ym.im.validation.group.*;
import io.netty.channel.ChannelHandlerContext;
......@@ -65,6 +67,7 @@ public class StaffSingleChatServiceImpl implements ChatService {
final Long merchantId = ctx.channel().attr(ChannelAttributeKey.MERCHANT_ID).get();
StaffSocketInfo staffSocketInfo = new StaffSocketInfo();
staffSocketInfo.setStaffId(staffId);
staffSocketInfo.setMerchantId(merchantId);
staffSocketInfo.setToken(ctx.channel().attr(ChannelAttributeKey.TOKEN_INFO).get());
staffSocketInfo.setChannel((NioSocketChannel) ctx.channel());
staffSocketInfo.setUserIds(redisTemplate.opsForSet().members(NettyConstant.STAFF_USERIDS_KEY + staffId));
......@@ -77,7 +80,7 @@ public class StaffSingleChatServiceImpl implements ChatService {
final Long staffId = ctx.channel().attr(ChannelAttributeKey.ROLE_ID).get();
final Long merchantId = ctx.channel().attr(ChannelAttributeKey.MERCHANT_ID).get();
final StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(merchantId,staffId);
final StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(merchantId, staffId);
if (ctx.channel().attr(ChannelAttributeKey.TOKEN_INFO).get().equals(staffSocketInfo.getToken())) {
final Set<Long> userIds = staffSocketInfo.getUserIds();
if (userIds.size() != 0) {
......@@ -86,7 +89,7 @@ public class StaffSingleChatServiceImpl implements ChatService {
redisTemplate.opsForSet().add(userListKey, userIds.toArray(new Long[userIds.size()]));
queue.staffOfflineQueue(new StaffSocketInfo(staffId, userIds)); //NioSocketChannel无法序列化 所以new StaffSocketInfo
}
channelGroup.removeMerchantStaff(merchantId,staffId);
channelGroup.removeMerchantStaff(staffId);
ctx.close();
log.info("客服: " + staffId + " 下线:");
}
......@@ -95,8 +98,9 @@ public class StaffSingleChatServiceImpl implements ChatService {
@Override
@Validated({MsgBodyGroup.class, ChatRecordSendGroup.class, StaffSendGroup.class})
public NioSocketChannel distribution(Long id, @Valid MsgBody<ChatRecord> msgBody) {
final Long userId = msgBody.getData().getUserId();
final ChatRecord chatRecord = msgBody.getData();
final Long userId = chatRecord.getUserId();
final Long merchantId = channelGroup.getMerchantStaff(id).getMerchantId();
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(userId);
if (userSocketInfo == null) {
//用户不在线,保存最后发送消息的客服ID
......@@ -105,16 +109,16 @@ public class StaffSingleChatServiceImpl implements ChatService {
pushNotifications(userId);
return null;
}
final Long currentStaffId = userSocketInfo.getStaffId();
final Long currentStaffId = userSocketInfo.getStaffId(merchantId);
if (currentStaffId == null) {
//通知用户 新的客服
userSocketInfo.setStaffId(id);
userSocketInfo.setStaff(merchantId, id);
userSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.DISTRIBUTION_STAFF).setData(new IdModel().setStaffId(id).setUserId(userId)));
channelGroup.getMerchantStaff(id).getUserIds().add(userId);
}
if (currentStaffId != null && !currentStaffId.equals(id)) {
//通知客服 绑定失败 当前用户已绑定客服
channelGroup.getMerchantStaff(id).writeAndFlush(new MsgBody<>().setStatus(MsgBody.BINDINGFAILURE).setData(new IdModel().setStaffId(currentStaffId).setUserId(userId)));
channelGroup.getMerchantStaff(id).writeAndFlush(new MsgBody<>().setStatus(MsgBody.BINDINGFAILURE).setData(new IdModel().setStaffId(currentStaffId).setUserId(userId).setMerchantId(merchantId)));
return null;
}
return userSocketInfo.getChannel();
......@@ -127,8 +131,8 @@ public class StaffSingleChatServiceImpl implements ChatService {
// 设置聊天基本信息
final ChatRecord record = msgBody.getData();
record.setId(IdWorker.getId())
.setMerchantId(channelGroup.getMerchantStaff(id).getMerchantId())
.setSendReceive(RECEIVE)
.setStaffId(id)
.setCreateTime(new Date());
// 先保存至数据库,再发送消息(若颠倒顺序可能导致数据未保存,更新已读操作先执行导致消息一直是未读状态
chatRecordService.insertSelective(record);
......
......@@ -33,7 +33,7 @@ public class UserServiceImpl implements UserService {
@Override
public MsgBody getUserList(Long staffId) {
final Set usersId = redisTemplate.opsForSet().members(NettyConstant.STAFF_USERIDS_KEY + staffId);
final Set<Long> usersId = redisTemplate.opsForSet().members(NettyConstant.STAFF_USERIDS_KEY + staffId);
if (usersId.size() > 0) {
redisTemplate.delete(NettyConstant.STAFF_USERIDS_KEY + staffId);
}
......@@ -49,25 +49,25 @@ public class UserServiceImpl implements UserService {
}
staffSocketInfo.getUserIds().remove(idModel.getUserId());
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(idModel.getUserId());
if (userSocketInfo != null && idModel.getStaffId().equals(userSocketInfo.getStaffId())) {
userSocketInfo.setStaffId(null);
if (userSocketInfo != null && idModel.getStaffId().equals(userSocketInfo.getStaffId(idModel.getMerchantId()))) {
userSocketInfo.getStaffIds().remove(idModel.getMerchantId());
}
return new MsgBody().setStatus(ResultStatus.SUCCESS.getCode());
return new MsgBody<>().setStatus(ResultStatus.SUCCESS.getCode());
}
@Override
public MsgBody checkBinding(IdModel idModel) {
final Long merchantId = idModel.getMerchantId();
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(idModel.getUserId());
/**
* 用户不在线 不校验绑定关系
* 用户在线,只有绑定的客服才能发送消息
* 用户在线未绑定客服
*/
if (userSocketInfo == null || (userSocketInfo != null && idModel.getStaffId().equals(userSocketInfo.getStaffId()) || userSocketInfo.getStaffId() == null)) {
return new MsgBody().setStatus(ResultStatus.SUCCESS.getCode());
if (userSocketInfo == null || (userSocketInfo != null && idModel.getStaffId().equals(userSocketInfo.getStaffId(merchantId)) || userSocketInfo.getStaffId(merchantId) == null)) {
return new MsgBody<>().setStatus(ResultStatus.SUCCESS.getCode());
}
return new MsgBody<>().setStatus(ResultStatus.CHECK_FAILURE.getCode()).setData(userSocketInfo.getStaffId());
return new MsgBody<>().setStatus(ResultStatus.CHECK_FAILURE.getCode()).setData(userSocketInfo.getStaffId(merchantId));
}
......
package com.ym.im.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.ym.im.entity.ChatRecord;
import com.ym.im.entity.MsgBody;
import com.ym.im.entity.StaffSocketInfo;
import com.ym.im.entity.UserSocketInfo;
import com.ym.im.entity.*;
import com.ym.im.entity.base.ChannelAttributeKey;
import com.ym.im.entity.base.NettyConstant;
import com.ym.im.entity.model.IdModel;
......@@ -26,8 +24,8 @@ import org.springframework.validation.annotation.Validated;
import javax.annotation.Resource;
import javax.validation.Valid;
import java.util.Date;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import static com.ym.im.entity.ChatRecord.SEND;
......@@ -40,6 +38,9 @@ import static com.ym.im.entity.ChatRecord.SEND;
@Validated
public class UserSingleChatServiceImpl implements ChatService {
@Autowired
private ChannelGroupHandler channelGroup;
@Resource(name = "myRedisTemplate")
private RedisTemplate redisTemplate;
......@@ -53,7 +54,8 @@ public class UserSingleChatServiceImpl implements ChatService {
private ChatRecordService chatRecordService;
@Autowired
private ChannelGroupHandler channelGroup;
private SessionListService sessiontListService;
@Override
public void init(ChannelHandlerContext ctx) {
......@@ -65,30 +67,25 @@ public class UserSingleChatServiceImpl implements ChatService {
userSocketInfo.setCol(ctx.channel().attr(ChannelAttributeKey.COL_INFO).get());
userSocketInfo.setToken(ctx.channel().attr(ChannelAttributeKey.TOKEN_INFO).get());
userSocketInfo.setPushToken(null);
userSocketInfo.setSessionList(this.getSessionList(userId));
channelGroup.USER_GROUP.put(userId, userSocketInfo);
//恢复历史绑定关系
restoreBindingRelationship(userId);
//通知客服 用户上线
broadcastUserOnline(userId);
this.restoreBindingRelationship(userId); //恢复历史绑定关系
this.broadcastUserOnline(userId); //通知客服 用户上线
log.info("用户: " + userId + " 上线");
}
@Override
public void offline(ChannelHandlerContext ctx) {
final Long userId = ctx.channel().attr(ChannelAttributeKey.ROLE_ID).get();
final UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(userId);
if (userSocketInfo != null && ctx.channel().attr(ChannelAttributeKey.TOKEN_INFO).get().equals(userSocketInfo.getToken())) {
final Long staffId = userSocketInfo.getStaffId();
channelGroup.USER_GROUP.remove(userId);
ctx.close();
if (staffId != null && channelGroup.getMerchantStaff(staffId) != null) {
// 保存最后的客服
redisTemplate.opsForHash().put(NettyConstant.IM_USERS, userId, staffId);
final Map<Long, Long> staffIds = userSocketInfo.getStaffIds();
if (staffIds.size() != 0) {
redisTemplate.opsForHash().put(NettyConstant.IM_USERS, userId, staffIds); // 保存最后的客服
}
ctx.close();
channelGroup.USER_GROUP.remove(userId);
log.info("用户: " + userId + " 下线");
}
}
......@@ -100,12 +97,12 @@ public class UserSingleChatServiceImpl implements ChatService {
final Long merchantId = msgBody.getData().getMerchantId();
// 获取服务用户的客服Id
Long staffId = channelGroup.USER_GROUP.get(id).getStaffId();
final Long staffId = channelGroup.USER_GROUP.get(id).getStaffId(merchantId);
// 客服SocketInfo对象
StaffSocketInfo staffSocketInfo;
// 若客服Id为空,分配客服,不为空则获取客服SocketInfo
if (staffId == null) {
staffSocketInfo = staffService.getIdleStaff(merchantId,id);
staffSocketInfo = staffService.getIdleStaff(merchantId, id);
if (staffSocketInfo == null) {
return null;
}
......@@ -119,7 +116,7 @@ public class UserSingleChatServiceImpl implements ChatService {
// 客服下线超过60秒,当前客服服务用户Set对象已过期,重新分配客服
// 过期后set对象不会为空而是大小为0
if (members.size() == 0) {
staffSocketInfo = staffService.getIdleStaff(merchantId,id);
staffSocketInfo = staffService.getIdleStaff(merchantId, id);
}
return null;
} else {
......@@ -154,9 +151,10 @@ public class UserSingleChatServiceImpl implements ChatService {
* 情况4:当前用户有客服服务,且客服已下线并超过60秒,重新分配客服且有客服可分配,ChatRecord.staffId = StaffSocketInfo.id;
* 情况5(特殊:需求未定暂时这样处理):当前用户有客服服务,且客服已下线并超过60秒,重新分配客服且无客服可分配,ChatRecord.staffId = staffId;
*/
record.setStaffId(channelGroup.USER_GROUP.get(id).getStaffId());
// record.setStaffId(channelGroup.USER_GROUP.get(id).getStaffId(record.getMerchantId()));
// 先保存至数据库,再发送消息(若颠倒顺序可能导致数据未保存,更新已读操作先执行导致消息一直是未读状态)
chatRecordService.insertSelective(record);
this.updateMerchantList(msgBody);
log.info("用户 消息保存:" + record.getId());
}
......@@ -180,8 +178,9 @@ public class UserSingleChatServiceImpl implements ChatService {
chatRecordService.updateReceiveTime(record);
UserSocketInfo userSocketInfo = channelGroup.USER_GROUP.get(record.getUserId());
if (userSocketInfo != null) {
userSocketInfo.setStaffId(userSocketInfo.getStaffId() == null ? record.getStaffId() : userSocketInfo.getStaffId());
StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(userSocketInfo.getStaffId());
final Long merchantId = record.getMerchantId();
final Long staffId = userSocketInfo.getStaffId(merchantId);
final StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(merchantId, staffId);
if (staffSocketInfo != null) {
staffSocketInfo.writeAndFlush(msgBody);
}
......@@ -197,12 +196,15 @@ public class UserSingleChatServiceImpl implements ChatService {
*/
private void restoreBindingRelationship(Long userId) {
if (redisTemplate.opsForHash().hasKey(NettyConstant.IM_USERS, userId)) {
final Long staffId = (Long) redisTemplate.opsForHash().get(NettyConstant.IM_USERS, userId);
final StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(staffId);
final HashMap staffs = (HashMap) redisTemplate.opsForHash().get(NettyConstant.IM_USERS, userId);
staffs.values().forEach(staffId -> {
final StaffSocketInfo staffSocketInfo = channelGroup.getMerchantStaff(Long.valueOf(staffId.toString()));
if (staffSocketInfo != null) {
channelGroup.USER_GROUP.get(userId).setStaffId(staffId);
staffSocketInfo.getUserIds().add(userId);
}
});
channelGroup.USER_GROUP.get(userId).setStaffIds(staffs);
redisTemplate.opsForHash().delete(NettyConstant.IM_USERS, userId);
}
}
......@@ -212,13 +214,29 @@ public class UserSingleChatServiceImpl implements ChatService {
* @param userId
*/
private void broadcastUserOnline(Long userId) {
final Long staffId = channelGroup.USER_GROUP.get(userId).getStaffId();
channelGroup.STAFF_GROUP.forEach((key, staffGroup) -> {
staffGroup.values().forEach(staffSocketInfo -> {
channelGroup.USER_GROUP.get(userId).getStaffIds().forEach((merchantId, staffId) -> {
channelGroup.STAFF_GROUP.get(merchantId).values().forEach(staffSocketInfo -> {
staffSocketInfo.getUserIds().remove(userId);
staffSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.USERS_ONLINE).setData(new IdModel().setStaffId(staffId).setUserId(userId)));
});
});
}
private Set<Long> getSessionList(Long userId) {
final List<Session> merchantLists = sessiontListService.list(new QueryWrapper<Session>().lambda().eq(Session::getUserId, userId));
return merchantLists.stream().map(Session::getMerchantId).collect(Collectors.toSet());
}
private void updateMerchantList(MsgBody<ChatRecord> msgBody) {
final Date now = new Date();
final ChatRecord chatRecord = msgBody.getData();
final Long userId = chatRecord.getUserId();
final Long merchantId = chatRecord.getMerchantId();
final Set<Long> merchantList = channelGroup.USER_GROUP.get(userId).getSessionList();
if (!merchantList.contains(merchantId)) {
sessiontListService.save(new Session().setId(IdWorker.getId()).setUserId(userId).setMerchantId(merchantId).setCreateTime(now).setModifyTime(now));
merchantList.add(merchantId);
}
}
}
......@@ -16,9 +16,9 @@ spring:
max-wait: 60000
# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位毫秒
time-between-eviction-runs-millis: 60000
url: jdbc:mysql://47.99.47.225:3306/pathfinder_im?useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai
url: jdbc:mysql://127.0.0.1/customer_service?useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: temple123456
password: 101020
driver-class-name: com.mysql.cj.jdbc.Driver
jackson:
default-property-inclusion: non_null
......
......@@ -10,7 +10,7 @@ spring:
add-mappings: false
server:
port: 20002
port: 9090
# 关闭tomcat自带/error页面
error:
whitelabel:
......
......@@ -4,11 +4,10 @@
<resultMap id="BaseResultMap" type="com.ym.im.entity.ChatRecord">
<id column="id" jdbcType="BIGINT" property="id"/>
<result column="user_id" jdbcType="BIGINT" property="userId"/>
<result column="staff_id" jdbcType="BIGINT" property="staffId"/>
<result column="merchant_id" jdbcType="BIGINT" property="merchantId"/>
<result column="msg_type" jdbcType="TINYINT" property="msgType"/>
<result column="msg_info" jdbcType="VARCHAR" property="msgInfo"/>
<result column="send_receive" jdbcType="TINYINT" property="sendReceive"/>
<result column="scope" jdbcType="TINYINT" property="scope"/>
<result column="send_time" jdbcType="TIMESTAMP" property="sendTime"/>
<result column="receive_time" jdbcType="TIMESTAMP" property="receiveTime"/>
<result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
......@@ -16,7 +15,7 @@
</resultMap>
<sql id="Base_Column_List">
id, user_id, staff_id, msg_type, msg_info, send_receive, scope, send_time, receive_time, create_time, modify_time
id, user_id, merchant_id,msg_type, msg_info, send_receive, send_time, receive_time, create_time, modify_time
</sql>
<insert id="insert">
......@@ -24,11 +23,10 @@
VALUES (
#{chat.id},
#{chat.userId},
#{chat.staffId},
#{chat.merchant_id},
#{chat.msgType},
#{chat.msgInfo},
#{chat.sendReceive},
#{chat.scope},
#{chat.sendTime},
#{chat.receiveTime},
#{chat.createTime},
......@@ -44,8 +42,8 @@
<if test="chat.userId != null">
user_id,
</if>
<if test="chat.staffId != null">
staff_id,
<if test="chat.merchantId != null">
merchant_id,
</if>
<if test="chat.msgType != null">
msg_type,
......@@ -56,9 +54,6 @@
<if test="chat.sendReceive != null">
send_receive,
</if>
<if test="chat.scope != null">
scope,
</if>
<if test="chat.sendTime != null">
send_time,
</if>
......@@ -79,8 +74,9 @@
<if test="chat.userId != null">
#{chat.userId},
</if>
<if test="chat.staffId != null">
#{chat.staffId},
<if test="chat.merchantId != null">
#{chat.merchantId},
</if>
<if test="chat.msgType != null">
#{chat.msgType},
......@@ -91,9 +87,6 @@
<if test="chat.sendReceive != null">
#{chat.sendReceive},
</if>
<if test="chat.scope != null">
#{chat.scope},
</if>
<if test="chat.sendTime != null">
#{chat.sendTime},
</if>
......@@ -112,11 +105,10 @@
<update id="updateById">
UPDATE chat_record_${index}
SET user_id = #{chat.userId},
staff_id = #{chat.staffId},
merchant_id = #{chat.merchantId},
msg_type = #{chat.msgType},
msg_info = #{chat.msgInfo},
send_receive = #{chat.sendReceive},
scope = #{chat.scope},
send_time = #{chat.sendTime},
receive_time = #{chat.receiveTime},
create_time = #{chat.createTime},
......@@ -133,8 +125,8 @@
<if test="chat.userId != null">
#{chat.userId}
</if>
<if test="chat.staffId != null">
#{chat.staffId}
<if test="chat.merchantId != null">
#{chat.merchantId}
</if>
<if test="chat.msgType != null">
#{chat.msgType}
......@@ -145,9 +137,6 @@
<if test="chat.sendReceive != null">
#{chat.sendReceive}
</if>
<if test="chat.scope != null">
#{chat.scope}
</if>
<if test="chat.sendTime != null">
#{chat.sendTime}
</if>
......
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.ym.im.mapper.SessionListMapper">
</mapper>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment