Commit c9bbd69c by giaogiao

该消息不是该客户端发送也能修改;

优化多线程下发消息
parent 1fd92d46
...@@ -86,7 +86,9 @@ public class ImClientServiceImpl extends BaseServiceImpl<ImClientMapper, ImClien ...@@ -86,7 +86,9 @@ public class ImClientServiceImpl extends BaseServiceImpl<ImClientMapper, ImClien
// 根据appKey查询appid // 根据appKey查询appid
ImApplication imApplication = imApplicationService.getOne( ImApplication imApplication = imApplicationService.getOne(
new QueryWrapper<ImApplication>().lambda().eq(ImApplication::getAppKey, curentJwtToken.getAppKey()) new QueryWrapper<ImApplication>().lambda()
.select(ImApplication::getId)
.eq(ImApplication::getAppKey, curentJwtToken.getAppKey())
); );
return this.getOne(new QueryWrapper<ImClient>().lambda() return this.getOne(new QueryWrapper<ImClient>().lambda()
......
...@@ -79,10 +79,11 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap ...@@ -79,10 +79,11 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
// 成员不存在,不能创建会话 // 成员不存在,不能创建会话
for (String id : imConversationCreate.getClientIds()) { for (String id : imConversationCreate.getClientIds()) {
ImClient client2 = imClientService.getOne(new QueryWrapper<ImClient>().lambda() ImClient client2 = imClientService.getOne(new QueryWrapper<ImClient>().lambda()
.select(ImClient::getId)
.eq(ImClient::getFkAppid, client.getFkAppid()) .eq(ImClient::getFkAppid, client.getFkAppid())
.eq(ImClient::getClientId, id)); .eq(ImClient::getClientId, id));
if (client2 == null) { if (client2 == null) {
log.info(" 成员不存在,不能创建会话 client2 == null"); log.info("成员不存在,不能创建会话 client2 == null");
return ApiResult.result(ApiCode.CLIENT_NOT_FOUNT, null); return ApiResult.result(ApiCode.CLIENT_NOT_FOUNT, null);
} }
} }
......
...@@ -54,17 +54,16 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes ...@@ -54,17 +54,16 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
ImClient client = imClientService.getClient(); ImClient client = imClientService.getClient();
/* // 判断该消息是否是该客户端发送
ImMessage messageById = this.getById(imMsgUpdate.getId()); ImMessage messageById = this.getById(imMsgUpdate.getId());
// 判断该消息是否是该客户端发送
if (!messageById.getSender().equals(client.getId())) { if (!messageById.getSender().equals(client.getId())) {
log.error("判断该消息是否是该客户端发送");
return ApiResult.fail(); return ApiResult.fail();
} }*/
ImMessage imMessage = new ImMessage(); ImMessage imMessage = new ImMessage();
imMessage.setId(imMsgUpdate.getId()); imMessage.setId(imMsgUpdate.getId());
JsonMapper jsonMapper = new JsonMapper(); JsonMapper jsonMapper = new JsonMapper();
try { try {
...@@ -72,6 +71,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes ...@@ -72,6 +71,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
imMessage.setContent(content); imMessage.setContent(content);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
e.printStackTrace(); e.printStackTrace();
return ApiResult.fail();
} }
boolean isOK = this.updateById(imMessage); boolean isOK = this.updateById(imMessage);
......
...@@ -35,14 +35,14 @@ public class WsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { ...@@ -35,14 +35,14 @@ public class WsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private MangerChannelService mangerChannelService; private MangerChannelService mangerChannelService;
private final static ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder() private final static ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNamePrefix("business-").build(); .setNamePrefix("WS-business-").build();
/** /**
* 核心业务处理线程池 * 耗时核心业务处理线程池
* 属于io密集型业务 * 属于io密集型业务
* io密集型任务配置尽可能多的线程数量 * io密集型任务配置尽可能多的线程数量
*/ */
private final static ExecutorService TASK_THREAD_POOL_EXECUTOR = private final static ExecutorService TASK_THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS * 120, WsConstants.CPU_PROCESSORS * 130 + 2, new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS * 10, WsConstants.CPU_PROCESSORS * 500,
1L, TimeUnit.MILLISECONDS, 1L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy()); new LinkedBlockingQueue<Runnable>(), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
......
package com.wecloud.im.tillo.app_ws.service.impl; package com.wecloud.im.tillo.app_ws.service.impl;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.wecloud.im.tillo.app_ws.cache.UserCache; import com.wecloud.im.tillo.app_ws.cache.UserCache;
import com.wecloud.im.tillo.app_ws.model.WsConstants;
import com.wecloud.im.tillo.app_ws.service.MangerChannelService; import com.wecloud.im.tillo.app_ws.service.MangerChannelService;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
...@@ -14,12 +12,6 @@ import lombok.extern.slf4j.Slf4j; ...@@ -14,12 +12,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* @Description 维护netty用户channel对象 * @Description 维护netty用户channel对象
* @Author hewei hwei1233@163.com * @Author hewei hwei1233@163.com
...@@ -29,8 +21,8 @@ import java.util.concurrent.TimeUnit; ...@@ -29,8 +21,8 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
public class MangerChannelServiceImpl implements MangerChannelService { public class MangerChannelServiceImpl implements MangerChannelService {
private final static ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder() // private final static ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNamePrefix("rpcWrite-").build(); // .setNamePrefix("rpcWrite-").build();
/** /**
* 远程调用ws下发数据线程池 * 远程调用ws下发数据线程池
* 属于IO密集型任务 * 属于IO密集型任务
...@@ -47,11 +39,11 @@ public class MangerChannelServiceImpl implements MangerChannelService { ...@@ -47,11 +39,11 @@ public class MangerChannelServiceImpl implements MangerChannelService {
* <p> * <p>
* 后续优化待完善:消息发送投递至MQ, 消费方从MQ队列获取下发任务 本地队列不宜缓存太多(机器死机则会全丢失) 堆积的请求处理队列可能会耗费非常大的内存甚至死机 * 后续优化待完善:消息发送投递至MQ, 消费方从MQ队列获取下发任务 本地队列不宜缓存太多(机器死机则会全丢失) 堆积的请求处理队列可能会耗费非常大的内存甚至死机
*/ */
private final static ExecutorService THREAD_POOL_RPC_WRITE_EXECUTOR = new ThreadPoolExecutor( // private final static ExecutorService THREAD_POOL_RPC_WRITE_EXECUTOR = new ThreadPoolExecutor(
WsConstants.CPU_PROCESSORS * 100, // WsConstants.CPU_PROCESSORS * 100,
WsConstants.CPU_PROCESSORS * 100, // WsConstants.CPU_PROCESSORS * 100,
1L, TimeUnit.MILLISECONDS, // 1L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy()); // new LinkedBlockingQueue<Runnable>(), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
@Autowired @Autowired
...@@ -285,17 +277,21 @@ public class MangerChannelServiceImpl implements MangerChannelService { ...@@ -285,17 +277,21 @@ public class MangerChannelServiceImpl implements MangerChannelService {
NioSocketChannel nioSocketChannel = get(toAppKey, toClientId); NioSocketChannel nioSocketChannel = get(toAppKey, toClientId);
if (null == nioSocketChannel) { if (null == nioSocketChannel) {
// userCache.offline(toAppKey + toClientId); // userCache.offline(toAppKey + toClientId);
log.debug("writeData连接为空:" + toAppKey + toClientId + "," + msg); if (log.isDebugEnabled()) {
log.debug("writeData连接为空:" + toAppKey + toClientId + "," + msg);
}
return false; return false;
} }
// 判断连接是否断开 // 判断连接是否断开
if (nioSocketChannel.isShutdown()) { if (nioSocketChannel.isShutdown()) {
log.debug("writeData连接断开:" + toAppKey + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText()); if (log.isDebugEnabled()) {
log.debug("writeData连接断开:" + toAppKey + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
}
return false; return false;
} }
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("writeData:" + toAppKey + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText()); log.debug("writeData:" + toAppKey + "," + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
} }
ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg)); ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg));
......
package com.wecloud.im.tillo.app_ws.service.impl; package com.wecloud.im.tillo.app_ws.service.impl;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wecloud.im.tillo.app_ws.model.ResponseModel; import com.wecloud.im.tillo.app_ws.model.ResponseModel;
import com.wecloud.im.tillo.app_ws.model.WsConstants;
import com.wecloud.im.tillo.app_ws.model.request.ReceiveModel; import com.wecloud.im.tillo.app_ws.model.request.ReceiveModel;
import com.wecloud.im.tillo.app_ws.service.MangerChannelService; import com.wecloud.im.tillo.app_ws.service.MangerChannelService;
import com.wecloud.im.tillo.app_ws.service.WriteDataService; import com.wecloud.im.tillo.app_ws.service.WriteDataService;
import io.geekidea.springbootplus.framework.common.api.ApiCode; import io.geekidea.springbootplus.framework.common.api.ApiCode;
import io.geekidea.springbootplus.framework.common.api.ApiResult; import io.geekidea.springbootplus.framework.common.api.ApiResult;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* @Description 下发数据 * @Description 下发数据
...@@ -22,6 +28,18 @@ import java.util.HashMap; ...@@ -22,6 +28,18 @@ import java.util.HashMap;
@Component @Component
public class WriteDataServiceImpl implements WriteDataService { public class WriteDataServiceImpl implements WriteDataService {
private final static ThreadFactory WRITE_NAMED_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNamePrefix("ws-WRITE-").build();
/**
* 耗时核心业务处理线程池
* 属于io密集型业务
* io密集型任务配置尽可能多的线程数量
*/
private final static ExecutorService WRITE_TASK_THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS * 10, WsConstants.CPU_PROCESSORS * 500,
1L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), WRITE_NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
@Autowired @Autowired
private MangerChannelService mangerChannelService; private MangerChannelService mangerChannelService;
...@@ -54,18 +72,24 @@ public class WriteDataServiceImpl implements WriteDataService { ...@@ -54,18 +72,24 @@ public class WriteDataServiceImpl implements WriteDataService {
} }
@Override @Override
@Async
public void write(ResponseModel responseModel, String toAppKey, String toClientId) { public void write(ResponseModel responseModel, String toAppKey, String toClientId) {
JsonMapper jsonMapper = new JsonMapper(); WRITE_TASK_THREAD_POOL_EXECUTOR.execute(
String json = null; () -> {
JsonMapper jsonMapper = new JsonMapper();
String json = null;
try {
json = jsonMapper.writeValueAsString(responseModel);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
mangerChannelService.writeData(json, toAppKey, toClientId);
}
);
try {
json = jsonMapper.writeValueAsString(responseModel);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
mangerChannelService.writeData(json, toAppKey, toClientId);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment