Commit 104ec405 by giaogiao

1.完成同意进入频道;

2.ws不活跃rtc清空缓存;
3.优化维护频道信息 (kv);
4.优化rtc leave;
parent 4f218eb9
...@@ -47,8 +47,7 @@ public class ImRtcController extends BaseController { ...@@ -47,8 +47,7 @@ public class ImRtcController extends BaseController {
@PostMapping("/join") @PostMapping("/join")
@ApiOperation(value = "同意进入频道", notes = "") @ApiOperation(value = "同意进入频道", notes = "")
public ApiResult<Boolean> join(@RequestBody JoinRtcChannelParam joinRtcChannelParam) { public ApiResult<Boolean> join(@RequestBody JoinRtcChannelParam joinRtcChannelParam) {
return ApiResult.result(true); return rtcService.join(joinRtcChannelParam);
} }
@PostMapping("/reject") @PostMapping("/reject")
......
...@@ -32,7 +32,7 @@ public class SignController extends BaseController { ...@@ -32,7 +32,7 @@ public class SignController extends BaseController {
* 根据客户方生成签名字符串 验证通过则下发token * 根据客户方生成签名字符串 验证通过则下发token
*/ */
@PostMapping("/get") @PostMapping("/get")
@ApiOperation(value = "获取sign(仅试使用)", notes = "生成签名测试,在生产环境中,此步骤需要第三方应用的服务端进行生成") @ApiOperation(value = "获取sign(仅提供测试调试使用)", notes = "生成签名测试,在生产环境中,此步骤需要第三方应用的服务端进行生成")
public String get(@RequestBody GetSignParam getSignParam) throws Exception { public String get(@RequestBody GetSignParam getSignParam) throws Exception {
return new MD5().digestHex(getSignParam.getTimestamp() + getSignParam.getClientId() + getSignParam.getAppKey() + getSignParam.getAppSecret()); return new MD5().digestHex(getSignParam.getTimestamp() + getSignParam.getClientId() + getSignParam.getAppKey() + getSignParam.getAppSecret());
......
...@@ -4,12 +4,14 @@ import cn.hutool.core.thread.ThreadFactoryBuilder; ...@@ -4,12 +4,14 @@ import cn.hutool.core.thread.ThreadFactoryBuilder;
import com.wecloud.im.ws.model.WsConstants; import com.wecloud.im.ws.model.WsConstants;
import com.wecloud.im.ws.receive.ReadWsData; import com.wecloud.im.ws.receive.ReadWsData;
import com.wecloud.im.ws.service.MangerChannelService; import com.wecloud.im.ws.service.MangerChannelService;
import com.wecloud.rtc.service.RtcService;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
...@@ -35,6 +37,9 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -35,6 +37,9 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
@Resource @Resource
private ReadWsData readWsData; private ReadWsData readWsData;
@Autowired
private RtcService rtcService;
@Resource @Resource
private MangerChannelService mangerChannelService; private MangerChannelService mangerChannelService;
...@@ -170,9 +175,16 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -170,9 +175,16 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
*/ */
@Override @Override
public void handlerRemoved(ChannelHandlerContext ctx) { public void handlerRemoved(ChannelHandlerContext ctx) {
String appKey = ctx.channel().attr(MangerChannelService.APP_KEY).get();
String clientId = ctx.channel().attr(MangerChannelService.CLIENT_ID).get();
String userIdByChannel = mangerChannelService.getInfoByChannel(ctx); String userIdByChannel = mangerChannelService.getInfoByChannel(ctx);
log.info("uid:" + userIdByChannel + "," + "handlerRemoved" + ",channelId:" + ctx.channel().id().asLongText()); log.info("uid:" + userIdByChannel + "," + "handlerRemoved" + ",channelId:" + ctx.channel().id().asLongText());
// 关掉连接 // 关掉连接
ctx.close(); ctx.close();
// rtc清空缓存
rtcService.clientOffline(appKey, clientId);
} }
} }
...@@ -7,17 +7,17 @@ public class RtcRedisKey implements Serializable { ...@@ -7,17 +7,17 @@ public class RtcRedisKey implements Serializable {
/** /**
* 维护频道信息 (kv) * 维护频道信息 (kv)
*/ */
public static final String RTC_CHANNEL_INFO = "rci:%s"; public static final String RTC_CHANNEL_INFO = "r:ci:%s";
/** /**
* 维护所有用户当前在线的频道ID ( kv) * 维护用户当前在线的频道ID ( kv)
* user_join_channel = ujc * user_join_channel = ujc
* rcu:clientA = 10001 * rcu:clientA = 10001
* rcu:clientB = 10001 * rcu:clientB = 10001
* rcu:clientC = 10002 * rcu:clientC = 10002
* rcu:clientD = 10003 * rcu:clientD = 10003
*/ */
public static final String USER_JOIN_CHANNEL = "ujc:%s"; public static final String USER_JOIN_CHANNEL = "r:ujc:%s";
/** /**
* 维护频道中存在的用户 (set 集合): * 维护频道中存在的用户 (set 集合):
...@@ -26,6 +26,6 @@ public class RtcRedisKey implements Serializable { ...@@ -26,6 +26,6 @@ public class RtcRedisKey implements Serializable {
* rcu:10002 = clientC * rcu:10002 = clientC
* rcu:10003 = clientD * rcu:10003 = clientD
*/ */
public static final String RTC_CHANNEL_USERS = "rcu:%s"; public static final String RTC_CHANNEL_USERS = "r:cu:%s";
} }
...@@ -28,17 +28,17 @@ public interface MangerRtcCacheService { ...@@ -28,17 +28,17 @@ public interface MangerRtcCacheService {
/** /**
* 退出频道 * 退出频道
*/ */
void remove(String appKey, String clientId, Long rtcChannelId); void leave(String appKey, String clientId, Long rtcChannelId);
/** /**
* 根据频道ID获取频道内所有client * 根据频道ID获取频道内所有client
*/ */
List<String> getClientListByRtcChannelId(Long rtcChannelId); List<String> getClientListByRtcChannelId(Long rtcChannelId);
// /** /**
// * 根据客户端ID获取该客户端加入的频道ID * 根据客户端ID获取该客户端加入的频道ID
// */ */
// Long getRtcChannelIdListByClientId(String appKey, String clientId); Long getRtcChannelIdListByClientId(String appKey, String clientId);
/** /**
* 获取客户端忙线/空闲状态 * 获取客户端忙线/空闲状态
......
...@@ -15,6 +15,10 @@ import io.geekidea.springbootplus.framework.common.api.ApiResult; ...@@ -15,6 +15,10 @@ import io.geekidea.springbootplus.framework.common.api.ApiResult;
*/ */
public interface RtcService { public interface RtcService {
/**
* 客户端离线
*/
void clientOffline(String appKey, String clientId);
/** /**
* 创建一个频道,并向接收方发送系统推送 * 创建一个频道,并向接收方发送系统推送
......
...@@ -6,6 +6,7 @@ import com.wecloud.im.ws.utils.RedisUtils; ...@@ -6,6 +6,7 @@ import com.wecloud.im.ws.utils.RedisUtils;
import com.wecloud.rtc.entity.redis.RtcChannelInfo; import com.wecloud.rtc.entity.redis.RtcChannelInfo;
import com.wecloud.rtc.entity.redis.RtcRedisKey; import com.wecloud.rtc.entity.redis.RtcRedisKey;
import com.wecloud.rtc.service.MangerRtcCacheService; import com.wecloud.rtc.service.MangerRtcCacheService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -43,8 +44,8 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService { ...@@ -43,8 +44,8 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService {
String rtcChannelInfoJson = new JsonMapper().writeValueAsString(rtcChannelInfo); String rtcChannelInfoJson = new JsonMapper().writeValueAsString(rtcChannelInfo);
// --- 保存频道信息 // --- 保存频道信息
redisUtils.setKey(RtcRedisKey.RTC_CHANNEL_INFO, rtcChannelInfoJson); String channelKey = String.format(RtcRedisKey.RTC_CHANNEL_INFO, rtcChannelId);
redisUtils.setKey(channelKey, rtcChannelInfoJson);
//用户当前在线的频道ID //用户当前在线的频道ID
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId); String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId);
...@@ -52,7 +53,7 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService { ...@@ -52,7 +53,7 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService {
//频道中存在的用户 //频道中存在的用户
String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId); String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId);
redisUtils.addForSet(rtcChannelUsers, appKey + clientId); redisUtils.addForSet(rtcChannelUsers, clientId);
} }
...@@ -65,12 +66,12 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService { ...@@ -65,12 +66,12 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService {
//频道中存在的用户 //频道中存在的用户
String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId); String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId);
redisUtils.addForSet(rtcChannelUsers, appKey + clientId); redisUtils.addForSet(rtcChannelUsers, clientId);
} }
@Override @Override
public void remove(String appKey, String clientId, Long rtcChannelId) { public void leave(String appKey, String clientId, Long rtcChannelId) {
//用户当前在线的频道ID //用户当前在线的频道ID
String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId); String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId);
...@@ -78,7 +79,7 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService { ...@@ -78,7 +79,7 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService {
//频道中存在的用户 //频道中存在的用户
String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId); String rtcChannelUsers = String.format(RtcRedisKey.RTC_CHANNEL_USERS, rtcChannelId);
redisUtils.removeForSet(rtcChannelUsers, appKey + clientId); redisUtils.removeForSet(rtcChannelUsers, clientId);
} }
@Override @Override
...@@ -94,15 +95,17 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService { ...@@ -94,15 +95,17 @@ public class MangerRtcCacheServiceImpl implements MangerRtcCacheService {
} }
// @Override @Override
// public Long getRtcChannelIdListByClientId(String appKey, String clientId) { public Long getRtcChannelIdListByClientId(String appKey, String clientId) {
//
// //用户当前在线的频道ID //用户当前在线的频道ID
// String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId); String userJoinChannelKey = String.format(RtcRedisKey.USER_JOIN_CHANNEL, appKey + clientId);
// String key = redisUtils.getKey(userJoinChannelKey); String key = redisUtils.getKey(userJoinChannelKey);
// if (StringUtils.isBlank(key)) {
// return Long.valueOf(key); return null;
// } }
return Long.valueOf(key);
}
@Override @Override
public boolean getBusyStatus(String appKey, String clientId) { public boolean getBusyStatus(String appKey, String clientId) {
......
...@@ -15,6 +15,9 @@ import com.wecloud.im.service.ImClientService; ...@@ -15,6 +15,9 @@ import com.wecloud.im.service.ImClientService;
import com.wecloud.im.ws.service.MangerChannelService; import com.wecloud.im.ws.service.MangerChannelService;
import com.wecloud.rtc.entity.response.RtcCallResponse; import com.wecloud.rtc.entity.response.RtcCallResponse;
import com.wecloud.rtc.entity.response.RtcCandidateForwardResponse; import com.wecloud.rtc.entity.response.RtcCandidateForwardResponse;
import com.wecloud.rtc.entity.response.RtcClientJoinResponse;
import com.wecloud.rtc.entity.response.RtcClientLeaveResponse;
import com.wecloud.rtc.entity.response.RtcClientRejectResponse;
import com.wecloud.rtc.entity.response.RtcSdpForwardResponse; import com.wecloud.rtc.entity.response.RtcSdpForwardResponse;
import com.wecloud.rtc.service.MangerRtcCacheService; import com.wecloud.rtc.service.MangerRtcCacheService;
import com.wecloud.rtc.service.RtcService; import com.wecloud.rtc.service.RtcService;
...@@ -51,6 +54,26 @@ public class RtcServiceImpl implements RtcService { ...@@ -51,6 +54,26 @@ public class RtcServiceImpl implements RtcService {
private MangerChannelService mangerChannelService; private MangerChannelService mangerChannelService;
@Override @Override
public void clientOffline(String appKey, String clientId) {
// 根据appKey查询appid
ImApplication imApplication = imApplicationService.getOneByAppKey(appKey);
ImClient client = imClientService.getCacheImClient(imApplication.getId(), clientId);
// 获取该客户端加入的频道ID
Long listByClientId = mangerRtcCacheService.getRtcChannelIdListByClientId(imApplication.getAppKey(), client.getClientId());
if (listByClientId == null) {
return;
}
LeaveRtcChannelParam leaveRtcChannelParam = new LeaveRtcChannelParam();
leaveRtcChannelParam.setChannelId(listByClientId);
// websocket离线逻辑 服务端踢出频道
this.leave(leaveRtcChannelParam, client, imApplication);
}
@Override
public ApiResult<CreateRtcChannelResult> createAndCall(CreateRtcChannelParam createRtcChannelParam) throws JsonProcessingException { public ApiResult<CreateRtcChannelResult> createAndCall(CreateRtcChannelParam createRtcChannelParam) throws JsonProcessingException {
ImClient client = imClientService.getCurentClient(); ImClient client = imClientService.getCurentClient();
Long rtcChannelId = SnowflakeUtil.getId(); Long rtcChannelId = SnowflakeUtil.getId();
...@@ -98,17 +121,85 @@ public class RtcServiceImpl implements RtcService { ...@@ -98,17 +121,85 @@ public class RtcServiceImpl implements RtcService {
// 修改缓存 // 修改缓存
mangerRtcCacheService.join(imApplication.getAppKey(), client.getClientId(), joinRtcChannelParam.getChannelId()); mangerRtcCacheService.join(imApplication.getAppKey(), client.getClientId(), joinRtcChannelParam.getChannelId());
return null; //获取频道内所有client
List<String> clientListByRtcChannelId = mangerRtcCacheService.getClientListByRtcChannelId(joinRtcChannelParam.getChannelId());
// 移除自己
clientListByRtcChannelId.remove(imApplication.getAppKey() + client.getClientId());
for (String toClientId : clientListByRtcChannelId) {
// ws向接收方发送通知
RtcClientJoinResponse rtcSdpForwardResponse = new RtcClientJoinResponse();
rtcSdpForwardResponse.setChannelId(joinRtcChannelParam.getChannelId());
rtcSdpForwardResponse.setClientId(client.getClientId());
rtcSdpForwardResponse.setTimestamp(new Date().getTime());
wsRtcWrite.clientJoin(rtcSdpForwardResponse, imApplication.getAppKey(), toClientId);
}
return ApiResult.ok(true);
} }
@Override @Override
public ApiResult<Boolean> reject(RejectRtcChannelParam rejectRtcChannelParam) { public ApiResult<Boolean> reject(RejectRtcChannelParam rejectRtcChannelParam) {
return null;
ImClient client = imClientService.getCurentClient();
ImApplication imApplication = imApplicationService.getById(client.getFkAppid());
//获取频道内所有client
List<String> clientListByRtcChannelId = mangerRtcCacheService.getClientListByRtcChannelId(rejectRtcChannelParam.getChannelId());
// 移除自己
clientListByRtcChannelId.remove(imApplication.getAppKey() + client.getClientId());
for (String toClientId : clientListByRtcChannelId) {
// ws向接收方发送通知
RtcClientRejectResponse rtcClientRejectResponse = new RtcClientRejectResponse();
rtcClientRejectResponse.setChannelId(rejectRtcChannelParam.getChannelId());
rtcClientRejectResponse.setClientId(client.getClientId());
rtcClientRejectResponse.setTimestamp(new Date().getTime());
wsRtcWrite.clientReject(rtcClientRejectResponse, imApplication.getAppKey(), toClientId);
}
return ApiResult.ok(true);
} }
@Override @Override
public ApiResult<Boolean> leave(LeaveRtcChannelParam leaveRtcChannelParam) { public ApiResult<Boolean> leave(LeaveRtcChannelParam leaveRtcChannelParam) {
return null;
ImClient client = imClientService.getCurentClient();
// 根据appKey查询appid
ImApplication imApplication = imApplicationService.getById(client.getFkAppid());
this.leave(leaveRtcChannelParam, client, imApplication);
return ApiResult.ok(true);
}
private void leave(LeaveRtcChannelParam leaveRtcChannelParam, ImClient client, ImApplication imApplication) {
// 修改缓存
mangerRtcCacheService.leave(imApplication.getAppKey(), client.getClientId(), leaveRtcChannelParam.getChannelId());
//获取频道内所有client
List<String> clientListByRtcChannelId = mangerRtcCacheService.getClientListByRtcChannelId(leaveRtcChannelParam.getChannelId());
// 移除自己
clientListByRtcChannelId.remove(imApplication.getAppKey() + client.getClientId());
for (String toClientId : clientListByRtcChannelId) {
// ws向接收方发送通知
RtcClientLeaveResponse rtcClientLeaveResponse = new RtcClientLeaveResponse();
rtcClientLeaveResponse.setChannelId(leaveRtcChannelParam.getChannelId());
rtcClientLeaveResponse.setClientId(client.getClientId());
rtcClientLeaveResponse.setTimestamp(new Date().getTime());
wsRtcWrite.clientLeave(rtcClientLeaveResponse, imApplication.getAppKey(), toClientId);
}
// 判断频道内是否无其他人了
// if (clientListByRtcChannelId == null) {
// }
} }
@Override @Override
...@@ -135,22 +226,21 @@ public class RtcServiceImpl implements RtcService { ...@@ -135,22 +226,21 @@ public class RtcServiceImpl implements RtcService {
// 移除自己 // 移除自己
clientListByRtcChannelId.remove(imApplication.getAppKey() + client.getClientId()); clientListByRtcChannelId.remove(imApplication.getAppKey() + client.getClientId());
for (String id : clientListByRtcChannelId) { for (String toClientId : clientListByRtcChannelId) {
// ws向接收方发送通知 // ws向接收方发送通知
RtcSdpForwardResponse rtcSdpForwardResponse = new RtcSdpForwardResponse(); RtcSdpForwardResponse rtcSdpForwardResponse = new RtcSdpForwardResponse();
rtcSdpForwardResponse.setSdpData(sdpForwardParam.getSdpData()); rtcSdpForwardResponse.setSdpData(sdpForwardParam.getSdpData());
rtcSdpForwardResponse.setSdpType(sdpForwardParam.getSdpType()); rtcSdpForwardResponse.setSdpType(sdpForwardParam.getSdpType());
// rtcSdpForwardResponse.setConversationId(createRtcChannelParam.getConversationId());
rtcSdpForwardResponse.setChannelId(rtcChannelId); rtcSdpForwardResponse.setChannelId(rtcChannelId);
rtcSdpForwardResponse.setClientId(client.getClientId()); rtcSdpForwardResponse.setClientId(client.getClientId());
rtcSdpForwardResponse.setTimestamp(new Date().getTime()); rtcSdpForwardResponse.setTimestamp(new Date().getTime());
wsRtcWrite.sdpForward(rtcSdpForwardResponse, imApplication.getAppKey(), id); wsRtcWrite.sdpForward(rtcSdpForwardResponse, imApplication.getAppKey(), toClientId);
} }
return null; return ApiResult.ok(true);
} }
@Override @Override
...@@ -177,23 +267,19 @@ public class RtcServiceImpl implements RtcService { ...@@ -177,23 +267,19 @@ public class RtcServiceImpl implements RtcService {
// 移除自己 // 移除自己
clientListByRtcChannelId.remove(imApplication.getAppKey() + client.getClientId()); clientListByRtcChannelId.remove(imApplication.getAppKey() + client.getClientId());
for (String id : clientListByRtcChannelId) { for (String toClientId : clientListByRtcChannelId) {
// ws向接收方发送通知 // ws向接收方发送通知
RtcCandidateForwardResponse rtcCandidateForwardResponse = new RtcCandidateForwardResponse(); RtcCandidateForwardResponse rtcCandidateForwardResponse = new RtcCandidateForwardResponse();
rtcCandidateForwardResponse.setCandidateData(candidateForwardParam.getCandidateData()); rtcCandidateForwardResponse.setCandidateData(candidateForwardParam.getCandidateData());
// rtcCandidateForwardResponse.setConversationId(createRtcChannelParam.getConversationId());
rtcCandidateForwardResponse.setChannelId(rtcChannelId); rtcCandidateForwardResponse.setChannelId(rtcChannelId);
rtcCandidateForwardResponse.setClientId(client.getClientId()); rtcCandidateForwardResponse.setClientId(client.getClientId());
rtcCandidateForwardResponse.setTimestamp(new Date().getTime()); rtcCandidateForwardResponse.setTimestamp(new Date().getTime());
wsRtcWrite.candidateForward(rtcCandidateForwardResponse, imApplication.getAppKey(), id); wsRtcWrite.candidateForward(rtcCandidateForwardResponse, imApplication.getAppKey(), toClientId);
} }
return ApiResult.ok(true);
return null;
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment