Commit 382aa512 by zhangjw

新增Redis锁解决多消费者消费事件数据重复问题

parent 13e6fbbb
......@@ -10,6 +10,7 @@ import org.jeecg.pm.entity.enums.EventLevel;
import org.jeecg.pm.entity.enums.EventState;
import org.jeecg.pm.service.IPmEventInfoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
......@@ -40,16 +41,27 @@ public class EventMessageListener {
private IPmEventInfoService pmEventInfoService;
@Autowired
private RedisTemplate redisTemplate;
@JmsListener(destination = VSS_TOPIC)
public void vssProcessMessage(BytesMessage bytesMessage) throws Exception {
final byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(bytes);
// 壳文件字段,EventDis类为event_dis.proto文件解析而来,CommEventLog类为事件壳文件类
final EventDis.CommEventLog commEventLog = EventDis.CommEventLog.parseFrom(bytes);
final PmEventInfo pmEventInfo = this.recode(commEventLog);
pmEventInfo.setSubsystem(PmEventInfo.VSS);
pmEventInfoService.saveEvent(pmEventInfo);
// log.info("VSS_TOPIC:{}", pmEventInfo);
String lock = VSS_TOPIC;
try {
final byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(bytes);
// 壳文件字段,EventDis类为event_dis.proto文件解析而来,CommEventLog类为事件壳文件类
final EventDis.CommEventLog commEventLog = EventDis.CommEventLog.parseFrom(bytes);
lock = lock + commEventLog.getLogId();
if (redisTemplate.opsForValue().setIfAbsent(lock, lock)) {
final PmEventInfo pmEventInfo = this.recode(commEventLog);
pmEventInfo.setSubsystem(PmEventInfo.VSS);
pmEventInfoService.saveEvent(pmEventInfo);
log.info("VSS_TOPIC:{}", pmEventInfo);
}
} finally {
redisTemplate.delete(lock);
}
}
......@@ -69,7 +81,7 @@ public class EventMessageListener {
public PmEventInfo recode(EventDis.CommEventLog commEventLog) {
final EventLevel eventLevel = Arrays.asList(EventLevel.values()).stream().filter(level -> level.getLevel().equals(commEventLog.getEventLevel())).findFirst().orElse(EventLevel.UNKNOWN);
final EventState eventState = Arrays.asList(EventState.values()).stream().filter(state -> state.getState().equals(commEventLog.getEventState())).findFirst().orElse(EventState.UNKNOWN);
DateFormat fmt =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return new PmEventInfo()
.setLogId(commEventLog.getLogId())
.setEventState(eventState.getRemark())
......@@ -80,7 +92,7 @@ public class EventMessageListener {
.setSubSysType(commEventLog.getSubSysType())
.setEventName(commEventLog.getEventName())
.setCreateTime(fmt.parse(commEventLog.getStartTime()))
.setStopTime(StringUtils.isBlank(commEventLog.getStopTime()) ? null :fmt.parse(commEventLog.getStopTime()))
.setStopTime(StringUtils.isBlank(commEventLog.getStopTime()) ? null : fmt.parse(commEventLog.getStopTime()))
.setSourceIdx(commEventLog.getSourceIdx())
.setSourceType(commEventLog.getSourceType())
.setSourceName(commEventLog.getSourceName())
......
......@@ -23,13 +23,8 @@ public class PmEventInfoServiceImpl extends ServiceImpl<PmEventInfoMapper, PmEve
@Override
public void saveEvent(PmEventInfo pmEventInfo) {
final PmEventInfo eventInfo = baseMapper.selectOne(new QueryWrapper<PmEventInfo>().lambda().eq(PmEventInfo::getLogId, pmEventInfo.getLogId()));//事件唯一ID
if (eventInfo == null) {
pmEventInfo.setId(IdWorker.getId());
baseMapper.insert(pmEventInfo);
return;
}
eventInfo.setEventLevel(pmEventInfo.getEventLevel()).setEventState(pmEventInfo.getEventState()).setStopTime(pmEventInfo.getStopTime());
baseMapper.updateById(eventInfo);
pmEventInfo.setId(eventInfo == null ? IdWorker.getId() : eventInfo.getId());
this.saveOrUpdate(pmEventInfo);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment