Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
W
wecloud_im_server
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
hewei
wecloud_im_server
Commits
e31b24d3
Commit
e31b24d3
authored
Nov 04, 2020
by
JJww
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'Jw' into 'master'
Jw See merge request hewei/Jumeirah!63
parents
12a337b4
ddcf6b33
Show whitespace changes
Inline
Side-by-side
Showing
18 changed files
with
84 additions
and
260 deletions
+84
-260
api-app/src/main/java/com/jumeirah/api/app/controller/AppUserController.java
+1
-0
common/src/main/java/com/jumeirah/common/mq/Queue.java
+1
-16
common/src/main/java/com/jumeirah/common/service/impl/MerchantServiceImpl.java
+0
-1
common/src/main/java/com/jumeirah/common/service/impl/StrokeServiceImpl.java
+3
-1
config/src/main/resources/config/application-dev.yml
+1
-2
config/src/main/resources/config/application-test.yml
+7
-0
customer-service/pom.xml
+24
-28
customer-service/src/main/java/com/ym/im/config/RabbitConfig.java
+5
-14
customer-service/src/main/java/com/ym/im/entity/Stroke.java
+7
-0
customer-service/src/main/java/com/ym/im/entity/model/OrderModel.java
+0
-37
customer-service/src/main/java/com/ym/im/handler/BaseHandler.java
+4
-5
customer-service/src/main/java/com/ym/im/mq/Queue.java
+0
-14
customer-service/src/main/java/com/ym/im/mq/Receiver.java
+19
-121
customer-service/src/main/java/com/ym/im/service/impl/MsgBodyServiceImpl.java
+2
-9
customer-service/src/main/java/com/ym/im/service/impl/StaffServiceImpl.java
+4
-4
customer-service/src/main/resources/application-dev.yml
+4
-4
customer-service/src/main/resources/application-prd.yml
+0
-1
customer-service/src/main/resources/application-test.yml
+2
-3
No files found.
api-app/src/main/java/com/jumeirah/api/app/controller/AppUserController.java
View file @
e31b24d3
...
...
@@ -27,6 +27,7 @@ import org.springframework.web.bind.annotation.RequestHeader;
import
org.springframework.web.bind.annotation.RequestMapping
;
import
org.springframework.web.bind.annotation.RequestParam
;
import
org.springframework.web.bind.annotation.RestController
;
import
org.springframework.web.bind.annotation.*
;
import
java.util.List
;
import
java.util.Set
;
...
...
common/src/main/java/com/jumeirah/common/mq/Queue.java
View file @
e31b24d3
package
com
.
jumeirah
.
common
.
mq
;
import
com.jumeirah.common.entity.Stroke
;
import
org.springframework.amqp.core.AmqpTemplate
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
...
...
@@ -18,30 +17,17 @@ public class Queue {
@Autowired
private
AmqpTemplate
rabbitTemplate
;
@Value
(
"${spring.rabbitmq.user-queue-name}"
)
private
String
userQueueName
;
@Value
(
"${spring.rabbitmq.order-queue-name}"
)
private
String
orderQueueName
;
/**
* 推送订单状态到客服系统
*
* @param orderInteractionModel
*/
public
void
pushOrder
(
Str
oke
stroke
)
{
public
void
pushOrder
(
Str
ing
stroke
)
{
rabbitTemplate
.
convertAndSend
(
orderQueueName
,
stroke
);
}
/**
* 禁用用户
*
* @param userId
*/
public
void
disableUserQueue
(
String
userId
)
{
rabbitTemplate
.
convertAndSend
(
userQueueName
,
userId
);
}
}
\ No newline at end of file
common/src/main/java/com/jumeirah/common/service/impl/MerchantServiceImpl.java
View file @
e31b24d3
...
...
@@ -67,7 +67,6 @@ public class MerchantServiceImpl extends BaseServiceImpl<MerchantMapper, Merchan
return
baseMapper
.
selectList
(
new
QueryWrapper
<
Merchant
>().
lambda
()
.
eq
(
Merchant:
:
getState
,
1
)
.
eq
(
Merchant:
:
getDeleted
,
0
)
.
eq
(
Merchant:
:
getAuditRegisterStatus
,
1
)
.
orderByDesc
(
Merchant:
:
getCreateTime
));
}
...
...
common/src/main/java/com/jumeirah/common/service/impl/StrokeServiceImpl.java
View file @
e31b24d3
package
com
.
jumeirah
.
common
.
service
.
impl
;
import
com.alibaba.fastjson.JSON
;
import
com.baomidou.mybatisplus.core.metadata.IPage
;
import
com.baomidou.mybatisplus.core.metadata.OrderItem
;
import
com.baomidou.mybatisplus.extension.plugins.pagination.Page
;
...
...
@@ -15,6 +16,7 @@ import com.jumeirah.common.vo.McStrokePaymentQueryVo;
import
com.jumeirah.common.vo.McStrokeQueryVo
;
import
com.jumeirah.common.vo.StrokeDetailVo
;
import
com.jumeirah.common.vo.StrokeQueryVo
;
import
com.rabbitmq.tools.json.JSONUtil
;
import
io.geekidea.springbootplus.framework.common.service.impl.BaseServiceImpl
;
import
io.geekidea.springbootplus.framework.core.pagination.PageInfo
;
import
io.geekidea.springbootplus.framework.core.pagination.Paging
;
...
...
@@ -46,7 +48,7 @@ public class StrokeServiceImpl extends BaseServiceImpl<StrokeMapper, Stroke> imp
public
boolean
saveStroke
(
Stroke
stroke
)
throws
Exception
{
final
boolean
save
=
super
.
save
(
stroke
);
if
(
save
)
{
//推送订单到客服系统
queue
.
pushOrder
(
stroke
);
queue
.
pushOrder
(
JSON
.
toJSONString
(
stroke
)
);
}
return
save
;
}
...
...
config/src/main/resources/config/application-dev.yml
View file @
e31b24d3
...
...
@@ -30,8 +30,7 @@ spring:
port
:
5672
username
:
root
password
:
root
user-queue-name
:
disable.user.dev
order-queue-name
:
push.order
order-queue-name
:
push.order.dev
# 打印SQL语句和结果集,本地开发环境可开启,线上注释掉
mybatis-plus
:
...
...
config/src/main/resources/config/application-test.yml
View file @
e31b24d3
...
...
@@ -26,6 +26,13 @@ spring:
password
:
temple123456
port
:
6379
rabbitmq
:
host
:
47.99.47.225
port
:
5672
username
:
root
password
:
root
order-queue-name
:
push.order
# 打印SQL语句和结果集,本地开发环境可开启,线上注释掉
mybatis-plus
:
configuration
:
...
...
customer-service/pom.xml
View file @
e31b24d3
...
...
@@ -2,10 +2,12 @@
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<groupId>
io.geekidea.springbootplus
</groupId>
<artifactId>
parent
</artifactId>
<version>
2.0
</version>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-parent
</artifactId>
<version>
2.2.5.RELEASE
</version>
<relativePath/>
</parent>
<modelVersion>
4.0.0
</modelVersion>
...
...
@@ -47,30 +49,23 @@
</dependency>
<dependency>
<groupId>
org.apache.httpcomponents
</groupId>
<artifactId>
httpclient
</artifactId>
</dependency>
<dependency>
<groupId>
org.apache.httpcomponents
</groupId>
<artifactId>
httpmime
</artifactId>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-data-redis
</artifactId>
</dependency>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-configuration-processor
</artifactId>
<optional>
true
</optional>
<artifactId>
spring-boot-starter-amqp
</artifactId>
</dependency>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-test
</artifactId>
<scope>
test
</scope>
<groupId>
org.apache.httpcomponents
</groupId>
<artifactId>
httpclient
</artifactId>
</dependency>
<dependency>
<groupId>
org.
springframework.boot
</groupId>
<artifactId>
spring-boot-starter-amqp
</artifactId>
<groupId>
org.
apache.httpcomponents
</groupId>
<artifactId>
httpmime
</artifactId>
</dependency>
<dependency>
...
...
@@ -81,6 +76,7 @@
<dependency>
<groupId>
com.alibaba
</groupId>
<artifactId>
druid-spring-boot-starter
</artifactId>
<version>
1.1.10
</version>
</dependency>
<dependency>
...
...
@@ -92,39 +88,39 @@
<dependency>
<groupId>
com.github.pagehelper
</groupId>
<artifactId>
pagehelper-spring-boot-starter
</artifactId>
<version>
1.2.3
</version>
</dependency>
<dependency>
<groupId>
org.mybatis.spring.boot
</groupId>
<artifactId>
mybatis-spring-boot-starter
</artifactId>
<version>
2.0.1
</version>
</dependency>
<dependency>
<groupId>
org.springframework.boot
</groupId>
<artifactId>
spring-boot-starter-data-redis
</artifactId>
<groupId>
com.baomidou
</groupId>
<artifactId>
mybatis-plus-boot-starter
</artifactId>
<version>
3.3.1
</version>
</dependency>
<dependency>
<groupId>
io.springfox
</groupId>
<artifactId>
springfox-swagger2
</artifactId>
<version>
2.9.2
</version>
</dependency>
<dependency>
<groupId>
io.springfox
</groupId>
<artifactId>
springfox-swagger-ui
</artifactId>
<version>
2.9.2
</version>
</dependency>
<dependency>
<groupId>
org.
springframework.boot
</groupId>
<artifactId>
spring-boot-devtools
</artifactId>
<
optional>
true
</optional
>
<groupId>
org.
projectlombok
</groupId>
<artifactId>
lombok
</artifactId>
<
version>
1.18.12
</version
>
</dependency>
<dependency>
<groupId>
com.baomidou
</groupId>
<artifactId>
mybatis-plus-boot-starter
</artifactId>
</dependency>
</dependencies>
<build>
...
...
customer-service/src/main/java/com/ym/im/config/RabbitConfig.java
View file @
e31b24d3
...
...
@@ -22,15 +22,15 @@ import java.util.Map;
@ConfigurationProperties
(
prefix
=
"spring.rabbitmq"
)
public
class
RabbitConfig
{
private
String
delayQueu
eName
;
private
String
exchang
eName
;
private
String
staffOffline
QueueName
;
private
String
order
QueueName
;
private
String
exchang
eName
;
private
String
staffOfflineQueu
eName
;
@Bean
public
Queue
delay
Queue
()
{
return
new
Queue
(
delay
QueueName
);
public
Queue
order
Queue
()
{
return
new
Queue
(
order
QueueName
);
}
@Bean
...
...
@@ -38,7 +38,6 @@ public class RabbitConfig {
return
new
Queue
(
staffOfflineQueueName
);
}
/**
* 配置默认的交换机
*/
...
...
@@ -53,14 +52,6 @@ public class RabbitConfig {
* 绑定队列到交换器
*/
@Bean
Binding
bindingDelayQueue
(
Queue
delayQueue
,
CustomExchange
customExchange
)
{
return
BindingBuilder
.
bind
(
delayQueue
).
to
(
customExchange
).
with
(
delayQueueName
).
noargs
();
}
/**
* 绑定队列到交换器
*/
@Bean
Binding
bindingStaffOfflineQueue
(
Queue
staffOfflineQueue
,
CustomExchange
customExchange
)
{
return
BindingBuilder
.
bind
(
staffOfflineQueue
).
to
(
customExchange
).
with
(
staffOfflineQueueName
).
noargs
();
}
...
...
customer-service/src/main/java/com/ym/im/entity/Stroke.java
View file @
e31b24d3
package
com
.
ym
.
im
.
entity
;
import
com.ym.im.validation.group.ChatRecordSaveGroup
;
import
com.ym.im.validation.group.ChatRecordSendGroup
;
import
io.swagger.annotations.ApiModelProperty
;
import
lombok.Data
;
import
lombok.experimental.Accessors
;
import
javax.validation.constraints.NotNull
;
import
javax.validation.groups.Default
;
import
java.io.Serializable
;
import
java.math.BigDecimal
;
import
java.sql.Timestamp
;
import
java.util.Date
;
/**
* 行程表
...
...
@@ -139,4 +144,6 @@ public class Stroke implements Serializable {
@ApiModelProperty
(
"是否是优惠调机,0-否,1-是"
)
private
Boolean
isDiscount
;
@ApiModelProperty
(
value
=
"发送时间"
)
private
Date
sendTime
;
}
customer-service/src/main/java/com/ym/im/entity/model/OrderModel.java
deleted
100644 → 0
View file @
12a337b4
package
com
.
ym
.
im
.
entity
.
model
;
import
lombok.Data
;
import
java.io.Serializable
;
/**
* @author: JJww
* @Date:2019-07-19
*/
@Data
public
class
OrderModel
implements
Serializable
{
private
static
final
long
serialVersionUID
=
1L
;
/**
* 订单类型
*/
private
String
type
;
/**
* 订单ID
*/
private
String
orderId
;
/**
* 订单状态
*/
private
String
orderStatus
;
/**
* 用户Id
*/
private
String
userId
;
private
Long
merchantId
;
}
customer-service/src/main/java/com/ym/im/handler/BaseHandler.java
View file @
e31b24d3
package
com
.
ym
.
im
.
handler
;
import
com.ym.im.entity.base.ChannelAttributeKey
;
import
com.ym.im.exception.HttpException
;
import
com.ym.im.factory.SingleChatFactory
;
import
io.netty.channel.ChannelHandler
;
import
io.netty.channel.ChannelHandlerContext
;
...
...
@@ -30,10 +29,10 @@ public abstract class BaseHandler<T> extends SimpleChannelInboundHandler<T> {
@Override
public
void
exceptionCaught
(
ChannelHandlerContext
ctx
,
Throwable
cause
)
{
if
(
cause
instanceof
HttpException
)
{
return
;
}
singleChatFactory
.
getService
(
ctx
.
channel
().
attr
(
ChannelAttributeKey
.
ROLE_TYPE
).
get
()).
offline
(
ctx
);
//
if (cause instanceof HttpException) {
//
return;
//
}
//
singleChatFactory.getService(ctx.channel().attr(ChannelAttributeKey.ROLE_TYPE).get()).offline(ctx);
}
}
customer-service/src/main/java/com/ym/im/mq/Queue.java
View file @
e31b24d3
package
com
.
ym
.
im
.
mq
;
import
com.ym.im.config.RabbitConfig
;
import
com.ym.im.entity.MsgBody
;
import
com.ym.im.entity.StaffSocketInfo
;
import
org.springframework.amqp.core.AmqpTemplate
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
...
@@ -21,19 +20,6 @@ public class Queue {
private
RabbitConfig
rabbitConfig
;
/**
* 延迟队列 重发未回应消息
*
* @param msgBody
*/
public
void
delaysQueue
(
MsgBody
msgBody
)
{
// 通过广播模式发布延时消息 延时3秒 消费后销毁 这里无需指定路由,会广播至每个绑定此交换机的队列
rabbitTemplate
.
convertAndSend
(
rabbitConfig
.
getExchangeName
(),
rabbitConfig
.
getDelayQueueName
(),
msgBody
,
message
->
{
message
.
getMessageProperties
().
setDelay
(
3
*
1000
);
// 毫秒为单位,指定此消息的延时时长
return
message
;
});
}
/**
* 客服离线 队列
*
* @param msgBody
...
...
customer-service/src/main/java/com/ym/im/mq/Receiver.java
View file @
e31b24d3
package
com
.
ym
.
im
.
mq
;
import
com.alibaba.fastjson.JSON
;
import
com.ym.im.entity.ChatRecord
;
import
com.ym.im.entity.MsgBody
;
import
com.ym.im.entity.StaffSocketInfo
;
import
com.ym.im.entity.Stroke
;
import
com.ym.im.entity.UserSocketInfo
;
import
com.ym.im.entity.base.NettyConstant
;
import
com.ym.im.entity.enums.RoleEnum
;
import
com.ym.im.entity.model.IdModel
;
import
com.ym.im.entity.model.OrderModel
;
import
com.ym.im.handler.ChannelGroupHandler
;
import
com.ym.im.service.StaffService
;
import
com.ym.im.util.JsonUtils
;
import
lombok.SneakyThrows
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.amqp.rabbit.annotation.RabbitListener
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.core.RedisTemplate
;
import
org.springframework.stereotype.Component
;
import
java.io.IOException
;
import
java.text.SimpleDateFormat
;
import
javax.annotation.Resource
;
import
java.util.Date
;
import
java.util.Set
;
...
...
@@ -34,7 +31,7 @@ public class Receiver {
@Autowired
private
Queue
queue
;
@
Autowired
@
Resource
(
name
=
"myRedisTemplate"
)
private
RedisTemplate
redisTemplate
;
@Autowired
...
...
@@ -43,23 +40,6 @@ public class Receiver {
@Autowired
private
ChannelGroupHandler
channelGroup
;
/**
* 禁用用户 队列名称
*/
public
static
final
String
USER_QUEUE_NAME
=
"disable.user"
;
/**
* 订单队列
*/
public
static
final
String
ORDER_QUEUE_NAME
=
"push.order"
;
@RabbitListener
(
queues
=
"#{delayQueue.name}"
)
public
void
delayAckHandler
(
MsgBody
msgBody
)
throws
IOException
{
log
.
info
(
"接收时间:"
+
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss"
).
format
(
new
Date
())
+
" 消息内容:"
+
JsonUtils
.
obj2Json
(
msgBody
));
retry
(
msgBody
);
}
@RabbitListener
(
queues
=
"#{staffOfflineQueue.name}"
)
public
void
offlineHandler
(
StaffSocketInfo
staffSocketInfo
)
{
...
...
@@ -68,9 +48,10 @@ public class Receiver {
redisTemplate
.
delete
(
NettyConstant
.
STAFF_USERIDS_KEY
+
staffId
);
//客服真离线后 才转发
if
(
channelGroup
.
getMerchantStaff
(
staffId
)
==
null
)
{
final
Set
<
Long
>
userIds
=
staffSocketInfo
.
getUserIds
();
final
Set
userIds
=
staffSocketInfo
.
getUserIds
();
log
.
info
(
"客服离线队列: "
+
"ID: "
+
"UserIds:"
+
userIds
);
userIds
.
forEach
((
Long
userId
)
->
{
userIds
.
forEach
(
uid
->
{
Long
userId
=
Long
.
valueOf
(
uid
.
toString
());
//用户在线才重新分配和转发
if
(
channelGroup
.
USER_GROUP
.
get
(
userId
)
!=
null
)
{
final
StaffSocketInfo
idleStaff
=
staffService
.
getIdleStaff
(
staffSocketInfo
.
getMerchantId
(),
userId
);
...
...
@@ -84,111 +65,29 @@ public class Receiver {
/**
* 禁用用户后 关闭socket
*
* @param userId
* @throws IOException
*/
@RabbitListener
(
queues
=
USER_QUEUE_NAME
)
public
void
disableUserHandler
(
String
userId
)
{
final
UserSocketInfo
userSocketInfo
=
channelGroup
.
USER_GROUP
.
get
(
Long
.
valueOf
(
userId
));
if
(
userSocketInfo
!=
null
)
{
userSocketInfo
.
writeAndFlush
(
new
MsgBody
<>().
setCode
(
MsgBody
.
LOGOUT
));
userSocketInfo
.
close
();
log
.
info
(
"用户: "
+
userId
+
"被禁用"
);
}
}
/**
* 订单相关处理
*
* @param orderModel
*/
@RabbitListener
(
queues
=
ORDER_QUEUE_NAME
)
public
void
orderHandler
(
OrderModel
orderModel
)
{
log
.
info
(
"Constants.ORDER_QUEUE_NAME: "
+
JSON
.
toJSONString
(
orderModel
));
final
UserSocketInfo
userSocketInfo
=
channelGroup
.
USER_GROUP
.
get
(
Long
.
valueOf
(
orderModel
.
getUserId
()));
@SneakyThrows
@RabbitListener
(
queues
=
"#{orderQueue.name}"
)
public
void
orderHandler
(
String
json
)
{
final
Stroke
stroke
=
JsonUtils
.
json2Obj
(
json
,
Stroke
.
class
);
log
.
info
(
"订单信息: "
+
json
);
final
Long
mcId
=
stroke
.
getMcId
();
final
Long
userId
=
stroke
.
getUserId
();
final
UserSocketInfo
userSocketInfo
=
channelGroup
.
USER_GROUP
.
get
(
userId
);
if
(
userSocketInfo
==
null
)
{
return
;
}
StaffSocketInfo
staffSocketInfo
=
channelGroup
.
getMerchantStaff
(
userSocketInfo
.
getStaffId
(
orderModel
.
getMerchantId
()));
final
MsgBody
<
OrderModel
>
orderInfo
=
new
MsgBody
<
OrderModel
>().
setCode
(
MsgBody
.
ORDER
).
setData
(
orderModel
);
/**
* 绑定客服在线,发送订单信息
*/
stroke
.
setSendTime
(
new
Date
());
final
MsgBody
<
Stroke
>
orderInfo
=
new
MsgBody
<
Stroke
>().
setCode
(
MsgBody
.
ORDER
).
setData
(
stroke
);
final
StaffSocketInfo
staffSocketInfo
=
channelGroup
.
getMerchantStaff
(
userSocketInfo
.
getStaffId
(
stroke
.
getMcId
()))
==
null
?
staffService
.
getIdleStaff
(
mcId
,
userId
)
:
channelGroup
.
getMerchantStaff
(
userSocketInfo
.
getStaffId
(
stroke
.
getMcId
()));
if
(
staffSocketInfo
!=
null
)
{
staffSocketInfo
.
writeAndFlush
(
orderInfo
);
log
.
info
(
"客服订单: "
+
"给客服("
+
staffSocketInfo
.
getStaffId
()
+
")发送订单:"
+
orderInfo
.
toString
());
return
;
log
.
info
(
"客服订单: "
+
"给客服("
+
staffSocketInfo
.
getStaffId
()
+
")发送订单:"
+
json
);
}
/**
* 绑定客服不在线,给历史客服发送订单信息
*/
final
Long
staffId
=
(
Long
)
redisTemplate
.
opsForHash
().
get
(
NettyConstant
.
IM_USERS
,
orderModel
.
getUserId
());
if
(
staffId
!=
null
)
{
log
.
info
(
"客服订单: "
+
"尝试给历史客服("
+
staffId
+
")发送订单:"
+
orderInfo
.
toString
());
staffSocketInfo
=
channelGroup
.
getMerchantStaff
(
staffId
);
if
(
staffSocketInfo
!=
null
)
{
staffSocketInfo
.
writeAndFlush
(
orderInfo
);
log
.
info
(
"客服订单: "
+
"给历史客服("
+
staffId
+
")发送订单:"
+
orderInfo
.
toString
());
}
}
}
/**
* 重发未回执的消息
*
* @param msgBody
* @throws IOException
*/
public
void
retry
(
MsgBody
<
ChatRecord
>
msgBody
)
throws
IOException
{
final
ChatRecord
chatRecord
=
msgBody
.
getData
();
final
Long
userId
=
Long
.
valueOf
(
chatRecord
.
getUserId
());
final
String
recordId
=
String
.
valueOf
(
chatRecord
.
getId
());
if
(
msgBody
!=
null
&&
chatRecord
.
getRetryCount
().
intValue
()
<
NettyConstant
.
RETRY_COUNT
.
intValue
())
{
UserSocketInfo
userSocketInfo
=
channelGroup
.
USER_GROUP
.
get
(
userId
);
if
(
userSocketInfo
==
null
)
{
return
;
}
MsgBody
<
ChatRecord
>
msg
=
JsonUtils
.
json2Obj
(
String
.
valueOf
(
redisTemplate
.
opsForHash
().
get
(
NettyConstant
.
MSG_KEY
+
userId
,
recordId
)),
JsonUtils
.
getJavaType
(
MsgBody
.
class
,
ChatRecord
.
class
));
if
(
msg
!=
null
&&
msg
.
getCode
().
equals
(
MsgBody
.
HAVE_READ
))
{
redisTemplate
.
opsForHash
().
delete
(
NettyConstant
.
MSG_KEY
+
userId
,
recordId
);
return
;
}
final
Integer
sendReceive
=
chatRecord
.
getSendReceive
();
switch
(
RoleEnum
.
get
(
sendReceive
))
{
case
APP:
Long
staffId
=
userSocketInfo
.
getStaffId
(
chatRecord
.
getMerchantId
());
StaffSocketInfo
staffSocketInfo
=
channelGroup
.
getMerchantStaff
(
staffId
);
if
(
staffSocketInfo
!=
null
)
{
staffSocketInfo
.
writeAndFlush
(
msgBody
);
}
break
;
case
merchant:
userSocketInfo
.
writeAndFlush
(
msgBody
);
break
;
default
:
}
//重发三次
chatRecord
.
setRetryCount
(
chatRecord
.
getRetryCount
()
+
1
);
queue
.
delaysQueue
(
msgBody
);
}
else
{
//移除失败消息
redisTemplate
.
opsForHash
().
delete
(
NettyConstant
.
MSG_KEY
+
userId
,
recordId
);
}
}
}
\ No newline at end of file
customer-service/src/main/java/com/ym/im/service/impl/MsgBodyServiceImpl.java
View file @
e31b24d3
...
...
@@ -6,7 +6,6 @@ import com.ym.im.entity.MsgBody;
import
com.ym.im.entity.base.ChannelAttributeKey
;
import
com.ym.im.entity.base.NettyConstant
;
import
com.ym.im.factory.SingleChatFactory
;
import
com.ym.im.mq.Queue
;
import
com.ym.im.service.ChatService
;
import
com.ym.im.service.MsgBodyService
;
import
com.ym.im.util.JsonUtils
;
...
...
@@ -36,15 +35,11 @@ import static com.ym.im.entity.MsgBody.SEND_MSG;
@Validated
({
MsgBodyGroup
.
class
})
public
class
MsgBodyServiceImpl
implements
MsgBodyService
{
@Autowired
private
Queue
queue
;
@Autowired
private
SingleChatFactory
singleChatFactory
;
@Resource
(
name
=
"myRedisTemplate"
)
private
RedisTemplate
redisTemplate
;
@Autowired
private
SingleChatFactory
singleChatFactory
;
@Override
public
void
msgBodyHandle
(
@NotNull
ChannelHandlerContext
ctx
,
@Valid
MsgBody
<
ChatRecord
>
msgBody
)
throws
JsonProcessingException
{
...
...
@@ -85,8 +80,6 @@ public class MsgBodyServiceImpl implements MsgBodyService {
msgBody
.
setCode
(
SEND_MSG
);
// 先保存消息至Redis
redisTemplate
.
opsForHash
().
put
(
NettyConstant
.
MSG_KEY
+
msgBody
.
getData
().
getUserId
(),
msgBody
.
getData
().
getId
(),
JsonUtils
.
obj2Json
(
msgBody
));
// 再默认以用户没有收到消息为前提,做循环、延迟通知
queue
.
delaysQueue
(
msgBody
);
// 最后发送聊天信息
channel
.
writeAndFlush
(
msgBody
);
}
...
...
customer-service/src/main/java/com/ym/im/service/impl/StaffServiceImpl.java
View file @
e31b24d3
...
...
@@ -28,10 +28,10 @@ public class StaffServiceImpl implements StaffService {
@Override
public
StaffSocketInfo
getIdleStaff
(
Long
merchantId
,
Long
userId
)
{
// final LinkedHashMap<Long, StaffSocketInfo> socketInfoMap = channelGroup.STAFF_GROUP.get(merchantId);
// if (socketInfoMap == null || socketInfoMap.size() == 0) {
// return null;
// }
if
(
channelGroup
.
STAFF_GROUP
.
get
(
merchantId
)
==
null
)
{
return
null
;
}
final
LinkedHashMap
<
Long
,
StaffSocketInfo
>
collect
=
channelGroup
.
STAFF_GROUP
.
get
(
merchantId
)
.
entrySet
()
.
stream
()
...
...
customer-service/src/main/resources/application-dev.yml
View file @
e31b24d3
...
...
@@ -24,12 +24,12 @@ spring:
jackson
:
default-property-inclusion
:
non_null
rabbitmq
:
host
:
127.0.0.1
host
:
47.99.47.225
port
:
5672
username
:
admin
password
:
admin
delay-queue-name
:
delay.ack.dev
username
:
root
password
:
root
staff-offline-Queue-Name
:
staff.offline.dev
order-queue-name
:
push.order.dev
exchange-name
:
delay.exchange.dev
listener
:
simple
:
...
...
customer-service/src/main/resources/application-prd.yml
View file @
e31b24d3
...
...
@@ -29,7 +29,6 @@ spring:
port
:
5672
username
:
admin
password
:
Yum123456
delay-queue-name
:
delay.ack
staff-offline-Queue-Name
:
staff.offline
exchange-name
:
delayAck
listener
:
...
...
customer-service/src/main/resources/application-test.yml
View file @
e31b24d3
...
...
@@ -29,9 +29,8 @@ spring:
port
:
5672
username
:
root
password
:
root
delay-queue-name
:
delay.ack-Jw
staff-offline-Queue-Name
:
staff.offline-Jw
exchange-name
:
delay.exchange
staff-offline-Queue-Name
:
staff.offline.dev
exchange-name
:
delay.exchange.dev
listener
:
simple
:
default-requeue-rejected
:
false
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment