Commit 986b9d47 by lixiaozhong

1、完成dubbo集群负载模式:根据用户所在ip进行动态ip地址所在的服务调用,引入spi接口实现类,提交待测试

2、解决nacos与阿里的spring-context包冲突问题
parent dfcf66df
......@@ -34,7 +34,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
* 项目启动入口
*/
@EnableDiscoveryClient
@EnableDubbo(scanBasePackages = {"com.wecloud.im.service.impl", "com.wecloud.im.router"})
@EnableDubbo(scanBasePackages = {"com.wecloud.im.router"})
@EnableAsync
@EnableScheduling
@EnableTransactionManagement
......
package com.wecloud.im.router.dubbo;
import org.apache.commons.lang3.StringUtils;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
import org.apache.dubbo.rpc.support.RpcUtils;
import java.util.List;
import java.util.Map;
public class ChannelAbstractClusterInvoker<T> extends AbstractClusterInvoker<T> {
public ChannelAbstractClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
//1.查看是否设置了指定ip
String ip = (String) RpcContext.getContext().get("ip");
if (StringUtils.isBlank(ip)) {
throw new RuntimeException("ip is blank ");
}
//2.检查是否有可用invoker
checkInvokers(invokers,invocation);
//3.根据指定ip获取对应invoker
Invoker<T> invoked = invokers.stream().filter(invoker -> invoker.getUrl().getHost().equals(ip))
.findFirst().orElse(null);
//4.检查是否有可用invoker
if(null == invoked) {
throw new RpcException(RpcException.NO_INVOKER_AVAILABLE_AFTER_FILTER,
"Failed to invoke the method " + invocation.getMethodName() + " in the service "
+ getInterface().getName() + ". No provider available for the service "
+ directory.getUrl().getServiceKey() + " from ip " + ip + " on the consumer "
+ NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion()
+ ". Please check if the providers have been started and registered.");
}
//5.发起远程调用,失败则抛出异常
try {
return invoked.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
"Fail invoke providers " + (invoked != null?invoked.getUrl():"")+ " " + loadbalance.getClass().getSimpleName()
+ " select from all providers " + invokers + " for service " + getInterface().getName()
+ " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
e.getCause() != null ? e.getCause() : e);
}
}
/**
* 覆盖父类,将负载均衡去除
* @param invocation
* @return
* @throws RpcException
*/
@Override
public Result invoke(final Invocation invocation) throws RpcException {
this.checkWhetherDestroyed();
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation)invocation).addObjectAttachments(contextAttachments);
}
List<Invoker<T>> invokers = this.list(invocation);
LoadBalance loadbalance = null; // this.initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation);
return this.doInvoke(invocation, invokers, loadbalance);
}
}
package com.wecloud.im.router.dubbo;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Directory;
public class ChannelRouterCluster implements Cluster {
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new ChannelAbstractClusterInvoker<>(directory);
}
}
......@@ -45,6 +45,8 @@ public class ChannelManager {
*/
public static final AttributeKey<String> APP_KEY = AttributeKey.valueOf("ak");
public static final AttributeKey<Integer> READ_IDLE_TIMES = AttributeKey.valueOf("readIdleTimes");
/**
* LANGUAGE
*/
......@@ -58,8 +60,6 @@ public class ChannelManager {
AttributeKey<String> DEVICEID = AttributeKey.valueOf("dc");
AttributeKey<String> PLATFORM = AttributeKey.valueOf("pt");
public static final AttributeKey<Integer> READ_IDLE_TIMES = AttributeKey.valueOf("readIdleTimes");
@Autowired
private RtcService rtcService;
......
......@@ -14,6 +14,8 @@ import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference;
import org.apache.dubbo.rpc.RpcContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -32,7 +34,7 @@ public class ChannelSender {
@Autowired
private UserCacheService userCacheService;
@Autowired
@DubboReference
private RouterSendService routerSendService;
/**
......@@ -107,10 +109,11 @@ public class ChannelSender {
// 是否为当前机器的ip
if (InitIp.lAN_IP.equals(channelInfo.getLanIp())) {
// 调用本地下发
this.sendMsgLocal(channelInfo.getChannelId(), msgJson);
} else {
// this.sendMsgLocal(channelInfo.getChannelId(), msgJson);
//
// } else {
// todo rpc调用下发
RpcContext.getContext().set("ip", channelInfo.getLanIp());
routerSendService.sendMsgRemote(channelInfo.getChannelId(), msgJson);
}
......
channelRouterCluster=com.wecloud.im.router.dubbo.ChannelRouterCluster
......@@ -333,33 +333,48 @@
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>${dubbo.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>${dubbo.version}</version>
<dependency>
<groupId>com.alibaba.spring</groupId>
<artifactId>spring-context-support</artifactId>
<version>1.0.11</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.dubbo</groupId>-->
<!-- <artifactId>dubbo</artifactId>-->
<!-- <version>${dubbo.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-nacos</artifactId>
<version>${dubbo.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>2.2.5.RELEASE</version>
<exclusions>
<exclusion>
<groupId>com.alibaba.spring</groupId>
<artifactId>spring-context-support</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2.2.5.RELEASE</version>
<exclusions>
<exclusion>
<groupId>com.alibaba.spring</groupId>
<artifactId>spring-context-support</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- rabbitmq -->
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment