package org.springblade.mq.kafka.handler; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springblade.core.tool.jackson.JsonUtil; import org.springblade.enums.*; import org.springblade.etl.service.IEtlTaskInfoService; import org.springblade.modules.baseinfo.rtu.service.IRtuBaseInfoService; import org.springblade.modules.business.check.order.dto.CheckOrderInfoDTO; import org.springblade.modules.business.check.order.service.IRtuCheckOrderService; import org.springblade.modules.business.data.service.IRtuDataGroundService; import org.springblade.modules.business.data.service.IRtuDataRainService; import org.springblade.modules.business.data.service.IRtuDataRiverService; import org.springblade.modules.business.data.service.IRtuDataRsvrService; import org.springblade.modules.business.rtumanage.dto.RtuStatusDTO; import org.springblade.modules.business.rtumanage.entity.RtuStatusEntity; import org.springblade.modules.business.rtumanage.service.IRtuManageService; import org.springblade.modules.baseinfo.warn.entity.WarnSettingEntity; import org.springblade.modules.business.warning.entity.RtuWarningInfoEntity; import org.springblade.modules.baseinfo.warn.service.IWarnSettingService; import org.springblade.modules.business.warning.service.IRtuWarningService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; /** * 消息处理器 * * @author */ @Slf4j @Component public class MessageHandler { @Resource private IRtuBaseInfoService rtuBaseInfoService; @Resource private IRtuCheckOrderService rtuCheckOrderService; @Resource private IRtuManageService rtuManageService; @Resource private IRtuWarningService rtuWarningService; @Resource private IEtlTaskInfoService etlTaskInfoService; @Resource private IRtuDataRainService rtuDataRainService; @Resource private IRtuDataRiverService rtuDataRiverService; @Resource private IRtuDataRsvrService rtuDataRsvrService; @Resource private IRtuDataGroundService rtuDataGroundService; @Resource private IWarnSettingService rtuWarnSettingService; @Autowired private KafkaTemplate kafkaTemplate; @Resource private RedisTemplate redisTemplate; @Value("${spring.mq-topic.ywxt-rtu-status}") private String topicYwxtRtuStatus; // @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "dcs.power01.test.1", containerFactory = "ackContainerFactory") public void dcs(ConsumerRecord record, Acknowledgment acknowledgment) { try { String message = (String) record.value(); log.info(message); } catch (Exception e) { log.error(e.getMessage(), e); } finally { // 手动提交 offset acknowledgment.acknowledge(); } } /** * 测站状态检测 * * @param record * @param acknowledgment */ // @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-rtu-status}", containerFactory = "ackContainerFactory") public void ywxtRtuStatus(ConsumerRecord record, Acknowledgment acknowledgment) { try { String message = (String) record.value(); RtuStatusDTO dto = JsonUtil.parse(message, RtuStatusDTO.class); //统计上报间隔 rtuManageService.rtuUpCount(dto); //更新测站最新上报时间状态 RtuStatusEntity statusEntity = new RtuStatusEntity(); statusEntity.setRtuCode(dto.getRtuCode()); statusEntity.setLastUpTime(dto.getLastUpTime()); rtuManageService.updateLastTime(statusEntity); } catch (Exception e) { log.error(e.getMessage(), e); } finally { // 手动提交 offset acknowledgment.acknowledge(); } } /** * 预警信息 * * @param record * @param acknowledgment */ // @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-warning}", containerFactory = "ackContainerFactory") public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) { try { String message = (String) record.value(); RtuWarningInfoEntity warningInfoEntity = JsonUtil.parse(message, RtuWarningInfoEntity.class); int statusCode = warningInfoEntity.getWarningStatus(); if (statusCode == WarningStatusEnum.STATUS_HAPPEN.getCode()) { //更新测站运行状态 RtuStatusEntity statusEntity = new RtuStatusEntity(); statusEntity.setRtuCode(warningInfoEntity.getRtuCode()); statusEntity.setRunStatusId(EquipmentStatusEnum.STATUS_FAILURE.getCode()); rtuManageService.updateRtuStatus(statusEntity); //测站检测参数 int warnClockAutoTask = 0; int warnDelayAutoTask = 0; int warnOfflineAutoTask = 0; int warnMissOutAutoTask = 0; int warnOutlierValueAutoTask = 0; int warnHourUpDelayAutoTask = 0; LambdaQueryWrapper warnSettingWrapper = Wrappers.query().lambda(); warnSettingWrapper.eq(WarnSettingEntity::getIsDeleted, 0); List warnSettingEntityList = rtuWarnSettingService.list(warnSettingWrapper); if (null != warnSettingEntityList && warnSettingEntityList.size() > 0) { if (warnSettingEntityList.get(0).getWarnClockAutoTask() != null) { warnClockAutoTask = warnSettingEntityList.get(0).getWarnClockAutoTask(); } if (warnSettingEntityList.get(0).getWarnDelayAutoTask() != null) { warnDelayAutoTask = warnSettingEntityList.get(0).getWarnDelayAutoTask(); } if (warnSettingEntityList.get(0).getWarnOfflineAutoTask() != null) { warnOfflineAutoTask = warnSettingEntityList.get(0).getWarnOfflineAutoTask(); } if (warnSettingEntityList.get(0).getWarnMissOutAutoTask() != null) { warnMissOutAutoTask = warnSettingEntityList.get(0).getWarnMissOutAutoTask(); } if (warnSettingEntityList.get(0).getWarnOutlierValueAutoTask() != null) { warnOutlierValueAutoTask = warnSettingEntityList.get(0).getWarnOutlierValueAutoTask(); } if (warnSettingEntityList.get(0).getWarnHourUpDelayAutoTask() != null) { warnHourUpDelayAutoTask = warnSettingEntityList.get(0).getWarnHourUpDelayAutoTask(); } } // if (WarnKindEnum.WARN_CLOCK.getCode() == warningInfoEntity.getWarningKind()) { //// if (warnClockAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) { //// //创建或更新维修任务 //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity); //// } //// } else if (WarnKindEnum.WARN_UP_MIN_DELAY.getCode() == warningInfoEntity.getWarningKind()) { //// if (warnDelayAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) { //// //创建或更新维修任务 //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity); //// } //// } else if (WarnKindEnum.WARN_OFFLINE.getCode() == warningInfoEntity.getWarningKind()) { //// if (warnOfflineAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) { //// //创建或更新维修任务 //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity); //// } //// } else if (WarnKindEnum.WARN_RAIN_MISS_OUT.getCode() == warningInfoEntity.getWarningKind()) { //// if (warnMissOutAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) { //// //创建或更新维修任务 //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity); //// } //// } else if (WarnKindEnum.WARN_WL_MISS_OUT.getCode() == warningInfoEntity.getWarningKind()) { //// if (warnMissOutAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) { //// //创建或更新维修任务 //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity); //// } //// } else if (WarnKindEnum.WARN_OUTLIER_VALUE.getCode() == warningInfoEntity.getWarningKind()){ //// if (warnOutlierValueAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) { //// //创建或更新维修任务 //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity); //// } //// }else if (WarnKindEnum.WARN_RAIN_UP_HOUR_DELAY.getCode() == warningInfoEntity.getWarningKind()) { //// if (warnHourUpDelayAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) { //// //创建或更新维修任务 //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity); //// } //// } else if (WarnKindEnum.WARN_WL_UP_HOUR_DELAY.getCode() == warningInfoEntity.getWarningKind()) { //// if (warnHourUpDelayAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) { //// //创建或更新维修任务 //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity); //// } //// } } else if (statusCode == WarningStatusEnum.STATUS_CLOSE.getCode()) { //预警恢复 LambdaQueryWrapper warningWrapper = Wrappers.query().lambda(); warningWrapper.eq(RtuWarningInfoEntity::getIsDeleted, 0); warningWrapper.eq(RtuWarningInfoEntity::getRtuCode, warningInfoEntity.getRtuCode()); warningWrapper.eq(RtuWarningInfoEntity::getWarningStatus, WarningStatusEnum.STATUS_HAPPEN.getCode()); List warningInfoEntityList = rtuWarningService.list(warningWrapper); if (null == warningInfoEntityList || warningInfoEntityList.size() == 0) { //更新测站运行状态 RtuStatusEntity statusEntity = new RtuStatusEntity(); statusEntity.setRtuCode(warningInfoEntity.getRtuCode()); statusEntity.setRunStatusId(EquipmentStatusEnum.STATUS_RUN.getCode()); rtuManageService.updateRtuStatus(statusEntity); //没有预警,可以关闭维修任务工单 CheckOrderInfoDTO checkOrderInfoDTO = new CheckOrderInfoDTO(); checkOrderInfoDTO.setRtuCode(warningInfoEntity.getRtuCode()); rtuCheckOrderService.closeOrder(checkOrderInfoDTO); } } } catch (Exception e) { log.error(e.getMessage(), e); } finally { // 手动提交 offset acknowledgment.acknowledge(); } } }