Commit 84190ade by giaogiao

redis管理rtc频道

parent 81e302e0
package com.wecloud.im.ws.strategy.concrete; package com.wecloud.im.ws.strategy.concrete;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.entity.ImConversationMembers;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.service.ImApplicationService; import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientBlacklistService; import com.wecloud.im.service.ImClientBlacklistService;
import com.wecloud.im.service.ImClientService; import com.wecloud.im.service.ImClientService;
...@@ -15,34 +10,24 @@ import com.wecloud.im.service.ImInboxService; ...@@ -15,34 +10,24 @@ import com.wecloud.im.service.ImInboxService;
import com.wecloud.im.service.ImMessageService; import com.wecloud.im.service.ImMessageService;
import com.wecloud.im.ws.annotation.CmdTypeAnnotation; import com.wecloud.im.ws.annotation.CmdTypeAnnotation;
import com.wecloud.im.ws.enums.WsRequestCmdEnum; import com.wecloud.im.ws.enums.WsRequestCmdEnum;
import com.wecloud.im.ws.enums.WsResponseCmdEnum;
import com.wecloud.im.ws.model.ResponseModel;
import com.wecloud.im.ws.model.request.ReceiveModel; import com.wecloud.im.ws.model.request.ReceiveModel;
import com.wecloud.im.ws.sender.SystemPush; import com.wecloud.im.ws.sender.SystemPush;
import com.wecloud.im.ws.service.WriteDataService; import com.wecloud.im.ws.service.WriteDataService;
import com.wecloud.im.ws.strategy.ImCmdAbstract; import com.wecloud.im.ws.strategy.ImCmdAbstract;
import io.geekidea.springbootplus.framework.common.api.ApiCode; import com.wecloud.rtc.SubCmd;
import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
/** /**
* @Description 处理app数据消息 * 处理RTC信令消息
*/ */
@CmdTypeAnnotation(type = WsRequestCmdEnum.SINGLE_RTC) @CmdTypeAnnotation(type = WsRequestCmdEnum.SINGLE_RTC)
@Service @Service
@Slf4j @Slf4j
public class SingleRtcConcrete extends ImCmdAbstract { public class SingleRtcConcrete extends ImCmdAbstract {
private static final String TO_CONVERSATION_KEY = "toConversation";
private static final String PUSH_KEY = "push";
private static final String MSG_ID = "msgId";
private static final JsonMapper JSON_MAPPER = new JsonMapper(); private static final JsonMapper JSON_MAPPER = new JsonMapper();
...@@ -73,188 +58,35 @@ public class SingleRtcConcrete extends ImCmdAbstract { ...@@ -73,188 +58,35 @@ public class SingleRtcConcrete extends ImCmdAbstract {
@Override @Override
public void process(ReceiveModel receiveModel, ChannelHandlerContext ctx, String data, String appKey, String clientId) throws JsonProcessingException { public void process(ReceiveModel receiveModel, ChannelHandlerContext ctx, String data, String appKey, String clientId) throws JsonProcessingException {
// 查询imApplication // 指令判空
ImApplication imApplication = imApplicationService.getOneByAppKey(appKey); if (receiveModel.getData().get(SubCmd.SUB_CMD) == null) {
if (imApplication == null) {
log.info("imApplication为空");
return;
}
// 查询发送者client
ImClient imClientSender = getClientSender(clientId, imApplication);
if (imClientSender == null) {
return;
}
// 获取会话id
if (receiveModel.getData().get(TO_CONVERSATION_KEY) == null) {
return;
}
Long toConversationId = Long.valueOf(receiveModel.getData().get(TO_CONVERSATION_KEY).toString());
// 查询该会话所有成员
List<ImConversationMembers> membersList = imConversationMembersService.list(
new QueryWrapper<ImConversationMembers>().lambda()
.eq(ImConversationMembers::getFkConversationId, toConversationId)
.notIn(ImConversationMembers::getFkClientId, imClientSender.getId())
);
if (membersList.isEmpty()) {
log.info("membersList为空,toConversationId:" + toConversationId);
return;
}
receiveModel.getData().remove(TO_CONVERSATION_KEY);
// 获取自定义推送字段
HashMap<String, String> pushMap = null;
if (receiveModel.getData().get(PUSH_KEY) != null) {
pushMap = (HashMap<String, String>) receiveModel.getData().get(PUSH_KEY);
receiveModel.getData().remove(PUSH_KEY);
}
String content = null;
try {
content = JSON_MAPPER.writeValueAsString(receiveModel.getData());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
// 判断为单聊
if (membersList.size() == 1) {
// 拉黑逻辑
if (black(receiveModel, appKey, clientId, imClientSender, membersList)) {
return;
}
} else {
// TODO 群聊不能发送音视频通话
return; return;
} }
String cmd = receiveModel.getData().get(SubCmd.SUB_CMD).toString();
switch (cmd) {
//创建频道
case SubCmd.CREATE:
break;
// // 生成消息id //加入频道
// long messageId = SnowflakeUtil.getId(); case SubCmd.JOIN:
// // 保存消息至消息表 break;
// ImMessage imMessage = saveImMessage(imApplication, imClientSender, toConversationId, messageId, content);
//
// // 封装响应的实体
// ImMessageOnlineSend imMessageOnlineSend = new ImMessageOnlineSend();
// BeanUtils.copyProperties(imMessage, imMessageOnlineSend);
// imMessageOnlineSend.setMsgId(imMessage.getId());
// imMessageOnlineSend.setSender(clientId);
// imMessageOnlineSend.setContent(receiveModel.getData());
// imMessageOnlineSend.setConversationId(toConversationId);
//
// // 遍历发送
// for (ImConversationMembers conversationMembers : membersList) {
// // 保存收件箱
// long imInboxId = SnowflakeUtil.getId();
// ImInbox imInbox = new ImInbox();
// imInbox.setId(imInboxId);
// imInbox.setCreateTime(new Date());
// imInbox.setFkAppid(imApplication.getId());
// imInbox.setReceiver(conversationMembers.getFkClientId());
// imInbox.setFkMsgId(messageId);
// imInbox.setReadMsgStatus(0);
// imInbox.setReceiverMsgStatus(0);
// imInbox.setFkConversationId(toConversationId);
// imInboxService.save(imInbox);
//
// // 查询接收方
// ImClient imClientReceiver = imClientService.getOne(new QueryWrapper<ImClient>().lambda()
// .eq(ImClient::getFkAppid, imApplication.getId())
// .eq(ImClient::getId, conversationMembers.getFkClientId()));
// if (imClientReceiver == null) {
// continue;
// }
//
// // 向接收方推送
// ResponseModel<ImMessageOnlineSend> responseModel = new ResponseModel<>();
// responseModel.setCmd(WsResponseCmdEnum.ONLINE_MSG.getCmdCode());
// ApiResult<Boolean> result = ApiResult.result(ApiCode.SUCCESS);
// responseModel.setCode(result.getCode());
// responseModel.setMsg(result.getMessage());
// responseModel.setData(imMessageOnlineSend);
// responseModel.setReqId(null);
// writeDataService.write(responseModel, appKey, imClientReceiver.getClientId());
//
// // 异步推送系统通知消息
// pushTask.push(pushMap, imClientReceiver, imApplication, PushType.VOIP);
// }
//
// // 响应发送方消息id等信息
// ResponseModel<HashMap<String, Long>> responseModel = new ResponseModel<>();
// ApiResult<Boolean> result = ApiResult.result(ApiCode.SUCCESS);
// responseModel.setCmd(WsResponseCmdEnum.RES.getCmdCode());
// responseModel.setCode(result.getCode());
// responseModel.setMsg(result.getMessage());
// HashMap<String, Long> stringHashMap = new HashMap<>(3);
// stringHashMap.put(MSG_ID, messageId);
// responseModel.setData(stringHashMap);
// responseModel.setReqId(receiveModel.getReqId());
// // 响应发送方
// writeDataService.write(responseModel, appKey, clientId);
}
private ImClient getClientSender(String clientUniId, ImApplication imApplication) { //拒绝加入频道
ImClient imClientSender = imClientService.getOne(new QueryWrapper<ImClient>().lambda() case SubCmd.REJECT:
.eq(ImClient::getFkAppid, imApplication.getId()) break;
.eq(ImClient::getClientId, clientUniId));
if (imClientSender == null) {
log.info("imClientSender为空");
return null;
}
return imClientSender;
}
private ImMessage saveImMessage(ImApplication imApplication, ImClient imClientSender, Long toConversationId, long messageId, String content) { //SDP数据转发
ImMessage imMessage = new ImMessage(); case SubCmd.SDP:
imMessage.setId(messageId); break;
imMessage.setCreateTime(new Date());
imMessage.setFkAppid(imApplication.getId());
imMessage.setSender(imClientSender.getId());
imMessage.setContent(content);
imMessage.setWithdraw(false);
imMessage.setEvent(true);
imMessage.setSystem(false);
imMessage.setSendStatus(2);
imMessage.setFkConversationId(toConversationId);
imMessageService.save(imMessage);
return imMessage;
}
private boolean black(ReceiveModel receiveModel, String appKey, String clientUniId, ImClient imClientSender, List<ImConversationMembers> membersList) { //主动挂断(离开频道)
// 判断是否被拉黑 case SubCmd.LEAVE:
boolean beBlack = imClientBlacklistService.isBeBlack(membersList.get(0).getFkClientId(), imClientSender.getId()); break;
if (beBlack) {
log.info("被对方拉黑了");
// 响应发送方
ResponseModel<HashMap<String, Long>> responseModel = new ResponseModel<>();
ApiResult<Boolean> result = ApiResult.result(ApiCode.IS_BE_BLACK);
responseModel.setCmd(WsResponseCmdEnum.RES.getCmdCode());
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
responseModel.setReqId(receiveModel.getReqId());
writeDataService.write(responseModel, appKey, clientUniId);
return true;
} }
// 是否把对方拉黑
boolean black = imClientBlacklistService.isBeBlack(imClientSender.getId(), membersList.get(0).getFkClientId());
if (black) {
log.info("你把对方拉黑了");
// 响应发送方
ResponseModel<HashMap<String, Long>> responseModel = new ResponseModel<>();
ApiResult<Boolean> result = ApiResult.result(ApiCode.IS_TO_BLACK);
responseModel.setCmd(WsResponseCmdEnum.RES.getCmdCode());
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
responseModel.setReqId(receiveModel.getReqId());
writeDataService.write(responseModel, appKey, clientUniId);
return true;
}
return false;
} }
} }
...@@ -22,6 +22,52 @@ public class RedisUtils { ...@@ -22,6 +22,52 @@ public class RedisUtils {
@Autowired @Autowired
private StringRedisTemplate redisTemplate; private StringRedisTemplate redisTemplate;
public StringRedisTemplate redisTemplate() {
return redisTemplate;
}
/**
* 添加Key:value
*
* @param key
* @param value
*/
public void setKey(String key, String value) {
redisTemplate.opsForValue().set(key, value);
}
/**
* 删除Key
*
* @param key
*/
public boolean delKey(String key) {
return redisTemplate.delete(key);
}
/**
* 获取Key
*
* @param key
*/
public String getKey(String key) {
return redisTemplate.opsForValue().get(key);
}
/**
* 模糊查询
*
* @param key
* @return
*/
public Set<String> keys(String key) {
return redisTemplate.keys(key);
}
/** /**
* 获取hash中field对应的值 * 获取hash中field对应的值
* *
......
package com.wecloud.rtc;
import java.io.Serializable;
public class RtcRedisKey implements Serializable {
/**
* 维护所有用户当前在线的频道ID
* new Map<String,Long> map
* map.put("clientA",10001)
* map.put("clientB",10001)
* map.put("clientC",10002)
* map.put("clientD",10003)
* <p>
* redis Key:
* user_join_channel = ujc
* rcu:clientA:10001
* rcu:clientB:10001
* rcu:clientC:10002
* rcu:clientD:10003
*/
public static final String USER_JOIN_CHANNEL = "ujc:%s:%s";
/**
* 维护频道中存在的用户
* Map<Long,List<String>> map
* <p>
* new List<String> list
* list.add("clientA")
* list.add("clientB")
* <p>
* map.put(10001,list)
* <p>
* redis Key:
* rtc_channel_users = rcu
* key = rcu:10001:clientA
* key = rcu:10001:clientB
* key = rcu:10002:clientC
* key = rcu:10003:clientD
*/
public static final String RTC_CHANNEL_USERS = "rcu:%s:%s";
}
package com.wecloud.rtc.service;
import java.util.List;
/**
* 管理rtc频道
*/
public interface MangerRtcChannelService {
/**
* 创建一个频道
*/
Long create(String appKey, String clientId, Long rtcChannelId);
/**
* 加入频道
*/
void join(String appKey, String clientId, Long rtcChannelId);
/**
* 退出频道
*/
void remove(String appKey, String clientId, Long rtcChannelId);
/**
* 根据频道ID获取频道内所有client
*/
List<String> getClientListByRtcChannelId(Long rtcChannelId);
/**
* 根据客户端ID获取该客户端加入的频道ID
*/
Long getRtcChannelIdListByClientId(String appKey, String clientId);
/**
* 获取客户端忙线/空闲状态
* @param appKey
* @param clientId
* @return true:忙线,false空闲
*/
boolean getBusyStatus(String appKey, String clientId);
}
package com.wecloud.rtc.service.impl;
import com.wecloud.im.ws.utils.RedisUtils;
import com.wecloud.rtc.RtcRedisKey;
import com.wecloud.rtc.service.MangerRtcChannelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@Service
public class MangerRtcChannelServiceImpl implements MangerRtcChannelService {
@Autowired
private RedisUtils redisUtils;
@Override
public Long create(String appKey, String clientId, Long rtcChannelId) {
//用户当前在线的频道ID
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId, rtcChannelId);
//频道中存在的用户
String rtcChannelUsersKey = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId, appKey + clientId);
redisUtils.setKey(userJoinChannelKey, "");
redisUtils.setKey(rtcChannelUsersKey, "");
return null;
}
@Override
public void join(String appKey, String clientId, Long rtcChannelId) {
//用户当前在线的频道ID
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId, rtcChannelId);
//频道中存在的用户
String rtcChannelUsersKey = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId, appKey + clientId);
redisUtils.setKey(userJoinChannelKey, "");
redisUtils.setKey(rtcChannelUsersKey, "");
}
@Override
public void remove(String appKey, String clientId, Long rtcChannelId) {
//用户当前在线的频道ID
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId, rtcChannelId);
//频道中存在的用户
String rtcChannelUsersKey = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId, appKey + clientId);
redisUtils.delKey(userJoinChannelKey);
redisUtils.delKey(rtcChannelUsersKey);
}
@Override
public List<String> getClientListByRtcChannelId(Long rtcChannelId) {
String rtcChannelUsersKey = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId, "*");
Set<String> keys = redisUtils.keys(rtcChannelUsersKey);
List<String> clientList = new ArrayList<>();
for (String next:keys){
String s = next.split(":")[2];
clientList.add(s);
}
return clientList;
}
@Override
public Long getRtcChannelIdListByClientId(String appKey, String clientId) {
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId, "*");
Set<String> keys = redisUtils.keys(userJoinChannelKey);
String next = keys.iterator().next();
return Long.valueOf(next.split(":")[2]);
}
@Override
public boolean getBusyStatus(String appKey, String clientId) {
//用户当前在线的频道ID
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId, "*");
Set<String> keys = redisUtils.keys(userJoinChannelKey);
if (keys == null || keys.isEmpty()) {
return false;
} else {
return true;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment