Commit e23ba628 by zhangjw

细节调整

parent ba875c2a
......@@ -110,18 +110,19 @@
<artifactId>springfox-swagger-ui</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.cloud</groupId>-->
<!-- <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
</dependency>
</dependencies>
<build>
......
package com.ym.im.config;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import org.springframework.context.annotation.Bean;
......@@ -14,16 +13,6 @@ import org.springframework.context.annotation.Configuration;
@Configuration
public class NettyConfig {
public static final String ROOM = "/room";
public static final Integer MAX_THREADS = 1024;
public static final Integer MAX_FRAME_LENGTH = 65535;
public static final AttributeKey<Long> USER_ID = AttributeKey.valueOf("userId");
/**
* 负责TCP连接建立操作 绝对不能阻塞
*
......
......@@ -34,13 +34,11 @@ public class MessageDecoder extends MessageToMessageDecoder<TextWebSocketFrame>
protected void decode(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame, List out) throws Exception {
final String jsonStr = textWebSocketFrame.text();
MsgBody<ChatRecord> msgBody = new MsgBody<>(MsgBody.ERROR);
MsgBody<ChatRecord> msgBody = new MsgBody<ChatRecord>().setStatus(MsgBody.ERROR);
try {
msgBody = JsonUtils.json2Obj(jsonStr, MsgBody.class, ChatRecord.class);
} catch (IOException e) {
log.error("Json转{}异常:\r\n" +
"Json原串:{}\r\n" +
"===异常栈信息===", "MsgBody.class", jsonStr, e);
log.error("Json转{}异常:\r\n" + "Json原串:{}\r\n" + "===异常栈信息===", "MsgBody.class", jsonStr, e);
msgBody.setStatus(MsgBody.ERROR);
msgBody.setMessage("错误的Json格式:" + jsonStr);
ctx.channel().writeAndFlush(msgBody);
......
package com.ym.im.core;
import com.ym.im.config.NettyConfig;
import com.ym.im.entity.NettyConstant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
......@@ -42,7 +43,7 @@ public class NettyServer {
@PostConstruct
public void start() throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.option(ChannelOption.SO_BACKLOG, NettyConfig.MAX_THREADS);//设置线程数
serverBootstrap.option(ChannelOption.SO_BACKLOG, NettyConstant.MAX_THREADS);//设置线程数
serverBootstrap.group(bossGroup, workerGroup)//绑定线程池
.channel(NioServerSocketChannel.class)//NioServerSocketChannel基于TCP协议的数据处理
.localAddress(port) // 绑定监听端口
......@@ -51,7 +52,7 @@ public class NettyServer {
.childHandler(webSocketChannelInitializer); // 添加处理器
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
if (channelFuture.isSuccess()) {
log.info("RTC_Endpoint 启动完毕,IP:{}", port);
log.info("客服服务 启动完毕,IP:{}", port);
}
}
......@@ -60,6 +61,6 @@ public class NettyServer {
public void destroy() {
bossGroup.shutdownGracefully().syncUninterruptibly();
workerGroup.shutdownGracefully().syncUninterruptibly();
log.info("RTC_Endpoint 已关闭,IP:{}", port);
log.info("客服服务 已关闭,IP:{}", port);
}
}
......@@ -2,16 +2,16 @@ package com.ym.im.core;
import com.ym.im.entity.NettyConstant;
import com.ym.im.handler.IndexHandler;
import com.ym.im.handler.SingleChatHandler;
import com.ym.im.handler.WebSocketHandshakerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.concurrent.EventExecutorGroup;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
/**
......@@ -19,34 +19,34 @@ import org.springframework.stereotype.Component;
* @Date:2019-01-21
*/
@Component
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
public class WebSocketChannelInitializer extends ChannelInitializer<NioSocketChannel> {
@Autowired
private SingleChatHandler singleChatHandler;
private MessageEncoder messageEncoder;
@Autowired
private IndexHandler indexHandler;
private MessageDecoder messageDecoder;
@Autowired
private MessageEncoder messageEncoder;
@Qualifier("businessGroup")
private EventExecutorGroup businessGroup;
@Autowired
private MessageDecoder messageDecoder;
private SingleChatHandler singleChatHandler;
@Autowired
private WebSocketHandshakerHandler webSocketHandshakerHandler;
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ChannelPipeline pipeline = sc.pipeline();
//websocket协议本身是基于http协议的,所以要使用http解编码器
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(NettyConstant.MAX_FRAME_LENGTH));
pipeline.addLast(indexHandler);
pipeline.addLast(new WebSocketServerProtocolHandler(NettyConstant.WS));
pipeline.addLast(messageDecoder);
pipeline.addLast(messageEncoder);
pipeline.addLast(singleChatHandler);
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
.addLast(new HttpServerCodec(),
new HttpObjectAggregator(NettyConstant.MAX_FRAME_LENGTH),
webSocketHandshakerHandler,
new WebSocketServerProtocolHandler(NettyConstant.CS),
messageDecoder,
messageEncoder)
.addLast(businessGroup, singleChatHandler);//复杂业务绑定businessGroup
}
}
......@@ -2,6 +2,7 @@ package com.ym.im.entity;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
......@@ -10,6 +11,7 @@ import java.io.Serializable;
* @Date:2019-07-17
*/
@Data
@Accessors(chain = true)
public class BaseSocketInfo implements Serializable {
private static final long serialVersionUID = 1L;
......@@ -17,4 +19,8 @@ public class BaseSocketInfo implements Serializable {
public String token;
public NioSocketChannel channel;
public void writeAndFlush(Object object) {
channel.writeAndFlush(object);
}
}
package com.ym.im.entity;
/**
* @author 陈俊雄
* @date 2019/5/31
**/
public enum IMessageConstant {
MISSING_PARAMETERS_USER_ID(0, "成功");
private int code;
private String msg;
IMessageConstant(int code, String msg) {
this.code = code;
this.msg = msg;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
......@@ -2,6 +2,7 @@ package com.ym.im.entity;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.experimental.Accessors;
import javax.validation.constraints.NotNull;
......@@ -10,6 +11,7 @@ import javax.validation.constraints.NotNull;
* @Date:2019-06-27
*/
@Data
@Accessors(chain = true)
public class IdModel {
@ApiModelProperty(value = "客服Id")
......@@ -20,8 +22,4 @@ public class IdModel {
@NotNull(message = "{error.userId_empty}")
private Long userId;
public IdModel(Long staffId, Long userId) {
this.staffId = staffId;
this.userId = userId;
}
}
......@@ -53,20 +53,4 @@ public class MsgBody<T> implements Serializable {
@ApiModelProperty(value = "消息描述")
private String message;
public MsgBody() {
}
public MsgBody(@NotNull(message = "{error.msg_body_status_empty}", groups = MsgBodyGroup.class) Integer status, @Valid @NotNull(message = "{error.msg_body_data_empty}", groups = MsgBodyGroup.class) T data) {
this.status = status;
this.data = data;
}
public MsgBody(@NotNull(message = "{error.msg_body_status_empty}", groups = MsgBodyGroup.class) Integer status) {
this.status = status;
}
public MsgBody(@NotNull(message = "{error.msg_body_status_empty}", groups = MsgBodyGroup.class) Integer status, String message) {
this.status = status;
this.message = message;
}
}
\ No newline at end of file
......@@ -2,9 +2,6 @@ package com.ym.im.entity;
import io.netty.util.AttributeKey;
import java.util.HashMap;
import java.util.Map;
/**
* @author: JJww
* @Date:2019-05-17
......@@ -50,9 +47,9 @@ public class NettyConstant {
public static final int CONNECT_TYPE_STAFF = 1;
/**
* ws标识
* websocket标识
*/
public static final String WS = "/ws";
public static final String CS = "/cs";
/**
* 用户 ws标识
......@@ -105,12 +102,6 @@ public class NettyConstant {
*/
public static final Integer RETRY_COUNT = 3;
public static final Map<String, Integer> CONNECT_TYPE = new HashMap<String, Integer>() {
{
put("app", 0);
put("web", 1);
}
};
}
......@@ -24,7 +24,4 @@ public class UserSocketInfo extends BaseSocketInfo {
private String col;
public UserSocketInfo() {
}
}
......@@ -3,8 +3,7 @@ package com.ym.im.entity.base;
/**
* 自定义请求状态码
*
* @author hjp
* @date 2019/5/16
* @author JJww
*/
public enum ResultStatus {
/**
......@@ -16,18 +15,8 @@ public enum ResultStatus {
THERE_IS_NO_ONLINE_CUSTOMER_SERVICE(10000, "There is no online customer service", "当前没有客服在线"),
MOBILE_NO_REGISTER(20000, "The mobile phone number is registered", "手机号未注册"),
MOBILE_REGISTER(20001, "Mobile phone number has been registered", "手机号已注册"),
VERIFICATION_CODE_FAILURE(20002, "The captcha is invalid or incorrect", "验证码失效或不正确"),
USER_INFO_NO_EXIST(20003, "User information does not exist", "用户信息不存在"),
WRONG_PASSWORD(20004, "wrong password", "密码错误"),
USER_FORBIDDEN(20005, "The user has been blocked", "手机号已被封禁"),
USER_NAME_REPEAT(20006, "The user has been exited", "用户账号名称已存在"),
REPEAT_COMMIT(20007, "your commit is repeat", "重复提交"),
HAS_BEEN_ENTERPRISE(20008, "Has been enterprise users", "已经是企业用户"),
NO_THIS_AIRLINE(20009,"This AirLine does not exist","该航班不存在"),
HAVE_ROUTE_CANNOT_DEL(30001, "Route binding cannot be delete", "有路线绑定无法删除"),
SERVICE_ROUTE_NO_EXIST(40001, "Service routing does not exist", "服务路线不存在"),
VEHICLE_TYPE_NO_EXIST(40002, "Vehicle type does not exist", "车辆类型不存在"),
SERVICE_ROUTE_VEHICLE_NO_EXIST(40003, "Service route does not exist with vehicle type association", "服务路线与车辆类型关联不存在"),
FORWARD_FAILURE(40004, "Failed to forward, customer service or user offline", "转发失败,客服或用户已下线"),
CHECK_FAILURE(40005, "This user has binding other staff", "此用户已经绑定其他客服"),
PARAM_ERROR(9999, "Parameters error", "参数有误"),
......
......@@ -32,13 +32,4 @@ public class OrderModel implements Serializable {
* 用户Id
*/
private String userId;
public OrderModel(String type, String orderId, String orderStatus, String userId) {
this.type = type;
this.orderId = orderId;
this.orderStatus = orderStatus;
this.userId = userId;
}
}
package com.ym.im.exception;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
/**
* @author: JJww
* @Date:2020/6/3
*/
@Slf4j
public class HttpException extends RuntimeException {
public HttpException(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) {
super(response.toString());
sendHttpResponse(ctx, request, response);
log.error("HttpException: " + response.toString());
}
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) {
if (response.status().code() != HttpResponseStatus.OK.code()) {
ByteBuf buf = Unpooled.copiedBuffer(response.status().toString(), CharsetUtil.UTF_8);
response.content().writeBytes(buf);
buf.release();
response.headers();
HttpUtil.setContentLength(response, response.content().readableBytes());
}
ChannelFuture f = ctx.channel().writeAndFlush(response);
if (!HttpUtil.isKeepAlive(request) || response.status().code() != HttpResponseStatus.OK.code()) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
}
......@@ -53,4 +53,6 @@ public class SingleChatHandler extends BaseHandler<MsgBody<ChatRecord>> {
e.printStackTrace();
}
}
}
package com.ym.im.handler;
import com.ym.im.entity.BaseSocketInfo;
import com.ym.im.entity.MsgBody;
import com.ym.im.entity.NettyConstant;
import com.ym.im.entity.enums.PlatformEnum;
import com.ym.im.exception.HttpException;
import com.ym.im.factory.SingleChatFactory;
import com.ym.im.service.ChannelGroupService;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Optional;
import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
/**
* @author: JJww
* @Date:2020/10/10
*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class WebSocketHandshakerHandler extends BaseHandler<FullHttpRequest> {
@Autowired
private SingleChatFactory singleChatFactory;
public static final String AUTHORIZATION = "Authorization";
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws Exception {
final String token = fullHttpRequest.headers().get(AUTHORIZATION);
Optional.ofNullable(token).orElseThrow(() -> new HttpException(ctx, fullHttpRequest, new DefaultFullHttpResponse(HTTP_1_1, UNAUTHORIZED)));
Long userId = 0L;//待完善鉴权逻辑
Integer type = 1;
ctx.channel().attr(NettyConstant.ID).set(userId);
ctx.channel().attr(NettyConstant.TOKEN_INFO).set(token);
this.sso(token, userId, "tokenInfo[0]");
singleChatFactory.getService(type).init(ctx);
fullHttpRequest.setUri(NettyConstant.CS);
ctx.fireChannelRead(fullHttpRequest.retain());
log.info(userId + " 上线");
}
private void sso(String token, Long userId, String type) {
final BaseSocketInfo baseSocketInfo = PlatformEnum.app.getKey().equals(type) ? ChannelGroupService.USER_GROUP.get(userId) : ChannelGroupService.STAFF_GROUP.get(userId);
if (baseSocketInfo != null && !token.equals(baseSocketInfo.getToken())) {
baseSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.FORCEDOFFLINE));
baseSocketInfo.getChannel().close();
ChannelGroupService.USER_GROUP.remove(userId);
log.info("用户: " + userId + " 被迫下线");
}
}
}
......@@ -66,7 +66,7 @@ public class Receiver {
if (ChannelGroupService.USER_GROUP.get(userId) != null) {
final StaffSocketInfo idleStaff = staffService.getIdleStaff(userId);
if (idleStaff != null) {
idleStaff.getChannel().writeAndFlush(new MsgBody<>(MsgBody.DISTRIBUTION_STAFF, new IdModel(staffId, userId)));
idleStaff.writeAndFlush(new MsgBody<>().setStatus(MsgBody.DISTRIBUTION_STAFF).setData(new IdModel().setStaffId(staffId).setUserId(userId)));
}
}
});
......@@ -85,7 +85,7 @@ public class Receiver {
final UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(Long.valueOf(userId));
if (userSocketInfo != null) {
userSocketInfo.getChannel().writeAndFlush(new MsgBody<>(MsgBody.LOGOUT));
userSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.LOGOUT));
userSocketInfo.getChannel().close();
log.info("用户: " + userId + "被禁用");
}
......@@ -106,12 +106,12 @@ public class Receiver {
return;
}
StaffSocketInfo staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(userSocketInfo.getStaffId());
final MsgBody<OrderModel> orderInfo = new MsgBody<>(MsgBody.ORDER, orderModel);
final MsgBody<OrderModel> orderInfo = new MsgBody<OrderModel>().setStatus(MsgBody.ORDER).setData(orderModel);
/**
* 绑定客服在线,发送订单信息
*/
if (staffSocketInfo != null) {
staffSocketInfo.getChannel().writeAndFlush(orderInfo);
staffSocketInfo.writeAndFlush(orderInfo);
log.info("客服订单: " + "给客服(" + staffSocketInfo.getStaffId() + ")发送订单:" + orderInfo.toString());
return;
}
......@@ -123,7 +123,7 @@ public class Receiver {
log.info("客服订单: " + "尝试给历史客服(" + staffId + ")发送订单:" + orderInfo.toString());
staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(staffId);
if (staffSocketInfo != null) {
staffSocketInfo.getChannel().writeAndFlush(orderInfo);
staffSocketInfo.writeAndFlush(orderInfo);
log.info("客服订单: " + "给历史客服(" + staffId + ")发送订单:" + orderInfo.toString());
}
}
......@@ -164,12 +164,12 @@ public class Receiver {
Long staffId = userSocketInfo.getStaffId() == null ? chatRecord.getStaffId() : userSocketInfo.getStaffId();
StaffSocketInfo staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(staffId);
if (staffSocketInfo != null) {
staffSocketInfo.getChannel().writeAndFlush(msgBody);
staffSocketInfo.writeAndFlush(msgBody);
}
break;
case NettyConstant.CONNECT_TYPE_STAFF:
userSocketInfo.getChannel().writeAndFlush(msgBody);
userSocketInfo.writeAndFlush(msgBody);
break;
default:
......
......@@ -65,13 +65,14 @@ public class ChatRecordServiceImpl implements ChatRecordService {
pull.setChatRecords(chatRecords);
pull.setPages(pageInfo.getPages());
pull.setTotal(pageInfo.getTotal());
return new MsgBody<>(ResultStatus.SUCCESS.getCode(), pull);
return new MsgBody<PullChatRecord>().setStatus(ResultStatus.SUCCESS.getCode()).setData(pull);
}
@Override
public MsgBody<List<ChatRecord>> updateReceiveTime(List<ChatRecord> chats) {
chatRecordMapper.updateBatchReceiveTimeById(chats, remainder(chats.get(0).getUserId()));
return new MsgBody<>(ResultStatus.SUCCESS.getCode(), chats);
return new MsgBody<List<ChatRecord>>().setStatus(ResultStatus.SUCCESS.getCode()).setData(chats);
}
/**
......
......@@ -50,7 +50,7 @@ public class StaffServiceImpl implements StaffService {
if (userSocketInfo != null) {
userSocketInfo.setStaffId(staffId);
//通知用户 新的客服
userSocketInfo.getChannel().writeAndFlush(new MsgBody<>(MsgBody.DISTRIBUTION_STAFF, new IdModel(staffId, userId)));
userSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.DISTRIBUTION_STAFF).setData(new IdModel().setStaffId(staffId).setUserId(userId)));
}
return staffSocketInfo;
}
......@@ -62,30 +62,29 @@ public class StaffServiceImpl implements StaffService {
final UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(idModel.getUserId());
final StaffSocketInfo staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(idModel.getStaffId());
if (staffSocketInfo == null || userSocketInfo == null) {
return new MsgBody(ResultStatus.FORWARD_FAILURE.getCode(), ResultStatus.FORWARD_FAILURE.getMessage());
return new MsgBody<>().setStatus(MsgBody.BINDINGFAILURE).setMessage(ResultStatus.FORWARD_FAILURE.getMessage());
}
//移除原客服绑定
ChannelGroupService.STAFF_GROUP.get(userSocketInfo.getStaffId()).getUserIds().remove(idModel.getUserId());
//设置新的客服
staffSocketInfo.getUserIds().add(idModel.getUserId());
userSocketInfo.setStaffId(idModel.getStaffId());
final MsgBody<IdModel> msgBody = new MsgBody<>(MsgBody.DISTRIBUTION_STAFF, idModel);
final MsgBody<IdModel> msgBody = new MsgBody<IdModel>().setStatus(MsgBody.DISTRIBUTION_STAFF).setData(idModel);
//通知用户 新客服ID
userSocketInfo.getChannel().writeAndFlush(msgBody);
userSocketInfo.writeAndFlush(msgBody);
//通知新客服
staffSocketInfo.getChannel().writeAndFlush(msgBody);
return new MsgBody(ResultStatus.SUCCESS.getCode());
staffSocketInfo.writeAndFlush(msgBody);
return new MsgBody<>().setData(ResultStatus.SUCCESS.getCode());
}
@Override
public MsgBody getStaffList() {
List<StaffSocketInfo> staffs = new ArrayList();
List<StaffSocketInfo> staffs = new ArrayList<StaffSocketInfo>();
ChannelGroupService.STAFF_GROUP.forEach((k, v) -> {
staffs.add(v);
});
return new MsgBody(ResultStatus.SUCCESS.getCode(), staffs);
return new MsgBody<>().setStatus(ResultStatus.SUCCESS.getCode()).setData(staffs);
}
......
package com.ym.im.service.impl;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.ym.im.entity.*;
import com.ym.im.entity.model.PushTokenAndTypeModel;
import com.ym.im.mq.Queue;
import com.ym.im.service.ChannelGroupService;
import com.ym.im.service.ChatRecordService;
......@@ -21,8 +21,6 @@ import org.springframework.validation.annotation.Validated;
import javax.annotation.Resource;
import javax.validation.Valid;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static com.ym.im.entity.ChatRecord.RECEIVE;
......@@ -52,7 +50,6 @@ public class StaffSingleChatServiceImpl implements ChatService {
// private PushGatherService pushService;
@Override
public void init(ChannelHandlerContext ctx) {
......@@ -103,12 +100,12 @@ public class StaffSingleChatServiceImpl implements ChatService {
if (currentStaffId == null) {
//通知用户 新的客服
userSocketInfo.setStaffId(id);
userSocketInfo.getChannel().writeAndFlush(new MsgBody<>(MsgBody.DISTRIBUTION_STAFF, new IdModel(id, userId)));
userSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.DISTRIBUTION_STAFF).setData(new IdModel().setStaffId(id).setUserId(userId)));
ChannelGroupService.STAFF_GROUP.get(id).getUserIds().add(userId);
}
if (currentStaffId != null && !currentStaffId.equals(id)) {
//通知客服 绑定失败 当前用户已绑定客服
ChannelGroupService.STAFF_GROUP.get(id).getChannel().writeAndFlush(new MsgBody<>(MsgBody.BINDINGFAILURE, new IdModel(currentStaffId, userId)));
ChannelGroupService.STAFF_GROUP.get(id).writeAndFlush(new MsgBody<>().setStatus(MsgBody.BINDINGFAILURE).setData(new IdModel().setStaffId(currentStaffId).setUserId(userId)));
return null;
}
return userSocketInfo.getChannel();
......@@ -120,10 +117,10 @@ public class StaffSingleChatServiceImpl implements ChatService {
public void save(Long id, @Valid MsgBody<ChatRecord> msgBody) {
// 设置聊天基本信息
final ChatRecord record = msgBody.getData();
record.setId(null);
record.setSendReceive(RECEIVE);
record.setStaffId(id);
record.setCreateTime(new Date());
record.setId(IdWorker.getId())
.setSendReceive(RECEIVE)
.setStaffId(id)
.setCreateTime(new Date());
// 先保存至数据库,再发送消息(若颠倒顺序可能导致数据未保存,更新已读操作先执行导致消息一直是未读状态
chatRecordService.insertSelective(record);
log.info("客服 消息保存:" + record.getId());
......@@ -138,10 +135,8 @@ public class StaffSingleChatServiceImpl implements ChatService {
*/
@Override
public void send(NioSocketChannel channel, MsgBody<ChatRecord> msgBody) throws JsonProcessingException {
msgBodyService.sendAndAck(channel, msgBody);
pushNotifications(msgBody.getData().getUserId());
}
......@@ -159,7 +154,7 @@ public class StaffSingleChatServiceImpl implements ChatService {
chatRecordService.updateReceiveTime(record);
UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(record.getUserId());
if (userSocketInfo != null) {
userSocketInfo.getChannel().writeAndFlush(msgBody);
userSocketInfo.writeAndFlush(msgBody);
redisTemplate.opsForHash().put(NettyConstant.MSG_KEY + record.getUserId(), record.getId(), JsonUtils.obj2Json(msgBody));
}
log.info("客服 消息回执:" + record.getId());
......
......@@ -29,7 +29,7 @@ public class UserServiceImpl implements UserService {
if (usersId.size() > 0) {
redisTemplate.delete(NettyConstant.STAFF_USERIDS_KEY + staffId);
}
return new MsgBody(ResultStatus.SUCCESS.getCode(), usersId);
return new MsgBody<>().setStatus(ResultStatus.SUCCESS.getCode()).setData(usersId);
}
@Override
......@@ -37,14 +37,14 @@ public class UserServiceImpl implements UserService {
final StaffSocketInfo staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(idModel.getStaffId());
if (staffSocketInfo == null) {
return new MsgBody(ResultStatus.REQUEST_ERROR.getCode(), ResultStatus.REQUEST_ERROR.getMessage());
return new MsgBody<>().setStatus(ResultStatus.REQUEST_ERROR.getCode()).setMessage(ResultStatus.REQUEST_ERROR.getMessage());
}
staffSocketInfo.getUserIds().remove(idModel.getUserId());
final UserSocketInfo userSocketInfo = ChannelGroupService.USER_GROUP.get(idModel.getUserId());
if (userSocketInfo != null && idModel.getStaffId().equals(userSocketInfo.getStaffId())) {
userSocketInfo.setStaffId(null);
}
return new MsgBody(ResultStatus.SUCCESS.getCode());
return new MsgBody().setStatus(ResultStatus.SUCCESS.getCode());
}
@Override
......@@ -57,10 +57,10 @@ public class UserServiceImpl implements UserService {
* 用户在线未绑定客服
*/
if (userSocketInfo == null || (userSocketInfo != null && idModel.getStaffId().equals(userSocketInfo.getStaffId()) || userSocketInfo.getStaffId() == null)) {
return new MsgBody(ResultStatus.SUCCESS.getCode());
return new MsgBody().setStatus(ResultStatus.SUCCESS.getCode());
}
return new MsgBody<>().setStatus(ResultStatus.CHECK_FAILURE.getCode()).setData(userSocketInfo.getStaffId());
return new MsgBody(ResultStatus.CHECK_FAILURE.getCode(), userSocketInfo.getStaffId());
}
}
\ No newline at end of file
package com.ym.im.service.impl;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.ym.im.entity.*;
import com.ym.im.service.*;
......@@ -129,11 +130,11 @@ public class UserSingleChatServiceImpl implements ChatService {
public void save(Long id, @Valid MsgBody<ChatRecord> msgBody) {
// 设置聊天基本信息
final ChatRecord record = msgBody.getData();
record.setId(null);
record.setUserId(id);
record.setSendReceive(SEND);
record.setCreateTime(new Date());
final ChatRecord record = msgBody.getData()
.setId(IdWorker.getId())
.setUserId(id)
.setSendReceive(SEND)
.setCreateTime(new Date());
/*
* 设置客服Id:
* 情况1:当前用户无客服服务,且无客服可分配,ChatRecord.staffId = null;
......@@ -157,7 +158,6 @@ public class UserSingleChatServiceImpl implements ChatService {
*/
@Override
public void send(NioSocketChannel channel, MsgBody<ChatRecord> msgBody) throws JsonProcessingException {
msgBodyService.sendAndAck(channel, msgBody);
}
......@@ -173,7 +173,7 @@ public class UserSingleChatServiceImpl implements ChatService {
userSocketInfo.setStaffId(userSocketInfo.getStaffId() == null ? record.getStaffId() : userSocketInfo.getStaffId());
StaffSocketInfo staffSocketInfo = ChannelGroupService.STAFF_GROUP.get(userSocketInfo.getStaffId());
if (staffSocketInfo != null) {
staffSocketInfo.getChannel().writeAndFlush(msgBody);
staffSocketInfo.writeAndFlush(msgBody);
}
redisTemplate.opsForHash().put(NettyConstant.MSG_KEY + record.getUserId(), record.getId(), JsonUtils.obj2Json(msgBody));
}
......@@ -205,7 +205,7 @@ public class UserSingleChatServiceImpl implements ChatService {
final Long staffId = ChannelGroupService.USER_GROUP.get(userId).getStaffId();
ChannelGroupService.STAFF_GROUP.forEach((key, staffSocketInfo) -> {
staffSocketInfo.getUserIds().remove(userId);
staffSocketInfo.getChannel().writeAndFlush(new MsgBody<>(MsgBody.USERS_ONLINE, new IdModel(staffId, userId)));
staffSocketInfo.writeAndFlush(new MsgBody<>().setStatus(MsgBody.USERS_ONLINE).setData(new IdModel().setStaffId(staffId).setUserId(userId)));
});
}
}
#Generated by Maven
#Sat Oct 10 14:42:53 SGT 2020
version=2.0
groupId=io.geekidea.springbootplus
artifactId=customer-service
com/ym/im/entity/StaffSocketInfo.class
com/ym/im/config/RedisConfig.class
com/ym/im/service/impl/MsgBodyServiceImpl.class
com/ym/im/IMessageApplication.class
com/ym/im/config/LocaleConfig.class
com/ym/im/mq/Queue.class
com/ym/im/entity/UserSocketInfo.class
com/ym/im/service/ChannelGroupService.class
com/ym/im/util/JsonUtils.class
com/ym/im/config/JacksonConfig.class
com/ym/im/entity/LanguageEnum.class
com/ym/im/factory/SingleChatFactory.class
com/ym/im/util/MessageUtils.class
com/ym/im/mapper/ChatRecordMapper.class
com/ym/im/handler/IndexHandler.class
com/ym/im/controller/StaffController.class
com/ym/im/service/ChatService.class
com/ym/im/core/WebSocketChannelInitializer.class
com/ym/im/config/ExceptionConfig$RestResponseEntityExceptionHandler.class
com/ym/im/entity/IMessageConstant.class
com/ym/im/service/impl/UserServiceImpl.class
com/ym/im/entity/BaseSocketInfo.class
com/ym/im/validation/group/StaffSendGroup.class
com/ym/im/service/impl/UserSingleChatServiceImpl.class
com/ym/im/core/NettyServer.class
com/ym/im/entity/NettyConstant$1.class
com/ym/im/entity/IdModel.class
com/ym/im/validation/group/ChatRecordSaveGroup.class
com/ym/im/core/MessageEncoder.class
com/ym/im/util/Assert.class
com/ym/im/config/NettyConfig.class
com/ym/im/entity/base/ResultStatus.class
com/ym/im/validation/group/MsgBodyGroup.class
com/ym/im/entity/MsgBody.class
com/ym/im/service/impl/StaffServiceImpl$1.class
com/ym/im/core/MessageDecoder.class
com/ym/im/config/SwaggerConfig.class
com/ym/im/config/RabbitConfig.class
com/ym/im/entity/NettyConstant.class
com/ym/im/handler/BaseHandler.class
com/ym/im/exception/IMessageException.class
com/ym/im/entity/PullChatRecord.class
com/ym/im/controller/ChatRecordController.class
com/ym/im/service/MsgBodyService.class
com/ym/im/service/impl/ChatRecordServiceImpl.class
com/ym/im/validation/group/ChatRecordSendGroup.class
com/ym/im/validation/group/ChatRecordReceiveGroup.class
com/ym/im/entity/ChatRecord.class
com/ym/im/service/impl/StaffServiceImpl.class
com/ym/im/entity/model/OrderModel.class
com/ym/im/mq/Receiver.class
com/ym/im/service/impl/StaffSingleChatServiceImpl.class
com/ym/im/service/ChatRecordService.class
com/ym/im/service/UserService.class
com/ym/im/config/ExceptionConfig.class
com/ym/im/entity/enums/PlatformEnum.class
com/ym/im/controller/UserController.class
com/ym/im/handler/SingleChatHandler.class
com/ym/im/entity/model/PushTokenAndTypeModel.class
com/ym/im/service/StaffService.class
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/entity/PullChatRecord.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/core/NettyServer.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/entity/model/PushTokenAndTypeModel.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/service/ChatService.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/util/Assert.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/handler/SingleChatHandler.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/service/impl/ChatRecordServiceImpl.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/validation/group/StaffSendGroup.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/util/MessageUtils.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/config/RabbitConfig.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/validation/group/MsgBodyGroup.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/handler/IndexHandler.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/config/ExceptionConfig.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/IMessageApplication.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/entity/IMessageConstant.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/service/ChannelGroupService.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/exception/IMessageException.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/service/StaffService.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/core/MessageDecoder.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/service/impl/MsgBodyServiceImpl.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/entity/UserSocketInfo.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/mq/Receiver.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/service/UserService.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/core/WebSocketChannelInitializer.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/service/impl/StaffServiceImpl.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/entity/enums/LanguageEnum.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/entity/MsgBody.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/entity/model/OrderModel.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/entity/ChatRecord.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/service/MsgBodyService.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/entity/enums/PlatformEnum.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/core/MessageEncoder.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/entity/IdModel.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/util/MybatisGenerator.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/controller/StaffController.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/util/JsonUtils.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/config/NettyConfig.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/service/impl/UserSingleChatServiceImpl.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/validation/group/ChatRecordSendGroup.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/mapper/ChatRecordMapper.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/config/SwaggerConfig.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/entity/StaffSocketInfo.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/controller/ChatRecordController.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/controller/UserController.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/mq/Queue.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/entity/base/ResultStatus.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/config/JacksonConfig.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/entity/NettyConstant.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/service/impl/StaffSingleChatServiceImpl.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/config/RedisConfig.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/entity/BaseSocketInfo.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/handler/BaseHandler.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/validation/group/ChatRecordReceiveGroup.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/factory/SingleChatFactory.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/service/impl/UserServiceImpl.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/service/ChatRecordService.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/config/LocaleConfig.java
/Users/JJww/Documents/Workspaces/Jumeirah/customer-service/src/main/java/com/ym/im/validation/group/ChatRecordSaveGroup.java
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment