Commit bc357dba by giaogiao

增加用户redis与ws的相关绑定

parent c2cc8404
package com.wecloud.im.ws.cache; package com.wecloud.im.ws.cache;
import com.wecloud.im.ws.utils.InitIp;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
/** /**
* @author hewei123@163.com * @author hewei123@163.com
* @Description 用户与redis绑定 * @Description 用户与redis绑定
...@@ -18,118 +13,39 @@ import java.util.Enumeration; ...@@ -18,118 +13,39 @@ import java.util.Enumeration;
@Slf4j @Slf4j
public class UserCache { public class UserCache {
// @Autowired
// private RedisUtils redisUtils;
/**
* 在线状态
*/
public static final Integer ONLINE = 1;
/**
* 离线状态
*/
public static final Integer OFFLINE = 0;
/**
* key name
*/
private static final String KEY_BASE = "wsu:";
/** // /**
* 用户在线状态fieldKey // * 在线状态
*/ // */
private static final String ONLINE_STATUS_KEY = "st"; // public static final Integer ONLINE = 1;
/** // /**
* 用户所连机器ip的fieldKey // * 离线状态
*/ // */
private static final String PRIVATE_IP_KEY = "lip"; // public static final Integer OFFLINE = 0;
// 用户语言暂时没用到
// private static final String LANGUAGE = "la";
// 用户公网ip,在公网部署集群需要用到
// private static final String PUBLIC_IP = "iip";
/**
* 排除无效的mac地址
*/
private final static byte[][] INVALID_MACS = {
{0x00, 0x05, 0x69}, // VMWare
{0x00, 0x1C, 0x14}, // VMWare
{0x00, 0x0C, 0x29}, // VMWare
{0x00, 0x50, 0x56}, // VMWare
{0x08, 0x00, 0x27}, // Virtualbox
{0x0A, 0x00, 0x27}, // Virtualbox
{0x00, 0x03, (byte) 0xFF}, // Virtual-PC
{0x00, 0x15, 0x5D} // Hyper-V
};
/** /**
* 内网ip * key name
* websocket user info
*/ */
private static String lAN_IP = ""; private static final String KEY_BASE = "wui:";
// 获取机器内网ip // /**
static { // * 用户在线状态fieldKey
lAN_IP = getLocalIpAddress(); // */
log.info("lAN_IP:" + lAN_IP); // private static final String ONLINE_STATUS_KEY = "ost";
}
/** /**
* 判断是否为虚拟mac地址 * 用户所连机器ip的fieldKey
*
* @param mac
* @return
*/ */
public static boolean isVmMac(byte[] mac) { private static final String LAN_IP = "lip";
if (null == mac) {
return false;
}
for (byte[] invalid : INVALID_MACS) {
if (invalid[0] == mac[0] && invalid[1] == mac[1] && invalid[2] == mac[2]) {
return true;
}
}
return false;
}
/** /**
* 获取本机地址 * 用户公网ip,在公网部署集群需要用到
*/ */
public static String getLocalIpAddress() { private static final String PUBLIC_IP = "pip";
try { private static final String type = "ty";
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces(); private static final String deviceToken = "dt";
while (networkInterfaces.hasMoreElements()) {
NetworkInterface ni = networkInterfaces.nextElement();
/*
排除docker虚拟网卡
*/
String docker0 = "docker0";
if (ni.getName().equals(docker0)) {
continue;
}
if (!ni.isUp() || ni.isLoopback() || ni.isVirtual()) {
continue;
}
if (isVmMac(ni.getHardwareAddress())) {
continue;
}
Enumeration<InetAddress> inetAddresses = ni.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
InetAddress inetAddress = inetAddresses.nextElement();
if (inetAddress.isLinkLocalAddress()) {
continue;
}
return inetAddress.getHostAddress();
}
}
} catch (SocketException e) {
log.info("获取本机IP地址失败。" + e);
}
return StringUtils.EMPTY;
}
/** /**
...@@ -138,7 +54,7 @@ public class UserCache { ...@@ -138,7 +54,7 @@ public class UserCache {
* @param id * @param id
*/ */
public void online(String id) { public void online(String id) {
log.info("ws用户上线保存redis连接ip:" + lAN_IP, ",uid:"); log.info("ws用户上线保存redis连接ip:" + InitIp.lAN_IP, ",uid:");
// redisUtils.hset(KEY_BASE + id, PRIVATE_IP_KEY, lAN_IP); // redisUtils.hset(KEY_BASE + id, PRIVATE_IP_KEY, lAN_IP);
// redisUtils.hset(KEY_BASE + id, ONLINE_STATUS_KEY, String.valueOf(ONLINE)); // redisUtils.hset(KEY_BASE + id, ONLINE_STATUS_KEY, String.valueOf(ONLINE));
} }
......
package com.wecloud.im.ws.model;
import lombok.Data;
import java.io.Serializable;
@Data
public class UserSession implements Serializable {
/**
* 通道id
*/
private String channelId;
private String deviceType;
private String deviceToken;
private String token;
}
package com.wecloud.im.ws.model.redis;
import lombok.Data;
import java.io.Serializable;
/**
* 维护client多端的ip及平台类型:
*/
@Data
public class UserConnectionInfo implements Serializable {
private String pubIp;
private String lanIp;
private String type;
private String deviceToken;
}
package com.wecloud.im.ws.service; package com.wecloud.im.ws.service;
import com.wecloud.im.ws.model.UserSession;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
...@@ -21,6 +23,12 @@ public interface MangerChannelService { ...@@ -21,6 +23,12 @@ public interface MangerChannelService {
*/ */
Map<String, NioSocketChannel> CHANNEL_MAP = new ConcurrentHashMap<>(); Map<String, NioSocketChannel> CHANNEL_MAP = new ConcurrentHashMap<>();
/**
* 本地维护 uid 对应 channel_shortID
*/
Map<String, List<UserSession>> USER_SESSION_MAP = new ConcurrentHashMap<>();
/** /**
* CLIENT_ID,是客户端的字符串id * CLIENT_ID,是客户端的字符串id
*/ */
......
...@@ -66,9 +66,6 @@ public class MangerChannelServiceImpl implements MangerChannelService { ...@@ -66,9 +66,6 @@ public class MangerChannelServiceImpl implements MangerChannelService {
nioSocketChannel.close(); nioSocketChannel.close();
} }
// AppHashValueModel appHashValueModel = new AppHashValueModel();
// appHashValueModel.setOnlineStatus(UserCache.ONLINE);
// userCache.online(userId);
MangerChannelService.CHANNEL_MAP.put(appKey + clientId, channel); MangerChannelService.CHANNEL_MAP.put(appKey + clientId, channel);
...@@ -106,176 +103,6 @@ public class MangerChannelServiceImpl implements MangerChannelService { ...@@ -106,176 +103,6 @@ public class MangerChannelServiceImpl implements MangerChannelService {
// //
// } // }
// //
//
// /**
// * TODO 待完成: 根据ACK回执 以及线程等待超时机制来判断客户端是否离线和超时;
// * TODO 待完成: 发送后阻塞当前子线程2秒后获取ack回执 如客户端发起ack回执则需要主动唤醒当前子线程 立马唤醒当前子线程, 判断如果已回执则返回发送成功, 如果未回执则判断客户端是否断线或发送错误
// *
// * @param msg
// * @param userId
// * @return
// */
// @Override
// public boolean rpcWriteData(String msg, Long userId) {
//
// Future<Boolean> future = THREAD_POOL_RPC_WRITE_EXECUTOR.submit(() -> {
//
// NioSocketChannel nioSocketChannel = get(String.valueOf(userId));
// if (null == nioSocketChannel) {
// userCache.offline(String.valueOf(userId));
// log.info("rpc-writeData连接为空:" + userId + "," + msg);
// return false;
// }
//
// // 判断连接是否断开
// if (nioSocketChannel.isShutdown()) {
// log.info("rpc-writeData连接断开:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// return false;
// }
//
// if (log.isInfoEnabled()) {
// log.info("rpc-writeData:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// }
//
// ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg));
// channelFuture.addListener(
// //执行后回调的方法
// (ChannelFutureListener) channelFuture1 -> {
// if (log.isInfoEnabled()) {
// log.info("rpc-netty线程异步执行结果:" + channelFuture1.isDone() + ",业务执行结果:" + channelFuture1.isSuccess()
// + ";\nwriteData:" + userId + "," + msg + ",channelId:" + nioSocketChannel.id().asLongText());
// }
// });
//
// channelFuture.get();
//
// return true;
// });
//
// boolean resultStatus = false;
// try {
// resultStatus = future.get();
// } catch (InterruptedException | ExecutionException e) {
// e.printStackTrace();
// }
// return resultStatus;
//
// }
//
// @Override
// public boolean rpcKickWriteData(String msg, Long userId) {
// Future<Boolean> future = THREAD_POOL_RPC_WRITE_EXECUTOR.submit(() -> {
//
// NioSocketChannel nioSocketChannel = get(String.valueOf(userId));
// if (null == nioSocketChannel) {
// log.info("rpc-kickWriteData连接为空:" + userId + "," + msg);
// return false;
// }
//
// // 判断连接是否断开
// if (nioSocketChannel.isShutdown()) {
// log.info("rpc-kickWriteData连接断开:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// nioSocketChannel.close();
// return false;
// }
//
// if (log.isDebugEnabled()) {
// log.info("rpc-kickWriteData:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// }
//
// ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg));
// channelFuture.addListener(
// //执行后回调的方法
// (ChannelFutureListener) channelFuture1 -> {
// if (log.isDebugEnabled()) {
// log.info("rpc-netty踢人线程异步执行结果:" + channelFuture1.isDone() + ",业务执行结果:" + channelFuture1.isSuccess()
// + ";\nkickWriteData:" + userId + "," + msg + ",channelId:" + nioSocketChannel.id().asLongText());
// }
// });
//
// channelFuture.get();
// // 关闭
// nioSocketChannel.close();
// return true;
// });
//
// boolean resultStatus = false;
// try {
// resultStatus = future.get();
// } catch (InterruptedException | ExecutionException e) {
// e.printStackTrace();
// }
// return resultStatus;
// }
//
// @Override
// public boolean rpcCloseOldChannel(Long userId) {
// Future<Boolean> future = THREAD_POOL_RPC_WRITE_EXECUTOR.submit(() -> {
// NioSocketChannel nioSocketChannel = get(String.valueOf(userId));
// if (null == nioSocketChannel) {
// log.info("rpc-closeOldChannel连接为空:" + userId);
// return false;
// }
// // 关闭
// nioSocketChannel.close();
// return true;
// });
// boolean resultStatus = false;
// try {
// resultStatus = future.get();
// } catch (InterruptedException | ExecutionException e) {
// e.printStackTrace();
// }
// return resultStatus;
// }
//
// @Override
// public boolean writeData(String msg, Long userId) {
//
//// Future<Boolean> future = THREAD_POOL_EXECUTOR.submit(() -> {
//
// NioSocketChannel nioSocketChannel = get(String.valueOf(userId));
// if (null == nioSocketChannel) {
// userCache.offline(String.valueOf(userId));
// log.info("writeData连接为空:" + userId + "," + msg);
// return false;
// }
//
// // 判断连接是否断开
// if (nioSocketChannel.isShutdown()) {
// log.info("writeData连接断开:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// return false;
// }
//
// if (log.isDebugEnabled()) {
// log.info("writeData:" + userId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// }
//
// ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg));
// channelFuture.addListener(
// //执行后回调的方法
// (ChannelFutureListener) channelFuture1 -> {
// if (log.isDebugEnabled()) {
// log.info("netty线程异步执行结果:" + channelFuture1.isDone() + ",业务执行结果:" + channelFuture1.isSuccess()
// + ";\nwriteData:" + userId + "," + msg + ",channelId:" + nioSocketChannel.id().asLongText());
// }
// });
//
//// channelFuture.get();
//
// return true;
//// });
//
//// Boolean resultStatus = false;
//// try {
//// resultStatus = future.get();
//// } catch (InterruptedException | ExecutionException e) {
//// e.printStackTrace();
//// }
//// return resultStatus;
//
// }
/** /**
* 获取用户在线状态 * 获取用户在线状态
* *
...@@ -287,7 +114,6 @@ public class MangerChannelServiceImpl implements MangerChannelService { ...@@ -287,7 +114,6 @@ public class MangerChannelServiceImpl implements MangerChannelService {
public boolean getOnlineStatus(String toAppKey, String toClientId) { public boolean getOnlineStatus(String toAppKey, String toClientId) {
NioSocketChannel nioSocketChannel = get(toAppKey, toClientId); NioSocketChannel nioSocketChannel = get(toAppKey, toClientId);
if (null == nioSocketChannel) { if (null == nioSocketChannel) {
// userCache.offline(toAppKey + toClientId);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.info("writeData 不存在 连接为空:" + toAppKey + toClientId); log.info("writeData 不存在 连接为空:" + toAppKey + toClientId);
} }
...@@ -309,23 +135,16 @@ public class MangerChannelServiceImpl implements MangerChannelService { ...@@ -309,23 +135,16 @@ public class MangerChannelServiceImpl implements MangerChannelService {
NioSocketChannel nioSocketChannel = get(toAppKey, toClientId); NioSocketChannel nioSocketChannel = get(toAppKey, toClientId);
if (null == nioSocketChannel) { if (null == nioSocketChannel) {
// userCache.offline(toAppKey + toClientId);
// if (log.isDebugEnabled()) {
log.info("writeData连接为空:" + toAppKey + toClientId + "," + msg); log.info("writeData连接为空:" + toAppKey + toClientId + "," + msg);
// }
return false; return false;
} }
// 判断连接是否断开 // 判断连接是否断开
if (nioSocketChannel.isShutdown()) { if (nioSocketChannel.isShutdown()) {
// if (log.isDebugEnabled()) {
log.info("writeData连接断开:" + toAppKey + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText()); log.info("writeData连接断开:" + toAppKey + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// }
return false; return false;
} }
// if (log.isDebugEnabled()) {
log.info("writeData:" + toAppKey + "," + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText()); log.info("writeData:" + toAppKey + "," + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// }
ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg)); ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg));
channelFuture.addListener( channelFuture.addListener(
...@@ -337,7 +156,6 @@ public class MangerChannelServiceImpl implements MangerChannelService { ...@@ -337,7 +156,6 @@ public class MangerChannelServiceImpl implements MangerChannelService {
} }
}); });
return true; return true;
} }
} }
package com.wecloud.im.ws.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
@Slf4j
@Service
public class InitIp {
/**
* 排除无效的mac地址
*/
private final static byte[][] INVALID_MACS = {
{0x00, 0x05, 0x69}, // VMWare
{0x00, 0x1C, 0x14}, // VMWare
{0x00, 0x0C, 0x29}, // VMWare
{0x00, 0x50, 0x56}, // VMWare
{0x08, 0x00, 0x27}, // Virtualbox
{0x0A, 0x00, 0x27}, // Virtualbox
{0x00, 0x03, (byte) 0xFF}, // Virtual-PC
{0x00, 0x15, 0x5D} // Hyper-V
};
/**
* 内网ip
*/
public static String lAN_IP = "";
// 获取机器内网ip
static {
lAN_IP = getLocalIpAddress();
log.info("lAN_IP:" + lAN_IP);
}
/**
* 判断是否为虚拟mac地址
*
* @param mac
* @return
*/
public static boolean isVmMac(byte[] mac) {
if (null == mac) {
return false;
}
for (byte[] invalid : INVALID_MACS) {
if (invalid[0] == mac[0] && invalid[1] == mac[1] && invalid[2] == mac[2]) {
return true;
}
}
return false;
}
/**
* 获取本机地址
*/
public static String getLocalIpAddress() {
try {
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
while (networkInterfaces.hasMoreElements()) {
NetworkInterface ni = networkInterfaces.nextElement();
/*
排除docker虚拟网卡
*/
String docker0 = "docker0";
if (ni.getName().equals(docker0)) {
continue;
}
if (!ni.isUp() || ni.isLoopback() || ni.isVirtual()) {
continue;
}
if (isVmMac(ni.getHardwareAddress())) {
continue;
}
Enumeration<InetAddress> inetAddresses = ni.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
InetAddress inetAddress = inetAddresses.nextElement();
if (inetAddress.isLinkLocalAddress()) {
continue;
}
return inetAddress.getHostAddress();
}
}
} catch (SocketException e) {
log.info("获取本机IP地址失败。" + e);
}
return StringUtils.EMPTY;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment