Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
W
wecloud_im_server
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
hewei
wecloud_im_server
Commits
fd8a084b
Commit
fd8a084b
authored
Oct 21, 2022
by
陈前凌
Browse files
Options
Browse Files
Download
Plain Diff
Merge remote-tracking branch 'origin/xiaohudou_20220427' into xiaohudou_20220427
parents
33a88a98
8ea108dc
Expand all
Show whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
128 additions
and
145 deletions
+128
-145
core/src/main/java/com/wecloud/im/netty/core/ChannelInboundHandler.java
+5
-2
core/src/main/java/com/wecloud/im/service/ImMessageService.java
+0
-6
core/src/main/java/com/wecloud/im/service/SaveMessageService.java
+86
-0
core/src/main/java/com/wecloud/im/service/impl/ImConversationServiceImpl.java
+10
-10
core/src/main/java/com/wecloud/im/service/impl/ImGroupServiceImpl.java
+9
-9
core/src/main/java/com/wecloud/im/service/impl/ImMessageServiceImpl.java
+13
-101
core/src/main/java/com/wecloud/im/ws/manager/ChannelManager.java
+1
-1
core/src/main/resources/mapper/ImMessageMapper.xml
+4
-16
No files found.
core/src/main/java/com/wecloud/im/netty/core/ChannelInboundHandler.java
View file @
fd8a084b
...
...
@@ -67,15 +67,18 @@ public class ChannelInboundHandler extends ChannelInboundHandlerAdapter {
log
.
info
(
"连接的客户端地址:{}"
,
ctx
.
channel
().
remoteAddress
());
}
ctx
.
writeAndFlush
(
"客户端"
+
InetAddress
.
getLocalHost
().
getHostName
()
+
"成功与服务端建立连接! "
);
log
.
info
(
"连接成功 {},clientId {}"
,
ctx
.
channel
().
remoteAddress
(),
ctx
.
channel
().
attr
(
ChannelManager
.
CLIENT_ID
).
get
());
super
.
channelActive
(
ctx
);
}
@Override
public
void
channelInactive
(
ChannelHandlerContext
ctx
)
throws
Exception
{
super
.
channelInactive
(
ctx
);
Long
clientId
=
ctx
.
channel
().
attr
(
ChannelManager
.
CLIENT_ID
).
get
();
log
.
info
(
"触发channelInactive方法 clientId {}"
,
clientId
);
// ctx.channel().close();
super
.
channelInactive
(
ctx
);
if
(
clientId
==
null
)
{
ctx
.
channel
().
close
();
}
}
@Override
public
void
exceptionCaught
(
ChannelHandlerContext
ctx
,
Throwable
cause
)
...
...
core/src/main/java/com/wecloud/im/service/ImMessageService.java
View file @
fd8a084b
...
...
@@ -180,10 +180,4 @@ public interface ImMessageService extends BaseService<ImMessage> {
List
<
WeCloudMessageVo
>
syncListMessage
(
SyncListMessageParam
param
);
/**
* 保存消息到数据库
* @param imMessage
*/
void
saveMessageToDb
(
ImMessage
imMessage
);
}
core/src/main/java/com/wecloud/im/service/SaveMessageService.java
0 → 100644
View file @
fd8a084b
package
com
.
wecloud
.
im
.
service
;
import
com.baomidou.lock.annotation.Lock4j
;
import
com.baomidou.mybatisplus.core.conditions.query.QueryWrapper
;
import
com.wecloud.im.entity.ImClient
;
import
com.wecloud.im.entity.ImConversationMembers
;
import
com.wecloud.im.entity.ImMessage
;
import
com.wecloud.im.param.ImConversationQueryVo
;
import
com.wecloud.im.ws.utils.RedisUtils
;
import
com.wecloud.utils.SnowflakeUtil
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.commons.lang3.StringUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.cache.Cache
;
import
org.springframework.cache.CacheManager
;
import
org.springframework.stereotype.Service
;
import
java.time.Duration
;
import
java.util.Date
;
import
java.util.List
;
/**
* @Author Future
* @Date 2022/8/24 17:18
* @Description 本地缓存服务
*/
@Slf4j
@Service
public
class
SaveMessageService
{
private
static
final
String
LAST_MSG_PREFIX
=
"last_msg_"
;
@Autowired
private
ImMessageService
imMessageService
;
@Autowired
private
RedisUtils
redisUtils
;
/**
* 持久化消息到数据库并更新相关缓存
* expire 锁过期时间 防止死锁
*
* @param imMessage
*/
@Lock4j
(
keys
=
{
"#imMessage.fkConversationId"
},
expire
=
3000
,
acquireTimeout
=
1000
)
public
void
saveMessageToDb
(
ImMessage
imMessage
)
{
imMessage
.
setId
(
SnowflakeUtil
.
getId
());
imMessage
.
setCreateTime
(
new
Date
());
this
.
setLastMessageIdAndFreshCash
(
imMessage
);
imMessageService
.
save
(
imMessage
);
}
/**
* 查找会话的最新一条消息
*
* @param message
* @return
*/
private
void
setLastMessageIdAndFreshCash
(
ImMessage
message
)
{
try
{
// 执行业务方法
String
key
=
LAST_MSG_PREFIX
+
message
.
getFkConversationId
();
String
messageId
=
redisUtils
.
getKey
(
key
);
if
(
StringUtils
.
isNotBlank
(
messageId
))
{
// 将Redis中最新的messageId存入
message
.
setPreMessageId
(
Long
.
valueOf
(
messageId
));
}
else
{
// 落库查
ImMessage
preMessage
=
imMessageService
.
getOne
(
new
QueryWrapper
<
ImMessage
>().
lambda
()
.
eq
(
ImMessage:
:
getFkConversationId
,
message
.
getFkConversationId
())
.
orderByDesc
(
ImMessage:
:
getId
)
.
last
(
"limit 1"
)
);
if
(
preMessage
!=
null
)
{
message
.
setPreMessageId
(
preMessage
.
getId
());
}
}
redisUtils
.
addKey
(
key
,
message
.
getId
().
toString
(),
Duration
.
ofMinutes
(
30
));
}
catch
(
Exception
e
)
{
log
.
info
(
"获取会话最后一条消息异常 "
,
e
);
}
}
}
core/src/main/java/com/wecloud/im/service/impl/ImConversationServiceImpl.java
View file @
fd8a084b
...
...
@@ -62,6 +62,7 @@ import com.wecloud.im.service.ImConversationMembersService;
import
com.wecloud.im.service.ImConversationService
;
import
com.wecloud.im.service.ImInboxService
;
import
com.wecloud.im.service.ImMessageService
;
import
com.wecloud.im.service.SaveMessageService
;
import
com.wecloud.im.vo.ChatRoomMemberVo
;
import
com.wecloud.im.vo.ConversationCountVo
;
import
com.wecloud.im.vo.ConversationMemberVo
;
...
...
@@ -162,6 +163,9 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
@Resource
private
EhcacheService
ehcacheService
;
@Autowired
private
SaveMessageService
saveMessageService
;
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
)
public
ImConversationCreateVo
createImConversation
(
ImConversationCreate
imConversationCreate
)
{
...
...
@@ -341,7 +345,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
content
.
put
(
"operator"
,
imClientSender
.
getClientId
());
//操作的client ID
content
.
put
(
"passivityOperator"
,
clientToConversation
.
getClientId
());
//被操作的client ID
ImMessage
imMessage
=
MessageBuilder
.
buildEventMessage
(
MsgTypeEnum
.
INVITE_CLIENT_JOIN_CONVERSATION
,
imApplication
,
createClient
,
imConversation
,
JsonUtils
.
encodeJson
(
content
));
boolean
save
=
imMessageService
.
save
(
imMessage
);
saveMessageService
.
saveMessageToDb
(
imMessage
);
// 发送给在群内的成员
sendMsgToMembers
(
imConversation
,
membersList
,
createClient
,
imMessage
,
content
);
// 发送给被邀请人
...
...
@@ -403,7 +407,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
content
.
put
(
"passivityOperator"
,
clientToBeRemove
.
getClientId
());
//被操作的client ID
ImMessage
imMessage
=
MessageBuilder
.
buildEventMessage
(
MsgTypeEnum
.
REMOVE_CLIENT_CONVERSATION
,
imApplication
,
createClient
,
imConversation
,
JsonUtils
.
encodeJson
(
content
));
imMessageService
.
save
(
imMessage
);
saveMessageService
.
saveMessageToDb
(
imMessage
);
membersList
.
removeIf
(
e
->
e
.
getId
().
equals
(
members
.
getId
()));
...
...
@@ -451,7 +455,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
Map
<
String
,
Object
>
content
=
new
HashMap
<>();
content
.
put
(
"operator"
,
currentClient
.
getClientId
());
ImMessage
imMessage
=
MessageBuilder
.
buildEventMessage
(
MsgTypeEnum
.
CONVERSATION_DISBAND
,
imApplication
,
currentClient
,
imConversation
,
JsonUtils
.
encodeJson
(
content
));
imMessageService
.
save
(
imMessage
);
saveMessageService
.
saveMessageToDb
(
imMessage
);
sendMsgToMembers
(
imConversation
,
membersList
,
currentClient
,
imMessage
,
content
);
}
...
...
@@ -527,11 +531,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
// 生成消息id
ImMessage
imMessage
=
MessageBuilder
.
buildEventMessage
(
MsgTypeEnum
.
LEAVE_CONVERSATION
,
imApplication
,
currentClient
,
imConversation
,
""
);
// 保存消息至消息表
boolean
save
=
imMessageService
.
save
(
imMessage
);
if
(!
save
)
{
throw
new
BusinessException
(
"退出群聊错误"
);
}
saveMessageService
.
saveMessageToDb
(
imMessage
);
sendMsgToMembers
(
imConversation
,
membersList
,
currentClient
,
imMessage
,
null
);
// 群主退出 转移给下一个人
if
(
GroupRoleEnum
.
OWNER
.
getCode
().
equals
(
members
.
getRole
()))
{
...
...
@@ -1022,7 +1022,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
if
(!
membersList
.
isEmpty
())
{
// 保存事件消息
ImMessage
imMessage
=
MessageBuilder
.
buildEventMessage
(
msgType
,
application
,
operatorClient
,
group
,
JsonUtils
.
encodeJson
(
content
));
imMessageService
.
save
(
imMessage
);
saveMessageService
.
saveMessageToDb
(
imMessage
);
// 发送消息至群成员
sendMsgToMembers
(
group
,
membersList
,
operatorClient
,
imMessage
,
content
);
}
...
...
@@ -1541,7 +1541,7 @@ public class ImConversationServiceImpl extends BaseServiceImpl<ImConversationMap
content
.
put
(
"operator"
,
creator
.
getClientId
());
//操作的client ID
content
.
put
(
"passivityOperator"
,
member
.
getClientId
());
//被操作的client ID
ImMessage
imMessage
=
MessageBuilder
.
buildEventMessage
(
MsgTypeEnum
.
CLIENT_JOIN_NEW_CONVERSATION
,
application
,
creator
,
conversation
,
JsonUtils
.
encodeJson
(
content
));
imMessageService
.
save
(
imMessage
);
saveMessageService
.
saveMessageToDb
(
imMessage
);
// 投递消息
couriers
.
deliver
(
imMessage
,
content
,
creator
,
member
,
WsResponseCmdEnum
.
CONVERSATION_EVENT_MSG
);
}
...
...
core/src/main/java/com/wecloud/im/service/impl/ImGroupServiceImpl.java
View file @
fd8a084b
package
com
.
wecloud
.
im
.
service
.
impl
;
import
com.wecloud.im.service.SaveMessageService
;
import
io.geekidea.springbootplus.framework.common.exception.BusinessException
;
import
io.geekidea.springbootplus.framework.shiro.util.SecurityUtils
;
import
lombok.AllArgsConstructor
;
...
...
@@ -12,6 +13,7 @@ import java.util.List;
import
java.util.Map
;
import
java.util.stream.Collectors
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Transactional
;
...
...
@@ -86,10 +88,8 @@ public class ImGroupServiceImpl implements ImGroupService {
*/
private
final
ImConversationMapper
imConversationMapper
;
/**
* 消息服务
*/
private
final
ImMessageService
imMessageService
;
@Autowired
private
SaveMessageService
saveMessageService
;
@Override
public
Long
createGroup
(
String
creatorClientId
,
String
groupName
,
List
<
String
>
memberIds
)
{
...
...
@@ -163,7 +163,7 @@ public class ImGroupServiceImpl implements ImGroupService {
ImMessage
imMessage
=
MessageBuilder
.
buildEventMessage
(
MsgTypeEnum
.
CONVERSATION_DISBAND
,
application
,
operator
,
conversation
,
JsonUtils
.
encodeJson
(
content
));
// 保存消息至消息表
imMessageService
.
save
(
imMessage
);
saveMessageService
.
saveMessageToDb
(
imMessage
);
conversationService
.
sendMsgToMembers
(
conversation
,
membersList
,
operator
,
imMessage
,
content
);
return
true
;
...
...
@@ -236,7 +236,7 @@ public class ImGroupServiceImpl implements ImGroupService {
content
.
put
(
"passivityOperator"
,
newMember
.
getClientId
());
ImMessage
imMessage
=
MessageBuilder
.
buildEventMessage
(
MsgTypeEnum
.
INVITE_CLIENT_JOIN_CONVERSATION
,
imApplication
,
inviter
,
conversation
,
JsonUtils
.
encodeJson
(
content
));
boolean
save
=
imMessageService
.
save
(
imMessage
);
saveMessageService
.
saveMessageToDb
(
imMessage
);
// 发送给在群内的成员
conversationService
.
sendMsgToMembers
(
conversation
,
oldMembers
,
inviter
,
imMessage
,
content
);
// 发送给被邀请人
...
...
@@ -286,7 +286,7 @@ public class ImGroupServiceImpl implements ImGroupService {
content
.
put
(
"operator"
,
operator
.
getClientId
());
// 自己主动退出
ImMessage
imMessage
=
MessageBuilder
.
buildEventMessage
(
MsgTypeEnum
.
LEAVE_CONVERSATION
,
imApplication
,
operator
,
conversation
,
JsonUtils
.
encodeJson
(
content
));
imMessageService
.
save
(
imMessage
);
saveMessageService
.
saveMessageToDb
(
imMessage
);
// 发送给在群内的成员
conversationService
.
sendMsgToMembers
(
conversation
,
membersList
,
operator
,
imMessage
,
content
);
}
else
{
...
...
@@ -294,7 +294,7 @@ public class ImGroupServiceImpl implements ImGroupService {
// 被操作的client ID
content
.
put
(
"passivityOperator"
,
members
.
getClientId
());
ImMessage
imMessage
=
MessageBuilder
.
buildEventMessage
(
MsgTypeEnum
.
REMOVE_CLIENT_CONVERSATION
,
imApplication
,
operator
,
conversation
,
JsonUtils
.
encodeJson
(
content
));
imMessageService
.
save
(
imMessage
);
saveMessageService
.
saveMessageToDb
(
imMessage
);
// 发送给在群内的成员
conversationService
.
sendMsgToMembers
(
conversation
,
membersList
,
operator
,
imMessage
,
content
);
}
...
...
@@ -394,7 +394,7 @@ public class ImGroupServiceImpl implements ImGroupService {
Map
<
String
,
Object
>
content
=
new
HashMap
<>();
content
.
put
(
"operator"
,
newGroupOwner
.
getClientId
());
ImMessage
imMessage
=
MessageBuilder
.
buildEventMessage
(
MsgTypeEnum
.
CONVERSATION_NEW_CREATOR
,
imApplication
,
newGroupOwnerClient
,
conversation
,
JsonUtils
.
encodeJson
(
content
));
imMessageService
.
save
(
imMessage
);
saveMessageService
.
saveMessageToDb
(
imMessage
);
// 发送给在群内的成员
List
<
ImConversationMembers
>
existMemberList
=
conversationMembersService
.
list
(
Wrappers
.<
ImConversationMembers
>
lambdaQuery
()
...
...
core/src/main/java/com/wecloud/im/service/impl/ImMessageServiceImpl.java
View file @
fd8a084b
This diff is collapsed.
Click to expand it.
core/src/main/java/com/wecloud/im/ws/manager/ChannelManager.java
View file @
fd8a084b
...
...
@@ -60,7 +60,7 @@ public class ChannelManager {
*/
public
void
online
(
Long
clientId
,
Integer
platform
,
NioSocketChannel
channel
)
{
String
longChannelId
=
channel
.
id
().
asLongText
();
log
.
info
(
"保存本地连接clientId {} platform {}
"
,
clientId
,
platform
);
log
.
info
(
"保存本地连接clientId {} platform {}
channel is null {}"
,
clientId
,
platform
,
channel
==
null
);
this
.
putSessionInfoMap
(
clientId
,
platform
,
channel
);
UserStateListener
.
triggerOnlineEvent
(
clientId
,
platform
,
longChannelId
);
...
...
core/src/main/resources/mapper/ImMessageMapper.xml
View file @
fd8a084b
...
...
@@ -67,14 +67,9 @@
im_message_new.send_status,
im_message_new.`msg_type` AS 'type',
im_message_new.fk_conversation_id as conversationId
-- (SELECT COUNT(id) FROM im_inbox WHERE fk_msg_id = msgId AND read_msg_status = 0) AS not_read_count,
-- (SELECT COUNT(id)
-- FROM im_inbox
-- WHERE fk_msg_id = msgId
-- AND receiver_msg_status = 0) AS not_receiver_count
FROM `im_message_new`
INNER JOIN `im_client` ON `im_client`.id = `im_message_new`.sender
WHERE fk_conversation_id = #{param.conversationId}
and im_message_new.is_delete = 1 and im_message_new.withdraw = 0
WHERE fk_conversation_id = #{param.conversationId}
and (im_message_new.`event`=0 || (im_message_new.`event`=1 and sender != #{param.currentFkClientId} and (receivers is
null || (receivers !=null and FIND_IN_SET(#{param.currentFkClientId}, receivers))) ))
<if
test=
"param.msgIdStart != null"
>
...
...
@@ -84,9 +79,7 @@
AND im_message_new.id
<![CDATA[ < ]]>
#{param.msgIdEnd}
</if>
and im_message_new.create_time > DATE_SUB(CURDATE(), INTERVAL 7 DAY)
ORDER BY `im_message_new`.`create_time` DESC
ORDER BY `im_message_new`.`id` DESC
</select>
...
...
@@ -103,15 +96,10 @@
im_message_new.`at`,
im_message_new.send_status,
im_message_new.`msg_type` AS 'type',
im_message_new.fk_conversation_id as conversationId,
(SELECT COUNT(id) FROM im_inbox WHERE fk_msg_id = msgId AND read_msg_status = 0) AS not_read_count,
(SELECT COUNT(id)
FROM im_inbox
WHERE fk_msg_id = msgId
AND receiver_msg_status = 0) AS not_receiver_count
im_message_new.fk_conversation_id as conversationId
FROM `im_message_new`
INNER JOIN `im_client` ON `im_client`.id = `im_message_new`.sender
WHERE fk_conversation_id = #{param.conversationId}
and im_message_new.is_delete = 1 and im_message_new.withdraw = 0
WHERE fk_conversation_id = #{param.conversationId}
and (im_message_new.`event`=0 || (im_message_new.`event`=1 and sender != #{param.currentFkClientId} and (receivers is
null || (receivers !=null and FIND_IN_SET(#{param.currentFkClientId}, receivers))) ))
<if
test=
"param.msgIdStart != null"
>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment