Commit 61ed7094 by hweeeeeei

优化业务线程池: 去掉write时的业务线程池,减少IO切换. 减少业务线程池数量,在高并发时不超过CPU核心数效果最佳

parent 7b2511a7
package com.wecloud.im.executor;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.wecloud.im.ws.model.WsConstants;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 业务处理线程池
*/
public class BusinessThreadPool {
private final static ThreadFactory BUSINESS_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNamePrefix("-business-").build();
/**
* 业务处理线程池
*/
public final static ExecutorService BUSINESS_TASK_THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS, WsConstants.CPU_PROCESSORS * 2,
60L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024 * 2), BUSINESS_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
}
package com.wecloud.im.netty.core; package com.wecloud.im.netty.core;
import cn.hutool.core.thread.ThreadFactoryBuilder; import com.wecloud.im.executor.BusinessThreadPool;
import com.wecloud.im.ws.model.WsConstants;
import com.wecloud.im.ws.receive.ReadWsData; import com.wecloud.im.ws.receive.ReadWsData;
import com.wecloud.im.ws.service.MangerChannelService; import com.wecloud.im.ws.service.MangerChannelService;
import com.wecloud.rtc.service.RtcService; import com.wecloud.rtc.service.RtcService;
...@@ -15,11 +14,6 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -15,11 +14,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* @Description app端 长连接事件处理 * @Description app端 长连接事件处理
...@@ -43,17 +37,6 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -43,17 +37,6 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Resource @Resource
private MangerChannelService mangerChannelService; private MangerChannelService mangerChannelService;
private final static ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNamePrefix("WS-business-").build();
/**
* 耗时核心业务处理线程池
* 属于io密集型业务
* io密集型任务配置尽可能多的线程数量
*/
private final static ExecutorService TASK_THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS * 5, WsConstants.CPU_PROCESSORS * 10,
10L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(10), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
...@@ -67,7 +50,7 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -67,7 +50,7 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
/* /*
* 在此进入耗时业务线程池, 将不再阻塞netty的I/O线程,提高网络吞吐 * 在此进入耗时业务线程池, 将不再阻塞netty的I/O线程,提高网络吞吐
*/ */
TASK_THREAD_POOL_EXECUTOR.execute(() -> BusinessThreadPool.BUSINESS_TASK_THREAD_POOL_EXECUTOR.execute(() ->
execute(ctx, data) execute(ctx, data)
); );
......
package com.wecloud.im.ws.service.impl; package com.wecloud.im.ws.service.impl;
import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wecloud.im.ws.model.WsResponseModel; import com.wecloud.im.ws.model.WsResponseModel;
import com.wecloud.im.ws.model.WsConstants;
import com.wecloud.im.ws.model.request.ReceiveModel; import com.wecloud.im.ws.model.request.ReceiveModel;
import com.wecloud.im.ws.service.MangerChannelService; import com.wecloud.im.ws.service.MangerChannelService;
import com.wecloud.im.ws.service.WriteDataService; import com.wecloud.im.ws.service.WriteDataService;
...@@ -14,11 +12,6 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -14,11 +12,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* @Description 下发数据 * @Description 下发数据
...@@ -28,17 +21,17 @@ import java.util.concurrent.TimeUnit; ...@@ -28,17 +21,17 @@ import java.util.concurrent.TimeUnit;
@Component @Component
public class WriteDataServiceImpl implements WriteDataService { public class WriteDataServiceImpl implements WriteDataService {
private final static ThreadFactory WRITE_NAMED_THREAD_FACTORY = new ThreadFactoryBuilder() // private final static ThreadFactory WRITE_NAMED_THREAD_FACTORY = new ThreadFactoryBuilder()
.setNamePrefix("ws-WRITE-").build(); // .setNamePrefix("ws-WRITE-").build();
/** // /**
* 耗时核心业务处理线程池 // * 耗时核心业务处理线程池
* 属于io密集型业务 // * 属于io密集型业务
* io密集型任务配置尽可能多的线程数量 // * io密集型任务配置尽可能多的线程数量
*/ // */
private final static ExecutorService WRITE_TASK_THREAD_POOL_EXECUTOR = // private final static ExecutorService WRITE_TASK_THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS * 2, WsConstants.CPU_PROCESSORS * 3, // new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS * 2, WsConstants.CPU_PROCESSORS * 3,
1L, TimeUnit.MILLISECONDS, // 1L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), WRITE_NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy()); // new LinkedBlockingQueue<Runnable>(), WRITE_NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
@Autowired @Autowired
private MangerChannelService mangerChannelService; private MangerChannelService mangerChannelService;
...@@ -74,23 +67,23 @@ public class WriteDataServiceImpl implements WriteDataService { ...@@ -74,23 +67,23 @@ public class WriteDataServiceImpl implements WriteDataService {
@Override @Override
public void write(WsResponseModel responseModel, String toAppKey, String toClientId) { public void write(WsResponseModel responseModel, String toAppKey, String toClientId) {
WRITE_TASK_THREAD_POOL_EXECUTOR.execute( // WRITE_TASK_THREAD_POOL_EXECUTOR.execute(
() -> { // () -> {
JsonMapper jsonMapper = new JsonMapper(); JsonMapper jsonMapper = new JsonMapper();
String json = null; String json = null;
try { try {
json = jsonMapper.writeValueAsString(responseModel); json = jsonMapper.writeValueAsString(responseModel);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
e.printStackTrace(); e.printStackTrace();
} }
mangerChannelService.writeData(json, toAppKey, toClientId); mangerChannelService.writeData(json, toAppKey, toClientId);
} }
); // );
} //}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment