Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
W
wecloud_im_server
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
hewei
wecloud_im_server
Commits
dfcf66df
Commit
dfcf66df
authored
Dec 30, 2021
by
lixiaozhong
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
1、ws包下的文件结构优化
2、改造一个sender,强化面向对象思想
parent
60b245ba
Hide whitespace changes
Inline
Side-by-side
Showing
19 changed files
with
410 additions
and
629 deletions
+410
-629
core/src/main/java/com/wecloud/im/controller/ImClientController.java
+3
-3
core/src/main/java/com/wecloud/im/netty/core/WsReadHandler.java
+13
-13
core/src/main/java/com/wecloud/im/netty/handler/NettyApiRequest.java
+5
-5
core/src/main/java/com/wecloud/im/router/RouterSendService.java
+6
-1
core/src/main/java/com/wecloud/im/router/RouterSendServiceImpl.java
+4
-4
core/src/main/java/com/wecloud/im/service/impl/ImConversationMembersServiceImpl.java
+2
-2
core/src/main/java/com/wecloud/im/service/impl/ImConversationServiceImpl.java
+8
-8
core/src/main/java/com/wecloud/im/service/impl/ImInboxServiceImpl.java
+3
-3
core/src/main/java/com/wecloud/im/service/impl/ImMessageServiceImpl.java
+4
-4
core/src/main/java/com/wecloud/im/ws/manager/ChannelManager.java
+176
-0
core/src/main/java/com/wecloud/im/ws/sender/ChannelSender.java
+167
-0
core/src/main/java/com/wecloud/im/ws/sender/WsDataWriter.java
+0
-11
core/src/main/java/com/wecloud/im/ws/service/MangerChannelService.java
+0
-155
core/src/main/java/com/wecloud/im/ws/service/WriteDataService.java
+0
-63
core/src/main/java/com/wecloud/im/ws/service/impl/MangerChannelServiceImpl.java
+0
-220
core/src/main/java/com/wecloud/im/ws/service/impl/WriteDataServiceImpl.java
+0
-115
core/src/main/java/com/wecloud/im/ws/strategy/concrete/NormalChatStrategy.java
+6
-8
core/src/main/java/com/wecloud/rtc/service/impl/RtcServiceImpl.java
+5
-6
core/src/main/java/com/wecloud/rtc/service/impl/WsRtcWriteImpl.java
+8
-8
No files found.
core/src/main/java/com/wecloud/im/controller/ImClientController.java
View file @
dfcf66df
...
...
@@ -11,7 +11,7 @@ import com.wecloud.im.service.ImApplicationService;
import
com.wecloud.im.service.ImClientService
;
import
com.wecloud.im.vo.GetInfoListVo
;
import
com.wecloud.im.vo.ImOnlineStatusVo
;
import
com.wecloud.im.ws.
service.MangerChannelService
;
import
com.wecloud.im.ws.
manager.ChannelManager
;
import
io.geekidea.springbootplus.framework.common.api.ApiResult
;
import
io.geekidea.springbootplus.framework.common.controller.BaseController
;
import
io.geekidea.springbootplus.framework.core.validator.groups.Add
;
...
...
@@ -46,7 +46,7 @@ public class ImClientController extends BaseController {
private
ImClientService
imClientService
;
@Autowired
private
MangerChannelService
mangerChannelService
;
private
ChannelManager
channelManager
;
@Autowired
private
ImApplicationService
imApplicationService
;
...
...
@@ -123,7 +123,7 @@ public class ImClientController extends BaseController {
for
(
String
clientId
:
getOnlineStatusParam
.
getClientIds
())
{
ImOnlineStatusVo
imOnlineStatusVo
=
new
ImOnlineStatusVo
();
imOnlineStatusVo
.
setStatus
(
mangerChannelService
.
getOnlineStatus
(
imApplication
.
getAppKey
(),
clientId
));
imOnlineStatusVo
.
setStatus
(
channelManager
.
getOnlineStatus
(
imApplication
.
getAppKey
(),
clientId
));
imOnlineStatusVo
.
setClientId
(
clientId
);
imOnlineStatusVos
.
add
(
imOnlineStatusVo
);
}
...
...
core/src/main/java/com/wecloud/im/netty/core/WsReadHandler.java
View file @
dfcf66df
package
com
.
wecloud
.
im
.
netty
.
core
;
import
com.wecloud.im.executor.BusinessThreadPool
;
import
com.wecloud.im.ws.
service.MangerChannelService
;
import
com.wecloud.im.ws.
manager.ChannelManager
;
import
com.wecloud.im.ws.strategy.AbstractImCmdStrategy
;
import
com.wecloud.rtc.service.RtcService
;
import
io.netty.channel.ChannelHandler
;
...
...
@@ -31,11 +31,11 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Autowired
private
RtcService
rtcService
;
@Resource
private
MangerChannelService
mangerChannelService
;
private
ChannelManager
channelManager
;
@Override
protected
void
channelRead0
(
ChannelHandlerContext
ctx
,
TextWebSocketFrame
msg
)
{
ctx
.
channel
().
attr
(
MangerChannelService
.
READ_IDLE_TIMES
).
set
(
0
);
// 读空闲的计数清零
ctx
.
channel
().
attr
(
ChannelManager
.
READ_IDLE_TIMES
).
set
(
0
);
// 读空闲的计数清零
String
data
=
msg
.
text
();
try
{
...
...
@@ -57,8 +57,8 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
}
private
void
execute
(
ChannelHandlerContext
ctx
,
String
data
)
{
String
appKey
=
ctx
.
channel
().
attr
(
MangerChannelService
.
APP_KEY
).
get
();
String
clientId
=
ctx
.
channel
().
attr
(
MangerChannelService
.
CLIENT_ID
).
get
();
String
appKey
=
ctx
.
channel
().
attr
(
ChannelManager
.
APP_KEY
).
get
();
String
clientId
=
ctx
.
channel
().
attr
(
ChannelManager
.
CLIENT_ID
).
get
();
try
{
if
(
PING
.
equals
(
data
))
{
...
...
@@ -82,10 +82,10 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Override
public
void
userEventTriggered
(
ChannelHandlerContext
ctx
,
Object
evt
)
throws
Exception
{
String
clientId
=
ctx
.
channel
().
attr
(
MangerChannelService
.
CLIENT_ID
).
get
();
String
clientId
=
ctx
.
channel
().
attr
(
ChannelManager
.
CLIENT_ID
).
get
();
//读超时计时器
Integer
readIdleTimes
=
ctx
.
channel
().
attr
(
MangerChannelService
.
READ_IDLE_TIMES
).
get
();
Integer
readIdleTimes
=
ctx
.
channel
().
attr
(
ChannelManager
.
READ_IDLE_TIMES
).
get
();
IdleStateEvent
event
=
(
IdleStateEvent
)
evt
;
...
...
@@ -95,7 +95,7 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
switch
(
event
.
state
())
{
case
READER_IDLE:
eventType
=
"读空闲:readIdleTimes="
+
readIdleTimes
;
ctx
.
channel
().
attr
(
MangerChannelService
.
READ_IDLE_TIMES
).
set
(
readIdleTimes
+
1
);
// 读空闲的计数加1
ctx
.
channel
().
attr
(
ChannelManager
.
READ_IDLE_TIMES
).
set
(
readIdleTimes
+
1
);
// 读空闲的计数加1
// 发ping
ctx
.
channel
().
writeAndFlush
(
new
TextWebSocketFrame
(
PING
));
...
...
@@ -132,7 +132,7 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Override
public
void
handlerAdded
(
ChannelHandlerContext
ctx
)
{
String
userIdByChannel
=
mangerChannelService
.
getStringInfoByChannel
(
ctx
);
String
userIdByChannel
=
channelManager
.
getStringInfoByChannel
(
ctx
);
log
.
info
(
"连接WS成功handlerAdded,uid:"
+
userIdByChannel
+
","
+
",channelId:"
+
ctx
.
channel
().
id
().
asLongText
());
}
...
...
@@ -155,13 +155,13 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Override
public
void
handlerRemoved
(
ChannelHandlerContext
ctx
)
{
String
appKey
=
ctx
.
channel
().
attr
(
MangerChannelService
.
APP_KEY
).
get
();
String
clientId
=
ctx
.
channel
().
attr
(
MangerChannelService
.
CLIENT_ID
).
get
();
String
appKey
=
ctx
.
channel
().
attr
(
ChannelManager
.
APP_KEY
).
get
();
String
clientId
=
ctx
.
channel
().
attr
(
ChannelManager
.
CLIENT_ID
).
get
();
String
userIdByChannel
=
mangerChannelService
.
getStringInfoByChannel
(
ctx
);
String
userIdByChannel
=
channelManager
.
getStringInfoByChannel
(
ctx
);
log
.
info
(
"uid:"
+
userIdByChannel
+
","
+
"handlerRemoved"
+
",channelId:"
+
ctx
.
channel
().
id
().
asLongText
());
// 关掉连接
mangerChannelService
.
offline
(
ctx
);
channelManager
.
offline
(
ctx
);
// rtc清空缓存
rtcService
.
clientOffline
(
appKey
,
clientId
);
...
...
core/src/main/java/com/wecloud/im/netty/handler/NettyApiRequest.java
View file @
dfcf66df
...
...
@@ -5,7 +5,7 @@ import com.alibaba.fastjson.JSONObject;
import
com.auth0.jwt.interfaces.DecodedJWT
;
import
com.wecloud.im.netty.core.WsReadHandler
;
import
com.wecloud.im.ws.model.WsConstants
;
import
com.wecloud.im.ws.
service.MangerChannelService
;
import
com.wecloud.im.ws.
manager.ChannelManager
;
import
com.wecloud.im.ws.utils.FullHttpRequestUtils
;
import
com.wecloud.utils.JsonUtils
;
import
io.geekidea.springbootplus.config.constant.CommonConstant
;
...
...
@@ -41,7 +41,7 @@ public class NettyApiRequest {
private
ShiroLoginService
shiroLoginService
;
@Resource
private
MangerChannelService
appUserChannelsService
;
private
ChannelManager
appUserChannelsService
;
@Resource
private
WsReadHandler
appImReadHandler
;
...
...
@@ -127,9 +127,9 @@ public class NettyApiRequest {
ctx
.
fireChannelRead
(
httpRequest
.
retain
());
// 设置属性值 userid - channel
ctx
.
channel
().
attr
(
MangerChannelService
.
CLIENT_ID
).
set
(
clientId
);
ctx
.
channel
().
attr
(
MangerChannelService
.
APP_KEY
).
set
(
appKey
);
ctx
.
channel
().
attr
(
MangerChannelService
.
READ_IDLE_TIMES
).
set
(
0
);
// 读空闲的计数=0
ctx
.
channel
().
attr
(
ChannelManager
.
CLIENT_ID
).
set
(
clientId
);
ctx
.
channel
().
attr
(
ChannelManager
.
APP_KEY
).
set
(
appKey
);
ctx
.
channel
().
attr
(
ChannelManager
.
READ_IDLE_TIMES
).
set
(
0
);
// 读空闲的计数=0
// 添加长连接handler
ctx
.
pipeline
().
addLast
(
"appImHandler"
,
appImReadHandler
);
...
...
core/src/main/java/com/wecloud/im/router/RouterSendService.java
View file @
dfcf66df
...
...
@@ -5,6 +5,11 @@ package com.wecloud.im.router;
*/
public
interface
RouterSendService
{
void
rpcSend
(
String
msg
,
String
channelId
);
/**
* 通过rpc调用发送,解决channel不在本机时调用
* @param channelId
* @param msg
*/
void
sendMsgRemote
(
String
channelId
,
String
msg
);
}
core/src/main/java/com/wecloud/im/router/RouterSendServiceImpl.java
View file @
dfcf66df
package
com
.
wecloud
.
im
.
router
;
import
com.wecloud.im.ws.se
rvice.MangerChannelService
;
import
com.wecloud.im.ws.se
nder.ChannelSender
;
import
org.apache.dubbo.config.annotation.DubboService
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
...
...
@@ -10,11 +10,11 @@ import org.springframework.stereotype.Service;
public
class
RouterSendServiceImpl
implements
RouterSendService
{
@Autowired
private
MangerChannelService
mangerChannelService
;
private
ChannelSender
channelSender
;
@Override
public
void
rpcSend
(
String
msg
,
String
channelId
)
{
mangerChannelService
.
writeDataToChannel
(
msg
,
channelId
);
public
void
sendMsgRemote
(
String
channelId
,
String
msg
)
{
channelSender
.
sendMsgLocal
(
channelId
,
msg
);
}
}
core/src/main/java/com/wecloud/im/service/impl/ImConversationMembersServiceImpl.java
View file @
dfcf66df
...
...
@@ -22,7 +22,7 @@ import com.wecloud.im.service.ImConversationMembersService;
import
com.wecloud.im.service.ImConversationService
;
import
com.wecloud.im.service.ImMessageService
;
import
com.wecloud.im.vo.ImConversationMemberListVo
;
import
com.wecloud.im.ws.se
rvice.WriteDataService
;
import
com.wecloud.im.ws.se
nder.ChannelSender
;
import
com.wecloud.utils.JsonUtils
;
import
io.geekidea.springbootplus.framework.common.api.ApiResult
;
import
io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl
;
...
...
@@ -48,7 +48,7 @@ import java.util.List;
public
class
ImConversationMembersServiceImpl
extends
BaseServiceImpl
<
ImConversationMembersMapper
,
ImConversationMembers
>
implements
ImConversationMembersService
{
@Autowired
private
WriteDataService
writeDataService
;
private
ChannelSender
channelSender
;
@Autowired
private
ImConversationMapper
imConversationMapper
;
...
...
core/src/main/java/com/wecloud/im/service/impl/ImConversationServiceImpl.java
View file @
dfcf66df
...
...
@@ -31,7 +31,7 @@ import com.wecloud.im.vo.OfflineMsgDto;
import
com.wecloud.im.ws.enums.MsgTypeEnum
;
import
com.wecloud.im.ws.enums.WsResponseCmdEnum
;
import
com.wecloud.im.ws.model.WsResponseModel
;
import
com.wecloud.im.ws.se
rvice.WriteDataService
;
import
com.wecloud.im.ws.se
nder.ChannelSender
;
import
com.wecloud.utils.JsonUtils
;
import
io.geekidea.springbootplus.framework.common.api.ApiCode
;
import
io.geekidea.springbootplus.framework.common.api.ApiResult
;
...
...
@@ -63,7 +63,7 @@ import java.util.Map;
public
class
ImConversationServiceImpl
extends
BaseServiceImpl
<
ImConversationMapper
,
ImConversation
>
implements
ImConversationService
{
@Autowired
private
WriteDataService
writeDataService
;
private
ChannelSender
channelSender
;
@Autowired
private
ImConversationMapper
imConversationMapper
;
...
...
@@ -325,7 +325,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
responseModel
.
setMsg
(
result
.
getMessage
());
responseModel
.
setData
(
imMessageOnlineSend
);
responseModel
.
setReqId
(
null
);
writeDataService
.
write
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
channelSender
.
sendMsg
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
}
...
...
@@ -404,7 +404,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
imMessage
.
setSystemFlag
(
false
);
imMessage
.
setSendStatus
(
2
);
imMessage
.
setFkConversationId
(
imClientToConversation
.
getConversationId
());
boolean
save
=
imMessageService
.
save
(
imMessage
);
imMessageService
.
save
(
imMessage
);
// 遍历发送
for
(
ImConversationMembers
conversationMembers
:
membersList
)
{
...
...
@@ -435,7 +435,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
responseModel
.
setMsg
(
result
.
getMessage
());
responseModel
.
setData
(
imMessageOnlineSend
);
responseModel
.
setReqId
(
null
);
writeDataService
.
write
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
channelSender
.
sendMsg
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
}
...
...
@@ -542,7 +542,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
responseModel
.
setMsg
(
result
.
getMessage
());
responseModel
.
setData
(
imMessageOnlineSend
);
responseModel
.
setReqId
(
null
);
writeDataService
.
write
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
channelSender
.
sendMsg
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
}
...
...
@@ -629,7 +629,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
responseModel
.
setMsg
(
result
.
getMessage
());
responseModel
.
setData
(
imMessageOnlineSend
);
responseModel
.
setReqId
(
null
);
writeDataService
.
write
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
channelSender
.
sendMsg
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
}
return
ApiResult
.
ok
();
...
...
@@ -710,7 +710,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
responseModel
.
setMsg
(
result
.
getMessage
());
responseModel
.
setData
(
imMessageOnlineSend
);
responseModel
.
setReqId
(
null
);
writeDataService
.
write
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
channelSender
.
sendMsg
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
}
...
...
core/src/main/java/com/wecloud/im/service/impl/ImInboxServiceImpl.java
View file @
dfcf66df
...
...
@@ -18,7 +18,7 @@ import com.wecloud.im.service.ImMessageService;
import
com.wecloud.im.ws.enums.MsgTypeEnum
;
import
com.wecloud.im.ws.enums.WsResponseCmdEnum
;
import
com.wecloud.im.ws.model.WsResponseModel
;
import
com.wecloud.im.ws.se
rvice.WriteDataService
;
import
com.wecloud.im.ws.se
nder.ChannelSender
;
import
io.geekidea.springbootplus.framework.common.api.ApiCode
;
import
io.geekidea.springbootplus.framework.common.api.ApiResult
;
import
io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl
;
...
...
@@ -53,7 +53,7 @@ public class ImInboxServiceImpl extends BaseServiceImpl<ImInboxMapper, ImInbox>
@Autowired
private
ImConversationMembersService
imConversationMembersService
;
@Autowired
private
WriteDataService
writeDataService
;
private
ChannelSender
channelSender
;
@Autowired
private
ImApplicationService
imApplicationService
;
...
...
@@ -213,7 +213,7 @@ public class ImInboxServiceImpl extends BaseServiceImpl<ImInboxMapper, ImInbox>
responseModel
.
setMsg
(
result
.
getMessage
());
responseModel
.
setData
(
imMessageOnlineSend
);
responseModel
.
setReqId
(
null
);
writeDataService
.
write
(
responseModel
,
application
.
getAppKey
(),
imClientReceiver
.
getClientId
());
channelSender
.
sendMsg
(
responseModel
,
application
.
getAppKey
(),
imClientReceiver
.
getClientId
());
}
}
}
...
...
core/src/main/java/com/wecloud/im/service/impl/ImMessageServiceImpl.java
View file @
dfcf66df
...
...
@@ -28,7 +28,7 @@ import com.wecloud.im.ws.model.WsResponseModel;
import
com.wecloud.im.ws.model.request.PushVO
;
import
com.wecloud.im.ws.model.request.ReceiveDataVO
;
import
com.wecloud.im.ws.sender.AsyncPush
;
import
com.wecloud.im.ws.se
rvice.WriteDataService
;
import
com.wecloud.im.ws.se
nder.ChannelSender
;
import
com.wecloud.utils.JsonUtils
;
import
io.geekidea.springbootplus.framework.common.api.ApiCode
;
import
io.geekidea.springbootplus.framework.common.api.ApiResult
;
...
...
@@ -75,7 +75,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
private
ImConversationMembersService
imConversationMembersService
;
@Autowired
private
WriteDataService
writeDataService
;
private
ChannelSender
channelSender
;
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
)
...
...
@@ -123,7 +123,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
responseModel
.
setReqId
(
null
);
// 向接收方推送
writeDataService
.
write
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
channelSender
.
sendMsg
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
}
...
...
@@ -234,7 +234,7 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
responseModel
.
setMsg
(
result
.
getMessage
());
responseModel
.
setData
(
imMessageOnlineSend
);
responseModel
.
setReqId
(
null
);
writeDataService
.
write
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
channelSender
.
sendMsg
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
// 获取自定义推送字段
PushVO
pushVO
=
imMsgRecall
.
getPush
();
...
...
core/src/main/java/com/wecloud/im/ws/manager/ChannelManager.java
0 → 100644
View file @
dfcf66df
package
com
.
wecloud
.
im
.
ws
.
manager
;
import
com.wecloud.im.ws.cache.UserCacheService
;
import
com.wecloud.im.ws.model.ClientInfo
;
import
com.wecloud.im.ws.model.redis.ClientChannelInfo
;
import
com.wecloud.rtc.service.RtcService
;
import
io.netty.channel.ChannelHandlerContext
;
import
io.netty.channel.socket.nio.NioSocketChannel
;
import
io.netty.util.AttributeKey
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
import
java.util.concurrent.ConcurrentHashMap
;
/**
* channel内容管理,在线、离线等信息在channel里
* @author lixiaozhong
*/
@Component
@Slf4j
public
class
ChannelManager
{
/**
* 本地维护 uid 对应多个 channel的shortID
*/
public
static
final
Map
<
String
,
Set
<
String
>>
CLIENTS_MAP
=
new
ConcurrentHashMap
<>();
/**
* channel的shortID对应client端数据
*/
public
static
final
Map
<
String
,
ClientInfo
>
SESSION_INFO_MAP
=
new
ConcurrentHashMap
<>();
/**
* CLIENT_ID,是客户端的字符串id
*/
public
static
final
AttributeKey
<
String
>
CLIENT_ID
=
AttributeKey
.
valueOf
(
"ci"
);
/**
* 是app的字符串id
*/
public
static
final
AttributeKey
<
String
>
APP_KEY
=
AttributeKey
.
valueOf
(
"ak"
);
/**
* LANGUAGE
*/
AttributeKey
<
String
>
LANGUAGE
=
AttributeKey
.
valueOf
(
"la"
);
/**
* APP_VERSION
*/
AttributeKey
<
String
>
APP_VERSION
=
AttributeKey
.
valueOf
(
"av"
);
AttributeKey
<
String
>
TOKEN
=
AttributeKey
.
valueOf
(
"to"
);
AttributeKey
<
String
>
DEVICEID
=
AttributeKey
.
valueOf
(
"dc"
);
AttributeKey
<
String
>
PLATFORM
=
AttributeKey
.
valueOf
(
"pt"
);
public
static
final
AttributeKey
<
Integer
>
READ_IDLE_TIMES
=
AttributeKey
.
valueOf
(
"readIdleTimes"
);
@Autowired
private
RtcService
rtcService
;
@Autowired
private
UserCacheService
userCacheService
;
/**
* client上线
* userID绑定channel
*
* @param channel
*/
public
void
online
(
String
appKey
,
String
clientId
,
NioSocketChannel
channel
)
{
String
longChannelId
=
channel
.
id
().
asLongText
();
this
.
putClientsMap
(
appKey
,
clientId
,
longChannelId
);
this
.
putSessionInfoMap
(
longChannelId
,
channel
);
userCacheService
.
online
(
appKey
,
clientId
,
longChannelId
);
}
/**
* 下线移除channel
*
* @param channelHandlerContext
*/
public
void
offline
(
ChannelHandlerContext
channelHandlerContext
)
{
String
appKey
=
channelHandlerContext
.
channel
().
attr
(
ChannelManager
.
APP_KEY
).
get
();
String
clientId
=
channelHandlerContext
.
channel
().
attr
(
ChannelManager
.
CLIENT_ID
).
get
();
String
userIdByChannelString
=
this
.
getStringInfoByChannel
(
channelHandlerContext
);
String
longChannelId
=
channelHandlerContext
.
channel
().
id
().
asLongText
();
log
.
info
(
"uid:"
+
userIdByChannelString
+
","
+
"handlerRemoved"
+
",channelId:"
+
longChannelId
);
// 关掉连接
channelHandlerContext
.
close
();
// 移除本地维护的channel
delSessionInfoMap
(
longChannelId
);
delClientsMap
(
appKey
,
clientId
,
longChannelId
);
// 移除redis缓存
userCacheService
.
offline
(
appKey
,
clientId
,
longChannelId
);
// rtc清空缓存
rtcService
.
clientOffline
(
appKey
,
clientId
);
}
/**
* 根据channel返回客户端key和id
*
* @param channelHandlerContext
* @return
*/
public
String
getStringInfoByChannel
(
ChannelHandlerContext
channelHandlerContext
)
{
return
"APP_KEY:"
+
channelHandlerContext
.
channel
().
attr
(
ChannelManager
.
APP_KEY
).
get
()
+
",CLIENT_ID:"
+
channelHandlerContext
.
channel
().
attr
(
ChannelManager
.
CLIENT_ID
).
get
();
}
/**
* 获取用户在线状态
*
* @param toAppKey
* @param toClientId
* @return true:在线, false 不在线
*/
public
boolean
getOnlineStatus
(
String
toAppKey
,
String
toClientId
)
{
List
<
ClientChannelInfo
>
channelInfos
=
userCacheService
.
getIpByClientIdAndOnline
(
toAppKey
,
toClientId
);
boolean
flag
=
false
;
for
(
ClientChannelInfo
channelInfo
:
channelInfos
)
{
if
(
channelInfo
.
getOnlineStatus
().
equals
(
1
))
{
return
true
;
}
}
return
flag
;
}
private
void
putSessionInfoMap
(
String
longChannelId
,
NioSocketChannel
channel
)
{
ClientInfo
clientInfo
=
new
ClientInfo
();
clientInfo
.
setDeviceId
(
""
);
clientInfo
.
setNioSocketChannel
(
channel
);
clientInfo
.
setToken
(
""
);
ChannelManager
.
SESSION_INFO_MAP
.
put
(
longChannelId
,
clientInfo
);
}
private
void
putClientsMap
(
String
appKey
,
String
clientId
,
String
longChannelId
)
{
Set
<
String
>
set
=
ChannelManager
.
CLIENTS_MAP
.
get
(
appKey
+
":"
+
clientId
);
if
(
set
==
null
||
set
.
isEmpty
())
{
HashSet
<
String
>
hashSet
=
new
HashSet
<>();
hashSet
.
add
(
longChannelId
);
ChannelManager
.
CLIENTS_MAP
.
put
(
appKey
+
":"
+
clientId
,
hashSet
);
}
else
{
set
.
add
(
longChannelId
);
}
}
private
void
delSessionInfoMap
(
String
longChannelId
)
{
ChannelManager
.
SESSION_INFO_MAP
.
remove
(
longChannelId
);
}
private
void
delClientsMap
(
String
appKey
,
String
clientId
,
String
longChannelId
)
{
Set
<
String
>
set
=
ChannelManager
.
CLIENTS_MAP
.
get
(
appKey
+
":"
+
clientId
);
if
(
set
!=
null
)
{
set
.
remove
(
longChannelId
);
}
}
}
core/src/main/java/com/wecloud/im/ws/sender/ChannelSender.java
0 → 100644
View file @
dfcf66df
package
com
.
wecloud
.
im
.
ws
.
sender
;
import
com.wecloud.im.router.RouterSendService
;
import
com.wecloud.im.ws.cache.UserCacheService
;
import
com.wecloud.im.ws.model.ClientInfo
;
import
com.wecloud.im.ws.model.WsResponseModel
;
import
com.wecloud.im.ws.model.redis.ClientChannelInfo
;
import
com.wecloud.im.ws.model.request.ReceiveVO
;
import
com.wecloud.im.ws.manager.ChannelManager
;
import
com.wecloud.im.ws.utils.InitIp
;
import
com.wecloud.utils.JsonUtils
;
import
io.geekidea.springbootplus.framework.common.api.ApiCode
;
import
io.geekidea.springbootplus.framework.common.api.ApiResult
;
import
io.netty.channel.socket.nio.NioSocketChannel
;
import
io.netty.handler.codec.http.websocketx.TextWebSocketFrame
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
java.util.HashMap
;
import
java.util.List
;
/**
* @Description 下发数据
* @Author hewei hwei1233@163.com
* @Date 2019-12-05
*/
@Component
@Slf4j
public
class
ChannelSender
{
@Autowired
private
UserCacheService
userCacheService
;
@Autowired
private
RouterSendService
routerSendService
;
/**
* 固定"成功"状态码 带data
*
* @param receiveVO
* @param data
* @param toAppKey
* @param toClientId
*/
public
void
sendMsgData
(
ReceiveVO
receiveVO
,
Object
data
,
String
toAppKey
,
String
toClientId
)
{
this
.
sendMsg
(
receiveVO
,
ApiCode
.
SUCCESS
,
data
,
toAppKey
,
toClientId
);
}
/**
* 固定"成功"状态码 无data
*
* @param receiveVO
* @param apiCode
* @param toAppKey
* @param toClientId
*/
public
void
sendMsgSucess
(
ReceiveVO
receiveVO
,
ApiCode
apiCode
,
String
toAppKey
,
String
toClientId
)
{
this
.
sendMsg
(
receiveVO
,
apiCode
,
new
HashMap
<>(
1
),
toAppKey
,
toClientId
);
}
/**
* 固定"参数错误"状态码 无data
*
* @param receiveVO
* @param toAppKey
* @param toClientId
*/
public
void
sendMsgIllegeArgs
(
ReceiveVO
receiveVO
,
String
toAppKey
,
String
toClientId
)
{
// this.nullDataSuccess(requestModel, ResultStatus.PARAM_ERROR, userId);
}
/**
* 可自定义状态码 带data
*
* @param receiveVO
* @param data
*/
public
void
sendMsg
(
ReceiveVO
receiveVO
,
ApiCode
apiCode
,
Object
data
,
String
toAppKey
,
String
toClientId
)
{
ApiResult
<
Boolean
>
apiResult
=
ApiResult
.
result
(
apiCode
);
WsResponseModel
responseModel
=
new
WsResponseModel
();
responseModel
.
setMsg
(
apiResult
.
getMessage
());
responseModel
.
setCmd
(
receiveVO
.
getCmd
());
responseModel
.
setReqId
(
receiveVO
.
getReqId
());
responseModel
.
setData
(
data
);
responseModel
.
setCode
(
apiResult
.
getCode
());
this
.
sendMsg
(
responseModel
,
toAppKey
,
toClientId
);
}
/**
* 调用ws处理响应逻辑
*
* @param responseModel
* @param toAppKey
* @param toClientId
*/
public
void
sendMsg
(
WsResponseModel
responseModel
,
String
toAppKey
,
String
toClientId
)
{
String
msgJson
=
JsonUtils
.
encodeJson
(
responseModel
);
List
<
ClientChannelInfo
>
channelInfos
=
userCacheService
.
getIpByClientIdAndOnline
(
toAppKey
,
toClientId
);
for
(
ClientChannelInfo
channelInfo
:
channelInfos
)
{
// 是否为当前机器的ip
if
(
InitIp
.
lAN_IP
.
equals
(
channelInfo
.
getLanIp
()))
{
// 调用本地下发
this
.
sendMsgLocal
(
channelInfo
.
getChannelId
(),
msgJson
);
}
else
{
// todo rpc调用下发
routerSendService
.
sendMsgRemote
(
channelInfo
.
getChannelId
(),
msgJson
);
}
}
}
/**
* 本地直接下发,限定本机有的channel
*
* @param nioSocketChannel
* @param responseModel
*/
public
void
sendMsgLocal
(
NioSocketChannel
nioSocketChannel
,
WsResponseModel
responseModel
)
{
String
msgJson
=
JsonUtils
.
encodeJson
(
responseModel
);
// 本地直接下发
nioSocketChannel
.
writeAndFlush
(
new
TextWebSocketFrame
(
msgJson
));
}
/**
* 向指定channelId下发数据,限定本机有的channel
*
* @param toChannelId
* @param msg
* @return
*/
public
boolean
sendMsgLocal
(
String
toChannelId
,
String
msg
)
{
ClientInfo
clientInfo
=
ChannelManager
.
SESSION_INFO_MAP
.
get
(
toChannelId
);
if
(
clientInfo
==
null
)
{
return
false
;
}
NioSocketChannel
nioSocketChannel
=
clientInfo
.
getNioSocketChannel
();
if
(
null
==
nioSocketChannel
)
{
log
.
info
(
"writeData连接为空:"
+
msg
);
return
false
;
}
// 判断连接是否断开
if
(
nioSocketChannel
.
isShutdown
())
{
log
.
info
(
"writeData连接断开:"
+
msg
+
",\nchannelId:"
+
nioSocketChannel
.
id
().
asLongText
());
return
false
;
}
log
.
info
(
"writeData:"
+
",\nchannelId:"
+
nioSocketChannel
.
id
().
asLongText
());
nioSocketChannel
.
writeAndFlush
(
new
TextWebSocketFrame
(
msg
));
return
true
;
}
}
core/src/main/java/com/wecloud/im/ws/sender/WsDataWriter.java
deleted
100644 → 0
View file @
60b245ba
package
com
.
wecloud
.
im
.
ws
.
sender
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.stereotype.Service
;
@Service
@Slf4j
public
class
WsDataWriter
{
}
core/src/main/java/com/wecloud/im/ws/service/MangerChannelService.java
deleted
100644 → 0
View file @
60b245ba
package
com
.
wecloud
.
im
.
ws
.
service
;
import
com.wecloud.im.ws.model.ClientInfo
;
import
io.netty.channel.ChannelHandlerContext
;
import
io.netty.channel.socket.nio.NioSocketChannel
;
import
io.netty.util.AttributeKey
;
import
java.util.Map
;
import
java.util.Set
;
import
java.util.concurrent.ConcurrentHashMap
;
/**
* @Description 管理netty用户channel对象
* @Author hewei hwei1233@163.com
* @Date 2019-08-01
*/
public
interface
MangerChannelService
{
// /**
// * channel对象
// * 用户id为key
// * context为值
// */
// Map<String, NioSocketChannel> CHANNEL_MAP = new ConcurrentHashMap<>();
/**
* 本地维护 uid 对应多个 channel的shortID
*/
Map
<
String
,
Set
<
String
>>
CLIENTS_MAP
=
new
ConcurrentHashMap
<>();
/**
* channel的shortID对应client端数据
*/
Map
<
String
,
ClientInfo
>
SESSION_INFO_MAP
=
new
ConcurrentHashMap
<>();
/**
* CLIENT_ID,是客户端的字符串id
*/
AttributeKey
<
String
>
CLIENT_ID
=
AttributeKey
.
valueOf
(
"ci"
);
/**
* 是app的字符串id
*/
AttributeKey
<
String
>
APP_KEY
=
AttributeKey
.
valueOf
(
"ak"
);
/**
* LANGUAGE
*/
AttributeKey
<
String
>
LANGUAGE
=
AttributeKey
.
valueOf
(
"la"
);
/**
* APP_VERSION
*/
AttributeKey
<
String
>
APP_VERSION
=
AttributeKey
.
valueOf
(
"av"
);
AttributeKey
<
String
>
TOKEN
=
AttributeKey
.
valueOf
(
"to"
);
AttributeKey
<
String
>
DEVICEID
=
AttributeKey
.
valueOf
(
"dc"
);
AttributeKey
<
String
>
PLATFORM
=
AttributeKey
.
valueOf
(
"pt"
);
AttributeKey
<
Integer
>
READ_IDLE_TIMES
=
AttributeKey
.
valueOf
(
"readIdleTimes"
);
/**
* 根据userID获取channel
*
* @return
*/
// NioSocketChannel get(String appKey, String clientId);
/**
* client上线
* userID绑定channel
*
* @param channel
*/
void
online
(
String
appKey
,
String
clientId
,
NioSocketChannel
channel
);
/**
* 下线移除channel
*
* @param channelHandlerContext
*/
void
offline
(
ChannelHandlerContext
channelHandlerContext
);
/**
* 根据channel返回客户端key和id
*
* @param channelHandlerContext
* @return
*/
String
getStringInfoByChannel
(
ChannelHandlerContext
channelHandlerContext
);
/**
* 下发数据
*
* @param msg
* @param userId
* @return
*/
// boolean writeData(String msg, Long userId);
/**
* 获取用户在线状态
*
* @param toAppKey
* @param toClientId
* @return true:在线, false 不在线
*/
boolean
getOnlineStatus
(
String
toAppKey
,
String
toClientId
);
/**
* 下发数据
*
* @param msg
* @return
*/
// boolean writeData(String msg, String toAppKey, String toClientId);
/**
* 向指定channel_Id下发数据
*
* @param msg
* @return
*/
boolean
writeDataToChannel
(
String
msg
,
String
toChannelId
);
// /**
// * rpc异步下发数据
// *
// * @param msg
// * @param userId
// * @return
// */
// boolean rpcWriteData(String msg, Long userId);
//
// /**
// * rpc-异步下发踢人数据及关闭旧通道
// *
// * @param msg
// * @param userId
// * @return
// */
// boolean rpcKickWriteData(String msg, Long userId);
//
// /**
// * rpc-异步关闭旧通道
// *
// * @param userId
// * @return
// */
// boolean rpcCloseOldChannel(Long userId);
//
// Boolean isOnLocal(Long userId);
}
core/src/main/java/com/wecloud/im/ws/service/WriteDataService.java
deleted
100644 → 0
View file @
60b245ba
package
com
.
wecloud
.
im
.
ws
.
service
;
import
com.wecloud.im.ws.model.WsResponseModel
;
import
com.wecloud.im.ws.model.request.ReceiveVO
;
import
io.geekidea.springbootplus.framework.common.api.ApiCode
;
import
io.netty.channel.socket.nio.NioSocketChannel
;
/**
* @Description ws响应数据,各种状态码封装
* @Author hewei hwei1233@163.com
* @Date 2019-12-05
*/
public
interface
WriteDataService
{
/**
* 可自定义状态码 带data
*
* @param receiveVO
* @param data
*/
void
dataAndStatus
(
ReceiveVO
receiveVO
,
ApiCode
apiCode
,
Object
data
,
String
toAppKey
,
String
toClientId
);
/**
* 固定"成功"状态码 带data
*
* @param requestModel
* @param data
*/
void
successAndData
(
ReceiveVO
requestModel
,
Object
data
,
String
toAppKey
,
String
toClientId
);
/**
* 固定"成功"状态码 无data
*
* @param requestModel
*/
void
nullDataSuccess
(
ReceiveVO
requestModel
,
ApiCode
apiCode
,
String
toAppKey
,
String
toClientId
);
/**
* 固定"参数错误"状态码 无data
*
* @param requestModel
*/
void
paramErrorAndNullData
(
ReceiveVO
requestModel
,
String
toAppKey
,
String
toClientId
);
/**
* 调用ws处理响应逻辑
*
* @param responseModel
*/
void
write
(
WsResponseModel
responseModel
,
String
toAppKey
,
String
toClientId
);
/**
* 本地直接下发
*
* @param responseModel
* @param nioSocketChannel
*/
void
response
(
WsResponseModel
responseModel
,
NioSocketChannel
nioSocketChannel
);
}
core/src/main/java/com/wecloud/im/ws/service/impl/MangerChannelServiceImpl.java
deleted
100644 → 0
View file @
60b245ba
package
com
.
wecloud
.
im
.
ws
.
service
.
impl
;
import
com.wecloud.im.ws.cache.UserCacheService
;
import
com.wecloud.im.ws.model.ClientInfo
;
import
com.wecloud.im.ws.model.redis.ClientChannelInfo
;
import
com.wecloud.im.ws.service.MangerChannelService
;
import
com.wecloud.rtc.service.RtcService
;
import
io.netty.channel.ChannelHandlerContext
;
import
io.netty.channel.socket.nio.NioSocketChannel
;
import
io.netty.handler.codec.http.websocketx.TextWebSocketFrame
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Set
;
/**
* @Description 维护netty用户channel对象
* @Author hewei hwei1233@163.com
* @Date 2019-12-03
*/
@Component
@Slf4j
public
class
MangerChannelServiceImpl
implements
MangerChannelService
{
@Autowired
private
RtcService
rtcService
;
@Autowired
private
UserCacheService
userCacheService
;
// @Override
// public NioSocketChannel get(String appKey, String clientId) {
//
// return MangerChannelService.CHANNEL_MAP.get(appKey + clientId);
// }
@Override
public
void
online
(
String
appKey
,
String
clientId
,
NioSocketChannel
channel
)
{
// 断掉旧链接
// NioSocketChannel nioSocketChannel = get(appKey, clientId);
// if (null != nioSocketChannel) {
// log.info("put新连接关掉旧链接:" + appKey + "," + clientId + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// nioSocketChannel.close();
// }
// MangerChannelService.CHANNEL_MAP.put(appKey + clientId, channel);
String
longChannelId
=
channel
.
id
().
asLongText
();
this
.
putClientsMap
(
appKey
,
clientId
,
longChannelId
);
this
.
putSessionInfoMap
(
longChannelId
,
channel
);
userCacheService
.
online
(
appKey
,
clientId
,
longChannelId
);
}
void
putSessionInfoMap
(
String
longChannelId
,
NioSocketChannel
channel
)
{
ClientInfo
clientInfo
=
new
ClientInfo
();
clientInfo
.
setDeviceId
(
""
);
clientInfo
.
setNioSocketChannel
(
channel
);
clientInfo
.
setToken
(
""
);
MangerChannelService
.
SESSION_INFO_MAP
.
put
(
longChannelId
,
clientInfo
);
}
void
putClientsMap
(
String
appKey
,
String
clientId
,
String
longChannelId
)
{
Set
<
String
>
set
=
MangerChannelService
.
CLIENTS_MAP
.
get
(
appKey
+
":"
+
clientId
);
if
(
set
==
null
||
set
.
isEmpty
())
{
HashSet
<
String
>
hashSet
=
new
HashSet
<>();
hashSet
.
add
(
longChannelId
);
MangerChannelService
.
CLIENTS_MAP
.
put
(
appKey
+
":"
+
clientId
,
hashSet
);
}
else
{
set
.
add
(
longChannelId
);
}
}
void
delSessionInfoMap
(
String
longChannelId
)
{
MangerChannelService
.
SESSION_INFO_MAP
.
remove
(
longChannelId
);
}
void
delClientsMap
(
String
appKey
,
String
clientId
,
String
longChannelId
)
{
Set
<
String
>
set
=
MangerChannelService
.
CLIENTS_MAP
.
get
(
appKey
+
":"
+
clientId
);
if
(
set
!=
null
)
{
set
.
remove
(
longChannelId
);
}
}
@Override
public
void
offline
(
ChannelHandlerContext
channelHandlerContext
)
{
String
appKey
=
channelHandlerContext
.
channel
().
attr
(
MangerChannelService
.
APP_KEY
).
get
();
String
clientId
=
channelHandlerContext
.
channel
().
attr
(
MangerChannelService
.
CLIENT_ID
).
get
();
String
userIdByChannelString
=
this
.
getStringInfoByChannel
(
channelHandlerContext
);
String
longChannelId
=
channelHandlerContext
.
channel
().
id
().
asLongText
();
log
.
info
(
"uid:"
+
userIdByChannelString
+
","
+
"handlerRemoved"
+
",channelId:"
+
longChannelId
);
// 关掉连接
channelHandlerContext
.
close
();
// 移除本地维护的channel
delSessionInfoMap
(
longChannelId
);
delClientsMap
(
appKey
,
clientId
,
longChannelId
);
// 移除redis缓存
userCacheService
.
offline
(
appKey
,
clientId
,
longChannelId
);
// rtc清空缓存
rtcService
.
clientOffline
(
appKey
,
clientId
);
}
@Override
public
String
getStringInfoByChannel
(
ChannelHandlerContext
channelHandlerContext
)
{
return
"APP_KEY:"
+
channelHandlerContext
.
channel
().
attr
(
MangerChannelService
.
APP_KEY
).
get
()
+
",CLIENT_ID:"
+
channelHandlerContext
.
channel
().
attr
(
MangerChannelService
.
CLIENT_ID
).
get
();
}
// @Override
// public Boolean isOnLocal(Long userId) {
// NioSocketChannel nioSocketChannel = MangerChannelService.get(String.valueOf(userId));
// return null != nioSocketChannel;
// }
/**
* 获取用户在线状态
*
* @param toAppKey
* @param toClientId
* @return true:在线, false 不在线
*/
@Override
public
boolean
getOnlineStatus
(
String
toAppKey
,
String
toClientId
)
{
List
<
ClientChannelInfo
>
channelInfos
=
userCacheService
.
getIpByClientIdAndOnline
(
toAppKey
,
toClientId
);
boolean
flag
=
false
;
for
(
ClientChannelInfo
channelInfo
:
channelInfos
)
{
if
(
channelInfo
.
getOnlineStatus
().
equals
(
1
))
{
return
true
;
}
}
// NioSocketChannel nioSocketChannel = get(toAppKey, toClientId);
// if (null == nioSocketChannel) {
// if (log.isDebugEnabled()) {
// log.info("writeData 不存在 连接为空:" + toAppKey + toClientId);
// }
// return false;
// }
// // 判断连接是否断开
// if (nioSocketChannel.isShutdown()) {
// if (log.isDebugEnabled()) {
// log.info("writeData连接断开:" + toAppKey + toClientId + "channelId:" + nioSocketChannel.id().asLongText());
// }
// return false;
// }
return
flag
;
}
// @Override
// public boolean writeData(String msg, String toAppKey, String toClientId) {
//
// NioSocketChannel nioSocketChannel = get(toAppKey, toClientId);
// if (null == nioSocketChannel) {
// log.info("writeData连接为空:" + toAppKey + toClientId + "," + msg);
// return false;
// }
// // 判断连接是否断开
// if (nioSocketChannel.isShutdown()) {
// log.info("writeData连接断开:" + toAppKey + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
// return false;
// }
//
// log.info("writeData:" + toAppKey + "," + toClientId + "," + msg + ",\nchannelId:" + nioSocketChannel.id().asLongText());
//
// ChannelFuture channelFuture = nioSocketChannel.writeAndFlush(new TextWebSocketFrame(msg));
// channelFuture.addListener(
// //执行后回调的方法
// (ChannelFutureListener) channelFuture1 -> {
// if (log.isDebugEnabled()) {
// log.info("netty线程异步执行结果:" + channelFuture1.isDone() + ",业务执行结果:" + channelFuture1.isSuccess()
// + ";\nwriteData:" + toAppKey + toClientId + "," + msg + ",channelId:" + nioSocketChannel.id().asLongText());
// }
// });
// return true;
// }
@Override
public
boolean
writeDataToChannel
(
String
msg
,
String
toChannelId
)
{
ClientInfo
clientInfo
=
SESSION_INFO_MAP
.
get
(
toChannelId
);
if
(
clientInfo
==
null
)
{
return
false
;
}
NioSocketChannel
nioSocketChannel
=
clientInfo
.
getNioSocketChannel
();
if
(
null
==
nioSocketChannel
)
{
log
.
info
(
"writeData连接为空:"
+
msg
);
return
false
;
}
// 判断连接是否断开
if
(
nioSocketChannel
.
isShutdown
())
{
log
.
info
(
"writeData连接断开:"
+
msg
+
",\nchannelId:"
+
nioSocketChannel
.
id
().
asLongText
());
return
false
;
}
log
.
info
(
"writeData:"
+
",\nchannelId:"
+
nioSocketChannel
.
id
().
asLongText
());
nioSocketChannel
.
writeAndFlush
(
new
TextWebSocketFrame
(
msg
));
return
true
;
}
}
core/src/main/java/com/wecloud/im/ws/service/impl/WriteDataServiceImpl.java
deleted
100644 → 0
View file @
60b245ba
package
com
.
wecloud
.
im
.
ws
.
service
.
impl
;
import
cn.hutool.core.thread.ThreadFactoryBuilder
;
import
com.wecloud.im.router.RouterSendService
;
import
com.wecloud.im.ws.cache.UserCacheService
;
import
com.wecloud.im.ws.model.WsResponseModel
;
import
com.wecloud.im.ws.model.redis.ClientChannelInfo
;
import
com.wecloud.im.ws.model.request.ReceiveVO
;
import
com.wecloud.im.ws.service.MangerChannelService
;
import
com.wecloud.im.ws.service.WriteDataService
;
import
com.wecloud.im.ws.utils.InitIp
;
import
com.wecloud.utils.JsonUtils
;
import
io.geekidea.springbootplus.framework.common.api.ApiCode
;
import
io.geekidea.springbootplus.framework.common.api.ApiResult
;
import
io.netty.channel.socket.nio.NioSocketChannel
;
import
io.netty.handler.codec.http.websocketx.TextWebSocketFrame
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.concurrent.ThreadFactory
;
/**
* @Description 下发数据
* @Author hewei hwei1233@163.com
* @Date 2019-12-05
*/
@Component
public
class
WriteDataServiceImpl
implements
WriteDataService
{
private
final
static
ThreadFactory
WRITE_NAMED_THREAD_FACTORY
=
new
ThreadFactoryBuilder
()
.
setNamePrefix
(
"ws-WRITE-"
).
build
();
/**
* 耗时核心业务处理线程池
* 属于io密集型业务
* io密集型任务配置尽可能多的线程数量
*/
// private final static ExecutorService WRITE_TASK_THREAD_POOL_EXECUTOR =
// new ThreadPoolExecutor(WsConstants.CPU_PROCESSORS * 2, WsConstants.CPU_PROCESSORS * 3,
// 1L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>(), WRITE_NAMED_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy());
@Autowired
private
MangerChannelService
mangerChannelService
;
@Autowired
private
UserCacheService
userCacheService
;
@Autowired
private
RouterSendService
routerSendService
;
@Override
public
void
successAndData
(
ReceiveVO
receiveVO
,
Object
data
,
String
toAppKey
,
String
toClientId
)
{
this
.
dataAndStatus
(
receiveVO
,
ApiCode
.
SUCCESS
,
data
,
toAppKey
,
toClientId
);
}
@Override
public
void
nullDataSuccess
(
ReceiveVO
receiveVO
,
ApiCode
apiCode
,
String
toAppKey
,
String
toClientId
)
{
this
.
dataAndStatus
(
receiveVO
,
apiCode
,
new
HashMap
<>(
1
),
toAppKey
,
toClientId
);
}
@Override
public
void
paramErrorAndNullData
(
ReceiveVO
receiveVO
,
String
toAppKey
,
String
toClientId
)
{
// this.nullDataSuccess(requestModel, ResultStatus.PARAM_ERROR, userId);
}
@Override
public
void
dataAndStatus
(
ReceiveVO
receiveVO
,
ApiCode
apiCode
,
Object
data
,
String
toAppKey
,
String
toClientId
)
{
ApiResult
<
Boolean
>
apiResult
=
ApiResult
.
result
(
apiCode
);
WsResponseModel
responseModel
=
new
WsResponseModel
();
responseModel
.
setMsg
(
apiResult
.
getMessage
());
responseModel
.
setCmd
(
receiveVO
.
getCmd
());
responseModel
.
setReqId
(
receiveVO
.
getReqId
());
responseModel
.
setData
(
data
);
responseModel
.
setCode
(
apiResult
.
getCode
());
this
.
write
(
responseModel
,
toAppKey
,
toClientId
);
}
@Override
public
void
write
(
WsResponseModel
responseModel
,
String
toAppKey
,
String
toClientId
)
{
String
msgJson
=
JsonUtils
.
encodeJson
(
responseModel
);
List
<
ClientChannelInfo
>
channelInfos
=
userCacheService
.
getIpByClientIdAndOnline
(
toAppKey
,
toClientId
);
for
(
ClientChannelInfo
channelInfo
:
channelInfos
)
{
// 是否为当前机器的ip
if
(
InitIp
.
lAN_IP
.
equals
(
channelInfo
.
getLanIp
()))
{
// 调用本地下发
mangerChannelService
.
writeDataToChannel
(
msgJson
,
channelInfo
.
getChannelId
());
}
else
{
// rpc调用下发
routerSendService
.
rpcSend
(
msgJson
,
channelInfo
.
getChannelId
());
}
}
}
@Override
public
void
response
(
WsResponseModel
responseModel
,
NioSocketChannel
nioSocketChannel
)
{
String
msgJson
=
JsonUtils
.
encodeJson
(
responseModel
);
// 本地直接下发
nioSocketChannel
.
writeAndFlush
(
new
TextWebSocketFrame
(
msgJson
));
}
}
core/src/main/java/com/wecloud/im/ws/strategy/concrete/NormalChatStrategy.java
View file @
dfcf66df
package
com
.
wecloud
.
im
.
ws
.
strategy
.
concrete
;
import
com.baomidou.mybatisplus.core.conditions.query.QueryWrapper
;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.wecloud.im.entity.ImApplication
;
import
com.wecloud.im.entity.ImClient
;
import
com.wecloud.im.entity.ImConversationMembers
;
...
...
@@ -17,10 +16,9 @@ import com.wecloud.im.ws.annotation.ImCmdType;
import
com.wecloud.im.ws.enums.WsRequestCmdEnum
;
import
com.wecloud.im.ws.enums.WsResponseCmdEnum
;
import
com.wecloud.im.ws.model.WsResponseModel
;
import
com.wecloud.im.ws.model.request.PushVO
;
import
com.wecloud.im.ws.model.request.ReceiveVO
;
import
com.wecloud.im.ws.sender.AsyncPush
;
import
com.wecloud.im.ws.se
rvice.WriteDataService
;
import
com.wecloud.im.ws.se
nder.ChannelSender
;
import
com.wecloud.im.ws.strategy.AbstractImCmdStrategy
;
import
com.wecloud.utils.JsonUtils
;
import
io.geekidea.springbootplus.framework.common.api.ApiCode
;
...
...
@@ -50,7 +48,7 @@ public class NormalChatStrategy extends AbstractImCmdStrategy {
private
ImClientBlacklistService
imClientBlacklistService
;
@Autowired
private
WriteDataService
writeDataService
;
private
ChannelSender
channelSender
;
@Autowired
private
ImMessageService
imMessageService
;
...
...
@@ -139,7 +137,7 @@ public class NormalChatStrategy extends AbstractImCmdStrategy {
responseModel
.
setMsg
(
result
.
getMessage
());
responseModel
.
setData
(
imMessageOnlineSend
);
responseModel
.
setReqId
(
null
);
writeDataService
.
write
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
channelSender
.
sendMsg
(
responseModel
,
imApplication
.
getAppKey
(),
imClientReceiver
.
getClientId
());
// 异步推送系统通知消息
systemPush
.
push
(
receiveVO
.
getData
().
getPush
(),
imClientReceiver
,
imApplication
);
...
...
@@ -190,7 +188,7 @@ public class NormalChatStrategy extends AbstractImCmdStrategy {
responseModel
.
setReqId
(
receiveVO
.
getReqId
());
// 响应发送方
// writeDataService.write(responseModel, appKey, clientId);
writeDataService
.
response
(
responseModel
,
chann
el
);
channelSender
.
sendMsgLocal
(
channel
,
responseMod
el
);
}
/**
...
...
@@ -216,7 +214,7 @@ public class NormalChatStrategy extends AbstractImCmdStrategy {
responseModel
.
setReqId
(
receiveVO
.
getReqId
());
// writeDataService.write(responseModel, appKey, clientUniId);
writeDataService
.
response
(
responseModel
,
chann
el
);
channelSender
.
sendMsgLocal
(
channel
,
responseMod
el
);
return
true
;
}
...
...
@@ -234,7 +232,7 @@ public class NormalChatStrategy extends AbstractImCmdStrategy {
responseModel
.
setReqId
(
receiveVO
.
getReqId
());
// writeDataService.write(responseModel, appKey, clientUniId);
writeDataService
.
response
(
responseModel
,
chann
el
);
channelSender
.
sendMsgLocal
(
channel
,
responseMod
el
);
return
true
;
}
...
...
core/src/main/java/com/wecloud/rtc/service/impl/RtcServiceImpl.java
View file @
dfcf66df
...
...
@@ -12,7 +12,7 @@ import com.wecloud.im.param.rtc.RejectRtcChannelParam;
import
com.wecloud.im.param.rtc.SdpForwardParam
;
import
com.wecloud.im.service.ImApplicationService
;
import
com.wecloud.im.service.ImClientService
;
import
com.wecloud.im.ws.
service.MangerChannelService
;
import
com.wecloud.im.ws.
manager.ChannelManager
;
import
com.wecloud.rtc.entity.response.RtcCallResponse
;
import
com.wecloud.rtc.entity.response.RtcCandidateForwardResponse
;
import
com.wecloud.rtc.entity.response.RtcClientJoinResponse
;
...
...
@@ -28,7 +28,6 @@ import lombok.extern.slf4j.Slf4j;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
java.util.Date
;
import
java.util.List
;
@Slf4j
...
...
@@ -51,7 +50,7 @@ public class RtcServiceImpl implements RtcService {
private
ImClientService
imClientService
;
@Autowired
private
MangerChannelService
mangerChannelService
;
private
ChannelManager
channelManager
;
@Override
public
void
clientOffline
(
String
appKey
,
String
clientId
)
{
...
...
@@ -82,7 +81,7 @@ public class RtcServiceImpl implements RtcService {
ImApplication
imApplication
=
imApplicationService
.
getById
(
client
.
getFkAppid
());
// 判断发起方必须在线
boolean
onlineStatus
=
mangerChannelService
.
getOnlineStatus
(
imApplication
.
getAppKey
(),
client
.
getClientId
());
boolean
onlineStatus
=
channelManager
.
getOnlineStatus
(
imApplication
.
getAppKey
(),
client
.
getClientId
());
if
(!
onlineStatus
)
{
log
.
info
(
"发起方必须在线"
+
imApplication
.
getAppKey
()
+
client
.
getClientId
());
ApiResult
.
fail
();
...
...
@@ -214,7 +213,7 @@ public class RtcServiceImpl implements RtcService {
ImApplication
imApplication
=
imApplicationService
.
getById
(
client
.
getFkAppid
());
// 判断发起方必须在线
boolean
onlineStatus
=
mangerChannelService
.
getOnlineStatus
(
imApplication
.
getAppKey
(),
client
.
getClientId
());
boolean
onlineStatus
=
channelManager
.
getOnlineStatus
(
imApplication
.
getAppKey
(),
client
.
getClientId
());
if
(!
onlineStatus
)
{
log
.
info
(
"发起方必须在线"
+
imApplication
.
getAppKey
()
+
client
.
getClientId
());
ApiResult
.
fail
();
...
...
@@ -255,7 +254,7 @@ public class RtcServiceImpl implements RtcService {
ImApplication
imApplication
=
imApplicationService
.
getById
(
client
.
getFkAppid
());
// 判断发起方必须在线
boolean
onlineStatus
=
mangerChannelService
.
getOnlineStatus
(
imApplication
.
getAppKey
(),
client
.
getClientId
());
boolean
onlineStatus
=
channelManager
.
getOnlineStatus
(
imApplication
.
getAppKey
(),
client
.
getClientId
());
if
(!
onlineStatus
)
{
log
.
info
(
"发起方必须在线"
+
imApplication
.
getAppKey
()
+
client
.
getClientId
());
ApiResult
.
fail
();
...
...
core/src/main/java/com/wecloud/rtc/service/impl/WsRtcWriteImpl.java
View file @
dfcf66df
...
...
@@ -3,7 +3,7 @@ package com.wecloud.rtc.service.impl;
import
com.wecloud.im.ws.enums.WsResponseCmdEnum
;
import
com.wecloud.im.ws.enums.WsRtcResponseSubCmdEnum
;
import
com.wecloud.im.ws.model.WsResponseModel
;
import
com.wecloud.im.ws.se
rvice.WriteDataService
;
import
com.wecloud.im.ws.se
nder.ChannelSender
;
import
com.wecloud.rtc.entity.response.RtcCallResponse
;
import
com.wecloud.rtc.entity.response.RtcCandidateForwardResponse
;
import
com.wecloud.rtc.entity.response.RtcClientJoinResponse
;
...
...
@@ -21,7 +21,7 @@ import org.springframework.stereotype.Service;
public
class
WsRtcWriteImpl
implements
WsRtcWrite
{
@Autowired
private
WriteDataService
writeDataService
;
private
ChannelSender
channelSender
;
@Override
public
void
rtcCall
(
RtcCallResponse
rtcCallResponse
,
String
toAppKey
,
String
toClientId
)
{
...
...
@@ -39,7 +39,7 @@ public class WsRtcWriteImpl implements WsRtcWrite {
responseModel
.
setMsg
(
result
.
getMessage
());
responseModel
.
setData
(
rtcResponseBase
);
responseModel
.
setReqId
(
null
);
writeDataService
.
write
(
responseModel
,
toAppKey
,
toClientId
);
channelSender
.
sendMsg
(
responseModel
,
toAppKey
,
toClientId
);
}
...
...
@@ -59,7 +59,7 @@ public class WsRtcWriteImpl implements WsRtcWrite {
responseModel
.
setMsg
(
result
.
getMessage
());
responseModel
.
setData
(
rtcResponseBase
);
responseModel
.
setReqId
(
null
);
writeDataService
.
write
(
responseModel
,
toAppKey
,
toClientId
);
channelSender
.
sendMsg
(
responseModel
,
toAppKey
,
toClientId
);
}
...
...
@@ -81,7 +81,7 @@ public class WsRtcWriteImpl implements WsRtcWrite {
responseModel
.
setMsg
(
result
.
getMessage
());
responseModel
.
setData
(
rtcResponseBase
);
responseModel
.
setReqId
(
null
);
writeDataService
.
write
(
responseModel
,
toAppKey
,
toClientId
);
channelSender
.
sendMsg
(
responseModel
,
toAppKey
,
toClientId
);
}
...
...
@@ -101,7 +101,7 @@ public class WsRtcWriteImpl implements WsRtcWrite {
responseModel
.
setMsg
(
result
.
getMessage
());
responseModel
.
setData
(
rtcResponseBase
);
responseModel
.
setReqId
(
null
);
writeDataService
.
write
(
responseModel
,
toAppKey
,
toClientId
);
channelSender
.
sendMsg
(
responseModel
,
toAppKey
,
toClientId
);
}
@Override
...
...
@@ -120,7 +120,7 @@ public class WsRtcWriteImpl implements WsRtcWrite {
responseModel
.
setMsg
(
result
.
getMessage
());
responseModel
.
setData
(
rtcResponseBase
);
responseModel
.
setReqId
(
null
);
writeDataService
.
write
(
responseModel
,
toAppKey
,
toClientId
);
channelSender
.
sendMsg
(
responseModel
,
toAppKey
,
toClientId
);
}
@Override
...
...
@@ -139,7 +139,7 @@ public class WsRtcWriteImpl implements WsRtcWrite {
responseModel
.
setMsg
(
result
.
getMessage
());
responseModel
.
setData
(
rtcResponseBase
);
responseModel
.
setReqId
(
null
);
writeDataService
.
write
(
responseModel
,
toAppKey
,
toClientId
);
channelSender
.
sendMsg
(
responseModel
,
toAppKey
,
toClientId
);
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment