Commit 9271c171 by giaogiao

消息接收, 不同type中的json结构由客户方自定义 不做任何操作

parent 18f91149
package io.geekidea.springbootplus.test;
import cn.hutool.crypto.digest.MD5;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wecloud.im.tillo.app_ws.model.request.ReceiveModel;
import java.util.Date;
......@@ -31,13 +35,34 @@ public class SignTest {
}
public static void main(String[] args) {
public static void main(String[] args) throws JsonProcessingException {
String clientId = "c1";
String appKey = "elLwpel1gWCHDqZy";
String appSecret = "68809bb5a9077a83631aeb0b17b5965d6b2302faf2ab3737";
String timestemp = String.valueOf(new Date().getTime());
getSign(timestemp, clientId, appKey, appSecret);
jsonTest();
}
private static void jsonTest() throws JsonProcessingException {
String data = "{\n" +
"\"reqId\":\"123123\",\n" +
"\"cmd\":1,\n" +
"\"data\":{\n" +
" \"type\":-1,\n" +
" \"text\":\"这是一个纯文本消息\",\n" +
" \"attrs\":{\n" +
" \"a\":\"attrs 是用来存储用户自定义的一些键值对\"}}\n" +
"}\n";
// json转换成对象
JsonMapper jsonMapper = new JsonMapper();
ReceiveModel receiveModel = jsonMapper.readValue(data, ReceiveModel.class);
// data对象转换成json
ObjectMapper objectMapper = new ObjectMapper();
String s = objectMapper.writeValueAsString(receiveModel.getData());
}
}
......@@ -58,6 +58,16 @@ public class ImMessageQueryVo implements Serializable {
@ApiModelProperty("发送状态, 0AVIMMessageStatusNone(未知) 1AVIMMessageStatusSending(发送中) 2AVIMMessageStatusSent(发送成功) 3AVIMMessageStatusReceipt(被接收) 4AVIMMessageStatusFailed(失败)")
private Integer sendStatus;
@ApiModelProperty("类型" +
"类型 文本消息-1" +
"图像消息-2" +
"音频消息-3" +
"视频消息-4" +
"位置消息-5" +
"文件消息-6" +
"以上类型均使用负数,所有正数留给自定义扩展类型使用,0 作为「没有类型」被保留起来。")
private Integer type;
@ApiModelProperty("会话id")
private Long fkConversationId;
}
\ No newline at end of file
package com.wecloud.im.tillo.app_ws;
import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.service.ImMessageService;
import com.wecloud.im.tillo.app_ws.model.WsConstants;
import com.wecloud.im.tillo.app_ws.receive.ReadWsData;
import com.wecloud.im.tillo.app_ws.service.MangerChannelService;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
......@@ -19,7 +13,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
......@@ -52,9 +45,7 @@ public class WsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
* 属于io密集型业务
* io密集型任务配置尽可能多的线程数量
*/
private final static ExecutorService THREAD_POOL_EXECUTOR =
private final static ExecutorService TASK_THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS * 120, WsConstants.CPU_PROCESSORS * 130 + 2,
1L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
......@@ -64,33 +55,33 @@ public class WsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
String data = msg.text();
ImMessage imMessage = new ImMessage();
imMessage.setId(new Snowflake(1L, 1L).nextId());
imMessage.setCreateTime(new Date());
imMessage.setWithdrawTime(new Date());
imMessage.setUpdateDate(new Date());
imMessage.setFkAppid(0L);
imMessage.setSender(0L);
imMessage.setContent(data);
imMessage.setWithdraw(false);
imMessage.setEvent(false);
imMessage.setSystem(false);
imMessage.setAt("");
imMessage.setSendStatus(0);
imMessage.setFkConversationId(0L);
imMessageService.save(imMessage);
JsonMapper jsonMapper = new JsonMapper();
try {
JsonNode jsonNode = jsonMapper.readTree(data);
int i = 0;
} catch (JsonProcessingException e) {
e.printStackTrace();
}
ChannelFuture channelFuture = ctx.writeAndFlush(new TextWebSocketFrame(data));
// ImMessage imMessage = new ImMessage();
// imMessage.setId(new Snowflake(1L, 1L).nextId());
//
// imMessage.setCreateTime(new Date());
// imMessage.setWithdrawTime(new Date());
// imMessage.setUpdateDate(new Date());
// imMessage.setFkAppid(0L);
// imMessage.setSender(0L);
// imMessage.setContent(data);
// imMessage.setWithdraw(false);
// imMessage.setEvent(false);
// imMessage.setSystem(false);
// imMessage.setAt("");
// imMessage.setSendStatus(0);
// imMessage.setFkConversationId(0L);
//
// imMessageService.save(imMessage);
//
// JsonMapper jsonMapper = new JsonMapper();
//
// try {
// JsonNode jsonNode = jsonMapper.readTree(data);
// int i = 0;
// } catch (JsonProcessingException e) {
// e.printStackTrace();
// }
// ChannelFuture channelFuture = ctx.writeAndFlush(new TextWebSocketFrame(data));
try {
if (data.isEmpty()) {
......@@ -99,7 +90,7 @@ public class WsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/*
* 在此进入耗时业务线程池, 将不再阻塞netty的I/O线程,提高网络吞吐
*/
THREAD_POOL_EXECUTOR.execute(() ->
TASK_THREAD_POOL_EXECUTOR.execute(() ->
execute(ctx, data)
);
......@@ -116,8 +107,7 @@ public class WsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// log.debug("appWS收到" + userIdByChannel + ":" + data + ",channelId:" + ctx.channel().id().asLongText());
log.debug("WS收到" + data);
String language = ctx.channel().attr(MangerChannelService.LANGUAGE).get();
// readWsData.convertModel(data, userIdByChannel, language);
readWsData.convertModel(data, ctx);
}
......
......@@ -5,6 +5,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.HashMap;
/**
* @Description null
......@@ -20,16 +21,17 @@ public class ReceiveModel implements Serializable {
* 枚举类UriPathEnum 请求uri的编码
* 由于websocket使用同一个通道发送数据,需要区分不同类型请求
*/
private Integer path;
private Integer cmd;
/**
* json数据
*/
private String data;
private HashMap data;
/**
* 请求id, 以判空是否请求成功, 服务端处理完成后 返回此id
* 由前端生成,可以用uuid,也可以用时间戳
*/
private String reqId;
}
package com.wecloud.im.tillo.app_ws.receive;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wecloud.im.tillo.app_ws.enums.WsRequestCmdEnum;
import com.wecloud.im.tillo.app_ws.model.request.ReceiveModel;
import com.wecloud.im.tillo.app_ws.service.MangerChannelService;
import com.wecloud.im.tillo.app_ws.service.WriteDataService;
import com.wecloud.im.tillo.app_ws.strategy.AbstractReceiveStrategy;
import com.wecloud.im.tillo.app_ws.strategy.ReceiveStrategyContext;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
......@@ -20,11 +22,11 @@ import javax.annotation.Resource;
*/
@Service
public class ReadWsData {
private static Logger log = LoggerFactory.getLogger(ReadWsData.class);
private static final Logger log = LoggerFactory.getLogger(ReadWsData.class);
// idea此处报红 属于正常
// @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Autowired
@Resource
private ReceiveStrategyContext receiveStrategyContext;
@Resource
......@@ -36,39 +38,50 @@ public class ReadWsData {
* 在此开始进入业务流程子线程,将不占netty的io线程
*
* @param data
* @param userId
* @param language
* @throws Exception
*/
public void convertModel(String data, Long userId, String language) {
public void convertModel(String data, ChannelHandlerContext ctx) {
// if (PING.equals(data)) {
// log.debug("收到心跳:" + userId);
// return;
// }
if (PING.equals(data)) {
log.debug("收到心跳:" + userId);
return;
}
String language = ctx.channel().attr(MangerChannelService.LANGUAGE).get();
String appKey = ctx.channel().attr(MangerChannelService.APP_KEY).get();
String clientId = ctx.channel().attr(MangerChannelService.CLIENT_ID).get();
// ReceiveModel requestModel = JSON.parseObject(data, ReceiveModel.class);
ReceiveModel requestModel = JSON.parseObject(data, ReceiveModel.class);
if (null == requestModel || null == requestModel.getPath()) {
JsonMapper jsonMapper = new JsonMapper();
ReceiveModel receiveModel = null;
try {
receiveModel = jsonMapper.readValue(data, ReceiveModel.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
if (null == receiveModel || null == receiveModel.getCmd()) {
return;
}
try {
// User user = userService.findById(userId);
this.doProcess(language, requestModel);
this.doProcess(language, receiveModel, ctx);
} catch (Exception e) {
log.error("系统繁忙:" + data + ",userId:" + userId, e);
log.error("系统繁忙:" + data + ",appKey:" + appKey + ",clientId:" + clientId, e);
// writeDataService.nullDataSuccess(requestModel, ResultStatus.SYS_BUSY, userId, language);
}
}
private void doProcess(String language, ReceiveModel requestModel) {
WsRequestCmdEnum wsRequestUriPathEnum = WsRequestCmdEnum.getByCode(requestModel.getPath());
private void doProcess(String language, ReceiveModel requestModel, ChannelHandlerContext ctx) {
WsRequestCmdEnum wsRequestUriPathEnum = WsRequestCmdEnum.getByCode(requestModel.getCmd());
// 使用策略模式, 根据不同类型请求调用不同实现类
AbstractReceiveStrategy receiveStrategy = receiveStrategyContext.getStrategy(wsRequestUriPathEnum);
receiveStrategy.process(requestModel, language);
receiveStrategy.process(requestModel, language, ctx);
}
......
......@@ -2,6 +2,7 @@ package com.wecloud.im.tillo.app_ws.strategy;
import com.wecloud.im.tillo.app_ws.model.request.ReceiveModel;
import io.netty.channel.ChannelHandlerContext;
/**
* @Description 接收netty不同类型请求
......@@ -18,5 +19,5 @@ public abstract class AbstractReceiveStrategy {
* @param language
* @throws Exception
*/
abstract public void process(ReceiveModel requestModel, String language);
abstract public void process(ReceiveModel requestModel, String language, ChannelHandlerContext ctx);
}
......@@ -4,6 +4,7 @@ import com.wecloud.im.tillo.app_ws.annotation.ReceiveTypeAnnotation;
import com.wecloud.im.tillo.app_ws.enums.WsRequestCmdEnum;
import com.wecloud.im.tillo.app_ws.model.request.ReceiveModel;
import com.wecloud.im.tillo.app_ws.strategy.AbstractReceiveStrategy;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
......@@ -16,7 +17,8 @@ import org.springframework.stereotype.Service;
public class SingleConcreteReceiveStrategy extends AbstractReceiveStrategy {
@Override
public void process(ReceiveModel requestModel, String language) {
public void process(ReceiveModel requestModel, String language, ChannelHandlerContext ctx) {
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment