MessageHandler.java 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. package org.springblade.mq.kafka.handler;
  2. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  3. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.kafka.clients.consumer.ConsumerRecord;
  6. import org.springblade.core.tool.jackson.JsonUtil;
  7. import org.springblade.enums.*;
  8. import org.springblade.etl.service.IEtlTaskInfoService;
  9. import org.springblade.modules.baseinfo.rtu.service.IRtuBaseInfoService;
  10. import org.springblade.modules.business.check.order.dto.CheckOrderInfoDTO;
  11. import org.springblade.modules.business.check.order.service.IRtuCheckOrderService;
  12. import org.springblade.modules.business.data.service.IRtuDataGroundService;
  13. import org.springblade.modules.business.data.service.IRtuDataRainService;
  14. import org.springblade.modules.business.data.service.IRtuDataRiverService;
  15. import org.springblade.modules.business.data.service.IRtuDataRsvrService;
  16. import org.springblade.modules.business.rtumanage.dto.RtuStatusDTO;
  17. import org.springblade.modules.business.rtumanage.entity.RtuStatusEntity;
  18. import org.springblade.modules.business.rtumanage.service.IRtuManageService;
  19. import org.springblade.modules.baseinfo.warn.entity.WarnSettingEntity;
  20. import org.springblade.modules.business.warning.entity.RtuWarningInfoEntity;
  21. import org.springblade.modules.baseinfo.warn.service.IWarnSettingService;
  22. import org.springblade.modules.business.warning.service.IRtuWarningService;
  23. import org.springframework.beans.factory.annotation.Autowired;
  24. import org.springframework.beans.factory.annotation.Value;
  25. import org.springframework.data.redis.core.RedisTemplate;
  26. import org.springframework.kafka.annotation.KafkaListener;
  27. import org.springframework.kafka.core.KafkaTemplate;
  28. import org.springframework.kafka.support.Acknowledgment;
  29. import org.springframework.stereotype.Component;
  30. import javax.annotation.Resource;
  31. import java.util.List;
  32. /**
  33. * 消息处理器
  34. *
  35. * @author
  36. */
  37. @Slf4j
  38. @Component
  39. public class MessageHandler {
  40. @Resource
  41. private IRtuBaseInfoService rtuBaseInfoService;
  42. @Resource
  43. private IRtuCheckOrderService rtuCheckOrderService;
  44. @Resource
  45. private IRtuManageService rtuManageService;
  46. @Resource
  47. private IRtuWarningService rtuWarningService;
  48. @Resource
  49. private IEtlTaskInfoService etlTaskInfoService;
  50. @Resource
  51. private IRtuDataRainService rtuDataRainService;
  52. @Resource
  53. private IRtuDataRiverService rtuDataRiverService;
  54. @Resource
  55. private IRtuDataRsvrService rtuDataRsvrService;
  56. @Resource
  57. private IRtuDataGroundService rtuDataGroundService;
  58. @Resource
  59. private IWarnSettingService rtuWarnSettingService;
  60. @Autowired
  61. private KafkaTemplate<String, String> kafkaTemplate;
  62. @Resource
  63. private RedisTemplate redisTemplate;
  64. @Value("${spring.mq-topic.ywxt-rtu-status}")
  65. private String topicYwxtRtuStatus;
  66. // @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "dcs.power01.test.1", containerFactory = "ackContainerFactory")
  67. public void dcs(ConsumerRecord record, Acknowledgment acknowledgment) {
  68. try {
  69. String message = (String) record.value();
  70. log.info(message);
  71. } catch (Exception e) {
  72. log.error(e.getMessage(), e);
  73. } finally {
  74. // 手动提交 offset
  75. acknowledgment.acknowledge();
  76. }
  77. }
  78. /**
  79. * 测站状态检测
  80. *
  81. * @param record
  82. * @param acknowledgment
  83. */
  84. // @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-rtu-status}", containerFactory = "ackContainerFactory")
  85. public void ywxtRtuStatus(ConsumerRecord record, Acknowledgment acknowledgment) {
  86. try {
  87. String message = (String) record.value();
  88. RtuStatusDTO dto = JsonUtil.parse(message, RtuStatusDTO.class);
  89. //统计上报间隔
  90. rtuManageService.rtuUpCount(dto);
  91. //更新测站最新上报时间状态
  92. RtuStatusEntity statusEntity = new RtuStatusEntity();
  93. statusEntity.setRtuCode(dto.getRtuCode());
  94. statusEntity.setLastUpTime(dto.getLastUpTime());
  95. rtuManageService.updateLastTime(statusEntity);
  96. } catch (Exception e) {
  97. log.error(e.getMessage(), e);
  98. } finally {
  99. // 手动提交 offset
  100. acknowledgment.acknowledge();
  101. }
  102. }
  103. /**
  104. * 预警信息
  105. *
  106. * @param record
  107. * @param acknowledgment
  108. */
  109. // @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-warning}", containerFactory = "ackContainerFactory")
  110. public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
  111. try {
  112. String message = (String) record.value();
  113. RtuWarningInfoEntity warningInfoEntity = JsonUtil.parse(message, RtuWarningInfoEntity.class);
  114. int statusCode = warningInfoEntity.getWarningStatus();
  115. if (statusCode == WarningStatusEnum.STATUS_HAPPEN.getCode()) {
  116. //更新测站运行状态
  117. RtuStatusEntity statusEntity = new RtuStatusEntity();
  118. statusEntity.setRtuCode(warningInfoEntity.getRtuCode());
  119. statusEntity.setRunStatusId(EquipmentStatusEnum.STATUS_FAILURE.getCode());
  120. rtuManageService.updateRtuStatus(statusEntity);
  121. //测站检测参数
  122. int warnClockAutoTask = 0;
  123. int warnDelayAutoTask = 0;
  124. int warnOfflineAutoTask = 0;
  125. int warnMissOutAutoTask = 0;
  126. int warnOutlierValueAutoTask = 0;
  127. int warnHourUpDelayAutoTask = 0;
  128. LambdaQueryWrapper<WarnSettingEntity> warnSettingWrapper = Wrappers.<WarnSettingEntity>query().lambda();
  129. warnSettingWrapper.eq(WarnSettingEntity::getIsDeleted, 0);
  130. List<WarnSettingEntity> warnSettingEntityList = rtuWarnSettingService.list(warnSettingWrapper);
  131. if (null != warnSettingEntityList && warnSettingEntityList.size() > 0) {
  132. if (warnSettingEntityList.get(0).getWarnClockAutoTask() != null) {
  133. warnClockAutoTask = warnSettingEntityList.get(0).getWarnClockAutoTask();
  134. }
  135. if (warnSettingEntityList.get(0).getWarnDelayAutoTask() != null) {
  136. warnDelayAutoTask = warnSettingEntityList.get(0).getWarnDelayAutoTask();
  137. }
  138. if (warnSettingEntityList.get(0).getWarnOfflineAutoTask() != null) {
  139. warnOfflineAutoTask = warnSettingEntityList.get(0).getWarnOfflineAutoTask();
  140. }
  141. if (warnSettingEntityList.get(0).getWarnMissOutAutoTask() != null) {
  142. warnMissOutAutoTask = warnSettingEntityList.get(0).getWarnMissOutAutoTask();
  143. }
  144. if (warnSettingEntityList.get(0).getWarnOutlierValueAutoTask() != null) {
  145. warnOutlierValueAutoTask = warnSettingEntityList.get(0).getWarnOutlierValueAutoTask();
  146. }
  147. if (warnSettingEntityList.get(0).getWarnHourUpDelayAutoTask() != null) {
  148. warnHourUpDelayAutoTask = warnSettingEntityList.get(0).getWarnHourUpDelayAutoTask();
  149. }
  150. }
  151. // if (WarnKindEnum.WARN_CLOCK.getCode() == warningInfoEntity.getWarningKind()) {
  152. //// if (warnClockAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
  153. //// //创建或更新维修任务
  154. //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
  155. //// }
  156. //// } else if (WarnKindEnum.WARN_UP_MIN_DELAY.getCode() == warningInfoEntity.getWarningKind()) {
  157. //// if (warnDelayAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
  158. //// //创建或更新维修任务
  159. //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
  160. //// }
  161. //// } else if (WarnKindEnum.WARN_OFFLINE.getCode() == warningInfoEntity.getWarningKind()) {
  162. //// if (warnOfflineAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
  163. //// //创建或更新维修任务
  164. //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
  165. //// }
  166. //// } else if (WarnKindEnum.WARN_RAIN_MISS_OUT.getCode() == warningInfoEntity.getWarningKind()) {
  167. //// if (warnMissOutAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
  168. //// //创建或更新维修任务
  169. //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
  170. //// }
  171. //// } else if (WarnKindEnum.WARN_WL_MISS_OUT.getCode() == warningInfoEntity.getWarningKind()) {
  172. //// if (warnMissOutAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
  173. //// //创建或更新维修任务
  174. //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
  175. //// }
  176. //// } else if (WarnKindEnum.WARN_OUTLIER_VALUE.getCode() == warningInfoEntity.getWarningKind()){
  177. //// if (warnOutlierValueAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
  178. //// //创建或更新维修任务
  179. //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
  180. //// }
  181. //// }else if (WarnKindEnum.WARN_RAIN_UP_HOUR_DELAY.getCode() == warningInfoEntity.getWarningKind()) {
  182. //// if (warnHourUpDelayAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
  183. //// //创建或更新维修任务
  184. //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
  185. //// }
  186. //// } else if (WarnKindEnum.WARN_WL_UP_HOUR_DELAY.getCode() == warningInfoEntity.getWarningKind()) {
  187. //// if (warnHourUpDelayAutoTask == WarningActiveEnum.ACTIVE_OPEN.getCode()) {
  188. //// //创建或更新维修任务
  189. //// rtuCheckOrderService.createOrUpdateOrder(warningInfoEntity);
  190. //// }
  191. //// }
  192. } else if (statusCode == WarningStatusEnum.STATUS_CLOSE.getCode()) {
  193. //预警恢复
  194. LambdaQueryWrapper<RtuWarningInfoEntity> warningWrapper = Wrappers.<RtuWarningInfoEntity>query().lambda();
  195. warningWrapper.eq(RtuWarningInfoEntity::getIsDeleted, 0);
  196. warningWrapper.eq(RtuWarningInfoEntity::getRtuCode, warningInfoEntity.getRtuCode());
  197. warningWrapper.eq(RtuWarningInfoEntity::getWarningStatus, WarningStatusEnum.STATUS_HAPPEN.getCode());
  198. List<RtuWarningInfoEntity> warningInfoEntityList = rtuWarningService.list(warningWrapper);
  199. if (null == warningInfoEntityList || warningInfoEntityList.size() == 0) {
  200. //更新测站运行状态
  201. RtuStatusEntity statusEntity = new RtuStatusEntity();
  202. statusEntity.setRtuCode(warningInfoEntity.getRtuCode());
  203. statusEntity.setRunStatusId(EquipmentStatusEnum.STATUS_RUN.getCode());
  204. rtuManageService.updateRtuStatus(statusEntity);
  205. //没有预警,可以关闭维修任务工单
  206. CheckOrderInfoDTO checkOrderInfoDTO = new CheckOrderInfoDTO();
  207. checkOrderInfoDTO.setRtuCode(warningInfoEntity.getRtuCode());
  208. rtuCheckOrderService.closeOrder(checkOrderInfoDTO);
  209. }
  210. }
  211. } catch (Exception e) {
  212. log.error(e.getMessage(), e);
  213. } finally {
  214. // 手动提交 offset
  215. acknowledgment.acknowledge();
  216. }
  217. }
  218. }