| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- package org.springblade.mq.kafka.handler;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.toolkit.Wrappers;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springblade.core.tool.jackson.JsonUtil;
- import org.springblade.enums.*;
- import org.springblade.etl.service.IEtlTaskInfoService;
- import org.springblade.modules.baseinfo.rtu.service.IRtuBaseInfoService;
- import org.springblade.modules.business.check.order.dto.CheckOrderInfoDTO;
- import org.springblade.modules.business.check.order.service.IRtuCheckOrderService;
- import org.springblade.modules.business.data.service.IRtuDataGroundService;
- import org.springblade.modules.business.data.service.IRtuDataRainService;
- import org.springblade.modules.business.data.service.IRtuDataRiverService;
- import org.springblade.modules.business.data.service.IRtuDataRsvrService;
- import org.springblade.modules.business.rtumanage.dto.RtuStatusDTO;
- import org.springblade.modules.business.rtumanage.entity.RtuStatusEntity;
- import org.springblade.modules.business.rtumanage.service.IRtuManageService;
- import org.springblade.modules.baseinfo.warn.entity.WarnSettingEntity;
- import org.springblade.modules.business.warning.entity.RtuWarningInfoEntity;
- import org.springblade.modules.baseinfo.warn.service.IWarnSettingService;
- import org.springblade.modules.business.warning.service.IRtuWarningService;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.support.Acknowledgment;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- import java.util.List;
- /**
- * 消息处理器
- *
- * @author
- */
- @Slf4j
- @Component
- public class MessageHandler {
- @Resource
- private IRtuBaseInfoService rtuBaseInfoService;
- @Resource
- private IRtuCheckOrderService rtuCheckOrderService;
- @Resource
- private IRtuManageService rtuManageService;
- @Resource
- private IRtuWarningService rtuWarningService;
- @Resource
- private IEtlTaskInfoService etlTaskInfoService;
- @Resource
- private IRtuDataRainService rtuDataRainService;
- @Resource
- private IRtuDataRiverService rtuDataRiverService;
- @Resource
- private IRtuDataRsvrService rtuDataRsvrService;
- @Resource
- private IRtuDataGroundService rtuDataGroundService;
- @Resource
- private IWarnSettingService rtuWarnSettingService;
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
- @Resource
- private RedisTemplate redisTemplate;
- @Value("${spring.mq-topic.ywxt-rtu-status}")
- private String topicYwxtRtuStatus;
- // @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "dcs.power01.test.1", containerFactory = "ackContainerFactory")
- public void dcs(ConsumerRecord record, Acknowledgment acknowledgment) {
- try {
- String message = (String) record.value();
- log.info(message);
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- } finally {
- // 手动提交 offset
- acknowledgment.acknowledge();
- }
- }
- /**
- * 测站状态检测
- *
- * @param record
- * @param acknowledgment
- */
- // @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-rtu-status}", containerFactory = "ackContainerFactory")
- public void ywxtRtuStatus(ConsumerRecord record, Acknowledgment acknowledgment) {
- try {
- String message = (String) record.value();
- RtuStatusDTO dto = JsonUtil.parse(message, RtuStatusDTO.class);
- //统计上报间隔
- rtuManageService.rtuUpCount(dto);
- //更新测站最新上报时间状态
- RtuStatusEntity statusEntity = new RtuStatusEntity();
- statusEntity.setRtuCode(dto.getRtuCode());
- statusEntity.setLastUpTime(dto.getLastUpTime());
- rtuManageService.updateLastTime(statusEntity);
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- } finally {
- // 手动提交 offset
- acknowledgment.acknowledge();
- }
- }
- /**
- * 预警信息
- *
- * @param record
- * @param acknowledgment
- */
- // @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-warning}", containerFactory = "ackContainerFactory")
- public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
- try {
- String message = (String) record.value();
- RtuWarningInfoEntity warningInfoEntity = JsonUtil.parse(message, RtuWarningInfoEntity.class);
- int statusCode = warningInfoEntity.getWarningStatus();
- if (statusCode == WarningStatusEnum.STATUS_HAPPEN.getCode()) {
- //更新测站运行状态
- RtuStatusEntity statusEntity = new RtuStatusEntity();
- statusEntity.setRtuCode(warningInfoEntity.getRtuCode());
- statusEntity.setRunStatusId(EquipmentStatusEnum.STATUS_FAILURE.getCode());
- rtuManageService.updateRtuStatus(statusEntity);
- //测站检测参数
- int warnClockAutoTask = 0;
- int warnDelayAutoTask = 0;
- int warnOfflineAutoTask = 0;
- int warnMissOutAutoTask = 0;
- int warnOutlierValueAutoTask = 0;
- int warnHourUpDelayAutoTask = 0;
- LambdaQueryWrapper<WarnSettingEntity> warnSettingWrapper = Wrappers.<WarnSettingEntity>query().lambda();
- warnSettingWrapper.eq(WarnSettingEntity::getIsDeleted, 0);
- List<WarnSettingEntity> warnSettingEntityList = rtuWarnSettingService.list(warnSettingWrapper);
- if (null != warnSettingEntityList && warnSettingEntityList.size() > 0) {
- if (warnSettingEntityList.get(0).getWarnClockAutoTask() != null) {
- warnClockAutoTask = warnSettingEntityList.get(0).getWarnClockAutoTask();
- }
- if (warnSettingEntityList.get(0).getWarnDelayAutoTask() != null) {
- warnDelayAutoTask = warnSettingEntityList.get(0).getWarnDelayAutoTask();
- }
- if (warnSettingEntityList.get(0).getWarnOfflineAutoTask() != null) {
- warnOfflineAutoTask = warnSettingEntityList.get(0).getWarnOfflineAutoTask();
- }
- if (warnSettingEntityList.get(0).getWarnMissOutAutoTask() != null) {
- warnMissOutAutoTask = warnSettingEntityList.get(0).getWarnMissOutAutoTask();
- }
- if (warnSettingEntityList.get(0).getWarnOutlierValueAutoTask() != null) {
- warnOutlierValueAutoTask = warnSettingEntityList.get(0).getWarnOutlierValueAutoTask();
- }
- if (warnSettingEntityList.get(0).getWarnHourUpDelayAutoTask() != null) {
- warnHourUpDelayAutoTask = warnSettingEntityList.get(0).getWarnHourUpDelayAutoTask();
- }
- }
- // if (WarnKindEnum.WARN_CLOCK.getCode() == warningInfoEntity.getWarningKind()) {
- //// if (warnClockAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
- //// //创建或更新维修任务
- //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
- //// }
- //// } else if (WarnKindEnum.WARN_UP_MIN_DELAY.getCode() == warningInfoEntity.getWarningKind()) {
- //// if (warnDelayAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
- //// //创建或更新维修任务
- //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
- //// }
- //// } else if (WarnKindEnum.WARN_OFFLINE.getCode() == warningInfoEntity.getWarningKind()) {
- //// if (warnOfflineAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
- //// //创建或更新维修任务
- //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
- //// }
- //// } else if (WarnKindEnum.WARN_RAIN_MISS_OUT.getCode() == warningInfoEntity.getWarningKind()) {
- //// if (warnMissOutAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
- //// //创建或更新维修任务
- //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
- //// }
- //// } else if (WarnKindEnum.WARN_WL_MISS_OUT.getCode() == warningInfoEntity.getWarningKind()) {
- //// if (warnMissOutAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
- //// //创建或更新维修任务
- //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
- //// }
- //// } else if (WarnKindEnum.WARN_OUTLIER_VALUE.getCode() == warningInfoEntity.getWarningKind()){
- //// if (warnOutlierValueAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
- //// //创建或更新维修任务
- //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
- //// }
- //// }else if (WarnKindEnum.WARN_RAIN_UP_HOUR_DELAY.getCode() == warningInfoEntity.getWarningKind()) {
- //// if (warnHourUpDelayAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
- //// //创建或更新维修任务
- //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
- //// }
- //// } else if (WarnKindEnum.WARN_WL_UP_HOUR_DELAY.getCode() == warningInfoEntity.getWarningKind()) {
- //// if (warnHourUpDelayAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
- //// //创建或更新维修任务
- //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
- //// }
- //// }
- } else if (statusCode == WarningStatusEnum.STATUS_CLOSE.getCode()) {
- //预警恢复
- LambdaQueryWrapper<RtuWarningInfoEntity> warningWrapper = Wrappers.<RtuWarningInfoEntity>query().lambda();
- warningWrapper.eq(RtuWarningInfoEntity::getIsDeleted, 0);
- warningWrapper.eq(RtuWarningInfoEntity::getRtuCode, warningInfoEntity.getRtuCode());
- warningWrapper.eq(RtuWarningInfoEntity::getWarningStatus, WarningStatusEnum.STATUS_HAPPEN.getCode());
- List<RtuWarningInfoEntity> warningInfoEntityList = rtuWarningService.list(warningWrapper);
- if (null == warningInfoEntityList || warningInfoEntityList.size() == 0) {
- //更新测站运行状态
- RtuStatusEntity statusEntity = new RtuStatusEntity();
- statusEntity.setRtuCode(warningInfoEntity.getRtuCode());
- statusEntity.setRunStatusId(EquipmentStatusEnum.STATUS_RUN.getCode());
- rtuManageService.updateRtuStatus(statusEntity);
- //没有预警,可以关闭维修任务工单
- CheckOrderInfoDTO checkOrderInfoDTO = new CheckOrderInfoDTO();
- checkOrderInfoDTO.setRtuCode(warningInfoEntity.getRtuCode());
- rtuCheckOrderService.closeOrder(checkOrderInfoDTO);
- }
- }
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- } finally {
- // 手动提交 offset
- acknowledgment.acknowledge();
- }
- }
- }
|