Parcourir la source

兴安盟水资源运维系统-修改KAFKA监听

dylan il y a 2 ans
Parent
commit
fc8b04fa3d

+ 60 - 5
src/main/java/org/springblade/mq/kafka/handler/MessageHandler.java

@@ -7,6 +7,7 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
+import org.springblade.constant.RedisBusinessConstant;
 import org.springblade.core.tool.jackson.JsonUtil;
 import org.springblade.core.tool.utils.ConcurrentDateFormat;
 import org.springblade.core.tool.utils.Func;
@@ -25,12 +26,17 @@ import org.springblade.modules.business.warning.entity.RtuWarningInfoEntity;
 import org.springblade.modules.business.warning.service.IRtuWarningService;
 import org.springblade.utils.BytesHelp;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.redis.core.HashOperations;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.Acknowledgment;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 import java.util.Date;
 import java.util.List;
 
@@ -62,9 +68,12 @@ public class MessageHandler {
     @Resource
     private KafkaTemplate<String, String> kafkaTemplate;
 
-    @Value("${spring.mq-topic.szy-rtu-status}")
+    @Value("${spring.mq-topic.ywxt-rtu-status-update}")
     private String topicYwxtRtuStatus;
 
+    @Resource
+    private RedisTemplate<String, String> redisTemplate;
+
     /**
      * 报文
      *
@@ -75,7 +84,7 @@ public class MessageHandler {
     public void waterFlowRateMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
         try {
             String message = (String) record.value();
-            log.info("{} ,{}", "water flow rate  info", message);
+            log.info("{} ,{}", "tddcs water flow rate  info", message);
             JSONObject body = JSONObject.parseObject(message);
             WaterFlowRateHistoryInfoEntity historyInfoEntity = new WaterFlowRateHistoryInfoEntity();
             String rtuCode = body.getString(SzyFlowRateMetadataEnum.METADATA_RTU_CODE.getName());
@@ -114,6 +123,24 @@ public class MessageHandler {
                 entity.setTs(historyInfoEntity.getTs());
                 entity.setTm(historyInfoEntity.getTm());
                 waterFlowRateService.saveOrUpdate(entity);
+
+
+                //更新测站最新上报时间
+                HashOperations<String, String, String> hashops = redisTemplate.opsForHash();
+                String key = RedisBusinessConstant.KEY_RTU_RUN_INFO + rtuCode;
+                String lastDatetimeText = hashops.get(key, RedisBusinessConstant.KEY_RTU_RUN_INFO_LAST_TIME);
+                if (null != lastDatetimeText && lastDatetimeText.length() > 0) {
+                    LocalDateTime lastDateTime = LocalDateTime.parse(lastDatetimeText, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+                    LocalDateTime tm = LocalDateTime.ofInstant(entity.getTm().toInstant(), ZoneId.systemDefault());
+                    if (tm.isAfter(lastDateTime)) {
+                        lastDatetimeText = Func.formatDateTime(entity.getTm());
+                        hashops.put(key, RedisBusinessConstant.KEY_RTU_RUN_INFO_LAST_TIME, lastDatetimeText);
+                    }
+                } else {
+                    lastDatetimeText = Func.formatDateTime(entity.getTm());
+                    hashops.put(key, RedisBusinessConstant.KEY_RTU_RUN_INFO_LAST_TIME, lastDatetimeText);
+                }
+
                 //测站状态检测
                 RtuStatusDTO statusEntity = new RtuStatusDTO();
                 if (null != entity.getTm()) {
@@ -142,7 +169,35 @@ public class MessageHandler {
     public void rtuStatusMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
         try {
             String message = (String) record.value();
-            log.info("{} ,{}", "rtu status info", message);
+            JSONObject body = JSONObject.parseObject(message);
+            log.info("{} ,{}", "tddcs rtu status info", body.toJSONString());
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        } finally {
+            // 手动提交 offset
+            acknowledgment.acknowledge();
+        }
+    }
+
+    @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.szy-rtu-warn}", containerFactory = "ackContainerFactory")
+    public void rtuWarnMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
+        try {
+            String message = (String) record.value();
+            JSONObject body = JSONObject.parseObject(message);
+            log.info("{} ,{}", "tddcs rtu warn info", body.toJSONString());
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        } finally {
+            // 手动提交 offset
+            acknowledgment.acknowledge();
+        }
+    }
+
+    @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-rtu-status-update}", containerFactory = "ackContainerFactory")
+    public void ywxtRtuStatusMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
+        try {
+            String message = (String) record.value();
+            log.info("{} ,{}", "ywxt rtu status update", message);
             RtuStatusDTO dto = JsonUtil.parse(message, RtuStatusDTO.class);
             //更新测站最新上报时间状态
             RtuStatusEntity statusEntity = new RtuStatusEntity();
@@ -158,10 +213,10 @@ public class MessageHandler {
     }
 
     @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-warning}", containerFactory = "ackContainerFactory")
-    public void rtuWarnMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
+    public void ywxtWarnMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
         try {
             String message = (String) record.value();
-            log.info("{} ,{}", "rtu warn  info", message);
+            log.info("{} ,{}", "ywxt warn info", message);
             RtuWarningInfoEntity warningInfoEntity = JsonUtil.parse(message, RtuWarningInfoEntity.class);
             int statusCode = warningInfoEntity.getWarningStatus();
             if (statusCode == WarningStatusEnum.STATUS_HAPPEN.getCode()) {

+ 1 - 0
src/main/resources/application-dev.yml

@@ -104,6 +104,7 @@ spring:
     szy-rtu-status: topic-szy-rtu-status
     szy-rtu-warn: topic-szy-rtu-warn
     ywxt-warning: topic-ywxt-warning-message
+    ywxt-rtu-status-update: topic-ywxt-rtu-status-update
   apk:
     qr-url: http://111.204.228.227:20002/api/galaxy-business/version/app/qr/url
     url: http://111.204.228.227:20002/app/apk