Commit 824eb260 by zhangjw

1:类拆分 细节调整 完善

parent 66908976
......@@ -23,4 +23,8 @@ public class BaseSocketInfo implements Serializable {
public void writeAndFlush(Object object) {
channel.writeAndFlush(object);
}
public void close() {
channel.close();
}
}
package com.ym.im.entity.base;
import io.netty.util.AttributeKey;
/**
* @author: JJww
* @Date:2020/10/13
*/
public class ChannelAttributeKey {
/**
* 角色 用户ID
*/
public static final AttributeKey<Long> ROLE_ID = AttributeKey.valueOf("role_Id");
/**
* 角色类型
*/
public static final AttributeKey<String> ROLE_TYPE = AttributeKey.valueOf("role_type");
/**
* 当前token
*/
public static final AttributeKey<String> TOKEN_INFO = AttributeKey.valueOf("token");
/**
* 客户端当前语言
*/
public static final AttributeKey<String> COL_INFO = AttributeKey.valueOf("col");
}
package com.ym.im.entity.base;
import io.netty.util.AttributeKey;
/**
* @author: JJww
* @Date:2019-05-17
*/
public class NettyConstant {
/**
* ID
*/
public static final AttributeKey<Long> ID = AttributeKey.valueOf("id");
/**
* 用户(0) or 客服(1)
*/
public static final AttributeKey<Integer> TYPE = AttributeKey.valueOf("type");
/**
* 客服ID
*/
public static final AttributeKey<Long> STAFF_ID = AttributeKey.valueOf("staff");
/**
* 当前token
*/
public static final AttributeKey<String> TOKEN_INFO = AttributeKey.valueOf("token");
/**
* 客户端当前语言
*/
public static final AttributeKey<String> COL_INFO = AttributeKey.valueOf("col");
/**
* 连接类型:用户
*/
public static final int CONNECT_TYPE_USER = 0;
/**
* 连接类型:客服
*/
public static final int CONNECT_TYPE_STAFF = 1;
/**
* websocket标识
*/
public static final String CS = "/cs";
/**
* 用户 ws标识
*/
public static final String USER_WS = "app";
/**
* 客服 ws标识
*/
public static final String STAFF_WS = "web";
/**
* Token
*/
public static final String TOKEN = "token";
/**
* 当前语言
*/
public static final String COL = "col";
/**
* 最大线程量
*/
public static final Integer MAX_THREADS = 1024;
......
package com.ym.im.entity;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* 语言类型枚举类
*
......@@ -6,30 +10,16 @@ package com.ym.im.entity;
* @Description
* @date 2019/5/16
*/
@Getter
@AllArgsConstructor
public enum LanguageEnum {
en("en", "英文"),
zh("zh", "中文");
private String key;
private String remark;
LanguageEnum(String key, String remark) {
this.key = key;
this.remark = remark;
}
public String getKey() {
return key;
}
private final String key;
public void setKey(String key) {
this.key = key;
}
private final String remark;
public String getRemark() {
return remark;
}
public void setRemark(String remark) {
this.remark = remark;
}
}
package com.ym.im.entity.enums;
public enum PlatformEnum {
web("web", "后台用户登陆标识"),
app("app", "app用户标识");
private String key;
private String remark;
PlatformEnum(String key, String remark) {
this.key = key;
this.remark = remark;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getRemark() {
return remark;
}
public void setRemark(String remark) {
this.remark = remark;
}
}
package com.ym.im.entity.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author JJww
*/
@Getter
@AllArgsConstructor
public enum RoleEnum {
APP("app", 0),
merchant("mer", 1),
system("sys", 2);
private final String type;
private final Integer key;
public static RoleEnum get(String type) {
for (RoleEnum roleEnum : values()) {
if (roleEnum.getType().equals(type)) {
//获取指定的枚举
return roleEnum;
}
}
return null;
}
public static RoleEnum get(Integer key) {
for (RoleEnum roleEnum : values()) {
if (roleEnum.getKey().equals(key)) {
//获取指定的枚举
return roleEnum;
}
}
return null;
}
}
package com.ym.im.factory;
import com.ym.im.entity.base.NettyConstant;
import com.ym.im.entity.enums.RoleEnum;
import com.ym.im.service.ChatService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -24,23 +24,17 @@ public class SingleChatFactory {
* @param type
* @return
*/
public ChatService getService(Integer type) {
public ChatService getService(String type) {
if (type == null) {
return null;
}
switch (type) {
case NettyConstant.CONNECT_TYPE_USER:
switch (RoleEnum.get(type)) {
case APP:
return userSingleChatServiceImpl;
case NettyConstant.CONNECT_TYPE_STAFF:
case merchant:
return staffSingleChatServiceImpl;
default:
return null;
}
return null;
}
}
package com.ym.im.handler;
import com.ym.im.entity.base.ChannelAttributeKey;
import com.ym.im.entity.base.NettyConstant;
import com.ym.im.factory.SingleChatFactory;
import io.netty.channel.ChannelHandlerContext;
......@@ -17,12 +18,12 @@ public abstract class BaseHandler<T> extends SimpleChannelInboundHandler<T> {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
singleChatFactory.getService(ctx.channel().attr(NettyConstant.TYPE).get()).offline(ctx);
singleChatFactory.getService(ctx.channel().attr(ChannelAttributeKey.ROLE_TYPE).get()).offline(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
singleChatFactory.getService(ctx.channel().attr(NettyConstant.TYPE).get()).offline(ctx);
singleChatFactory.getService(ctx.channel().attr(ChannelAttributeKey.ROLE_TYPE).get()).offline(ctx);
}
}
package com.ym.im.handler;
import com.ym.im.entity.base.BaseSocketInfo;
import com.ym.im.entity.MsgBody;
import com.ym.im.entity.base.BaseSocketInfo;
import com.ym.im.entity.base.ChannelAttributeKey;
import com.ym.im.entity.base.NettyConstant;
import com.ym.im.entity.enums.PlatformEnum;
import com.ym.im.entity.enums.RoleEnum;
import com.ym.im.exception.HttpException;
import com.ym.im.factory.SingleChatFactory;
import com.ym.im.service.ChannelGroupService;
......@@ -17,6 +18,7 @@ import org.springframework.stereotype.Component;
import java.util.Optional;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
......@@ -33,6 +35,10 @@ public class WebSocketHandshakerHandler extends BaseHandler<FullHttpRequest> {
@Autowired
private SingleChatFactory singleChatFactory;
public static final String ROLE_TYPE = "roleType";
public static final String MERCHANT_ID = "merchantId";
public static final String AUTHORIZATION = "Authorization";
@Override
......@@ -40,25 +46,36 @@ public class WebSocketHandshakerHandler extends BaseHandler<FullHttpRequest> {
final String token = fullHttpRequest.headers().get(AUTHORIZATION);
Optional.ofNullable(token).orElseThrow(() -> new HttpException(ctx, fullHttpRequest, new DefaultFullHttpResponse(HTTP_1_1, UNAUTHORIZED)));
Long userId = 0L;//待完善鉴权逻辑
Integer type = 1;
ctx.channel().attr(NettyConstant.ID).set(userId);
ctx.channel().attr(NettyConstant.TOKEN_INFO).set(token);
this.sso(token, userId, "tokenInfo[0]");
singleChatFactory.getService(type).init(ctx);
final String roleType = fullHttpRequest.headers().get(ROLE_TYPE, null);
Optional.ofNullable(roleType).orElseThrow(() -> new HttpException(ctx, fullHttpRequest, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)));
final Long merchantId = Long.valueOf(fullHttpRequest.headers().get(MERCHANT_ID, null));
Optional.ofNullable(merchantId).orElseThrow(() -> new HttpException(ctx, fullHttpRequest, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)));
final Long userId = 123L;//待完善鉴权逻辑
ctx.channel().attr(ChannelAttributeKey.ROLE_ID).set(userId);
ctx.channel().attr(ChannelAttributeKey.TOKEN_INFO).set(token);
ctx.channel().attr(ChannelAttributeKey.ROLE_TYPE).set(roleType);
this.sso(token, userId, roleType);
singleChatFactory.getService(roleType).init(ctx);
fullHttpRequest.setUri(NettyConstant.CS);
ctx.fireChannelRead(fullHttpRequest.retain());
log.info(userId + " 上线");
}
private void sso(String token, Long userId, String type) {
final BaseSocketInfo baseSocketInfo = PlatformEnum.app.getKey().equals(type) ? ChannelGroupService.USER_GROUP.get(userId) : ChannelGroupService.STAFF_GROUP.get(userId);
private void sso(String token, Long roleId, String type) {
BaseSocketInfo baseSocketInfo = null;
switch (RoleEnum.get(type)) {
case APP:
baseSocketInfo = ChannelGroupService.USER_GROUP.get(roleId);
break;
case merchant:
baseSocketInfo = ChannelGroupService.STAFF_GROUP.get(roleId);
break;
}
if (baseSocketInfo != null && !token.equals(baseSocketInfo.getToken())) {
baseSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.FORCEDOFFLINE));
baseSocketInfo.getChannel().close();
ChannelGroupService.USER_GROUP.remove(userId);
log.info("用户: " + userId + " 被迫下线");
baseSocketInfo.close();
ChannelGroupService.USER_GROUP.remove(roleId);//待完善
log.info("用户: " + roleId + " 被迫下线");
}
}
}
package com.ym.im.mq;
import com.alibaba.fastjson.JSON;
import com.ym.im.entity.*;
import com.ym.im.entity.ChatRecord;
import com.ym.im.entity.MsgBody;
import com.ym.im.entity.StaffSocketInfo;
import com.ym.im.entity.UserSocketInfo;
import com.ym.im.entity.base.NettyConstant;
import com.ym.im.entity.enums.RoleEnum;
import com.ym.im.entity.model.IdModel;
import com.ym.im.entity.model.OrderModel;
import com.ym.im.service.ChannelGroupService;
......@@ -88,7 +92,7 @@ public class Receiver {
final UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(Long.valueOf(userId));
if (userSocketInfo != null) {
userSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.LOGOUT));
userSocketInfo.getChannel().close();
userSocketInfo.close();
log.info("用户: " + userId + "被禁用");
}
}
......@@ -140,9 +144,9 @@ public class Receiver {
*/
public void retry(MsgBody<ChatRecord> msgBody) throws IOException {
ChatRecord chatRecord = msgBody.getData();
Long userId = Long.valueOf(chatRecord.getUserId());
String recordId = String.valueOf(chatRecord.getId());
final ChatRecord chatRecord = msgBody.getData();
final Long userId = Long.valueOf(chatRecord.getUserId());
final String recordId = String.valueOf(chatRecord.getId());
if (msgBody != null && chatRecord.getRetryCount().intValue() < NettyConstant.RETRY_COUNT.intValue()) {
......@@ -151,18 +155,15 @@ public class Receiver {
return;
}
MsgBody<ChatRecord> msg = JsonUtils.json2Obj(
String.valueOf(redisTemplate.opsForHash().get(NettyConstant.MSG_KEY + userId, recordId)),
JsonUtils.getJavaType(MsgBody.class, ChatRecord.class));
MsgBody<ChatRecord> msg = JsonUtils.json2Obj(String.valueOf(redisTemplate.opsForHash().get(NettyConstant.MSG_KEY + userId, recordId)), JsonUtils.getJavaType(MsgBody.class, ChatRecord.class));
if (msg != null && msg.getStatus().equals(MsgBody.HAVE_READ)) {
redisTemplate.opsForHash().delete(NettyConstant.MSG_KEY + userId, recordId);
return;
}
final Integer sendReceive = chatRecord.getSendReceive();
switch (RoleEnum.get(sendReceive)) {
switch (chatRecord.getSendReceive()) {
case NettyConstant.CONNECT_TYPE_USER:
case APP:
Long staffId = userSocketInfo.getStaffId() == null ? chatRecord.getStaffId() : userSocketInfo.getStaffId();
StaffSocketInfo staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(staffId);
if (staffSocketInfo != null) {
......@@ -170,7 +171,7 @@ public class Receiver {
}
break;
case NettyConstant.CONNECT_TYPE_STAFF:
case merchant:
userSocketInfo.writeAndFlush(msgBody);
break;
......
......@@ -3,6 +3,7 @@ package com.ym.im.service.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.ym.im.entity.ChatRecord;
import com.ym.im.entity.MsgBody;
import com.ym.im.entity.base.ChannelAttributeKey;
import com.ym.im.entity.base.NettyConstant;
import com.ym.im.factory.SingleChatFactory;
import com.ym.im.mq.Queue;
......@@ -48,13 +49,13 @@ public class MsgBodyServiceImpl implements MsgBodyService {
@Override
public void msgBodyHandle(@NotNull ChannelHandlerContext ctx, @Valid MsgBody<ChatRecord> msgBody) throws JsonProcessingException {
final ChatService chatService = singleChatFactory.getService(ctx.channel().attr(NettyConstant.TYPE).get());
final ChatService chatService = singleChatFactory.getService(ctx.channel().attr(ChannelAttributeKey.ROLE_TYPE).get());
switch (msgBody.getStatus()) {
case SEND_MSG:
// 获取用户、客服Id
final Long id = ctx.channel().attr(NettyConstant.ID).get();
final Long id = ctx.channel().attr(ChannelAttributeKey.ROLE_ID).get();
// 先保存聊天消息
chatService.save(id, msgBody);
// 再发送回执
......
......@@ -2,7 +2,11 @@ package com.ym.im.service.impl;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.ym.im.entity.*;
import com.ym.im.entity.ChatRecord;
import com.ym.im.entity.MsgBody;
import com.ym.im.entity.StaffSocketInfo;
import com.ym.im.entity.UserSocketInfo;
import com.ym.im.entity.base.ChannelAttributeKey;
import com.ym.im.entity.base.NettyConstant;
import com.ym.im.entity.model.IdModel;
import com.ym.im.mq.Queue;
......@@ -55,11 +59,10 @@ public class StaffSingleChatServiceImpl implements ChatService {
@Override
public void init(ChannelHandlerContext ctx) {
ctx.channel().attr(NettyConstant.TYPE).set(NettyConstant.CONNECT_TYPE_STAFF);
final Long staffId = ctx.channel().attr(NettyConstant.ID).get();
final Long staffId = ctx.channel().attr(ChannelAttributeKey.ROLE_ID).get();
StaffSocketInfo staffSocketInfo = new StaffSocketInfo();
staffSocketInfo.setStaffId(staffId);
staffSocketInfo.setToken(ctx.channel().attr(NettyConstant.TOKEN_INFO).get());
staffSocketInfo.setToken(ctx.channel().attr(ChannelAttributeKey.TOKEN_INFO).get());
staffSocketInfo.setChannel((NioSocketChannel) ctx.channel());
staffSocketInfo.setUserIds(redisTemplate.opsForSet().members(NettyConstant.STAFF_USERIDS_KEY + staffId));
ChannelGroupService.STAFF_GROUP.put(staffId, staffSocketInfo);
......@@ -69,9 +72,9 @@ public class StaffSingleChatServiceImpl implements ChatService {
@Override
public void offline(ChannelHandlerContext ctx) {
final Long staffId = ctx.channel().attr(NettyConstant.ID).get();
final Long staffId = ctx.channel().attr(ChannelAttributeKey.ROLE_ID).get();
final StaffSocketInfo staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(staffId);
if (ctx.channel().attr(NettyConstant.TOKEN_INFO).get().equals(staffSocketInfo.getToken())) {
if (ctx.channel().attr(ChannelAttributeKey.TOKEN_INFO).get().equals(staffSocketInfo.getToken())) {
final Set<Long> userIds = staffSocketInfo.getUserIds();
if (userIds.size() != 0) {
final String userListKey = NettyConstant.STAFF_USERIDS_KEY + staffId;
......@@ -80,7 +83,7 @@ public class StaffSingleChatServiceImpl implements ChatService {
queue.staffOfflineQueue(new StaffSocketInfo(staffId, userIds)); //NioSocketChannel无法序列化 所以new StaffSocketInfo
}
ChannelGroupService.STAFF_GROUP.remove(staffId);
ctx.channel().close();
ctx.close();
log.info("客服: " + staffId + " 下线:");
}
}
......
......@@ -2,7 +2,11 @@ package com.ym.im.service.impl;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.ym.im.entity.*;
import com.ym.im.entity.ChatRecord;
import com.ym.im.entity.MsgBody;
import com.ym.im.entity.StaffSocketInfo;
import com.ym.im.entity.UserSocketInfo;
import com.ym.im.entity.base.ChannelAttributeKey;
import com.ym.im.entity.base.NettyConstant;
import com.ym.im.entity.model.IdModel;
import com.ym.im.service.*;
......@@ -42,21 +46,20 @@ public class UserSingleChatServiceImpl implements ChatService {
private StaffService staffService;
@Autowired
private ChatRecordService chatRecordService;
private MsgBodyService msgBodyService;
@Autowired
private MsgBodyService msgBodyService;
private ChatRecordService chatRecordService;
@Override
public void init(ChannelHandlerContext ctx) {
ctx.channel().attr(NettyConstant.TYPE).set(NettyConstant.CONNECT_TYPE_USER);
final Long userId = ctx.channel().attr(NettyConstant.ID).get();
final Long userId = ctx.channel().attr(ChannelAttributeKey.ROLE_ID).get();
UserSocketInfo userSocketInfo = new UserSocketInfo();
userSocketInfo.setUserId(userId);
userSocketInfo.setChannel((NioSocketChannel) ctx.channel());
userSocketInfo.setCol(ctx.channel().attr(NettyConstant.COL_INFO).get());
userSocketInfo.setToken(ctx.channel().attr(NettyConstant.TOKEN_INFO).get());
userSocketInfo.setCol(ctx.channel().attr(ChannelAttributeKey.COL_INFO).get());
userSocketInfo.setToken(ctx.channel().attr(ChannelAttributeKey.TOKEN_INFO).get());
userSocketInfo.setPushToken(null);
ChannelGroupService.USER_GROUP.put(userId, userSocketInfo);
//恢复历史绑定关系
......@@ -71,13 +74,13 @@ public class UserSingleChatServiceImpl implements ChatService {
@Override
public void offline(ChannelHandlerContext ctx) {
final Long userId = ctx.channel().attr(NettyConstant.ID).get();
final Long userId = ctx.channel().attr(ChannelAttributeKey.ROLE_ID).get();
final UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(userId);
if (userSocketInfo != null && ctx.channel().attr(NettyConstant.TOKEN_INFO).get().equals(userSocketInfo.getToken())) {
if (userSocketInfo != null && ctx.channel().attr(ChannelAttributeKey.TOKEN_INFO).get().equals(userSocketInfo.getToken())) {
final Long staffId = userSocketInfo.getStaffId();
ChannelGroupService.USER_GROUP.remove(userId);
ctx.channel().close();
ctx.close();
if (staffId != null && ChannelGroupService.STAFF_GROUP.get(staffId) != null) {
// 保存最后的客服
redisTemplate.opsForHash().put(NettyConstant.IM_USERS, userId, staffId);
......@@ -210,4 +213,5 @@ public class UserSingleChatServiceImpl implements ChatService {
staffSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.USERS_ONLINE).setData(new IdModel().setStaffId(staffId).setUserId(userId)));
});
}
}
......@@ -2,7 +2,7 @@ spring:
profiles:
active: dev
application:
name: pathfinder-im
name: customer-service
# 404抛出异常
mvc:
throw-exception-if-no-handler-found: true
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment