|
@@ -5,11 +5,23 @@ import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
|
|
|
|
|
import org.springblade.constant.RtuDataInfoConstant;
|
|
import org.springblade.constant.RtuDataInfoConstant;
|
|
|
|
|
+import org.springblade.core.tool.utils.ConcurrentDateFormat;
|
|
|
|
|
+import org.springblade.core.tool.utils.Func;
|
|
|
|
|
+import org.springblade.modules.business.data.entity.*;
|
|
|
|
|
+import org.springblade.modules.business.data.service.*;
|
|
|
|
|
+import org.springblade.modules.business.datagram.entity.RtuDatagramInfoEntity;
|
|
|
|
|
+import org.springblade.modules.business.datagram.service.IRtuDatagramService;
|
|
|
|
|
+import org.springblade.modules.business.rtumanage.entity.RtuStatusEntity;
|
|
|
|
|
+import org.springblade.modules.business.rtumanage.service.IRtuManageService;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
|
|
|
+import org.springframework.kafka.core.KafkaTemplate;
|
|
|
import org.springframework.kafka.support.Acknowledgment;
|
|
import org.springframework.kafka.support.Acknowledgment;
|
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
|
|
+import javax.annotation.Resource;
|
|
|
|
|
+import java.util.Date;
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* KAFKA消息处理器
|
|
* KAFKA消息处理器
|
|
|
*
|
|
*
|
|
@@ -29,6 +41,222 @@ public class MessageHandler {
|
|
|
@Value("${spring.mq-topic.ywxt-cloud-rtu-data-rsvr}")
|
|
@Value("${spring.mq-topic.ywxt-cloud-rtu-data-rsvr}")
|
|
|
private String topicRtuDataRsvr;
|
|
private String topicRtuDataRsvr;
|
|
|
|
|
|
|
|
|
|
+ @Value("${spring.mq-topic.ywxt-rtu-status}")
|
|
|
|
|
+ private String topicRtuStatus;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private KafkaTemplate<String, String> kafkaTemplate;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IRtuDataRainService rtuDataRainService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IRtuDataRiverService rtuDataRiverService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IRtuDataRsvrService rtuDataRsvrService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IRtuDataRainHistoreService rtuDataRainHistoreService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IRtuDataRiverHistoreService rtuDataRiverHistoreService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IRtuDataRsvrHistoreService rtuDataRsvrHistoreService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IRtuRealTimeStatusService rtuRealTimeStatusService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IRtuRealTimeStatusHistoreService rtuRealTimeStatusHistoreService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IRtuDatagramService rtuDatagramService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IRtuDataSzyMpService mpService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IRtuDataSzyMpHistoreService mpHistoreService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IRtuDataSzyRiverService szyRiverService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IRtuDataSzyRiverHistoreService szyRiverHistoreService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IRtuManageService rtuManageService;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private IRtuDataWeatherService weatherService;
|
|
|
|
|
+
|
|
|
|
|
+ @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-cloud-collect-datagram}", containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void datagramInfoMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String message = (String) record.value();
|
|
|
|
|
+ log.info("{} ,{}", "datagram info", message);
|
|
|
|
|
+
|
|
|
|
|
+ JSONObject data = JSONObject.parseObject(message, JSONObject.class);
|
|
|
|
|
+ String rtuCode = data.getString(RtuDataInfoConstant.BASE_KEY_RTU);
|
|
|
|
|
+ String datagram = data.getString(RtuDataInfoConstant.BASE_KEY_DATAGRAM);
|
|
|
|
|
+ String pickTm = data.getString(RtuDataInfoConstant.BASE_KEY_DATAGRAM_PICK_TM);
|
|
|
|
|
+ String agreement = data.getString(RtuDataInfoConstant.BASE_KEY_AGREEMENT);
|
|
|
|
|
+
|
|
|
|
|
+ RtuDatagramInfoEntity entity = new RtuDatagramInfoEntity();
|
|
|
|
|
+ entity.setRtuCode(rtuCode);
|
|
|
|
|
+ entity.setAgreement(agreement);
|
|
|
|
|
+ entity.setDatagram(datagram);
|
|
|
|
|
+ Date pickDt = Func.parse(pickTm, ConcurrentDateFormat.of("yyyyMMddHHmmss"));
|
|
|
|
|
+ entity.setPickTime(pickDt);
|
|
|
|
|
+ rtuDatagramService.save(entity);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ // 手动提交 offset
|
|
|
|
|
+ acknowledgment.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-cloud-collect-datagram-ext}", containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void datagramExtInfoMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String message = (String) record.value();
|
|
|
|
|
+ log.info("{} ,{}", "datagram info ext", message);
|
|
|
|
|
+ JSONObject data = JSONObject.parseObject(message, JSONObject.class);
|
|
|
|
|
+ String rtuCode = data.getString(RtuDataInfoConstant.BASE_KEY_RTU);
|
|
|
|
|
+ String datagram = data.getString(RtuDataInfoConstant.BASE_KEY_DATAGRAM);
|
|
|
|
|
+ String pickTm = data.getString(RtuDataInfoConstant.BASE_KEY_DATAGRAM_PICK_TM);
|
|
|
|
|
+ String agreement = data.getString(RtuDataInfoConstant.BASE_KEY_AGREEMENT);
|
|
|
|
|
+
|
|
|
|
|
+ RtuDatagramInfoEntity entity = new RtuDatagramInfoEntity();
|
|
|
|
|
+ entity.setRtuCode(rtuCode);
|
|
|
|
|
+ entity.setAgreement(agreement);
|
|
|
|
|
+ entity.setDatagram(datagram);
|
|
|
|
|
+ Date pickDt = Func.parse(pickTm, ConcurrentDateFormat.of("yyyyMMddHHmmss"));
|
|
|
|
|
+ entity.setPickTime(pickDt);
|
|
|
|
|
+ rtuDatagramService.save(entity);
|
|
|
|
|
+
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ // 手动提交 offset
|
|
|
|
|
+ acknowledgment.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-cloud-rtu-data-szy-river}", containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void szyRiverInfoMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String message = (String) record.value();
|
|
|
|
|
+ log.info("{} ,{}", "rtu data szy river info", message);
|
|
|
|
|
+ JSONObject data = JSONObject.parseObject(message, JSONObject.class);
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY)) {
|
|
|
|
|
+ log.info("rtu code {}", data.getString(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY));
|
|
|
|
|
+ RtuDataWqRiverEntity entity = new RtuDataWqRiverEntity();
|
|
|
|
|
+ entity.setRtuCode(data.getString(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY));
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_COLLECT_TIME_KEY)) {
|
|
|
|
|
+ entity.setTm(data.getDate(RtuDataInfoConstant.RTU_DATA_COLLECT_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_UP_TIME_KEY)) {
|
|
|
|
|
+ entity.setUpTime(data.getDate(RtuDataInfoConstant.RTU_DATA_UP_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_PICK_TIME_KEY)) {
|
|
|
|
|
+ entity.setFromTime(data.getDate(RtuDataInfoConstant.RTU_DATA_PICK_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_SZY_RIVER_HIGH)) {
|
|
|
|
|
+ entity.setWaterLevelHigh(data.getString(RtuDataInfoConstant.RTU_DATA_SZY_RIVER_HIGH));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_SZY_RIVER_SPEED)) {
|
|
|
|
|
+ entity.setWaterFlowSpeed(data.getString(RtuDataInfoConstant.RTU_DATA_SZY_RIVER_SPEED));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ szyRiverService.updateInfo(entity);
|
|
|
|
|
+
|
|
|
|
|
+ RtuDataWqRiverStoreEntity storeEntity = new RtuDataWqRiverStoreEntity();
|
|
|
|
|
+ storeEntity.setRtuCode(entity.getRtuCode());
|
|
|
|
|
+ storeEntity.setTm(entity.getTm());
|
|
|
|
|
+ storeEntity.setUpTime(entity.getUpTime());
|
|
|
|
|
+ storeEntity.setFromTime(entity.getFromTime());
|
|
|
|
|
+ storeEntity.setWaterLevelHigh(entity.getWaterLevelHigh());
|
|
|
|
|
+ storeEntity.setWaterFlowSpeed(entity.getWaterFlowSpeed());
|
|
|
|
|
+ szyRiverHistoreService.save(storeEntity);
|
|
|
|
|
+
|
|
|
|
|
+ RtuStatusEntity rtuStatusEntity = new RtuStatusEntity();
|
|
|
|
|
+ rtuStatusEntity.setRtuCode(entity.getRtuCode());
|
|
|
|
|
+ rtuStatusEntity.setLastUpTime(entity.getUpTime());
|
|
|
|
|
+ kafkaTemplate.send(topicRtuStatus, Func.toJson(rtuStatusEntity));
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ // 手动提交 offset
|
|
|
|
|
+ acknowledgment.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-cloud-rtu-data-szy-mp-q-r}", containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void szyMpQRInfoMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String message = (String) record.value();
|
|
|
|
|
+ log.info("{} ,{}", "rtu data mp info", message);
|
|
|
|
|
+ JSONObject data = JSONObject.parseObject(message, JSONObject.class);
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY)) {
|
|
|
|
|
+ log.info("rtu code {}", data.getString(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY));
|
|
|
|
|
+ RtuDataSzyFlowRateEntity entity = new RtuDataSzyFlowRateEntity();
|
|
|
|
|
+ entity.setRtuCode(data.getString(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY));
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_COLLECT_TIME_KEY)) {
|
|
|
|
|
+ entity.setTm(data.getDate(RtuDataInfoConstant.RTU_DATA_COLLECT_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_UP_TIME_KEY)) {
|
|
|
|
|
+ entity.setUpTime(data.getDate(RtuDataInfoConstant.RTU_DATA_UP_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_PICK_TIME_KEY)) {
|
|
|
|
|
+ entity.setFromTime(data.getDate(RtuDataInfoConstant.RTU_DATA_PICK_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_SZY_MP_Q_R_FIRST_FLOWRATE)) {
|
|
|
|
|
+ entity.setMpQ(data.getString(RtuDataInfoConstant.RTU_DATA_SZY_MP_Q_R_FIRST_FLOWRATE));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_SZY_MP_Q_R_FIRST_ACC_FLOWRATE)) {
|
|
|
|
|
+ entity.setAccW(data.getString(RtuDataInfoConstant.RTU_DATA_SZY_MP_Q_R_FIRST_ACC_FLOWRATE));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_SZY_MP_Q_R_SECOND_FLOWRATE)) {
|
|
|
|
|
+ entity.setMpSencondQ(data.getString(RtuDataInfoConstant.RTU_DATA_SZY_MP_Q_R_SECOND_FLOWRATE));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_SZY_MP_Q_R_SECOND_ACC_FLOWRATE)) {
|
|
|
|
|
+ entity.setAccSencondW(data.getString(RtuDataInfoConstant.RTU_DATA_SZY_MP_Q_R_SECOND_ACC_FLOWRATE));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ mpService.updateInfo(entity);
|
|
|
|
|
+ RtuDataSzyFlowRateStoreEntity storeEntity = new RtuDataSzyFlowRateStoreEntity();
|
|
|
|
|
+ storeEntity.setRtuCode(entity.getRtuCode());
|
|
|
|
|
+ storeEntity.setTm(entity.getTm());
|
|
|
|
|
+ storeEntity.setUpTime(entity.getUpTime());
|
|
|
|
|
+ storeEntity.setFromTime(entity.getFromTime());
|
|
|
|
|
+ storeEntity.setMpQ(entity.getMpQ());
|
|
|
|
|
+ storeEntity.setAccW(entity.getAccW());
|
|
|
|
|
+ storeEntity.setMpSencondQ(entity.getMpSencondQ());
|
|
|
|
|
+ storeEntity.setAccSencondW(entity.getAccSencondW());
|
|
|
|
|
+
|
|
|
|
|
+ mpHistoreService.save(storeEntity);
|
|
|
|
|
+
|
|
|
|
|
+ RtuStatusEntity rtuStatusEntity = new RtuStatusEntity();
|
|
|
|
|
+ rtuStatusEntity.setRtuCode(entity.getRtuCode());
|
|
|
|
|
+ rtuStatusEntity.setLastUpTime(entity.getUpTime());
|
|
|
|
|
+ kafkaTemplate.send(topicRtuStatus, Func.toJson(rtuStatusEntity));
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ // 手动提交 offset
|
|
|
|
|
+ acknowledgment.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* RTU雨情数据
|
|
* RTU雨情数据
|
|
@@ -44,6 +272,53 @@ public class MessageHandler {
|
|
|
JSONObject data = JSONObject.parseObject(message, JSONObject.class);
|
|
JSONObject data = JSONObject.parseObject(message, JSONObject.class);
|
|
|
if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY)) {
|
|
if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY)) {
|
|
|
log.info("rtu code {}", data.getString(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY));
|
|
log.info("rtu code {}", data.getString(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY));
|
|
|
|
|
+ RtuDataRainEntity entity = new RtuDataRainEntity();
|
|
|
|
|
+ entity.setRtuCode(data.getString(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY));
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_COLLECT_TIME_KEY)) {
|
|
|
|
|
+ entity.setTm(data.getDate(RtuDataInfoConstant.RTU_DATA_COLLECT_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_UP_TIME_KEY)) {
|
|
|
|
|
+ entity.setUpTime(data.getDate(RtuDataInfoConstant.RTU_DATA_UP_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_PICK_TIME_KEY)) {
|
|
|
|
|
+ entity.setFromTime(data.getDate(RtuDataInfoConstant.RTU_DATA_PICK_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RAIN_DRP_KEY)) {
|
|
|
|
|
+ entity.setDrp(data.getDouble(RtuDataInfoConstant.RTU_DATA_RAIN_DRP_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RAIN_INTV_KEY)) {
|
|
|
|
|
+ entity.setIntv(data.getDouble(RtuDataInfoConstant.RTU_DATA_RAIN_INTV_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RAIN_PDR_KEY)) {
|
|
|
|
|
+ entity.setPdr(data.getDouble(RtuDataInfoConstant.RTU_DATA_RAIN_PDR_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RAIN_DYP_KEY)) {
|
|
|
|
|
+ entity.setDyp(data.getDouble(RtuDataInfoConstant.RTU_DATA_RAIN_DYP_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RAIN_WTH_KEY)) {
|
|
|
|
|
+ entity.setWth(data.getString(RtuDataInfoConstant.RTU_DATA_RAIN_WTH_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RAIN_PT_KEY)) {
|
|
|
|
|
+ entity.setPt(data.getDouble(RtuDataInfoConstant.RTU_DATA_RAIN_PT_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ rtuDataRainService.updateRainInfo(entity);
|
|
|
|
|
+ RtuDataRainStoreEntity storeEntity = new RtuDataRainStoreEntity();
|
|
|
|
|
+ storeEntity.setRtuCode(entity.getRtuCode());
|
|
|
|
|
+ storeEntity.setTm(entity.getTm());
|
|
|
|
|
+ storeEntity.setUpTime(entity.getUpTime());
|
|
|
|
|
+ storeEntity.setFromTime(entity.getFromTime());
|
|
|
|
|
+ storeEntity.setDrp(entity.getDrp());
|
|
|
|
|
+ storeEntity.setIntv(entity.getIntv());
|
|
|
|
|
+ storeEntity.setPdr(entity.getPdr());
|
|
|
|
|
+ storeEntity.setDyp(entity.getDyp());
|
|
|
|
|
+ storeEntity.setWth(entity.getWth());
|
|
|
|
|
+ storeEntity.setPt(entity.getPt());
|
|
|
|
|
+ rtuDataRainHistoreService.save(storeEntity);
|
|
|
|
|
+
|
|
|
|
|
+ RtuStatusEntity rtuStatusEntity = new RtuStatusEntity();
|
|
|
|
|
+ rtuStatusEntity.setRtuCode(entity.getRtuCode());
|
|
|
|
|
+ rtuStatusEntity.setLastUpTime(entity.getUpTime());
|
|
|
|
|
+ kafkaTemplate.send(topicRtuStatus, Func.toJson(rtuStatusEntity));
|
|
|
}
|
|
}
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error(e.getMessage(), e);
|
|
log.error(e.getMessage(), e);
|
|
@@ -67,6 +342,34 @@ public class MessageHandler {
|
|
|
JSONObject data = JSONObject.parseObject(message, JSONObject.class);
|
|
JSONObject data = JSONObject.parseObject(message, JSONObject.class);
|
|
|
if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY)) {
|
|
if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY)) {
|
|
|
log.info("rtu code {}", data.getString(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY));
|
|
log.info("rtu code {}", data.getString(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY));
|
|
|
|
|
+ RtuDataRiverEntity entity = new RtuDataRiverEntity();
|
|
|
|
|
+ entity.setRtuCode(data.getString(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY));
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_COLLECT_TIME_KEY)) {
|
|
|
|
|
+ entity.setTm(data.getDate(RtuDataInfoConstant.RTU_DATA_COLLECT_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_UP_TIME_KEY)) {
|
|
|
|
|
+ entity.setUpTime(data.getDate(RtuDataInfoConstant.RTU_DATA_UP_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_PICK_TIME_KEY)) {
|
|
|
|
|
+ entity.setFromTime(data.getDate(RtuDataInfoConstant.RTU_DATA_PICK_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RIVER_Z_KEY)) {
|
|
|
|
|
+ entity.setZ(data.getDouble(RtuDataInfoConstant.RTU_DATA_RIVER_Z_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ rtuDataRiverService.updateRiverInfo(entity);
|
|
|
|
|
+
|
|
|
|
|
+ RtuDataRiverStoreEntity storeEntity = new RtuDataRiverStoreEntity();
|
|
|
|
|
+ storeEntity.setRtuCode(entity.getRtuCode());
|
|
|
|
|
+ storeEntity.setTm(entity.getTm());
|
|
|
|
|
+ storeEntity.setUpTime(entity.getUpTime());
|
|
|
|
|
+ storeEntity.setFromTime(entity.getFromTime());
|
|
|
|
|
+ storeEntity.setZ(entity.getZ());
|
|
|
|
|
+ rtuDataRiverHistoreService.save(storeEntity);
|
|
|
|
|
+
|
|
|
|
|
+ RtuStatusEntity rtuStatusEntity = new RtuStatusEntity();
|
|
|
|
|
+ rtuStatusEntity.setRtuCode(entity.getRtuCode());
|
|
|
|
|
+ rtuStatusEntity.setLastUpTime(entity.getUpTime());
|
|
|
|
|
+ kafkaTemplate.send(topicRtuStatus, Func.toJson(rtuStatusEntity));
|
|
|
}
|
|
}
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error(e.getMessage(), e);
|
|
log.error(e.getMessage(), e);
|
|
@@ -90,6 +393,58 @@ public class MessageHandler {
|
|
|
JSONObject data = JSONObject.parseObject(message, JSONObject.class);
|
|
JSONObject data = JSONObject.parseObject(message, JSONObject.class);
|
|
|
if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY)) {
|
|
if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY)) {
|
|
|
log.info("rtu code {}", data.getString(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY));
|
|
log.info("rtu code {}", data.getString(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY));
|
|
|
|
|
+ RtuDataRsvrEntity entity = new RtuDataRsvrEntity();
|
|
|
|
|
+ entity.setRtuCode(data.getString(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY));
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_COLLECT_TIME_KEY)) {
|
|
|
|
|
+ entity.setTm(data.getDate(RtuDataInfoConstant.RTU_DATA_COLLECT_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_UP_TIME_KEY)) {
|
|
|
|
|
+ entity.setUpTime(data.getDate(RtuDataInfoConstant.RTU_DATA_UP_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_PICK_TIME_KEY)) {
|
|
|
|
|
+ entity.setFromTime(data.getDate(RtuDataInfoConstant.RTU_DATA_PICK_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_RSVR_RZ_KEY)) {
|
|
|
|
|
+ entity.setRz(data.getDouble(RtuDataInfoConstant.RTU_DATA_RSVR_RZ_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ rtuDataRsvrService.updateRsvrInfo(entity);
|
|
|
|
|
+
|
|
|
|
|
+ RtuDataRsvrStoreEntity storeEntity = new RtuDataRsvrStoreEntity();
|
|
|
|
|
+ storeEntity.setRtuCode(entity.getRtuCode());
|
|
|
|
|
+ storeEntity.setTm(entity.getTm());
|
|
|
|
|
+ storeEntity.setUpTime(entity.getUpTime());
|
|
|
|
|
+ storeEntity.setFromTime(entity.getFromTime());
|
|
|
|
|
+ storeEntity.setRz(entity.getRz());
|
|
|
|
|
+ rtuDataRsvrHistoreService.save(storeEntity);
|
|
|
|
|
+
|
|
|
|
|
+ RtuStatusEntity rtuStatusEntity = new RtuStatusEntity();
|
|
|
|
|
+ rtuStatusEntity.setRtuCode(entity.getRtuCode());
|
|
|
|
|
+ rtuStatusEntity.setLastUpTime(entity.getUpTime());
|
|
|
|
|
+ kafkaTemplate.send(topicRtuStatus, Func.toJson(rtuStatusEntity));
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ // 手动提交 offset
|
|
|
|
|
+ acknowledgment.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 测站实时状态信息
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param record
|
|
|
|
|
+ * @param acknowledgment
|
|
|
|
|
+ */
|
|
|
|
|
+ @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-cloud-rtu-realtime-info}", containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void rtuRealtimeStatusInfoMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String message = (String) record.value();
|
|
|
|
|
+ log.info("{} ,{}", "rtu realtime status info", message);
|
|
|
|
|
+ JSONObject data = JSONObject.parseObject(message, JSONObject.class);
|
|
|
|
|
+ if (rtuRealTimeStatusService.updateExtStatus(data)) {
|
|
|
|
|
+ rtuRealTimeStatusHistoreService.saveExtStatus(data);
|
|
|
}
|
|
}
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error(e.getMessage(), e);
|
|
log.error(e.getMessage(), e);
|
|
@@ -99,4 +454,67 @@ public class MessageHandler {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-cloud-rtu-warn-info}", containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void rtuWarnStatusInfoMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String message = (String) record.value();
|
|
|
|
|
+ log.info("{} ,{}", "rtu warn status info", message);
|
|
|
|
|
+ JSONObject data = JSONObject.parseObject(message, JSONObject.class);
|
|
|
|
|
+ log.info("rtu code {}", data.getString(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY));
|
|
|
|
|
+ if (rtuRealTimeStatusService.updateWarnStatus(data)) {
|
|
|
|
|
+ rtuRealTimeStatusHistoreService.saveWarnStatus(data);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ // 手动提交 offset
|
|
|
|
|
+ acknowledgment.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-cloud-rtu-data-weather}", containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void rtuWeatherMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String message = (String) record.value();
|
|
|
|
|
+ JSONObject data = JSONObject.parseObject(message, JSONObject.class);
|
|
|
|
|
+ RtuDataWeatherEntity entity = new RtuDataWeatherEntity();
|
|
|
|
|
+ entity.setRtuCode(data.getString(RtuDataInfoConstant.RTU_DATA_RTUCODE_KEY));
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_COLLECT_TIME_KEY)) {
|
|
|
|
|
+ entity.setTm(data.getDate(RtuDataInfoConstant.RTU_DATA_COLLECT_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_UP_TIME_KEY)) {
|
|
|
|
|
+ entity.setUpTime(data.getDate(RtuDataInfoConstant.RTU_DATA_UP_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey(RtuDataInfoConstant.RTU_DATA_PICK_TIME_KEY)) {
|
|
|
|
|
+ entity.setFromTime(data.getDate(RtuDataInfoConstant.RTU_DATA_PICK_TIME_KEY));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey("AI")) {
|
|
|
|
|
+ entity.setAi(data.getDouble("AI"));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (data.containsKey("MST")) {
|
|
|
|
|
+ entity.setMst(data.getDouble("MST"));
|
|
|
|
|
+ }
|
|
|
|
|
+ weatherService.save(entity);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ // 手动提交 offset
|
|
|
|
|
+ acknowledgment.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.ywxt-rtu-status}", containerFactory = "ackContainerFactory")
|
|
|
|
|
+ public void rtuStatusMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ String message = (String) record.value();
|
|
|
|
|
+ RtuStatusEntity rtuStatusEntity = JSONObject.parseObject(message, RtuStatusEntity.class);
|
|
|
|
|
+ rtuManageService.updateLastTime(rtuStatusEntity);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ // 手动提交 offset
|
|
|
|
|
+ acknowledgment.acknowledge();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
}
|
|
}
|