Commit 6c5b3d30 by lixiaozhong

标准化,将websocket的消息体改成actionMapping的方式

parent be6999d5
......@@ -155,7 +155,7 @@ public class Swagger2Config {
@Bean
public Docket restAppApi() {
// 获取需要扫描的包
String[] basePackages = {"com.wecloud.im.controller", "com.wecloud.im"};
String[] basePackages = {"com.wecloud"};
ApiSelectorBuilder apiSelectorBuilder = new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.groupName("app")
......
......@@ -2,7 +2,7 @@ package com.wecloud.dispatch;
import com.wecloud.dispatch.common.ActionMethod;
import com.wecloud.dispatch.common.ApplyInfo;
import com.wecloud.dispatch.common.RequestVO;
import com.wecloud.dispatch.common.BaseRequest;
import com.wecloud.dispatch.config.ActionConfigurer;
import com.wecloud.dispatch.exception.ActionNotFoundException;
import com.wecloud.dispatch.extend.ActionBox;
......@@ -18,8 +18,8 @@ import com.wecloud.dispatch.registry.ActionBoxRegistry;
import com.wecloud.dispatch.registry.ActionInterceptorRegistry;
import com.wecloud.dispatch.registry.ActionRegistry;
import com.wecloud.dispatch.registry.MethodArgumentResolverRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.MethodParameter;
import java.lang.reflect.InvocationTargetException;
......@@ -30,18 +30,18 @@ import java.util.List;
/**
* @author lixiaozhong
*/
@Slf4j
public class ActionDispatcher {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
protected ActionContext actionContext = new ActionContext(this);
private final DefaultMethodArgumentResolver defaultMethodArgumentResolver = new DefaultMethodArgumentResolver();
public Object action(String action, RequestVO message) {
public Object action(String action, BaseRequest message) {
ActionMessage am = resolver(action, message);
return action(am, getArgumentBox());
}
public Object action(String action, RequestVO message, ArgumentBox argumentBox) {
public Object action(String action, BaseRequest message, ArgumentBox argumentBox) {
Object returnValue = null;
ActionMessage am = resolver(action, message);
returnValue = action(am, argumentBox == null ? getArgumentBox() : argumentBox);
......@@ -74,7 +74,6 @@ public class ActionDispatcher {
public ActionRequest getActionRequest(ActionMessage am) {
String actionCode = am.getAction();
RequestVO data = am.getMessage();
ActionMethod actionMethod = actionContext.getActionRegistry().getActionMethod(actionCode);
if (null == actionMethod) {
throw new ActionNotFoundException(actionCode + " not found");
......@@ -82,7 +81,7 @@ public class ActionDispatcher {
Object action = null;
Class<?> type = actionMethod.getActionClass();
action = getAction(type);
return createActionRequest(actionCode, action, actionMethod, data);
return createActionRequest(action, actionMethod, am);
}
public Object getAction(Class<?> type) {
......@@ -93,13 +92,8 @@ public class ActionDispatcher {
return a;
}
public ActionRequest createActionRequest(String path, Object action, Method method, RequestVO data) {
ActionRequest actionRequest = new ActionRequestImpl(path, action, new ActionMethod(method, path), data);
return actionRequest;
}
public ActionRequest createActionRequest(String path, Object action, ActionMethod actionMethod, RequestVO data) {
ActionRequest actionRequest = new ActionRequestImpl(path, action, actionMethod, data);
public ActionRequest createActionRequest(Object action, ActionMethod actionMethod, ActionMessage am) {
ActionRequest actionRequest = new ActionRequestImpl(action, actionMethod, am);
return actionRequest;
}
......@@ -126,7 +120,7 @@ public class ActionDispatcher {
public Object invokeForRequest(ActionRequest request, ArgumentBox argumentBox) {
Object[] args = getMethodArgumentValues(request, argumentBox);
if (logger.isTraceEnabled()) {
if (log.isTraceEnabled()) {
// StringBuilder sb = new StringBuilder("Invoking [");
// logger.debug(sb.toString());
}
......@@ -136,8 +130,8 @@ public class ActionDispatcher {
ApplyInfo applyInfo = applyActionMethodInterceptor(bean, actionMethod.getMethod(), args, actionMethod.getMethodParameters(), request, argumentBox);
if (applyInfo.isApprove()) {
returnValue = doInvoke(bean, actionMethod.getMethod(), args);
if (logger.isTraceEnabled()) {
logger.trace("Method [" + actionMethod.getMethod().getName() + "] returned [" + returnValue + "]");
if (log.isTraceEnabled()) {
log.trace("Method [" + actionMethod.getMethod().getName() + "] returned [" + returnValue + "]");
}
} else {
returnValue = applyInfo.getValue();
......@@ -227,7 +221,7 @@ public class ActionDispatcher {
return ab;
}
public ActionMessage resolver(String action, RequestVO message) {
public ActionMessage resolver(String action, BaseRequest message) {
ActionMessage am = new DefaultActionMessage(action, message);
return am;
}
......@@ -272,14 +266,27 @@ public class ActionDispatcher {
private ActionMethod actionMethod;
private Object action;
private RequestVO data;
private BaseRequest data;
private String path;
private Long senderClientId;
private Channel senderChannel;
ActionRequestImpl(String path, Object action, ActionMethod actionMethod, RequestVO data) {
ActionRequestImpl(String path, Object action, ActionMethod actionMethod, BaseRequest data, Long senderClientId, Channel senderChannel) {
this.path = path;
this.action = action;
this.actionMethod = actionMethod;
this.data = data;
this.senderClientId = senderClientId;
this.senderChannel = senderChannel;
}
ActionRequestImpl(Object action, ActionMethod actionMethod, ActionMessage am) {
this.path = am.getAction();
this.action = action;
this.actionMethod = actionMethod;
this.data = am.getMessage();
this.senderClientId = am.getSenderClientId();
this.senderChannel = am.getSenderChannel();
}
@Override
......@@ -288,7 +295,7 @@ public class ActionDispatcher {
}
@Override
public RequestVO getData() {
public BaseRequest getData() {
return this.data;
}
......@@ -298,8 +305,19 @@ public class ActionDispatcher {
}
@Override
public Long getSenderClientId() {
return this.senderClientId;
}
@Override
public Channel getSenderChannel() {
return this.senderChannel;
}
@Override
public String getPath() {
return path;
}
}
}
package com.wecloud.dispatch;
import com.wecloud.dispatch.annotation.ActionMapping;
import com.wecloud.dispatch.extend.ActionRequest;
import com.wecloud.im.ws.model.WsResponse;
import com.wecloud.im.ws.model.request.PushVO;
import com.wecloud.utils.JsonUtils;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.ArrayList;
......@@ -48,8 +55,10 @@ import java.util.Set;
* @author lixiaozhong
*/
@Component
@ActionMapping(value = "sendtest")
@Controller
@ActionMapping(value = "/sendtest")
@RequestMapping("/sendtest")
@Api(value = "测试方法类", tags = {"websocket报文-全部采用json传输"})
@Slf4j
public class WsTestAction {
......@@ -63,18 +72,31 @@ public class WsTestAction {
* @param pMap
*/
@ActionMapping(value = "gogo")
@ApiOperation(value = "测试方法")
@PostMapping("gogo")
@ApiImplicitParams({
@ApiImplicitParam(name="reqId", value = "请求id", required = false),
@ApiImplicitParam(name="hello", value = "测试参数hello", required = true),
@ApiImplicitParam(name="push", value = "push", dataTypeClass = PushVO.class),
@ApiImplicitParam(name="pList", value = "请求pList", dataTypeClass = List.class),
@ApiImplicitParam(name="pSet", value = "参数pSet", dataTypeClass = Set.class),
@ApiImplicitParam(name="pMap", value = "参数pMap", dataTypeClass = Map.class),
@ApiImplicitParam(name="request", value = "请求上下文,内部生成,不用传参", required = false),
})
public WsResponse<Boolean> testMyDispatch(String reqId,
@RequestParam("hello") String nihao,
PushVO push,
@RequestParam("push") PushVO push,
ArrayList<PushVO> pList,
Set<PushVO> pSet,
Map<String, String> pMap) {
Map<String, String> pMap,
@RequestParam("request") ActionRequest request) {
log.info(reqId);
log.info(nihao);
log.info(JsonUtils.encodeJson(push));
log.info(JsonUtils.encodeJson(pList));
log.info(JsonUtils.encodeJson(pSet));
log.info(JsonUtils.encodeJson(pMap));
log.info(request.getSenderClientId()+"");
WsResponse<Boolean> wsResponse = new WsResponse<>();
wsResponse.setCmd(0).setData(true).setCode(0).setMsg("msg").setReqId(reqId);
return wsResponse;
......
......@@ -8,7 +8,10 @@ import java.util.HashMap;
* @author lixiaozhong
*/
@NoArgsConstructor
public class RequestVO extends HashMap<String, Object> {
public class BaseRequest extends HashMap<String, Object> implements Request{
public static final String ACTION = "action";
public static final String REQ_ID = "reqId";
/**
* 请求的参数 action,用来寻找action类的path
......@@ -21,20 +24,22 @@ public class RequestVO extends HashMap<String, Object> {
*/
private String reqId;
@Override
public String getAction() {
return (String)this.get("action");
return (String)this.get(ACTION);
}
public void setAction(String action) {
this.put("action", action);
this.put(ACTION, action);
}
@Override
public String getReqId() {
return (String)this.get("reqId");
return (String)this.get(REQ_ID);
}
public void setReqId(String reqId) {
this.put("reqId", reqId);
this.put(REQ_ID, reqId);
}
}
package com.wecloud.dispatch.common;
/**
* @author lixiaozhong
*/
public interface Request {
/**
* 请求的参数 action,用来寻找action类的path
*/
String getAction();
/**
* 请求id, 以判空是否请求成功, 服务端处理完成后 返回此id
* 由前端生成,可以用uuid,也可以用时间戳
*/
String getReqId();
}
package com.wecloud.dispatch.extend;
import com.wecloud.dispatch.common.RequestVO;
import com.wecloud.dispatch.common.BaseRequest;
import io.netty.channel.Channel;
/**
* @author lixiaozhong
*/
public interface ActionMessage {
void setMessage(RequestVO message);
void setMessage(BaseRequest message);
RequestVO getMessage();
BaseRequest getMessage();
String getAction();
void setAction(String action);
void setSenderClientId(Long senderClientId);
Long getSenderClientId();
void setSenderChannel(Channel channel);
Channel getSenderChannel();
}
package com.wecloud.dispatch.extend;
import com.wecloud.dispatch.common.ActionMethod;
import com.wecloud.dispatch.common.RequestVO;
import com.wecloud.dispatch.common.BaseRequest;
import io.netty.channel.Channel;
/**
* @author lixiaozhong
......@@ -12,7 +13,12 @@ public interface ActionRequest {
Object getAction();
RequestVO getData();
BaseRequest getData();
ActionMethod getActionMethod();
Long getSenderClientId();
Channel getSenderChannel();
}
package com.wecloud.dispatch.extend.impl;
import com.wecloud.dispatch.common.RequestVO;
import com.wecloud.dispatch.common.BaseRequest;
import com.wecloud.dispatch.extend.ActionMessage;
import io.netty.channel.Channel;
/**
* @author lixiaozhong
......@@ -9,23 +10,32 @@ import com.wecloud.dispatch.extend.ActionMessage;
public class DefaultActionMessage implements ActionMessage {
String action;
RequestVO message;
BaseRequest message;
Long senderClientId;
Channel senderChannel;
public DefaultActionMessage() {
}
public DefaultActionMessage(String action, RequestVO message) {
public DefaultActionMessage(String action, BaseRequest message) {
this.message = message;
this.action = action;
}
public DefaultActionMessage(String action, BaseRequest message, Long senderClientId, Channel senderChannel) {
this.message = message;
this.action = action;
this.senderClientId = senderClientId;
this.senderChannel = senderChannel;
}
@Override
public RequestVO getMessage() {
public BaseRequest getMessage() {
return message;
}
@Override
public void setMessage(RequestVO message) {
public void setMessage(BaseRequest message) {
this.message = message;
}
......@@ -39,4 +49,24 @@ public class DefaultActionMessage implements ActionMessage {
this.action = action;
}
@Override
public void setSenderClientId(Long senderClientId) {
this.senderClientId = senderClientId;
}
@Override
public Long getSenderClientId() {
return senderClientId;
}
@Override
public void setSenderChannel(Channel senderChannel) {
this.senderChannel = senderChannel;
}
@Override
public Channel getSenderChannel() {
return this.senderChannel;
}
}
package com.wecloud.dispatch.factory;
import com.wecloud.dispatch.util.ClassUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Array;
import java.util.Map;
......@@ -12,9 +11,9 @@ import java.util.concurrent.locks.ReentrantLock;
/**
* @author lixiaozhong
*/
@Slf4j
public abstract class AbstractFactory {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
private Map<Class<?>, Object> objectMap = new ConcurrentHashMap<>();
private Map<Class<?>, Boolean> canNotInstanceMap = new ConcurrentHashMap<>();
private ReentrantLock lock = new ReentrantLock();
......@@ -99,7 +98,7 @@ public abstract class AbstractFactory {
}
} catch (Exception e) {
canNotInstanceMap.put(clazz, false);
logger.error("初始化对象失败!", e);
log.error("初始化对象失败!", e);
}
}
return object;
......
package com.wecloud.dispatch.general;
import com.wecloud.dispatch.common.RequestVO;
import com.wecloud.dispatch.common.BaseRequest;
import com.wecloud.dispatch.extend.ActionMessage;
import com.wecloud.dispatch.extend.ArgumentBox;
import com.wecloud.dispatch.extend.impl.DefaultArgumentBox;
......@@ -35,16 +35,18 @@ public class GeneralMessageHandler {
log.debug("appWS收到data: {}\n senderClientId:{}, channelId:{}", data, senderClientId, ctx.channel().id().asLongText());
}
// 解析jsonO
RequestVO requestVO = JsonUtils.decodeJson(data, RequestVO.class);
BaseRequest baseRequest = JsonUtils.decodeJson(data, BaseRequest.class);
if (null == requestVO || null == requestVO.getAction()) {
throw new BusinessException("null == requestVO || null == requestVO.getAction()");
if (null == baseRequest || null == baseRequest.getAction()) {
throw new BusinessException("null == baseRequest || null == baseRequest.getAction()");
}
ActionMessage am = actionMessageResolver.resolver(generalActionDispatcher, requestVO);
ActionMessage am = actionMessageResolver.resolver(generalActionDispatcher, baseRequest);
am.setSenderChannel(ctx.channel());
am.setSenderClientId(senderClientId);
Object res = generalActionDispatcher.action(am);
channelSender.sendMsgLocal((NioSocketChannel)ctx.channel(), res);
channelSender.sendMsgLocal((NioSocketChannel)am.getSenderChannel(), res);
}
public ArgumentBox getArgumentBox() {
......
package com.wecloud.dispatch.general.extend;
import com.wecloud.dispatch.ActionDispatcher;
import com.wecloud.dispatch.common.RequestVO;
import com.wecloud.dispatch.common.BaseRequest;
import com.wecloud.dispatch.extend.ActionMessage;
/**
......@@ -16,6 +16,6 @@ public interface ActionMessageResolver {
* @param data
* @return
*/
ActionMessage resolver(ActionDispatcher actionDispatcher, RequestVO data);
ActionMessage resolver(ActionDispatcher actionDispatcher, BaseRequest data);
}
package com.wecloud.dispatch.general.extend.impl;
import com.wecloud.dispatch.ActionDispatcher;
import com.wecloud.dispatch.common.RequestVO;
import com.wecloud.dispatch.common.BaseRequest;
import com.wecloud.dispatch.extend.ActionMessage;
import com.wecloud.dispatch.extend.impl.DefaultActionMessage;
import com.wecloud.dispatch.general.extend.ActionMessageResolver;
......@@ -14,7 +14,7 @@ import org.springframework.stereotype.Component;
public class ActionMessageResolverImpl implements ActionMessageResolver {
@Override
public ActionMessage resolver(ActionDispatcher actionDispatcher, RequestVO data) {
public ActionMessage resolver(ActionDispatcher actionDispatcher, BaseRequest data) {
ActionMessage am = new DefaultActionMessage();
am.setAction("");
......
package com.wecloud.dispatch.general.impl;
import cn.hutool.core.bean.copier.BeanCopier;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.wecloud.dispatch.ActionContext;
......@@ -55,49 +56,14 @@ public class GeneralMethodArgumentResolver implements MethodArgumentResolver {
Object value = request.getData().get(name);
if (value instanceof JSONObject) {
JSONObject jo = (JSONObject) value;
if (isCustomMap(clazz)) {
object = JSONObject.parseObject(jo.toJSONString(), clazz);
if(ActionRequest.class.isAssignableFrom(clazz)) {
object = request;
} else if (ArgumentBox.class.isAssignableFrom(clazz)) {
object = argumentBox;
} else {
object = jo.toJavaObject(t);
object = JsonUtils.beanCopyDeep(value, clazz);
}
} else if (value instanceof JSONArray) {
JSONArray ja = (JSONArray) value;
object = ja.toJavaObject(t);
}
else if (value instanceof Map) {
if(clazz.isInstance(value)) {
object = value;
} else {
object = JsonUtils.mapToBean((Map) value, getObject(clazz));
}
}
else if (value instanceof List) {
if(clazz.isInstance(value)) {
object = value;
} else if(Set.class.isAssignableFrom(clazz)) {
Set objectSet = new HashSet();
Type actualTypeArgument = ((ParameterizedType) parameter.getGenericParameterType()).getActualTypeArguments()[0];
((List<Map<String, Object>>) value).forEach(p -> objectSet.add(JsonUtils.mapToBean(p, getObject((Class)actualTypeArgument))));
object = objectSet;
} else {
List objectList = new ArrayList();
Type actualTypeArgument = ((ParameterizedType) parameter.getGenericParameterType()).getActualTypeArguments()[0];
((List<Map<String, Object>>) value).forEach(p -> objectList.add(JsonUtils.mapToBean(p, getObject((Class)actualTypeArgument))));
object = objectList;
}
}
else if (null != value) {
if (clazz.isInstance(value)) {
object = value;
} else {
if (conversionService.canConvert(value.getClass(), clazz)) {
object = conversionService.convert(value, clazz);
}
}
}
if (null == object) {
if(object == null) {
object = getDefaultValue(clazz);
}
if (null == object && isCanInstance(clazz) && !canNotInstanceMap.containsKey(clazz)) {
......
......@@ -9,7 +9,7 @@ import lombok.experimental.Accessors;
import javax.validation.constraints.NotNull;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* 消息在线推送
......@@ -41,7 +41,7 @@ public class ImMessageOnlineSend extends BaseEntity {
private String sender;
@ApiModelProperty("内容")
private HashMap content;
private Map content;
@ApiModelProperty("0未撤回; 1已撤回")
private Boolean withdraw;
......
......@@ -5,6 +5,7 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpRequest;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
......@@ -14,8 +15,8 @@ import java.net.InetAddress;
@Component
@ChannelHandler.Sharable
@Slf4j
public class ChannelInboundHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(ChannelInboundHandler.class);
private final NettyApiRequest nettyApiRequest;
public ChannelInboundHandler(NettyApiRequest nettyApiRequest) {
......@@ -32,7 +33,7 @@ public class ChannelInboundHandler extends ChannelInboundHandlerAdapter {
try {
nettyApiRequest.handle(ctx, msg, httpRequest);
} catch (Exception e) {
logger.error("SingleNettyServer处理请求失败!", e);
log.error("SingleNettyServer处理请求失败!", e);
// this.sendBad(ctx);
} finally {
//释放请求
......@@ -61,8 +62,8 @@ public class ChannelInboundHandler extends ChannelInboundHandlerAdapter {
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (logger.isDebugEnabled()) {
logger.info("连接的客户端地址:{}", ctx.channel().remoteAddress());
if (log.isDebugEnabled()) {
log.info("连接的客户端地址:{}", ctx.channel().remoteAddress());
}
ctx.writeAndFlush("客户端" + InetAddress.getLocalHost().getHostName() + "成功与服务端建立连接! ");
super.channelActive(ctx);
......
......@@ -3,7 +3,6 @@ package com.wecloud.im.netty.core;
import com.wecloud.dispatch.general.GeneralMessageHandler;
import com.wecloud.im.executor.BusinessThreadPool;
import com.wecloud.im.ws.manager.ChannelManager;
import com.wecloud.im.ws.strategy.AbstractImCmdStrategy;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
......@@ -72,8 +71,7 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
log.info("收到心跳应用Pong,clientId:" + clientId);
return;
}
AbstractImCmdStrategy.process(clientId, ctx, data);
// AbstractImCmdStrategy.process(clientId, ctx, data);
generalMessageHandler.doMessage(clientId, ctx, data);
......
package com.wecloud.im.param;
import com.wecloud.im.ws.model.request.PushVO;
import lombok.Data;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
/**
* @Description TODO
* @Author lixiaozhong
* @Date 2022/1/26 6:11 下午
*/
@Data
public class ChatContentVo implements Serializable {
PushVO push;
@NotNull
Long toConversation;
@NotNull
Integer type;
String text;
}
package com.wecloud.im.param;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @Description TODO
* @Author lixiaozhong
* @Date 2022/1/27 9:02 上午
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgVo implements Serializable {
private Long msgId;
}
......@@ -3,6 +3,8 @@ package com.wecloud.im.service;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.param.ChatContentVo;
import com.wecloud.im.param.ImClientSimpleDto;
import com.wecloud.im.param.ImHistoryMessagePageParam;
import com.wecloud.im.param.add.ImMsgRecall;
import com.wecloud.im.param.add.ImMsgSendToOnlineClient;
......@@ -14,6 +16,7 @@ import com.wecloud.im.ws.model.request.ReceiveVO;
import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.geekidea.springbootplus.framework.common.service.BaseService;
import io.geekidea.springbootplus.framework.core.pagination.Paging;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
......@@ -38,6 +41,9 @@ public interface ImMessageService extends BaseService<ImMessage> {
ImMessage saveImMessage(ImApplication imApplication, ImClient imClientSender, long messageId, ReceiveVO receiveVO, ReceiveDataVO sysParam);
@Transactional(rollbackFor = Exception.class)
ImMessage saveImMessage(ImClientSimpleDto client, long messageId, ChatContentVo data);
/**
* 消息撤回 只能撤回客户端自己发送的消息
*
......
......@@ -13,6 +13,8 @@ import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.entity.ImMessageOnlineSend;
import com.wecloud.im.enums.ChatTypeEnum;
import com.wecloud.im.mapper.ImMessageMapper;
import com.wecloud.im.param.ChatContentVo;
import com.wecloud.im.param.ImClientSimpleDto;
import com.wecloud.im.param.ImHistoryMessagePageParam;
import com.wecloud.im.param.add.ImMsgRecall;
import com.wecloud.im.param.add.ImMsgSendToOnlineClient;
......@@ -163,6 +165,30 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
}
@Override
public ImMessage saveImMessage(ImClientSimpleDto client, long messageId, ChatContentVo data) {
ImMessage imMessage = new ImMessage();
// 数据库字段类型为JSON格式
// 因mysql关系型数据库非MongoDB文档类型数据库,第三方应用拓展的自定义参数名和值需使用json格式落库
String contentJsonString = JsonUtils.encodeJson(data);
imMessage.setContent(contentJsonString);
imMessage.setId(messageId);
imMessage.setCreateTime(new Date());
imMessage.setFkAppid(client.getFkAppid());
imMessage.setSender(client.getId());
imMessage.setWithdraw(false);
imMessage.setEvent(false);
imMessage.setSystemFlag(false);
imMessage.setSendStatus(2);
imMessage.setMsgType(data.getType());
imMessage.setFkConversationId(data.getToConversation());
this.save(imMessage);
return imMessage;
}
@Override
@Transactional(rollbackFor = Exception.class)
public ApiResult<Boolean> updateMsgWithdrawById(ImMsgRecall imMsgRecall) {
ImClient imClientSender = imClientService.getCurentClient();
......
package com.wecloud.im.thousandchat.action;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.wecloud.dispatch.annotation.ActionMapping;
import com.wecloud.dispatch.common.BaseRequest;
import com.wecloud.dispatch.extend.ActionRequest;
import com.wecloud.im.entity.ImApplication;
import com.wecloud.im.entity.ImClient;
import com.wecloud.im.entity.ImConversationMembers;
import com.wecloud.im.entity.ImMessage;
import com.wecloud.im.entity.ImMessageOnlineSend;
import com.wecloud.im.param.ChatContentVo;
import com.wecloud.im.param.ImClientSimpleDto;
import com.wecloud.im.param.ImConversationQueryVo;
import com.wecloud.im.param.MsgVo;
import com.wecloud.im.service.ImApplicationService;
import com.wecloud.im.service.ImClientService;
import com.wecloud.im.service.ImConversationMembersService;
import com.wecloud.im.service.ImConversationService;
import com.wecloud.im.service.ImMessageService;
import com.wecloud.im.thousandchat.cache.ThousandChatCacheManager;
import com.wecloud.im.ws.enums.WsResponseCmdEnum;
import com.wecloud.im.ws.model.WsResponse;
import com.wecloud.im.ws.sender.AsyncPush;
import com.wecloud.im.ws.sender.ChannelSender;
import com.wecloud.utils.JsonUtils;
import com.wecloud.utils.SnowflakeUtil;
import io.geekidea.springbootplus.framework.common.api.ApiCode;
import io.geekidea.springbootplus.framework.common.api.ApiResult;
import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
* @Description 处理Cmd请求
* 抽象类 策略设计模式
* @Author hewei hwei1233@163.com
* @Date 2020-01-02
*/
@Slf4j
@Component
@ActionMapping(value = "/chat")
public class ThousandChatAction {
@Autowired
private ImApplicationService imApplicationService;
@Autowired
private ImClientService imClientService;
@Autowired
private ImConversationService imConversationService;
@Autowired
private AsyncPush systemPush;
@Autowired
private ImConversationMembersService imConversationMembersService;
@Autowired
private ImMessageService imMessageService;
@Autowired
private ChannelSender channelSender;
@Autowired
private ThousandChatCacheManager thousandChatCacheManager;
@ActionMapping("/thousand/send")
public void sendMsg(ActionRequest request, ChatContentVo data, String reqId) {
if (log.isDebugEnabled()) {
log.debug("接收到参数,reqId: {},\n data: {}, ", data);
}
//查看接收的群属性,是否万人群
ImConversationQueryVo conversation = imConversationService.getCacheImConversationById(data.getToConversation());
if (conversation == null) {
log.warn("会reqId: {} ,会话id: {}db中不存在", reqId, data.getToConversation());
return;
}
// 查询发送者client
ImClient imClientSender = imClientService.getCacheImClient(request.getSenderClientId());
if (imClientSender == null) {
log.warn("根据senderClientId: {} 查找不到 imClientSender!", request.getSenderClientId());
return;
}
// 查询imApplication
ImApplication imApplication = imApplicationService.getCacheById(imClientSender.getFkAppid());
if (imApplication == null) {
log.warn("根据appId: {} 查找不到 imApplication!", imClientSender.getFkAppid());
return;
}
ImMessageOnlineSend imMessageOnlineSend = assembleImMessageOnlineSend(data, imClientSender, imApplication.getId());
// 在线用户直接发消息
sendMsgForOnline(data, imMessageOnlineSend);
// 给所有人(在线+离线)遍历发送
// 先查询该会话所有成员
List<ImConversationMembers> membersList = imConversationMembersService.list(
new QueryWrapper<ImConversationMembers>().lambda()
.eq(ImConversationMembers::getFkConversationId, data.getToConversation())
);
// 再给所有人发 todo 需要改成批量
for (ImConversationMembers conversationMembers : membersList) {
// 入库 保存收件箱
// long imInboxId = SnowflakeUtil.getId();
// saveImInbox(imApplication, toConversationId, messageId, conversationMembers, imInboxId);
// 查询接收方
ImClient imClientReceiver = imClientService.getOne(new QueryWrapper<ImClient>().lambda()
.eq(ImClient::getFkAppid, imApplication.getId())
.eq(ImClient::getId, conversationMembers.getFkClientId()));
if (imClientReceiver == null) {
continue;
}
// 异步推送系统通知消息
systemPush.push(data.getPush(), imClientReceiver, imApplication);
}
// 响应发送方消息id等信息
response(reqId, imMessageOnlineSend.getMsgId(), request.getSenderChannel());
}
/**
* 发送消息给在线客户
* @param data
* @param imMessageOnlineSend
*/
private void sendMsgForOnline(ChatContentVo data, ImMessageOnlineSend imMessageOnlineSend) {
Map<String /** ip **/, List<String /** client的主键ID:platform **/>> onlineIpClientMap =
thousandChatCacheManager.findOnlineHostsByThousandGroupId(data.getToConversation());
// 封装要推给接收方的消息
WsResponse<ImMessageOnlineSend> responseModel = new WsResponse<>();
responseModel.setCmd(WsResponseCmdEnum.ONLINE_MSG.getCmdCode());
ApiResult<Boolean> result = ApiResult.result(ApiCode.SUCCESS);
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
responseModel.setData(imMessageOnlineSend);
responseModel.setReqId(null);
onlineIpClientMap.forEach((ip, clientIdAndPlatforms) -> {
channelSender.batchSendMsg(responseModel, ip, clientIdAndPlatforms);
});
}
/**
* 拼装发送消息体
* @param data
* @param imClientSender
* @param appId
* @return
*/
private ImMessageOnlineSend assembleImMessageOnlineSend(ChatContentVo data, ImClient imClientSender, Long appId) {
// 生成消息id
long messageId = SnowflakeUtil.getId();
// 入库 保存消息至消息表
ImClientSimpleDto client = new ImClientSimpleDto().setId(imClientSender.getId()).setFkAppid(appId);
ImMessage imMessage = imMessageService.saveImMessage(client, messageId, data);
// 封装响应的实体
ImMessageOnlineSend imMessageOnlineSend = new ImMessageOnlineSend();
imMessageOnlineSend.setMsgId(imMessage.getId());
imMessageOnlineSend.setSender(imClientSender.getClientId());
Map<String, Object> content = JsonUtils.beanToMap(data);
//action的属性无需要返回
content.remove(BaseRequest.ACTION);
imMessageOnlineSend.setContent(content);
imMessageOnlineSend.setConversationId(data.getToConversation());
imMessageOnlineSend.setCreateTime(imMessage.getCreateTime());
imMessageOnlineSend.setWithdrawTime(imMessage.getWithdrawTime());
imMessageOnlineSend.setWithdraw(imMessage.getWithdraw());
imMessageOnlineSend.setEvent(imMessage.getEvent());
imMessageOnlineSend.setSystemFlag(imMessage.getSystemFlag());
imMessageOnlineSend.setType(data.getType());
imMessageOnlineSend.setAt(imMessage.getAt());
return imMessageOnlineSend;
}
/**
* 响应发送方消息id等信息
*
* @param reqId
* @param messageId
* @param channel
*/
private void response(String reqId, long messageId, Channel channel) {
WsResponse<MsgVo> responseModel = new WsResponse<>();
ApiResult<Boolean> result = ApiResult.result(ApiCode.SUCCESS);
responseModel.setCmd(WsResponseCmdEnum.RES.getCmdCode());
responseModel.setCode(result.getCode());
responseModel.setMsg(result.getMessage());
responseModel.setData(new MsgVo(messageId));
responseModel.setReqId(reqId);
// 响应发送方
channelSender.sendMsgLocal((NioSocketChannel)channel, responseModel);
}
}
......@@ -12,8 +12,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import java.io.InputStream;
......@@ -31,11 +30,11 @@ import java.util.concurrent.atomic.AtomicLong;
* 使用AtomicLong完成匿名内部类operationComplete方法中的计数;
* 使用Netty的Future对象进行消息推送结果的判断。
*/
@Slf4j
public class IosPush {
private final static EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup(4);
private static final Logger logger = LoggerFactory.getLogger(IosPush.class);
/**
* Semaphore又称信号量,是操作系统中的一个概念,在Java并发编程中,信号量控制的是线程并发的数量。
*/
......@@ -85,7 +84,7 @@ public class IosPush {
String sound) {
if (deviceToken == null || "".equals(deviceToken)) {
logger.error("deviceToken=null");
log.error("deviceToken=null");
return;
}
......@@ -126,30 +125,30 @@ public class IosPush {
SEMAPHORE.acquire();
} catch (Exception e) {
//线程太多了,没有多余的信号量可以获取了
logger.error("ios push get semaphore failed, deviceToken:{}", deviceToken);
logger.error("ios push get semaphore failed", e);
log.error("ios push get semaphore failed, deviceToken:{}", deviceToken);
log.error("ios push get semaphore failed", e);
}
logger.debug("token={},payload={}", token, payload);
log.debug("token={},payload={}", token, payload);
ApnsClient apnsClient = getAPNSConnect(inputStream, productFlag, certificatePassword);
final Future<PushNotificationResponse<SimpleApnsPushNotification>> future = apnsClient.sendNotification(pushNotification);
future.addListener((GenericFutureListener<Future<PushNotificationResponse>>) pushNotificationResponseFuture -> {
if (future.isSuccess()) {
final PushNotificationResponse<SimpleApnsPushNotification> response = future.getNow();
if (response.isAccepted()) {
logger.debug("success token{}", token);
log.debug("success token{}", token);
successCnt.incrementAndGet();
} else {
Date invalidTime = response.getTokenInvalidationTimestamp();
logger.error("Notification rejected by the APNs gateway: " + response.getRejectionReason() + ",payload:" + payload +
log.error("Notification rejected by the APNs gateway: " + response.getRejectionReason() + ",payload:" + payload +
"\n,deviceToken:" + deviceToken
, ",alertBody:" + alertBody + ",sanitizeTokenString:" + token);
if (invalidTime != null) {
logger.error("\t…and the token is invalid as of " + response.getTokenInvalidationTimestamp());
log.error("\t…and the token is invalid as of " + response.getTokenInvalidationTimestamp());
}
}
} else {
logger.error("send notification device token={} is failed {} ", token, future.cause().getMessage());
log.error("send notification device token={} is failed {} ", token, future.cause().getMessage());
}
latch.countDown();
//释放允许,将占有的信号量归还
......@@ -158,10 +157,10 @@ public class IosPush {
try {
latch.await(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("ios push latch await failed!", e);
log.error("ios push latch await failed!", e);
}
long endPushTime = System.currentTimeMillis();
logger.debug("pushMessage success. [成功{}个,耗时{}ms]", successCnt.get(), endPushTime - startTime);
log.debug("pushMessage success. [成功{}个,耗时{}ms]", successCnt.get(), endPushTime - startTime);
}
......@@ -187,7 +186,7 @@ public class IosPush {
String certificatePassword = "123456";
if (deviceToken == null || "".equals(deviceToken)) {
logger.error("deviceToken=null");
log.error("deviceToken=null");
return;
}
......@@ -228,30 +227,30 @@ public class IosPush {
SEMAPHORE.acquire();
} catch (Exception e) {
//线程太多了,没有多余的信号量可以获取了
logger.error("ios push get semaphore failed, deviceToken:{}", deviceToken);
logger.error("ios push get semaphore failed", e);
log.error("ios push get semaphore failed, deviceToken:{}", deviceToken);
log.error("ios push get semaphore failed", e);
}
logger.debug("token={},payload={}", token, payload);
log.debug("token={},payload={}", token, payload);
ApnsClient apnsClient = getAPNSConnect(apnsCertificatePath, productFlag, certificatePassword);
final Future<PushNotificationResponse<SimpleApnsPushNotification>> future = apnsClient.sendNotification(pushNotification);
future.addListener((GenericFutureListener<Future<PushNotificationResponse>>) pushNotificationResponseFuture -> {
if (future.isSuccess()) {
final PushNotificationResponse<SimpleApnsPushNotification> response = future.getNow();
if (response.isAccepted()) {
logger.debug("success token{}", token);
log.debug("success token{}", token);
successCnt.incrementAndGet();
} else {
Date invalidTime = response.getTokenInvalidationTimestamp();
logger.error("Notification rejected by the APNs gateway: " + response.getRejectionReason() + ",payload:" + payload +
log.error("Notification rejected by the APNs gateway: " + response.getRejectionReason() + ",payload:" + payload +
"\n,deviceToken:" + deviceToken
, ",alertBody:" + alertBody + ",sanitizeTokenString:" + token);
if (invalidTime != null) {
logger.error("\t…and the token is invalid as of " + response.getTokenInvalidationTimestamp());
log.error("\t…and the token is invalid as of " + response.getTokenInvalidationTimestamp());
}
}
} else {
logger.error("send notification device token={} is failed {} ", token, future.cause().getMessage());
log.error("send notification device token={} is failed {} ", token, future.cause().getMessage());
}
latch.countDown();
//释放允许,将占有的信号量归还
......@@ -260,10 +259,10 @@ public class IosPush {
try {
latch.await(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("ios push latch await failed!", e);
log.error("ios push latch await failed!", e);
}
long endPushTime = System.currentTimeMillis();
logger.debug("pushMessage success. [成功{}个,耗时{}ms]", successCnt.get(), endPushTime - startTime);
log.debug("pushMessage success. [成功{}个,耗时{}ms]", successCnt.get(), endPushTime - startTime);
}
// public static void main(String[] args) {
......@@ -293,7 +292,7 @@ public class IosPush {
.setClientCredentials(inputStream, certificatePassword)
.setConcurrentConnections(4).setEventLoopGroup(EVENT_LOOP_GROUP).build();
} catch (Exception e) {
logger.error("ios get push apns client failed!", e);
log.error("ios get push apns client failed!", e);
}
return apnsXTClient;
}
......@@ -315,7 +314,7 @@ public class IosPush {
.setClientCredentials(resourceAsStream, certificatePassword)
.setConcurrentConnections(4).setEventLoopGroup(EVENT_LOOP_GROUP).build();
} catch (Exception e) {
logger.error("ios get push apns client failed!", e);
log.error("ios get push apns client failed!", e);
}
return apnsXTClient;
}
......
package com.wecloud.im.ws.strategy;
import com.wecloud.im.ws.annotation.ImCmdType;
import com.wecloud.im.ws.enums.WsRequestCmdEnum;
import com.wecloud.im.ws.utils.ClassScanner;
import com.wecloud.im.ws.utils.SpringBeanUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @Description 策略模式 上下文
* 维护指令码与策略实现的对应
* @Author hewei hwei1233@163.com
*/
@Component
public class ImCmdStrategyFactory {
private final Map<WsRequestCmdEnum, AbstractImCmdStrategy> strategyMap = new HashMap<>();
public AbstractImCmdStrategy getStrategy(WsRequestCmdEnum wsRequestPathEnum) {
if (wsRequestPathEnum == null) {
throw new IllegalArgumentException("not fond enum");
}
if (CollectionUtils.isEmpty(strategyMap)) {
throw new IllegalArgumentException("strategy map is empty,please check you strategy package path");
}
return strategyMap.get(wsRequestPathEnum);
}
@EventListener(ApplicationStartedEvent.class)
private void init() {
// 扫码ReceiveTypeAnnotation注解的类
Set<Class<?>> classSet = ClassScanner.scan(ImCmdStrategyFactory.class.getPackage().getName(), ImCmdType.class);
classSet.forEach(clazz -> {
// 获取注解中的类型值,与枚举类一一对应
WsRequestCmdEnum type = clazz.getAnnotation(ImCmdType.class).type();
strategyMap.put(type, (AbstractImCmdStrategy)SpringBeanUtils.getBean(clazz));
});
}
}
//package com.wecloud.im.ws.strategy;
//
//import com.wecloud.im.ws.annotation.ImCmdType;
//import com.wecloud.im.ws.enums.WsRequestCmdEnum;
//import com.wecloud.im.ws.utils.ClassScanner;
//import com.wecloud.im.ws.utils.SpringBeanUtils;
//import org.springframework.beans.factory.InitializingBean;
//import org.springframework.boot.context.event.ApplicationStartedEvent;
//import org.springframework.context.event.EventListener;
//import org.springframework.stereotype.Component;
//import org.springframework.util.CollectionUtils;
//
//import javax.annotation.PostConstruct;
//import java.util.HashMap;
//import java.util.Map;
//import java.util.Set;
//
///**
// * @Description 策略模式 上下文
// * 维护指令码与策略实现的对应
// * @Author hewei hwei1233@163.com
// */
//@Component
//public class ImCmdStrategyFactory {
//
// private final Map<WsRequestCmdEnum, AbstractImCmdStrategy> strategyMap = new HashMap<>();
//
// public AbstractImCmdStrategy getStrategy(WsRequestCmdEnum wsRequestPathEnum) {
//
// if (wsRequestPathEnum == null) {
// throw new IllegalArgumentException("not fond enum");
// }
//
// if (CollectionUtils.isEmpty(strategyMap)) {
// throw new IllegalArgumentException("strategy map is empty,please check you strategy package path");
// }
//
// return strategyMap.get(wsRequestPathEnum);
// }
//
// @EventListener(ApplicationStartedEvent.class)
// private void init() {
//
// // 扫码ReceiveTypeAnnotation注解的类
// Set<Class<?>> classSet = ClassScanner.scan(ImCmdStrategyFactory.class.getPackage().getName(), ImCmdType.class);
//
// classSet.forEach(clazz -> {
// // 获取注解中的类型值,与枚举类一一对应
// WsRequestCmdEnum type = clazz.getAnnotation(ImCmdType.class).type();
// strategyMap.put(type, (AbstractImCmdStrategy)SpringBeanUtils.getBean(clazz));
// });
// }
//}
......@@ -4,12 +4,13 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wecloud.im.ws.model.request.ReceiveVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.cglib.beans.BeanMap;
import java.io.IOException;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
......@@ -220,15 +221,103 @@ public class JsonUtils {
}
/**
* 将map装换为javabean对象
* 将map装换为javabean对象,不支持深度转换,支持深度请看 mapToBeanDeep 方法
*
* @param map
* @param bean
* @param clazz
* @return
*/
public static <T> T mapToBean(Map<String, Object> map, T bean) {
BeanMap beanMap = BeanMap.create(bean);
public static <T> T mapToBean(Map<String, Object> map, Class<T> clazz) {
T obj = null;
try {
obj = clazz.newInstance();
} catch (Exception e) {
log.error("mapToBean转换,创建实例对象失败", e);
return null;
}
BeanMap beanMap = BeanMap.create(obj);
beanMap.putAll(map);
return bean;
return obj;
}
/**
* 将一个 Map/javaBean 对象转化为一个 javaBean 深度
* @param srcObject 要转化的对象,可以是map,bean
* @param destClass 要转化的类型
* @return 转化出来的 JavaBean 对象
*/
public static <T>T beanCopyDeep(Object srcObject, Class<T> destClass) {
String s = encodeJson(srcObject);
return decodeJson(s, destClass);
}
private static boolean isFinalType(Class clazz) {
if (clazz == String.class ) {
return true;
}
if (clazz == Boolean.class) {
return true;
}
if (clazz == Long.class) {
return true;
}
if (clazz == Integer.class) {
return true;
}
if (clazz == Double.class) {
return true;
}
if (clazz == Float.class) {
return true;
}
if (clazz == Short.class) {
return true;
}
if (clazz == BigDecimal.class) {
return true;
}
if (clazz == Byte.class) {
return true;
}
if (clazz == int.class) {
return true;
}
if (clazz == long.class) {
return true;
}
if (clazz == float.class) {
return true;
}
if (clazz == double.class) {
return true;
}
if (clazz == byte.class) {
return true;
}
if (clazz == char.class) {
return true;
}
if (clazz == short.class) {
return true;
}
if (clazz == boolean.class) {
return true;
}
return false;
}
/**
* 将javabean对象转换为map
*/
public static <T> Map<String, Object> beanToMap(T bean) {
Map<String, Object> map = new HashMap();
if (bean != null) {
BeanMap beanMap = BeanMap.create(bean);
for (Object key : beanMap.keySet()) {
Object value = beanMap.get(key);
map.put(key + "", value);
}
}
return map;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment