Commit fdb7b594 by hweeeeeei

多端登陆同步redis

parent d9ac2d80
...@@ -4,14 +4,12 @@ import cn.hutool.core.thread.ThreadFactoryBuilder; ...@@ -4,14 +4,12 @@ import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.wecloud.im.ws.model.WsConstants; import com.wecloud.im.ws.model.WsConstants;
import com.wecloud.im.ws.receive.ReadWsData; import com.wecloud.im.ws.receive.ReadWsData;
import com.wecloud.im.ws.service.MangerChannelService; import com.wecloud.im.ws.service.MangerChannelService;
import com.wecloud.rtc.service.RtcService;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
...@@ -37,9 +35,6 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -37,9 +35,6 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Resource @Resource
private ReadWsData readWsData; private ReadWsData readWsData;
@Autowired
private RtcService rtcService;
@Resource @Resource
private MangerChannelService mangerChannelService; private MangerChannelService mangerChannelService;
...@@ -153,7 +148,7 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -153,7 +148,7 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) { public void handlerAdded(ChannelHandlerContext ctx) {
String userIdByChannel = mangerChannelService.getInfoByChannel(ctx); String userIdByChannel = mangerChannelService.getStringInfoByChannel(ctx);
log.info("连接WS成功handlerAdded,uid:" + userIdByChannel + "," + ",channelId:" + ctx.channel().id().asLongText()); log.info("连接WS成功handlerAdded,uid:" + userIdByChannel + "," + ",channelId:" + ctx.channel().id().asLongText());
} }
...@@ -176,15 +171,8 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -176,15 +171,8 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Override @Override
public void handlerRemoved(ChannelHandlerContext ctx) { public void handlerRemoved(ChannelHandlerContext ctx) {
String appKey = ctx.channel().attr(MangerChannelService.APP_KEY).get();
String clientId = ctx.channel().attr(MangerChannelService.CLIENT_ID).get();
String userIdByChannel = mangerChannelService.getInfoByChannel(ctx);
log.info("uid:" + userIdByChannel + "," + "handlerRemoved" + ",channelId:" + ctx.channel().id().asLongText());
// 关掉连接 // 关掉连接
ctx.close(); mangerChannelService.offline(ctx);
// rtc清空缓存
rtcService.clientOffline(appKey, clientId);
} }
} }
...@@ -134,7 +134,7 @@ public class NettyApiRequest { ...@@ -134,7 +134,7 @@ public class NettyApiRequest {
ctx.pipeline().addLast("appImHandler", appImReadHandler); ctx.pipeline().addLast("appImHandler", appImReadHandler);
// 保存用户上下文对象 // 保存用户上下文对象
appUserChannelsService.put(appKey, clientId, (NioSocketChannel) ctx.channel()); appUserChannelsService.online(appKey, clientId, (NioSocketChannel) ctx.channel());
//移除当前api处理handler, 不再参与长连接处理 //移除当前api处理handler, 不再参与长连接处理
ctx.pipeline().remove("SingleHttpRequestHandler"); ctx.pipeline().remove("SingleHttpRequestHandler");
......
package com.wecloud.im.ws.cache; package com.wecloud.im.ws.cache;
import com.wecloud.im.ws.model.redis.ClientConnectionInfo;
import com.wecloud.im.ws.utils.InitIp; import com.wecloud.im.ws.utils.InitIp;
import com.wecloud.im.ws.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Map;
/** /**
* @author hewei123@163.com * @author hewei123@163.com
* @Description 用户与redis绑定 * @Description 用户与redis绑定
...@@ -13,81 +18,98 @@ import org.springframework.stereotype.Service; ...@@ -13,81 +18,98 @@ import org.springframework.stereotype.Service;
@Slf4j @Slf4j
public class UserCache { public class UserCache {
// /**
// * 在线状态
// */
// public static final Integer ONLINE = 1;
// /**
// * 离线状态
// */
// public static final Integer OFFLINE = 0;
/** /**
* key name * 在线状态
* websocket user info
*/ */
private static final String KEY_BASE = "wui:"; public static final Integer ONLINE = 1;
/**
// /** * 离线状态
// * 用户在线状态fieldKey */
// */ public static final Integer OFFLINE = 0;
// private static final String ONLINE_STATUS_KEY = "ost"; /**
* 维护client端的信息
* ip及平台类型
*/
private static final String CLIENT_INFO = "cli:";
/**
* 维护client的多端数据:
* 值为set集合,netty的channel的id
*/
private static final String CLIENTS = "cis:";
/**
* 用户在线状态fieldKey
*/
private static final String ONLINE_STATUS_KEY = "ost";
/**
* 设备类型 web,ios,android,ios,other
*/
private static final String DEVICE_TYPE = "ty";
/** /**
* 用户所连机器ip的fieldKey * 用户所连机器ip的fieldKey
*/ */
private static final String LAN_IP = "lip"; private static final String LAN_IP = "lip";
/** /**
* 用户公网ip,在公网部署集群需要用到 * 用户公网ip,在公网部署集群需要用到
*/ */
private static final String PUBLIC_IP = "pip"; private static final String PUBLIC_IP = "pip";
private static final String type = "ty"; /**
private static final String deviceToken = "dt"; * 推送token
*/
private static final String DEVICE_TOKEN = "dt";
/**
* 设备ID
*/
private static final String DEVICE_ID = "di";
@Autowired
private RedisUtils redisUtils;
/** /**
* 用户上线绑定机器ip * 用户上线绑定机器ip
* *
* @param id * @param longChannelId
*/ */
public void online(String id) { public void online(String appKey, String clientId, String longChannelId) {
log.info("ws用户上线保存redis连接ip:" + InitIp.lAN_IP, ",uid:"); log.info("ws用户上线保存redis连接ip:" + InitIp.lAN_IP, ",uid:" + longChannelId);
// redisUtils.hset(KEY_BASE + id, PRIVATE_IP_KEY, lAN_IP); redisUtils.addForSet(CLIENTS, longChannelId);
// redisUtils.hset(KEY_BASE + id, ONLINE_STATUS_KEY, String.valueOf(ONLINE));
redisUtils.hashset(CLIENT_INFO + appKey + clientId, LAN_IP, InitIp.lAN_IP);
redisUtils.hashset(CLIENT_INFO + appKey + clientId, ONLINE_STATUS_KEY, ONLINE.toString());
} }
/** /**
* 用户下线删除绑定机器ip * 用户下线删除绑定机器ip
* *
* @param id * @param longChannelId
*/
public void offline(String appKey, String clientId, String longChannelId) {
log.info("ws用户离线删除redis key,uid:" + longChannelId);
redisUtils.removeForSet(CLIENTS, longChannelId);
// redisUtils.hashset(CLIENT_INFO + appKey + clientId, ONLINE_STATUS_KEY, OFFLINE.toString());
redisUtils.delKey(CLIENT_INFO + appKey + clientId);
}
/**
* 根据用户id获取存在redis中的数据 例如绑定的服务器ip地址
*
* @param clientId
* @return
*/ */
public void offline(String id) { public ClientConnectionInfo getById(String clientId) {
log.info("ws用户离线删除redis key,uid:" + id);
Map<String, String> hgetll = redisUtils.hashgetll(CLIENT_INFO + clientId);
if (hgetll.isEmpty()) {
return null;
}
ClientConnectionInfo appHashValueModel = new ClientConnectionInfo();
appHashValueModel.setLanIp(hgetll.get(LAN_IP));
// appHashValueModel.(Integer.parseInt(hgetll.get(ONLINE_STATUS_KEY)));
return appHashValueModel;
// redisUtils.kdel(KEY_BASE + id);
} }
// /**
// * 根据用户id获取存在redis中的数据 例如绑定的服务器ip地址
// *
// * @param id
// * @return
// */
// public AppHashValueModel getById(String id) {
//
// Map<String, String> hgetll = redisUtils.hgetll(KEY_BASE + id);
// if (hgetll.isEmpty()) {
// return null;
// }
// AppHashValueModel appHashValueModel = new AppHashValueModel();
// appHashValueModel.setLanIp(hgetll.get(PRIVATE_IP_KEY));
// appHashValueModel.setOnlineStatus(Integer.parseInt(hgetll.get(ONLINE_STATUS_KEY)));
// return appHashValueModel;
//
// }
//
} }
...@@ -19,15 +19,15 @@ public class ClientInfo implements Serializable { ...@@ -19,15 +19,15 @@ public class ClientInfo implements Serializable {
*/ */
private String deviceId; private String deviceId;
/** // /**
* 设备类型 // * 设备类型
*/ // */
private String deviceType; // private String deviceType;
//
/** // /**
* 设备推送token // * 设备推送token
*/ // */
private String deviceToken; // private String deviceToken;
/** /**
* 登陆token * 登陆token
......
...@@ -8,7 +8,7 @@ import java.io.Serializable; ...@@ -8,7 +8,7 @@ import java.io.Serializable;
* 维护client多端的ip及平台类型: * 维护client多端的ip及平台类型:
*/ */
@Data @Data
public class UserConnectionInfo implements Serializable { public class ClientConnectionInfo implements Serializable {
private String pubIp; private String pubIp;
private String lanIp; private String lanIp;
private String type; private String type;
......
...@@ -24,12 +24,12 @@ public interface MangerChannelService { ...@@ -24,12 +24,12 @@ public interface MangerChannelService {
Map<String, NioSocketChannel> CHANNEL_MAP = new ConcurrentHashMap<>(); Map<String, NioSocketChannel> CHANNEL_MAP = new ConcurrentHashMap<>();
/** /**
* 本地维护 uid 对应 channel_shortID * 本地维护 uid 对应多个 channel的shortID
*/ */
Map<String, Set<String>> CLIENTS_MAP = new ConcurrentHashMap<>(); Map<String, Set<String>> CLIENTS_MAP = new ConcurrentHashMap<>();
/** /**
* channel_shortID对应client端数据 * channelshortID对应client端数据
*/ */
Map<String, ClientInfo> SESSION_INFO_MAP = new ConcurrentHashMap<>(); Map<String, ClientInfo> SESSION_INFO_MAP = new ConcurrentHashMap<>();
...@@ -66,17 +66,19 @@ public interface MangerChannelService { ...@@ -66,17 +66,19 @@ public interface MangerChannelService {
NioSocketChannel get(String appKey, String clientId); NioSocketChannel get(String appKey, String clientId);
/** /**
* client上线
* userID绑定channel * userID绑定channel
* *
* @param channel * @param channel
*/ */
void put(String appKey, String clientId, NioSocketChannel channel); void online(String appKey, String clientId, NioSocketChannel channel);
/** /**
* 移除channel * 下线移除channel
*
* @param channelHandlerContext * @param channelHandlerContext
*/ */
void remove(ChannelHandlerContext channelHandlerContext); void offline(ChannelHandlerContext channelHandlerContext);
/** /**
* 根据channel返回客户端key和id * 根据channel返回客户端key和id
...@@ -84,7 +86,7 @@ public interface MangerChannelService { ...@@ -84,7 +86,7 @@ public interface MangerChannelService {
* @param channelHandlerContext * @param channelHandlerContext
* @return * @return
*/ */
String getInfoByChannel(ChannelHandlerContext channelHandlerContext); String getStringInfoByChannel(ChannelHandlerContext channelHandlerContext);
/** /**
* 下发数据 * 下发数据
......
...@@ -3,7 +3,7 @@ package com.wecloud.im.ws.service.impl; ...@@ -3,7 +3,7 @@ package com.wecloud.im.ws.service.impl;
import com.wecloud.im.ws.cache.UserCache; import com.wecloud.im.ws.cache.UserCache;
import com.wecloud.im.ws.model.ClientInfo; import com.wecloud.im.ws.model.ClientInfo;
import com.wecloud.im.ws.service.MangerChannelService; import com.wecloud.im.ws.service.MangerChannelService;
import io.netty.channel.Channel; import com.wecloud.rtc.service.RtcService;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
...@@ -25,6 +25,8 @@ import java.util.Set; ...@@ -25,6 +25,8 @@ import java.util.Set;
@Slf4j @Slf4j
public class MangerChannelServiceImpl implements MangerChannelService { public class MangerChannelServiceImpl implements MangerChannelService {
@Autowired
private RtcService rtcService;
// private final static ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder() // private final static ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder()
// .setNamePrefix("rpcWrite-").build(); // .setNamePrefix("rpcWrite-").build();
...@@ -37,7 +39,7 @@ public class MangerChannelServiceImpl implements MangerChannelService { ...@@ -37,7 +39,7 @@ public class MangerChannelServiceImpl implements MangerChannelService {
* <p> * <p>
* 由于来自远程端调用下发数据 如果是群聊1000人群则调用1000次 为不要占用太多资源 需要排队下发 * 由于来自远程端调用下发数据 如果是群聊1000人群则调用1000次 为不要占用太多资源 需要排队下发
* 经过并发测试 200并发1000人群消息 需要调用200x1000=20w次 考虑单机cpu性能还要顾及本机api业务 设置阻塞队列 * 经过并发测试 200并发1000人群消息 需要调用200x1000=20w次 考虑单机cpu性能还要顾及本机api业务 设置阻塞队列
* 为避免过多占用本地io线程导致response慢,设置LinkedBlockingQueue数量多可以避免抢占,TODO (队列数量需要测试调试到最优数量 ) * 为避免过多占用本地io线程导致response慢,设置LinkedBlockingQueue数量多可以避免抢占, (队列数量需要测试调试到最优数量 )
* 最大线程数量不要设置太多 数量、优先级一定要比本地io线程低级 * 最大线程数量不要设置太多 数量、优先级一定要比本地io线程低级
* *
* *
...@@ -62,7 +64,7 @@ public class MangerChannelServiceImpl implements MangerChannelService { ...@@ -62,7 +64,7 @@ public class MangerChannelServiceImpl implements MangerChannelService {
} }
@Override @Override
public void put(String appKey, String clientId, NioSocketChannel channel) { public void online(String appKey, String clientId, NioSocketChannel channel) {
// 断掉旧链接 // 断掉旧链接
NioSocketChannel nioSocketChannel = get(appKey, clientId); NioSocketChannel nioSocketChannel = get(appKey, clientId);
...@@ -73,54 +75,70 @@ public class MangerChannelServiceImpl implements MangerChannelService { ...@@ -73,54 +75,70 @@ public class MangerChannelServiceImpl implements MangerChannelService {
this.CHANNEL_MAP.put(appKey + clientId, channel); this.CHANNEL_MAP.put(appKey + clientId, channel);
String ShortChannelId = channel.id().asShortText(); String longChannelId = channel.id().asLongText();
this.putClientsMap(appKey, clientId, ShortChannelId); this.putClientsMap(appKey, clientId, longChannelId);
this.putSessionInfoMap(ShortChannelId, channel); this.putSessionInfoMap(longChannelId, channel);
userCache.online(appKey, clientId, longChannelId);
} }
void putSessionInfoMap(String ShortChannelId, NioSocketChannel channel) { void putSessionInfoMap(String longChannelId, NioSocketChannel channel) {
ClientInfo clientInfo = new ClientInfo(); ClientInfo clientInfo = new ClientInfo();
clientInfo.setDeviceId(""); clientInfo.setDeviceId("");
clientInfo.setDeviceType("");
clientInfo.setDeviceToken("");
clientInfo.setNioSocketChannel(channel); clientInfo.setNioSocketChannel(channel);
clientInfo.setToken(""); clientInfo.setToken("");
this.SESSION_INFO_MAP.put(ShortChannelId, clientInfo); this.SESSION_INFO_MAP.put(longChannelId, clientInfo);
} }
void putClientsMap(String appKey, String clientId, String ShortChannelId) { void putClientsMap(String appKey, String clientId, String longChannelId) {
Set<String> set = this.CLIENTS_MAP.get(appKey + ":" + clientId); Set<String> set = this.CLIENTS_MAP.get(appKey + ":" + clientId);
if (set.isEmpty()) { if (set == null || set.isEmpty()) {
HashSet<String> shortChannelId = new HashSet<>(); HashSet<String> hashSet = new HashSet<>();
shortChannelId.add(ShortChannelId); hashSet.add(longChannelId);
this.CLIENTS_MAP.put(appKey + ":" + clientId, shortChannelId); this.CLIENTS_MAP.put(appKey + ":" + clientId, hashSet);
} else { } else {
set.add(ShortChannelId); set.add(longChannelId);
} }
} }
@Override void delSessionInfoMap(String longChannelId) {
public void remove(ChannelHandlerContext channelHandlerContext) { this.SESSION_INFO_MAP.remove(longChannelId);
Channel channel = channelHandlerContext.channel();
/*
channel != null : 通道不能为空
!channel.isActive() 通道不能是活跃状态的
!channel.isOpen() 通道不能是打开状态的
// !channel.isWritable() 通道是否可写数据
*/
if (channel != null && !channel.isActive() && !channel.isOpen()) {
String userId = String.valueOf(this.getInfoByChannel(channelHandlerContext));
userCache.offline(userId);
log.info("不活跃remove,uid:" + userId);
this.CHANNEL_MAP.remove(userId);
channelHandlerContext.channel().close();
} }
void delClientsMap(String appKey, String clientId, String longChannelId) {
Set<String> set = this.CLIENTS_MAP.get(appKey + ":" + clientId);
if (set != null) {
set.remove(longChannelId);
}
}
@Override
public void offline(ChannelHandlerContext channelHandlerContext) {
String appKey = channelHandlerContext.channel().attr(MangerChannelService.APP_KEY).get();
String clientId = channelHandlerContext.channel().attr(MangerChannelService.CLIENT_ID).get();
String userIdByChannelString = this.getStringInfoByChannel(channelHandlerContext);
String longChannelId = channelHandlerContext.channel().id().asLongText();
log.info("uid:" + userIdByChannelString + "," + "handlerRemoved" + ",channelId:" + longChannelId);
// 关掉连接
channelHandlerContext.close();
// rtc清空缓存
rtcService.clientOffline(appKey, clientId);
// 移除本地维护的channel
delSessionInfoMap(longChannelId);
delClientsMap(appKey, clientId, longChannelId);
userCache.offline(appKey, clientId, longChannelId);
} }
@Override @Override
public String getInfoByChannel(ChannelHandlerContext channelHandlerContext) { public String getStringInfoByChannel(ChannelHandlerContext channelHandlerContext) {
return "APP_KEY:" + channelHandlerContext.channel().attr(MangerChannelService.APP_KEY).get() return "APP_KEY:" + channelHandlerContext.channel().attr(MangerChannelService.APP_KEY).get()
+ ",CLIENT_ID:" + channelHandlerContext.channel().attr(MangerChannelService.CLIENT_ID).get(); + ",CLIENT_ID:" + channelHandlerContext.channel().attr(MangerChannelService.CLIENT_ID).get();
} }
...@@ -129,9 +147,8 @@ public class MangerChannelServiceImpl implements MangerChannelService { ...@@ -129,9 +147,8 @@ public class MangerChannelServiceImpl implements MangerChannelService {
// public Boolean isOnLocal(Long userId) { // public Boolean isOnLocal(Long userId) {
// NioSocketChannel nioSocketChannel = this.get(String.valueOf(userId)); // NioSocketChannel nioSocketChannel = this.get(String.valueOf(userId));
// return null != nioSocketChannel; // return null != nioSocketChannel;
//
// } // }
//
/** /**
* 获取用户在线状态 * 获取用户在线状态
* *
......
package com.wecloud.im.ws.strategy.concrete; //package com.wecloud.im.ws.strategy.concrete;
//
import com.fasterxml.jackson.core.JsonProcessingException; //import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.json.JsonMapper; //import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wecloud.im.service.ImApplicationService; //import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientBlacklistService; //import com.wecloud.im.service.ImClientBlacklistService;
import com.wecloud.im.service.ImClientService; //import com.wecloud.im.service.ImClientService;
import com.wecloud.im.service.ImConversationMembersService; //import com.wecloud.im.service.ImConversationMembersService;
import com.wecloud.im.service.ImInboxService; //import com.wecloud.im.service.ImInboxService;
import com.wecloud.im.service.ImMessageService; //import com.wecloud.im.service.ImMessageService;
import com.wecloud.im.ws.annotation.CmdTypeAnnotation; //import com.wecloud.im.ws.annotation.CmdTypeAnnotation;
import com.wecloud.im.ws.enums.WsRequestCmdEnum; //import com.wecloud.im.ws.enums.WsRequestCmdEnum;
import com.wecloud.im.ws.model.request.ReceiveModel; //import com.wecloud.im.ws.model.request.ReceiveModel;
import com.wecloud.im.ws.sender.PushTask; //import com.wecloud.im.ws.sender.PushTask;
import com.wecloud.im.ws.service.WriteDataService; //import com.wecloud.im.ws.service.WriteDataService;
import com.wecloud.im.ws.strategy.ImCmdAbstract; //import com.wecloud.im.ws.strategy.ImCmdAbstract;
import com.wecloud.rtc.entity.RtcSubCmd; //import com.wecloud.rtc.entity.RtcSubCmd;
import io.netty.channel.ChannelHandlerContext; //import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j; //import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; //import org.springframework.stereotype.Service;
//
/** ///**
* 处理RTC信令消息 // * 处理RTC信令消息
*/ // */
@CmdTypeAnnotation(type = WsRequestCmdEnum.SINGLE_RTC) //@CmdTypeAnnotation(type = WsRequestCmdEnum.SINGLE_RTC)
@Service //@Service
@Slf4j //@Slf4j
public class SingleRtcConcrete extends ImCmdAbstract { //public class SingleRtcConcrete extends ImCmdAbstract {
//
//
private static final JsonMapper JSON_MAPPER = new JsonMapper(); // private static final JsonMapper JSON_MAPPER = new JsonMapper();
//
@Autowired // @Autowired
private ImClientBlacklistService imClientBlacklistService; // private ImClientBlacklistService imClientBlacklistService;
//
@Autowired // @Autowired
private WriteDataService writeDataService; // private WriteDataService writeDataService;
//
@Autowired // @Autowired
private ImMessageService imMessageService; // private ImMessageService imMessageService;
//
@Autowired // @Autowired
private ImInboxService imInboxService; // private ImInboxService imInboxService;
//
@Autowired // @Autowired
private ImApplicationService imApplicationService; // private ImApplicationService imApplicationService;
//
@Autowired // @Autowired
private ImConversationMembersService imConversationMembersService; // private ImConversationMembersService imConversationMembersService;
//
@Autowired // @Autowired
private ImClientService imClientService; // private ImClientService imClientService;
//
@Autowired // @Autowired
private PushTask systemPush; // private PushTask systemPush;
//
@Override // @Override
public void process(ReceiveModel receiveModel, ChannelHandlerContext ctx, String data, String appKey, String clientId) throws JsonProcessingException { // public void process(ReceiveModel receiveModel, ChannelHandlerContext ctx, String data, String appKey, String clientId) throws JsonProcessingException {
//
// 指令判空 // // 指令判空
if (receiveModel.getData().get(RtcSubCmd.SUB_CMD) == null) { // if (receiveModel.getData().get(RtcSubCmd.SUB_CMD) == null) {
return; // return;
} // }
//
String cmd = receiveModel.getData().get(RtcSubCmd.SUB_CMD).toString(); // String cmd = receiveModel.getData().get(RtcSubCmd.SUB_CMD).toString();
switch (cmd) { // switch (cmd) {
// //创建频道 //// //创建频道
// case RtcSubCmd.CREATE: //// case RtcSubCmd.CREATE:
// break; //// break;
// ////
// //加入频道 //// //加入频道
// case RtcSubCmd.JOIN: //// case RtcSubCmd.JOIN:
// break; //// break;
// ////
// //拒绝加入频道 //// //拒绝加入频道
// case RtcSubCmd.REJECT: //// case RtcSubCmd.REJECT:
// break; //// break;
// ////
// //SDP数据转发 //// //SDP数据转发
// case RtcSubCmd.SDP: //// case RtcSubCmd.SDP:
// break; //// break;
// ////
// //主动挂断(离开频道) //// //主动挂断(离开频道)
// case RtcSubCmd.LEAVE: //// case RtcSubCmd.LEAVE:
// break; //// break;
//
} // }
//
} // }
//
} //}
...@@ -61,7 +61,7 @@ public class InitIp { ...@@ -61,7 +61,7 @@ public class InitIp {
/** /**
* 获取本机地址 * 获取本机地址
*/ */
public static String getLocalIpAddress() { private static String getLocalIpAddress() {
try { try {
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces(); Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
while (networkInterfaces.hasMoreElements()) { while (networkInterfaces.hasMoreElements()) {
......
...@@ -24,8 +24,6 @@ public class RedisUtils { ...@@ -24,8 +24,6 @@ public class RedisUtils {
public StringRedisTemplate redisTemplate() { public StringRedisTemplate redisTemplate() {
return redisTemplate; return redisTemplate;
} }
...@@ -35,7 +33,7 @@ public class RedisUtils { ...@@ -35,7 +33,7 @@ public class RedisUtils {
* @param key * @param key
* @param value * @param value
*/ */
public void setKey(String key, String value) { public void addKey(String key, String value) {
redisTemplate.opsForValue().set(key, value); redisTemplate.opsForValue().set(key, value);
} }
...@@ -89,7 +87,6 @@ public class RedisUtils { ...@@ -89,7 +87,6 @@ public class RedisUtils {
*/ */
public void hashset(String key, String field, String value) { public void hashset(String key, String field, String value) {
redisTemplate.opsForHash().put(key, field, value); redisTemplate.opsForHash().put(key, field, value);
} }
/** /**
......
...@@ -70,11 +70,11 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService { ...@@ -70,11 +70,11 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService {
// --- 保存频道信息 // --- 保存频道信息
String channelKey = String.format(RtcRedisKey.RTC_CHANNEL_INFO, rtcChannelId); String channelKey = String.format(RtcRedisKey.RTC_CHANNEL_INFO, rtcChannelId);
redisUtils.setKey(channelKey, rtcChannelInfoJson); redisUtils.addKey(channelKey, rtcChannelInfoJson);
//用户当前在线的频道ID //用户当前在线的频道ID
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId); String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId);
redisUtils.setKey(userJoinChannelKey, rtcChannelId.toString()); redisUtils.addKey(userJoinChannelKey, rtcChannelId.toString());
//频道中存在的用户 //频道中存在的用户
String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId); String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId);
...@@ -87,7 +87,7 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService { ...@@ -87,7 +87,7 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService {
//用户当前在线的频道ID //用户当前在线的频道ID
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId); String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId);
redisUtils.setKey(userJoinChannelKey, rtcChannelId.toString()); redisUtils.addKey(userJoinChannelKey, rtcChannelId.toString());
//频道中存在的用户 //频道中存在的用户
String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId); String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId);
......
...@@ -36,6 +36,7 @@ spring: ...@@ -36,6 +36,7 @@ spring:
discovery: discovery:
server-addr: localhost:8848 server-addr: localhost:8848
# 打印SQL语句和结果集,本地开发环境可开启,线上注释掉 # 打印SQL语句和结果集,本地开发环境可开启,线上注释掉
mybatis-plus: mybatis-plus:
configuration: configuration:
......
...@@ -326,6 +326,12 @@ ...@@ -326,6 +326,12 @@
</dependencyManagement> </dependencyManagement>
<dependencies> <dependencies>
<!-- rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency> <dependency>
<groupId>net.logstash.logback</groupId> <groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId> <artifactId>logstash-logback-encoder</artifactId>
......
## 本地redis ## 本地redis
...@@ -9,6 +9,7 @@ docker run -p 3306:3306 --name mysql57 -v $PWD/dockerData/mysql57/data:/var/lib/ ...@@ -9,6 +9,7 @@ docker run -p 3306:3306 --name mysql57 -v $PWD/dockerData/mysql57/data:/var/lib/
CREATE DATABASE IF NOT EXISTS wecloud_im DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_unicode_ci; CREATE DATABASE IF NOT EXISTS wecloud_im DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_unicode_ci;
## nacos ## nacos
docker run --name nacos-quick -e MODE=standalone -p 8848:8848 -d nacos/nacos-server:2.0.3
## RabbitMQ
RabbitMQ docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
\ No newline at end of file
本地:
String appKey = "QNtP3EjtLw26ekt0";
String appSecret = "a5e619003868258e0f7c5b5821ea00fb6b2302faf2ab3737";
--
clientA1
--
aaaaa2
--
aaaaa3
--
# 国内测试外网api文档 # 国内测试外网api文档
...@@ -23,46 +7,6 @@ https://imapitest.wecloud.cn/api//doc.html#/home ...@@ -23,46 +7,6 @@ https://imapitest.wecloud.cn/api//doc.html#/home
# 国内测试外网web示例 # 国内测试外网web示例
https://imwebtest.wecloud.cn/ https://imwebtest.wecloud.cn/
## 会话
1邀请2
1447818154184151040
--群聊发送文本-------
{
"reqId":"1231223123",
"cmd":1,
"data":{
"diyAbcd":"aaaa自已定2义字段的值",
"toConversation":1447818154184151040,
"type":-1,
"text":"发给12312123213这是一123个纯文本消息,发给12312123213这是一123个纯文本消息发给12312123213这是一123个纯文本消息",
"attrs":{
"a":"attrs 阿道夫123123是用来213存储用户自定义的一些键值对,ttrs 阿道夫123123是用来213存储用户自定义的一些键值对",
"b":"attrs 阿道夫123123是用来213存储用户自定义的一些键值对,ttrs 阿道夫123123是用来213存储用户自定义的一些键值对"
}
}
}
{
"reqId":"123123123",
"cmd":1,
"data":{
"push":{
"title":"收到一条新消息",
"subTitle":"发给12312123213这是一123个纯文本消息,发给12312123213这是一123个纯文本消息发给12312123213这是一123个纯文本消息"
},
"diyAbcd":"aaaa自已定义字段的值",
"toConversation":1427910060675305472,
"type":-1,
"text":"发给12312123213这是一123个纯文本消息,发给12312123213这是一123个纯文本消息发给12312123213这是一123个纯文本消息",
"attrs":{
"a":"attrs 阿道夫123123是用来213存储用户自定义的一些键值对,ttrs 阿道夫123123是用来213存储用户自定义的一些键值对",
"b":"attrs 阿道夫123123是用来213存储用户自定义的一些键值对,ttrs 阿道夫123123是用来213存储用户自定义的一些键值对"
}
}
}
## 集群配置 ## 集群配置
### AWS服务器内部获取公网IP地址 等元数据 ### AWS服务器内部获取公网IP地址 等元数据
curl http://instance-data/latest/meta-data/public-ipv4 curl http://instance-data/latest/meta-data/public-ipv4
......
本地:
本地:
String appKey = "QNtP3EjtLw26ekt0";
String appSecret = "a5e619003868258e0f7c5b5821ea00fb6b2302faf2ab3737";
--
clientA1
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJ3ZWIiLCJjbGllbnRJZCI6ImNsaWVudEExIiwiaXNzIjoid2VjbG91ZF9pbSIsImFwcEtleSI6IlFOdFAzRWp0THcyNmVrdDAiLCJleHAiOjE2ODYzMzQzOTUsImlhdCI6MTYzNjM1NTM4NywianRpIjoiOTY2NTAxMGYyZTIwNDM4NmE1ZTg3NTQzZDQ3NDEwZGMifQ.SvAt4Qphup_Vind26Eu_pgMUKvU9lnv89YUdkOIrgsU
--
clientA2
--
clientA3
--
## 会话
1邀请2
--发送文本-------
{
"reqId":"123123123",
"cmd":1,
"data":{
"push":{
"title":"收到一条新消息",
"subTitle":"发给12312123213这是一123个纯文本消息,发给12312123213这是一123个纯文本消息发给12312123213这是一123个纯文本消息"
},
"diyAbcd":"aaaa自已定义字段的值",
"toConversation":1427910060675305472,
"type":-1,
"text":"发给12312123213这是一123个纯文本消息,发给12312123213这是一123个纯文本消息发给12312123213这是一123个纯文本消息",
"attrs":{
"a":"attrs 阿道夫123123是用来213存储用户自定义的一些键值对,ttrs 阿道夫123123是用来213存储用户自定义的一些键值对",
"b":"attrs 阿道夫123123是用来213存储用户自定义的一些键值对,ttrs 阿道夫123123是用来213存储用户自定义的一些键值对"
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment