Commit 2a652fd4 by Future

Merge branch 'feature-cluster' into xiaohudou_20220427

parents 9e886eab 4b591824
...@@ -38,6 +38,18 @@ ...@@ -38,6 +38,18 @@
</dependency> </dependency>
</dependencies> </dependencies>
<distributionManagement>
<repository>
<id>wecloud</id>
<url>http://124.71.68.175:8081/repository/maven-releases/</url>
</repository>
<snapshotRepository>
<id>wecloud</id>
<url>http://124.71.68.175:8081/repository/maven-snapshots/</url>
</snapshotRepository>
</distributionManagement>
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
...@@ -52,6 +64,43 @@ ...@@ -52,6 +64,43 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<groupId>com.wecloud</groupId>
<artifactId>classencrypt-maven-plugin</artifactId>
<version>${maven-classencrypt-plugin.version}</version>
<configuration>
<!-- 参数说明-->
<!-- -packages 加密的包名(可为空,多个用","分割)-->
<!-- -libjars jar/war包lib下要加密jar文件名(可为空,多个用","分割)-->
<!-- -cfgfiles 需要加密的配置文件,一般是classes目录下的yml或properties文件(可为空,多个用","分割)-->
<!-- -excludes 排除的类名(可为空,多个用","分割)-->
<!-- -classpath 外部依赖的jar目录,例如/tomcat/lib(可为空,多个用","分割)-->
<!-- -password 加密密码,如果是#号,则使用无密码模式加密-->
<!-- -code 机器码,在绑定的机器生成,加密后只可在此机器上运行-->
<password>wecloud#0-0</password><!--加密打包之后pom.xml会被删除,不用担心在jar包里找到此密码-->
<packages>com.wecloud,io.geekidea</packages>
<!-- <cfgfiles>application.yml</cfgfiles>-->
<!-- <excludes>io.geekidea</excludes>-->
<!-- <libjars>a.jar,b.jar</libjars> 多个之间用,分割-->
<!-- 机器绑定-->
<!-- 机器绑定只允许加密的项目在特定的机器上运行;-->
<!-- 在需要绑定的机器上执行以下命令,生成机器码-->
<!-- 在cmd目录下,执行 java -jar classencrypt-fatjar.jar -C-->
<!-- 例如我本机是: 4B1ABEBA8423DEE07469C1E38B3CF7881BBAC2F6C072365DF2297D84370991CC97E8330B0E5FB4ADFF0F87A038F4773A-->
<!-- <code>4B1ABEBA8423DEE07469C1E38B3CF7881BBAC2F6C072365DF2297D84370991CC97E8330B0E5FB4ADFF0F87A038F4773A</code>-->
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>classEncrypt</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins> </plugins>
</build> </build>
......
#! /bin/sh
java -jar classencrypt.jar -C
#! /bin/sh
cd ..
java -javaagent:bootstrap-2.0-encrypted.jar='-pwd wecloud#0-0' -jar bootstrap-2.0-encrypted.jar
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
<property name="CONTEXT_NAME" value="spring-boot-plus"/> <property name="CONTEXT_NAME" value="spring-boot-plus"/>
<property name="LOG_PATH" value="logs"/> <property name="LOG_PATH" value="logs"/>
<property name="MAX_FILE_SIZE" value="10MB"/> <property name="MAX_FILE_SIZE" value="1000MB"/>
<property name="MAX_HISTORY" value="30"/> <property name="MAX_HISTORY" value="30"/>
<contextName>${CONTEXT_NAME}</contextName> <contextName>${CONTEXT_NAME}</contextName>
...@@ -162,4 +162,4 @@ ...@@ -162,4 +162,4 @@
<!-- </springProfile>--> <!-- </springProfile>-->
</configuration> </configuration>
\ No newline at end of file
...@@ -100,6 +100,20 @@ public class MessageAction { ...@@ -100,6 +100,20 @@ public class MessageAction {
} }
/** /**
* 查询某个会话历史消息分页列表
* @param request
* @param param
* @param reqId
* @return
*/
@ActionMapping("/getHistoryMsgNew")
@ApiOperation("查询某个会话历史消息分页列表-新")
public WsResponse<Paging<OfflineMsgDto>> getHistoryMsgNew(ImHistoryMessagePageParam data) {
log.info("查询某个会话历史消息分页列表-新:{}", JSON.toJSONString(data));
return WsResponse.ok(imMessageService.getHistoryMsgConversationIdNew(data));
}
/**
* 查询某个消息已读client列表和未读client * 查询某个消息已读client列表和未读client
* @param request * @param request
* @param param * @param param
......
...@@ -51,6 +51,8 @@ public interface ImMessageMapper extends BaseMapper<ImMessage> { ...@@ -51,6 +51,8 @@ public interface ImMessageMapper extends BaseMapper<ImMessage> {
*/ */
IPage<OfflineMsgDto> getHistoryMsgConversationId(@Param("page") Page page, @Param("param") ImHistoryMessagePageParam imHistoryMessagePageParam); IPage<OfflineMsgDto> getHistoryMsgConversationId(@Param("page") Page page, @Param("param") ImHistoryMessagePageParam imHistoryMessagePageParam);
IPage<OfflineMsgDto> getHistoryMsgConversationIdNew(@Param("page") Page page, @Param("param") ImHistoryMessagePageParam imHistoryMessagePageParam);
/** /**
* 根据客户端id与会话id 查询离线消息 * 根据客户端id与会话id 查询离线消息
* *
......
package com.wecloud.im.netty.core; package com.wecloud.im.netty.core;
import com.wecloud.im.netty.handler.NettyApiRequest; import com.wecloud.im.netty.handler.NettyApiRequest;
import com.wecloud.im.ws.manager.ChannelManager;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
...@@ -62,12 +63,25 @@ public class ChannelInboundHandler extends ChannelInboundHandlerAdapter { ...@@ -62,12 +63,25 @@ public class ChannelInboundHandler extends ChannelInboundHandlerAdapter {
*/ */
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (log.isDebugEnabled()) { if (log.isInfoEnabled()) {
log.info("连接的客户端地址:{}", ctx.channel().remoteAddress()); log.info("连接的客户端地址:{}", ctx.channel().remoteAddress());
} }
ctx.writeAndFlush("客户端" + InetAddress.getLocalHost().getHostName() + "成功与服务端建立连接! "); ctx.writeAndFlush("客户端" + InetAddress.getLocalHost().getHostName() + "成功与服务端建立连接! ");
super.channelActive(ctx); super.channelActive(ctx);
} }
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
Long clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
log.info("触发channelInactive方法 clientId {}", clientId);
// ctx.channel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
super.exceptionCaught(ctx, cause);
Long clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
log.info("触发exceptionCaught方法 clientId {}", clientId);
}
} }
...@@ -145,7 +145,7 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram ...@@ -145,7 +145,7 @@ public class WsReadHandler extends SimpleChannelInboundHandler<TextWebSocketFram
Long clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get(); Long clientId = ctx.channel().attr(ChannelManager.CLIENT_ID).get();
Integer platform = ctx.channel().attr(ChannelManager.PLATFORM).get(); Integer platform = ctx.channel().attr(ChannelManager.PLATFORM).get();
log.info("CLIENT_ID:{}, handlerRemoved. channelId is {}", clientId, ctx.channel().id().asLongText()); log.info("appId: {} platform: {} CLIENT_ID:{}, handlerRemoved. channelId is {}", appId, platform, clientId, ctx.channel().id().asLongText());
// 关掉连接 // 关掉连接
channelManager.offline(clientId, platform, ctx); channelManager.offline(clientId, platform, ctx);
......
...@@ -41,4 +41,7 @@ public class ImHistoryMessagePageParam extends BasePageOrderParam { ...@@ -41,4 +41,7 @@ public class ImHistoryMessagePageParam extends BasePageOrderParam {
* 当前操作人client主键id * 当前操作人client主键id
*/ */
private Long currentFkClientId; private Long currentFkClientId;
@ApiModelProperty("是否获取最后一条 0-否 1-是")
private Integer getLast;
} }
...@@ -86,6 +86,15 @@ public interface ImMessageService extends BaseService<ImMessage> { ...@@ -86,6 +86,15 @@ public interface ImMessageService extends BaseService<ImMessage> {
Paging<OfflineMsgDto> getHistoryMsgConversationId(ImHistoryMessagePageParam param); Paging<OfflineMsgDto> getHistoryMsgConversationId(ImHistoryMessagePageParam param);
/** /**
* 查询某个会话历史消息分页列表
*
* @param param
* @return
* @throws Exception
*/
Paging<OfflineMsgDto> getHistoryMsgConversationIdNew(ImHistoryMessagePageParam param);
/**
* 查询用户所有离线消息 * 查询用户所有离线消息
* *
* @return * @return
......
...@@ -504,6 +504,15 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes ...@@ -504,6 +504,15 @@ public class ImMessageServiceImpl extends BaseServiceImpl<ImMessageMapper, ImMes
return new Paging<>(iPage); return new Paging<>(iPage);
} }
@Override
public Paging<OfflineMsgDto> getHistoryMsgConversationIdNew(ImHistoryMessagePageParam param) {
Page<ImMessage> page = new PageInfo<>(param, OrderItem.desc(getLambdaColumn(ImMessage::getCreateTime)));
ImClient currentClient = imClientService.getCurrentClient();
param.setCurrentFkClientId(currentClient.getId());
IPage<OfflineMsgDto> iPage = imMessageMapper.getHistoryMsgConversationIdNew(page, param);
return new Paging<>(iPage);
}
/** /**
......
package com.wecloud.im.ws.manager; package com.wecloud.im.ws.manager;
import com.alibaba.fastjson.JSON;
import com.wecloud.im.ws.cache.UserStateListener;
import com.wecloud.im.ws.model.ClientInfo;
import com.wecloud.im.ws.utils.RedisUtils;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.springframework.stereotype.Component;
import com.wecloud.im.ws.cache.UserStateListener;
import com.wecloud.im.ws.model.ClientInfo;
import com.wecloud.im.ws.utils.RedisUtils;
/** /**
* channel内容管理,在线、离线等信息在channel里 * channel内容管理,在线、离线等信息在channel里
* @author lixiaozhong * @author lixiaozhong
...@@ -44,7 +43,7 @@ public class ChannelManager { ...@@ -44,7 +43,7 @@ public class ChannelManager {
AttributeKey<String> LANGUAGE = AttributeKey.valueOf("la"); AttributeKey<String> LANGUAGE = AttributeKey.valueOf("la");
/** /**
* APP_VERSION * APP_VERSIONn
*/ */
AttributeKey<String> APP_VERSION = AttributeKey.valueOf("av"); AttributeKey<String> APP_VERSION = AttributeKey.valueOf("av");
AttributeKey<String> TOKEN = AttributeKey.valueOf("to"); AttributeKey<String> TOKEN = AttributeKey.valueOf("to");
...@@ -60,6 +59,7 @@ public class ChannelManager { ...@@ -60,6 +59,7 @@ public class ChannelManager {
*/ */
public void online(Long clientId, Integer platform, NioSocketChannel channel) { public void online(Long clientId, Integer platform, NioSocketChannel channel) {
String longChannelId = channel.id().asLongText(); String longChannelId = channel.id().asLongText();
log.info("保存本地连接clientId {} platform {}", clientId, platform);
this.putSessionInfoMap(clientId, platform, channel); this.putSessionInfoMap(clientId, platform, channel);
UserStateListener.triggerOnlineEvent(clientId, platform, longChannelId); UserStateListener.triggerOnlineEvent(clientId, platform, longChannelId);
...@@ -79,6 +79,12 @@ public class ChannelManager { ...@@ -79,6 +79,12 @@ public class ChannelManager {
// 关掉连接 // 关掉连接
channelHandlerContext.close(); channelHandlerContext.close();
ClientInfo oldChanel = ChannelManager.SESSION_INFO_MAP.get(genKeyForSessionInfoMap(clientId, platform));
if(oldChanel != null && oldChanel.getNioSocketChannel().isActive()) {
// 活跃的连接,可能是新建的,不能处理
log.info("连接还是活跃的,不下线处理 clientId {},oldChanel {}", clientId, oldChanel.getNioSocketChannel().id().asLongText());
return;
}
// 移除本地维护的channel // 移除本地维护的channel
delSessionInfoMap(clientId, platform); delSessionInfoMap(clientId, platform);
...@@ -96,11 +102,15 @@ public class ChannelManager { ...@@ -96,11 +102,15 @@ public class ChannelManager {
clientInfo.setDeviceId(""); clientInfo.setDeviceId("");
clientInfo.setNioSocketChannel(channel); clientInfo.setNioSocketChannel(channel);
clientInfo.setToken(""); clientInfo.setToken("");
log.info("本地存缓存key {}", genKeyForSessionInfoMap(clientId, platform));
ChannelManager.SESSION_INFO_MAP.put(genKeyForSessionInfoMap(clientId, platform), clientInfo); ChannelManager.SESSION_INFO_MAP.put(genKeyForSessionInfoMap(clientId, platform), clientInfo);
} }
private void delSessionInfoMap(Long clientId, Integer platform) { private void delSessionInfoMap(Long clientId, Integer platform) {
log.info("本地清缓存key {}", genKeyForSessionInfoMap(clientId, platform));
ChannelManager.SESSION_INFO_MAP.remove(genKeyForSessionInfoMap(clientId, platform)); ChannelManager.SESSION_INFO_MAP.remove(genKeyForSessionInfoMap(clientId, platform));
log.info("删除map后,本地缓存结果 {}", JSON.toJSONString(ChannelManager.SESSION_INFO_MAP.get(genKeyForSessionInfoMap(clientId, platform))));
} }
} }
...@@ -83,6 +83,48 @@ ...@@ -83,6 +83,48 @@
ORDER BY `im_message`.`create_time` DESC ORDER BY `im_message`.`create_time` DESC
</select> </select>
<select id="getHistoryMsgConversationIdNew" resultType="com.wecloud.im.vo.OfflineMsgDto">
SELECT im_message.id AS msgId,
im_message.create_time,
im_message.withdraw_time,
im_message.update_date,
`im_client`.client_id AS sender,
im_message.content,
im_message.withdraw,
im_message.`event`,
im_message.system_flag,
im_message.`at`,
im_message.send_status,
im_message.`msg_type` AS 'type',
im_message.fk_conversation_id as conversationId,
(SELECT COUNT(id) FROM im_inbox WHERE fk_msg_id = msgId AND read_msg_status = 0) AS not_read_count,
(SELECT COUNT(id)
FROM im_inbox
WHERE fk_msg_id = msgId
AND receiver_msg_status = 0) AS not_receiver_count
FROM `im_message`
INNER JOIN `im_client` ON `im_client`.id = `im_message`.sender
WHERE fk_conversation_id = #{param.conversationId} and im_message.is_delete = 1 and im_message.withdraw = 0
and (im_message.`event`=0 || (im_message.`event`=1 and sender != #{param.currentFkClientId} ))
<if test="param.msgIdStart != null">
AND im_message.id > #{param.msgIdStart}
</if>
<if test="param.getLast != null and param.getLast == 1">
AND im_message.id <![CDATA[ < ]]> #{param.msgIdEnd}
</if>
<if test="param.getLast != null and param.getLast == 0">
AND im_message.id <![CDATA[ <= ]]> #{param.msgIdEnd}
</if>
<if test="param.getLast == null">
AND im_message.id <![CDATA[ < ]]> #{param.msgIdEnd}
</if>
ORDER BY `im_message`.`create_time` DESC
</select>
<select id="getReceivedLastMsgByConversationId" resultType="com.wecloud.im.vo.OfflineMsgDto"> <select id="getReceivedLastMsgByConversationId" resultType="com.wecloud.im.vo.OfflineMsgDto">
SELECT im_message.id AS msgId, SELECT im_message.id AS msgId,
......
...@@ -59,6 +59,7 @@ ...@@ -59,6 +59,7 @@
<maven-javadoc-plugin.version>3.1.1</maven-javadoc-plugin.version> <maven-javadoc-plugin.version>3.1.1</maven-javadoc-plugin.version>
<maven-versions-plugin.version>2.2</maven-versions-plugin.version> <maven-versions-plugin.version>2.2</maven-versions-plugin.version>
<maven-assembly-plugin.version>3.2.0</maven-assembly-plugin.version> <maven-assembly-plugin.version>3.2.0</maven-assembly-plugin.version>
<maven-classencrypt-plugin.version>1.0.0</maven-classencrypt-plugin.version>
</properties> </properties>
<modules> <modules>
...@@ -500,6 +501,17 @@ ...@@ -500,6 +501,17 @@
<!-- RocketMq start --> <!-- RocketMq start -->
</dependencies> </dependencies>
<distributionManagement>
<repository>
<id>wecloud</id>
<url>http://124.71.68.175:8081/repository/maven-releases/</url>
</repository>
<snapshotRepository>
<id>wecloud</id>
<url>http://124.71.68.175:8081/repository/maven-snapshots/</url>
</snapshotRepository>
</distributionManagement>
<build> <build>
<resources> <resources>
<resource> <resource>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment