Commit 652277cd by lixiaozhong

1、消息路由重新设计

2、增加platform概念
parent 0e37c019
......@@ -5,6 +5,7 @@ import cn.hutool.crypto.digest.MD5;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.enums.PlatformEunm;
import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientService;
import com.wecloud.im.ws.utils.RedisUtils;
......@@ -84,7 +85,7 @@ public class LoginTest {
}
// 生成token
String generateToken = JwtUtil.generateToken(clientId, appKey, imApplication.getAppSecret(), Duration.ofDays(99999));
String generateToken = JwtUtil.generateToken(clientId, appKey, imApplication.getAppSecret(), Duration.ofDays(99999), PlatformEunm.WEB.getCode());
// 保存redis
redisTemplate.opsForValue().set("client:" + imApplication.getAppKey() + ":" + clientId, generateToken);
......
......@@ -71,6 +71,7 @@ public interface CommonConstant {
*/
String CLIENT_ID = "clientId";
String APP_KEY = "appKey";
String PLATFORM = "platform";
/**
......
......@@ -11,6 +11,7 @@ import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientService;
import com.wecloud.im.vo.GetInfoListVo;
import com.wecloud.im.vo.ImOnlineStatusVo;
import com.wecloud.im.ws.cache.UserStateCacheManager;
import com.wecloud.im.ws.manager.ChannelManager;
import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.common.controller.BaseController;
......@@ -46,7 +47,7 @@ public class ImClientController extends BaseController {
private ImClientService imClientService;
@Autowired
private ChannelManager channelManager;
private UserStateCacheManager userStateCacheManager;
@Autowired
private ImApplicationService imApplicationService;
......@@ -123,7 +124,8 @@ public class ImClientController extends BaseController {
for (String clientId : getOnlineStatusParam.getClientIds()) {
ImOnlineStatusVo imOnlineStatusVo = new ImOnlineStatusVo();
imOnlineStatusVo.setStatus(channelManager.getOnlineStatus(imApplication.getAppKey(), clientId));
// todo 需要改成批量
imOnlineStatusVo.setStatus(userStateCacheManager.isOnline(imApplication.getAppKey(), clientId));
imOnlineStatusVo.setClientId(clientId);
imOnlineStatusVos.add(imOnlineStatusVo);
}
......
......@@ -35,7 +35,8 @@ public class SignController extends BaseController {
@ApiOperation(value = "获取sign(仅提供测试调试使用)", notes = "生成签名测试,在生产环境中,此步骤需要第三方应用的服务端进行生成")
public String get(@RequestBody GetSignParam getSignParam) throws Exception {
return new MD5().digestHex(getSignParam.getTimestamp() + getSignParam.getClientId() + getSignParam.getAppKey() + getSignParam.getAppSecret());
return new MD5().digestHex(getSignParam.getTimestamp() + getSignParam.getClientId()
+ getSignParam.getAppKey() + getSignParam.getAppSecret() + getSignParam.getPlatform());
}
}
......
package com.wecloud.im.enums;
import io.geekidea.springbootplus.framework.common.enums.BaseEnum;
public enum PlatformEunm implements BaseEnum {
WEB(1, "web端"),
ANDROID(2, "安卓端"),
IOS(3, "IOS端"),
WIN(4, "PC-windows端"),
MAC(5, "PC-macOs端");
private final Integer code;
private final String desc;
PlatformEunm(Integer code, String desc) {
this.code = code;
this.desc = desc;
}
@Override
public Integer getCode() {
return this.code;
}
@Override
public String getDesc() {
return this.desc;
}
}
......@@ -3,14 +3,12 @@ package com.wecloud.im.netty.core;
import com.wecloud.im.executor.BusinessThreadPool;
import com.wecloud.im.ws.manager.ChannelManager;
import com.wecloud.im.ws.strategy.AbstractImCmdStrategy;
import com.wecloud.rtc.service.RtcService;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
......@@ -28,8 +26,6 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
private static final String PING = "ping";
private static final String PONG = "pong";
@Autowired
private RtcService rtcService;
@Resource
private ChannelManager channelManager;
......@@ -81,7 +77,7 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
String clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
//读超时计时器
......@@ -134,10 +130,11 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
String appKey = ctx.channel().attr(ChannelManager.APP_KEY).get();
String clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
Integer platform = ctx.channel().attr(ChannelManager.PLATFORM).get();
log.info("uid is APP_KEY:{},CLIENT_ID:{}, handlerRemoved. channelId is {}", appKey, clientId, ctx.channel().id().asLongText());
// 关掉连接
channelManager.offline(appKey, clientId, ctx);
channelManager.offline(appKey, clientId, platform, ctx);
}
......
......@@ -127,15 +127,16 @@ public class NettyApiRequest {
ctx.fireChannelRead(httpRequest.retain());
// 设置属性值 userid - channel
ctx.channel().attr(ChannelManager.CLIENT_ID).set(clientId);
ctx.channel().attr(ChannelManager.APP_KEY).set(appKey);
ctx.channel().attr(ChannelManager.CLIENT_ID).set(clientId);
ctx.channel().attr(ChannelManager.PLATFORM).set(jwtToken.getPlatform());
ctx.channel().attr(ChannelManager.READ_IDLE_TIMES).set(0);// 读空闲的计数=0
// 添加长连接handler
ctx.pipeline().addLast("appImHandler", appImReadHandler);
// 保存用户上下文对象
appUserChannelsService.online(appKey, clientId, (NioSocketChannel) ctx.channel());
appUserChannelsService.online(appKey, clientId, jwtToken.getPlatform(), (NioSocketChannel) ctx.channel());
//移除当前api处理handler, 不再参与长连接处理
ctx.pipeline().remove("SingleHttpRequestHandler");
......
......@@ -26,7 +26,12 @@ public class GetSignParam implements Serializable {
@ApiModelProperty("appKey")
private String appKey;
@ApiModelProperty("客户端平台: 1 web, 2 安卓, 3 ios, 4 pc-win, 5 pc-macOs")
private Integer platform;
@ApiModelProperty("密钥")
private String appSecret;
}
......@@ -24,6 +24,9 @@ public class ImTokenVerify {
@ApiModelProperty("appkey,需与生成sign时的值一致")
private String appKey;
@ApiModelProperty("客户端平台: 1 web, 2 安卓, 3 ios, 4 pc-win, 5 pc-macOs, 需与生成sign时的值一致")
private Integer platform;
@ApiModelProperty("签名sign")
private String sign;
......
......@@ -7,9 +7,11 @@ public interface RouterSendService {
/**
* 通过rpc调用发送,解决channel不在本机时调用
* @param channelId
* @param appKey
* @param clientId
* @param platform
* @param msg
*/
void sendMsgRemote(String channelId, String msg);
void sendMsgRemote(String appKey, String clientId, Integer platform, String msg);
}
package com.wecloud.im.router;
import com.wecloud.im.ws.manager.ChannelManager;
import com.wecloud.im.ws.sender.ChannelSender;
import org.apache.dubbo.config.annotation.DubboService;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -13,8 +14,8 @@ public class RouterSendServiceImpl implements RouterSendService {
private ChannelSender channelSender;
@Override
public void sendMsgRemote(String channelId, String msg) {
channelSender.sendMsgLocal(channelId, msg);
public void sendMsgRemote(String appKey, String clientId, Integer platform, String msg) {
channelSender.sendMsgLocal(appKey, clientId, platform, msg);
}
}
......@@ -68,9 +68,14 @@ public class ImClientLoginServiceImpl implements ImClientLoginService {
return ApiResult.result(ApiCode.FAIL, null);
}
if (imTokenVerify.getPlatform() == null) {
log.info("platform is null , clientId is: {}", imTokenVerify.getClientId());
return ApiResult.result(ApiCode.FAIL, null);
}
// 生成以数据库为准的签名
String secret = imApplication.getAppSecret();
String mySign = new MD5().digestHex(imTokenVerify.getTimestamp() + imTokenVerify.getClientId() + imApplication.getAppKey() + secret);
String mySign = new MD5().digestHex(imTokenVerify.getTimestamp() + imTokenVerify.getClientId() + imApplication.getAppKey() + secret + imTokenVerify.getPlatform());
// 验证签名
if (!mySign.equals(imTokenVerify.getSign())) {
......@@ -93,12 +98,12 @@ public class ImClientLoginServiceImpl implements ImClientLoginService {
}
// 生成token
String generateToken = JwtUtil.generateToken(imTokenVerify.getClientId(), imTokenVerify.getAppKey(), secret, Duration.ofDays(99999));
String generateToken = JwtUtil.generateToken(imTokenVerify.getClientId(), imTokenVerify.getAppKey(), secret, Duration.ofDays(99999), imTokenVerify.getPlatform());
// 保存redis
// redisTemplate.opsForValue().set("client:" + imApplication.getAppKey() + ":" + imTokenVerify.getClientId(), generateToken);
JwtToken jwtToken = JwtToken.build(generateToken, secret, jwtProperties.getExpireSecond(), imClient.getClientId(), imTokenVerify.getAppKey());
JwtToken jwtToken = JwtToken.build(generateToken, secret, jwtProperties.getExpireSecond(), imClient.getClientId(), imTokenVerify.getAppKey(), imTokenVerify.getPlatform());
appLoginRedisService.cacheLoginInfo(jwtToken);
TokenVo tokenVo = new TokenVo();
......
......@@ -26,12 +26,12 @@ public class GroupCacheManager extends UserStateListener {
private RedisUtils redisUtils;
@Override
public void onLineEvent(String appKey, String clientId, String longChannelId) {
public void onLineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
}
@Override
public void offlineEvent(String appKey, String clientId, String longChannelId) {
public void offlineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
}
......
......@@ -16,6 +16,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @author hewei123@163.com
......@@ -27,37 +28,16 @@ import java.util.Set;
public class UserStateCacheManager extends UserStateListener {
/**
* 在线状态
*/
public static final Integer ONLINE = 1;
/**
* 离线状态
*/
public static final Integer OFFLINE = 0;
/**
* 维护client端的信息
* ip及平台类型
*/
private static final String CLIENT_INFO = "cli:";
/**
* 维护client的多端数据:
* 值为set集合,netty的channel的id
*/
private static final String CLIENTS = "cis:";
/**
* 用户在线状态fieldKey
*/
private static final String ONLINE_STATUS = "ost";
/**
* 设备类型 web,ios,android,ios,other
*/
private static final String DEVICE_TYPE = "ty";
/**
* 用户所连机器ip的fieldKey
*/
private static final String LAN_IP = "lip";
/**
* 用户公网ip,在公网部署集群需要用到
*/
private static final String PUBLIC_IP = "pip";
......@@ -69,66 +49,68 @@ public class UserStateCacheManager extends UserStateListener {
* 设备ID
*/
private static final String DEVICE_ID = "di";
@Autowired
private RedisUtils redisUtils;
@Override
public void onLineEvent(String appKey, String clientId, String longChannelId) {
log.info("ws用户上线保存redis连接ip:" + InitIp.lAN_IP, ",uid:" + longChannelId);
// 先进行历史数据清理
Set<String> members = redisUtils.getForSetMembers(CLIENTS + appKey + clientId);
if(members != null) {
members.forEach(channelId-> redisUtils.delKey(CLIENT_INFO + channelId));
public void onLineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
log.info("ws用户上线保存redis连接ip: {}, uid: {}", InitIp.lAN_IP, longChannelId);
// 先删除旧的重复的platform
Set<String> platformAndIps = redisUtils.getForSetMembers(getUserStateCacheKey(appKey, clientId));
for(String platformAndIp : platformAndIps) {
String[] split = platformAndIp.split(RedisUtils.SPLIT);
String innerPlatform = split[0];
if(innerPlatform.equals(String.valueOf(platform))) {
redisUtils.removeForSet(getUserStateCacheKey(appKey, clientId), platformAndIp);
}
}
redisUtils.delKey(CLIENTS + appKey + clientId);
redisUtils.addForSet(CLIENTS + appKey + clientId, longChannelId);
redisUtils.hashset(CLIENT_INFO + longChannelId, LAN_IP, InitIp.lAN_IP);
redisUtils.hashset(CLIENT_INFO + longChannelId, ONLINE_STATUS, ONLINE.toString());
redisUtils.addForSet(getUserStateCacheKey(appKey, clientId), platform + RedisUtils.SPLIT + InitIp.lAN_IP, 10, TimeUnit.DAYS);
}
@Override
public void offlineEvent(String appKey, String clientId, String longChannelId) {
public void offlineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
log.info("ws用户离线删除redis key,uid:" + longChannelId);
redisUtils.removeForSet(CLIENTS + appKey + clientId, longChannelId);
// redisUtils.hashset(CLIENT_INFO + appKey + clientId, ONLINE_STATUS_KEY, OFFLINE.toString());
redisUtils.delKey(CLIENT_INFO + longChannelId);
redisUtils.removeForSet(getUserStateCacheKey(appKey, clientId), platform + RedisUtils.SPLIT + InitIp.lAN_IP);
}
/**
* 根据clientId获取redis 的channel信息
* 根据clientId获取在线用户信息
* @param appKey
* @param clientId
* @return
*/
public List<ClientChannelInfo> getIpByClientIdAndOnline(String appKey, String clientId) {
public List<ClientChannelInfo> findOnlineInfosByClientId(String appKey, String clientId) {
// 获取所有 CLIENTS的 longChannelId
Set<String> longChannelIds = redisUtils.getForSetMembers(CLIENTS + appKey + clientId);
// 获取所有 CLIENTS的 <platform>:<ip>
Set<String> platformAndIs = redisUtils.getForSetMembers(getUserStateCacheKey(appKey, clientId));
ArrayList<ClientChannelInfo> clientChannelInfos = new ArrayList<>();
// 根据 longChannelId 查询信息
for (String longChannelId : longChannelIds) {
String onlineStatus = redisUtils.hashget(CLIENT_INFO + longChannelId, ONLINE_STATUS);
String lanIp = redisUtils.hashget(CLIENT_INFO + longChannelId, LAN_IP);
for(String platformAndIp : platformAndIs) {
String[] split = platformAndIp.split(RedisUtils.SPLIT);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo();
clientChannelInfo.setChannelId(longChannelId);
clientChannelInfo.setLanIp(lanIp);
clientChannelInfo.setOnlineStatus(Integer.valueOf(onlineStatus));
clientChannelInfo.setPlatform(Integer.valueOf(split[0]));
clientChannelInfo.setLanIp(split[1]);
clientChannelInfos.add(clientChannelInfo);
// //判断是否在线
// if (StringUtils.isNotBlank(onlineStatus) && Integer.valueOf(onlineStatus).equals(ONLINE)) {
// }
}
return clientChannelInfos;
}
/**
* 判断用户是否在线
* @param appKey
* @param clientId
* @return true表示在线,false表示离线
*/
public boolean isOnline(String appKey, String clientId) {
Set<String> platformAndIs = redisUtils.getForSetMembers(getUserStateCacheKey(appKey, clientId));
return platformAndIs.size() > 0;
}
private String getUserStateCacheKey(String appKey, String clientId) {
return CLIENTS + appKey + RedisUtils.SPLIT + clientId;
}
}
......@@ -27,19 +27,19 @@ public abstract class UserStateListener {
listeners.add(listener);
}
public static void triggerOnlineEvent(String appKey, String clientId, String longChannelId) {
public static void triggerOnlineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
for(UserStateListener listener: listeners) {
listener.onLineEvent(appKey, clientId, longChannelId);
listener.onLineEvent(appKey, clientId, platform, longChannelId);
}
}
public static void triggerOfflineEvent(String appKey, String clientId, String longChannelId) {
public static void triggerOfflineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
for(UserStateListener listener: listeners) {
listener.offlineEvent(appKey, clientId, longChannelId);
listener.offlineEvent(appKey, clientId, platform, longChannelId);
}
}
public abstract void onLineEvent(String appKey, String clientId, String longChannelId);
public abstract void onLineEvent(String appKey, String clientId, Integer platform, String longChannelId);
public abstract void offlineEvent(String appKey, String clientId, String longChannelId);
public abstract void offlineEvent(String appKey, String clientId, Integer platform, String longChannelId);
}
......@@ -4,7 +4,6 @@ import com.wecloud.im.ws.cache.UserStateCacheManager;
import com.wecloud.im.ws.cache.UserStateListener;
import com.wecloud.im.ws.model.ClientInfo;
import com.wecloud.im.ws.model.redis.ClientChannelInfo;
import com.wecloud.rtc.service.RtcService;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
......@@ -12,10 +11,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
......@@ -27,12 +24,9 @@ import java.util.concurrent.ConcurrentHashMap;
public class ChannelManager {
/**
* 本地维护 uid 对应多个 channel的shortID
*/
public static final Map<String, Set<String>> CLIENTS_MAP = new ConcurrentHashMap<>();
/**
* channel的shortID对应client端数据
* client端对应channel的数据
* key: appId-clientId-platform
* value: channel对象
*/
public static final Map<String, ClientInfo> SESSION_INFO_MAP = new ConcurrentHashMap<>();
......@@ -59,26 +53,22 @@ public class ChannelManager {
AttributeKey<String> APP_VERSION = AttributeKey.valueOf("av");
AttributeKey<String> TOKEN = AttributeKey.valueOf("to");
AttributeKey<String> DEVICEID = AttributeKey.valueOf("dc");
AttributeKey<String> PLATFORM = AttributeKey.valueOf("pt");
@Autowired
private RtcService rtcService;
@Autowired
private UserStateCacheManager userStateCacheManager;
public static final AttributeKey<Integer> PLATFORM = AttributeKey.valueOf("pt");
/**
* client上线
* userID绑定channel
*
* @param appKey
* @param clientId
* @param platform
* @param channel
*/
public void online(String appKey, String clientId, NioSocketChannel channel) {
public void online(String appKey, String clientId, Integer platform, NioSocketChannel channel) {
String longChannelId = channel.id().asLongText();
this.putClientsMap(appKey, clientId, longChannelId);
this.putSessionInfoMap(longChannelId, channel);
this.putSessionInfoMap(appKey, clientId, platform, channel);
UserStateListener.triggerOnlineEvent(appKey, clientId, longChannelId);
UserStateListener.triggerOnlineEvent(appKey, clientId, platform, longChannelId);
}
/**
......@@ -86,9 +76,10 @@ public class ChannelManager {
*
* @param appKey
* @param clientId
* @param platform
* @param channelHandlerContext
*/
public void offline(String appKey, String clientId, ChannelHandlerContext channelHandlerContext) {
public void offline(String appKey, String clientId, Integer platform, ChannelHandlerContext channelHandlerContext) {
String longChannelId = channelHandlerContext.channel().id().asLongText();
......@@ -97,62 +88,26 @@ public class ChannelManager {
// 移除本地维护的channel
delSessionInfoMap(longChannelId);
delClientsMap(appKey, clientId, longChannelId);
delSessionInfoMap(appKey, clientId, platform);
UserStateListener.triggerOfflineEvent(appKey, clientId, longChannelId);
UserStateListener.triggerOfflineEvent(appKey, clientId, platform, longChannelId);
}
/**
* 获取用户在线状态
*
* @param toAppKey
* @param toClientId
* @return true:在线, false 不在线
*/
public boolean getOnlineStatus(String toAppKey, String toClientId) {
List<ClientChannelInfo> channelInfos = userStateCacheManager.getIpByClientIdAndOnline(toAppKey, toClientId);
boolean flag = false;
for (ClientChannelInfo channelInfo : channelInfos) {
if (channelInfo.getOnlineStatus().equals(1)) {
return true;
}
}
return flag;
public static String genKeyForSessionInfoMap(String appKey, String clientId, Integer platform) {
return new StringBuilder(appKey).append("-").append(clientId).append("-").append(platform).toString();
}
private void putSessionInfoMap(String longChannelId, NioSocketChannel channel) {
private void putSessionInfoMap(String appKey, String clientId, Integer platform, NioSocketChannel channel) {
ClientInfo clientInfo = new ClientInfo();
clientInfo.setDeviceId("");
clientInfo.setNioSocketChannel(channel);
clientInfo.setToken("");
ChannelManager.SESSION_INFO_MAP.put(longChannelId, clientInfo);
}
private void putClientsMap(String appKey, String clientId, String longChannelId) {
Set<String> set = ChannelManager.CLIENTS_MAP.get(appKey + ":" + clientId);
if (set == null || set.isEmpty()) {
HashSet<String> hashSet = new HashSet<>();
hashSet.add(longChannelId);
ChannelManager.CLIENTS_MAP.put(appKey + ":" + clientId, hashSet);
} else {
set.add(longChannelId);
}
ChannelManager.SESSION_INFO_MAP.put(genKeyForSessionInfoMap(appKey, clientId, platform), clientInfo);
}
private void delSessionInfoMap(String longChannelId) {
ChannelManager.SESSION_INFO_MAP.remove(longChannelId);
private void delSessionInfoMap(String appKey, String clientId, Integer platform) {
ChannelManager.SESSION_INFO_MAP.remove(genKeyForSessionInfoMap(appKey, clientId, platform));
}
private void delClientsMap(String appKey, String clientId, String longChannelId) {
Set<String> set = ChannelManager.CLIENTS_MAP.get(appKey + ":" + clientId);
if (set != null) {
set.remove(longChannelId);
}
}
}
......@@ -9,7 +9,10 @@ import java.io.Serializable;
*/
@Data
public class ClientChannelInfo extends ClientConnectionInfo implements Serializable {
private String channelId;
private Integer onlineStatus;
private String appkey;
private String clientId;
private Integer platform;
}
......@@ -105,19 +105,18 @@ public class ChannelSender {
String msgJson = JsonUtils.encodeJson(responseModel);
List<ClientChannelInfo> channelInfos = userStateCacheManager.getIpByClientIdAndOnline(toAppKey, toClientId);
List<ClientChannelInfo> channelInfos = userStateCacheManager.findOnlineInfosByClientId(toAppKey, toClientId);
// 先进行分类,key是ip地址,value是channel的列表
// 一个用户存在多端的情况,所以先进行分类,key是ip地址,value是channel的列表
Map<String, List<ClientChannelInfo>> ipChannels = channelInfos.stream().collect(Collectors.groupingBy(ClientChannelInfo::getLanIp));
for (Map.Entry<String, List<ClientChannelInfo>> channelInfoEntry : ipChannels.entrySet()) {
// 是否为当前机器的ip
if (InitIp.lAN_IP.equals(channelInfoEntry.getKey())) {
// 调用本地下发
for(ClientChannelInfo clientChannelInfo : channelInfoEntry.getValue()) {
this.sendMsgLocal(clientChannelInfo.getChannelId(), msgJson);
this.sendMsgLocal(toAppKey, toClientId, clientChannelInfo.getPlatform(), msgJson);
}
continue;
}
......@@ -127,7 +126,7 @@ public class ChannelSender {
for(ClientChannelInfo clientChannelInfo : channelInfoEntry.getValue()) {
Address address = new Address(clientChannelInfo.getLanIp(), 20881);
RpcContext.getContext().setObjectAttachment("address", address);
routerSendService.sendMsgRemote(clientChannelInfo.getChannelId(), msgJson);
routerSendService.sendMsgRemote(toAppKey, toClientId, clientChannelInfo.getPlatform(), msgJson);
}
}
......@@ -150,13 +149,16 @@ public class ChannelSender {
/**
* 向指定channelId下发数据,限定本机有的channel
*
* @param toChannelId
* @param appKey
* @param clientId
* @param platform
* @param msg
* @return
*/
public boolean sendMsgLocal(String toChannelId, String msg) {
public boolean sendMsgLocal(String appKey, String clientId, Integer platform, String msg) {
ClientInfo clientInfo = ChannelManager.SESSION_INFO_MAP.get(toChannelId);
String key = ChannelManager.genKeyForSessionInfoMap(appKey, clientId, platform);
ClientInfo clientInfo = ChannelManager.SESSION_INFO_MAP.get(key);
if (clientInfo == null) {
return false;
}
......
......@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class IosPush {
// private static Map<String, String> APNS_CACHE = new HashMap<>();
private final static EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup(4);
private static final Logger logger = LoggerFactory.getLogger(IosPush.class);
/**
......@@ -288,11 +288,10 @@ public class IosPush {
public static ApnsClient getAPNSConnect(InputStream inputStream, Boolean productFlag, String certificatePassword) {
ApnsClient apnsXTClient = null;
try {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(4);
String environmentHost = productFlag ? ApnsClientBuilder.PRODUCTION_APNS_HOST : ApnsClientBuilder.DEVELOPMENT_APNS_HOST;
apnsXTClient = new ApnsClientBuilder().setApnsServer(environmentHost)
.setClientCredentials(inputStream, certificatePassword)
.setConcurrentConnections(4).setEventLoopGroup(eventLoopGroup).build();
.setConcurrentConnections(4).setEventLoopGroup(EVENT_LOOP_GROUP).build();
} catch (Exception e) {
logger.error("ios get push apns client failed!", e);
}
......@@ -310,12 +309,11 @@ public class IosPush {
public static ApnsClient getAPNSConnect(String apnsCertificatePath, Boolean productFlag, String certificatePassword) {
ApnsClient apnsXTClient = null;
try {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(4);
String environmentHost = productFlag ? ApnsClientBuilder.PRODUCTION_APNS_HOST : ApnsClientBuilder.DEVELOPMENT_APNS_HOST;
InputStream resourceAsStream = ClassLoader.getSystemClassLoader().getResourceAsStream(apnsCertificatePath);
apnsXTClient = new ApnsClientBuilder().setApnsServer(environmentHost)
.setClientCredentials(resourceAsStream, certificatePassword)
.setConcurrentConnections(4).setEventLoopGroup(eventLoopGroup).build();
.setConcurrentConnections(4).setEventLoopGroup(EVENT_LOOP_GROUP).build();
} catch (Exception e) {
logger.error("ios get push apns client failed!", e);
}
......
package com.wecloud.im.ws.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.sql.Time;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @author hewei123@163.com
......@@ -22,6 +28,9 @@ public class RedisUtils {
@Autowired
private StringRedisTemplate redisTemplate;
public final static String SPLIT = ":";
public StringRedisTemplate redisTemplate() {
return redisTemplate;
}
......@@ -32,9 +41,10 @@ public class RedisUtils {
*
* @param key
* @param value
* @param timeout 超时
*/
public void addKey(String key, String value) {
redisTemplate.opsForValue().set(key, value);
public void addKey(String key, String value, Duration timeout) {
redisTemplate.opsForValue().set(key, value, timeout);
}
/**
......@@ -85,8 +95,16 @@ public class RedisUtils {
* @param field
* @param value
*/
public void hashset(String key, String field, String value) {
redisTemplate.opsForHash().put(key, field, value);
public void hashset(String key, String field, String value, long timeout, TimeUnit timeUnit) {
redisTemplate.execute(new SessionCallback<List<Object>>() {
@Override
public List<Object> execute(RedisOperations redisOperations) throws DataAccessException {
redisOperations.multi();
redisOperations.opsForHash().put(key, field, value);
redisOperations.expire(key, timeout, timeUnit);
return redisOperations.exec();
}
});
}
/**
......@@ -156,8 +174,16 @@ public class RedisUtils {
* @param key
* @param value
*/
public void addForSet(String key, String value) {
redisTemplate.opsForSet().add(key, value);
public void addForSet(String key, String value, long timeout, TimeUnit timeUnit) {
redisTemplate.execute(new SessionCallback<List<Object>>() {
@Override
public List<Object> execute(RedisOperations redisOperations) throws DataAccessException {
redisOperations.multi();
redisOperations.opsForSet().add(key, value);
redisOperations.expire(key, timeout, timeUnit);
return redisOperations.exec();
}
});
}
/**
......@@ -178,5 +204,4 @@ public class RedisUtils {
return redisTemplate.opsForSet().members(key);
}
}
......@@ -10,9 +10,11 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Service
public class MangerRtcCacheServiceImpl implements MangerRtcCacheService {
......@@ -68,15 +70,15 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService {
// --- 保存频道信息
String channelKey = String.format(RtcRedisKey.RTC_CHANNEL_INFO, rtcChannelId);
redisUtils.addKey(channelKey, rtcChannelInfoJson);
redisUtils.addKey(channelKey, rtcChannelInfoJson, Duration.ofDays(10));
//用户当前在线的频道ID
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId);
redisUtils.addKey(userJoinChannelKey, rtcChannelId.toString());
redisUtils.addKey(userJoinChannelKey, rtcChannelId.toString(), Duration.ofDays(10));
//频道中存在的用户
String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId);
redisUtils.addForSet(rtcChannelUsers, clientId);
redisUtils.addForSet(rtcChannelUsers, clientId, 10, TimeUnit.DAYS);
}
......@@ -85,11 +87,11 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService {
//用户当前在线的频道ID
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId);
redisUtils.addKey(userJoinChannelKey, rtcChannelId.toString());
redisUtils.addKey(userJoinChannelKey, rtcChannelId.toString(), Duration.ofDays(10));
//频道中存在的用户
String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId);
redisUtils.addForSet(rtcChannelUsers, clientId);
redisUtils.addForSet(rtcChannelUsers, clientId, 10, TimeUnit.DAYS);
}
......
......@@ -12,6 +12,7 @@ import com.wecloud.im.param.rtc.RejectRtcChannelParam;
import com.wecloud.im.param.rtc.SdpForwardParam;
import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientService;
import com.wecloud.im.ws.cache.UserStateCacheManager;
import com.wecloud.im.ws.cache.UserStateListener;
import com.wecloud.im.ws.manager.ChannelManager;
import com.wecloud.rtc.entity.response.RtcCallResponse;
......@@ -51,15 +52,15 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
private ImClientService imClientService;
@Autowired
private ChannelManager channelManager;
private UserStateCacheManager userStateCacheManager;
@Override
public void onLineEvent(String appKey, String clientId, String longChannelId) {
public void onLineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
// nothing need to do
}
@Override
public void offlineEvent(String appKey, String clientId, String longChannelId) {
public void offlineEvent(String appKey, String clientId, Integer platform, String longChannelId) {
// 根据appKey查询appid
ImApplication imApplication = imApplicationService.getOneByAppKey(appKey);
ImClient client = imClientService.getCacheImClient(imApplication.getId(), clientId);
......@@ -85,7 +86,7 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
ImApplication imApplication = imApplicationService.getById(client.getFkAppid());
// 判断发起方必须在线
boolean onlineStatus = channelManager.getOnlineStatus(imApplication.getAppKey(), client.getClientId());
boolean onlineStatus = userStateCacheManager.isOnline(imApplication.getAppKey(), client.getClientId());
if (!onlineStatus) {
log.info("发起方必须在线" + imApplication.getAppKey() + client.getClientId());
ApiResult.fail();
......@@ -217,7 +218,7 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
ImApplication imApplication = imApplicationService.getById(client.getFkAppid());
// 判断发起方必须在线
boolean onlineStatus = channelManager.getOnlineStatus(imApplication.getAppKey(), client.getClientId());
boolean onlineStatus = userStateCacheManager.isOnline(imApplication.getAppKey(), client.getClientId());
if (!onlineStatus) {
log.info("发起方必须在线" + imApplication.getAppKey() + client.getClientId());
ApiResult.fail();
......@@ -258,7 +259,7 @@ public class RtcServiceImpl extends UserStateListener implements RtcService {
ImApplication imApplication = imApplicationService.getById(client.getFkAppid());
// 判断发起方必须在线
boolean onlineStatus = channelManager.getOnlineStatus(imApplication.getAppKey(), client.getClientId());
boolean onlineStatus = userStateCacheManager.isOnline(imApplication.getAppKey(), client.getClientId());
if (!onlineStatus) {
log.info("发起方必须在线" + imApplication.getAppKey() + client.getClientId());
ApiResult.fail();
......
# wecloud-im 前端Websocket对接文档
# wecloud-im 前端Websocket对接文档
......@@ -147,7 +147,7 @@ ws://localhost:8899/ws?token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJ3ZW
| 字段名 | 字段类型 | 是否可空 | 说明 |
| -------- | -------- | -------- | ------------------------------------- |
| token | String | 否 | token |
| platform | int | 是 | 设备平台类型: 1 安卓; 2 ios; 3 web |
| platform | int | 是 | 设备平台类型: 1 web, 2 安卓, 3 ios, 4 pc-win, 5 pc-macOs |
#### 不同环境连接
......
......@@ -76,7 +76,7 @@ public class JwtFilter extends AuthenticatingFilter {
if (jwtToken == null) {
throw new AuthenticationException("Redis Token不存在,token:" + token);
}
return JwtToken.build(token, jwtToken.getSalt(), jwtToken.getExpireSecond(), jwtToken.getClientId(), jwtToken.getAppKey());
return JwtToken.build(token, jwtToken.getSalt(), jwtToken.getExpireSecond(), jwtToken.getClientId(), jwtToken.getAppKey(), jwtToken.getPlatform());
}
......
......@@ -77,7 +77,12 @@ public class JwtToken implements HostAuthenticationToken {
private String credentials;
public static JwtToken build(String token, String salt, long expireSecond, String clientId, String appKey) {
/**
* 1 web, 2 安卓, 3 ios, 4 pc-win, 5 pc-macOs
*/
private Integer platform;
public static JwtToken build(String token, String salt, long expireSecond, String clientId, String appKey, Integer platform) {
DecodedJWT decodedJwt = JwtUtil.getJwtInfo(token);
Date createDate = decodedJwt.getIssuedAt();
Date expireDate = decodedJwt.getExpiresAt();
......@@ -90,6 +95,7 @@ public class JwtToken implements HostAuthenticationToken {
jwtToken.setCreateDate(createDate);
jwtToken.setExpireSecond(expireSecond);
jwtToken.setExpireDate(expireDate);
jwtToken.setPlatform(platform);
return jwtToken;
}
......
......@@ -71,9 +71,10 @@ public class JwtUtil {
* @param clientId clientId
* @param salt 盐值
* @param expireDuration 过期时间和单位
* @param platform 平台
* @return token
*/
public static String generateToken(String clientId, String appKey, String salt, Duration expireDuration) {
public static String generateToken(String clientId, String appKey, String salt, Duration expireDuration, Integer platform) {
try {
if (StringUtils.isBlank(clientId)) {
log.error("username不能为空");
......@@ -106,6 +107,7 @@ public class JwtUtil {
.withClaim(CommonConstant.CLIENT_ID, clientId)
// APP_KEY
.withClaim(CommonConstant.APP_KEY, appKey)
.withClaim(CommonConstant.PLATFORM, platform)
// jwt唯一id
.withJWTId(UUIDUtil.getUuid())
// 签发人
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment