|
|
@@ -9,33 +9,43 @@ package org.springblade.etl;
|
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;
|
|
|
+import org.springblade.core.tool.jackson.JsonUtil;
|
|
|
+import org.springblade.core.tool.utils.ConcurrentDateFormat;
|
|
|
+import org.springblade.core.tool.utils.Func;
|
|
|
+import org.springblade.etl.dto.EtlGroundDataDTO;
|
|
|
+import org.springblade.etl.dto.EtlRainDataDTO;
|
|
|
+import org.springblade.etl.dto.EtlRiverDataDTO;
|
|
|
+import org.springblade.etl.dto.EtlRsvrDataDTO;
|
|
|
import org.springblade.etl.service.*;
|
|
|
+import org.springblade.modules.baseinfo.rtu.entity.RtuInfoEntity;
|
|
|
+import org.springblade.modules.baseinfo.rtu.service.IRtuBaseInfoService;
|
|
|
import org.springblade.modules.baseinfo.stbase.service.IMvAttStBaseService;
|
|
|
-import org.springblade.modules.business.project.base.service.IProjectBaseInfoService;
|
|
|
-import org.springblade.modules.business.rtu.base.service.IRtuBaseInfoService;
|
|
|
-import org.springblade.modules.business.rtu.data.service.IRtuDataGroundService;
|
|
|
-import org.springblade.modules.business.rtu.data.service.IRtuDataRainService;
|
|
|
-import org.springblade.modules.business.rtu.data.service.IRtuDataRiverService;
|
|
|
-import org.springblade.modules.business.rtu.data.service.IRtuDataRsvrService;
|
|
|
-import org.springblade.modules.business.rtumanage.service.IRtuManageService;
|
|
|
-import org.springblade.modules.business.warning.service.IOriginalWarningService;
|
|
|
-import org.springblade.modules.business.warning.service.IRtuWarningService;
|
|
|
-import org.springblade.modules.system.service.IDictBizService;
|
|
|
+import org.springblade.modules.business.data.entity.RtuDataGroundEntity;
|
|
|
+import org.springblade.modules.business.data.entity.RtuDataRainEntity;
|
|
|
+import org.springblade.modules.business.data.entity.RtuDataRiverEntity;
|
|
|
+import org.springblade.modules.business.data.entity.RtuDataRsvrEntity;
|
|
|
+import org.springblade.modules.business.data.service.IRtuDataGroundService;
|
|
|
+import org.springblade.modules.business.data.service.IRtuDataRainService;
|
|
|
+import org.springblade.modules.business.data.service.IRtuDataRiverService;
|
|
|
+import org.springblade.modules.business.data.service.IRtuDataRsvrService;
|
|
|
+import org.springblade.modules.business.rtumanage.entity.RtuStatusEntity;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.data.redis.core.HashOperations;
|
|
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.kafka.core.KafkaTemplate;
|
|
|
-import org.springframework.messaging.simp.SimpMessagingTemplate;
|
|
|
import org.springframework.scheduling.annotation.EnableScheduling;
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
import java.util.concurrent.*;
|
|
|
|
|
|
/***
|
|
|
* Date:2022/5/22
|
|
|
- * Title: 环境辐射与气象监测系统(KRS) 模块
|
|
|
- * Description:MODBUS TCP 协议传输 ,任务管理
|
|
|
+ * Title: ETL模块
|
|
|
+ * Description: 定时任务管理
|
|
|
* @author swp
|
|
|
* @version 1.0
|
|
|
* Remark:认为有必要的其他信息
|
|
|
@@ -48,97 +58,185 @@ public class EtlTaskManager {
|
|
|
/**
|
|
|
* 公共线程池
|
|
|
**/
|
|
|
- private static ThreadFactory publicThreadFactory = new ThreadFactoryBuilder().setNameFormat("public-thread-pool-%d").build();
|
|
|
+ private static ThreadFactory publicThreadFactory = new ThreadFactoryBuilder().setNameFormat("etl-thread-pool-%d").build();
|
|
|
private static ExecutorService publicThreadPool = new ThreadPoolExecutor(1, 1,
|
|
|
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), publicThreadFactory, new ThreadPoolExecutor.AbortPolicy());
|
|
|
|
|
|
- @Resource
|
|
|
- private IEtlWarningService etlWarningService;
|
|
|
-
|
|
|
- @Resource
|
|
|
- private IRtuWarningService rtuWarningService;
|
|
|
-
|
|
|
- @Resource
|
|
|
- private IRtuBaseInfoService rtuBaseInfoService;
|
|
|
-
|
|
|
- @Resource
|
|
|
- private IProjectBaseInfoService projectBaseInfoService;
|
|
|
-
|
|
|
- @Resource
|
|
|
- private IRtuDataRiverService dataRiverService;
|
|
|
-
|
|
|
- @Resource
|
|
|
- private IRtuDataRsvrService dataRsvrService;
|
|
|
-
|
|
|
- @Resource
|
|
|
- private IRtuDataRainService dataRainService;
|
|
|
-
|
|
|
- @Resource
|
|
|
- private IRtuDataGroundService dataGroundService;
|
|
|
|
|
|
@Autowired
|
|
|
private KafkaTemplate<String, String> kafkaTemplate;
|
|
|
|
|
|
- @Autowired
|
|
|
- private SimpMessagingTemplate messagingTemplate;
|
|
|
-
|
|
|
@Autowired
|
|
|
private RedisTemplate redisTemplate;
|
|
|
|
|
|
@Resource
|
|
|
- private IDictBizService dictBizService;
|
|
|
+ private IEtlRainDataService etlRainDataService;
|
|
|
|
|
|
@Resource
|
|
|
- private IRtuManageService rtuManageService;
|
|
|
+ private IRtuDataRainService rtuDataRainService;
|
|
|
|
|
|
@Resource
|
|
|
- private IOriginalWarningService originalWarningService;
|
|
|
+ private IEtlRiverDataService etlRiverDataService;
|
|
|
|
|
|
@Resource
|
|
|
- private IEtlRainDataService etlRainDataService;
|
|
|
+ private IRtuDataRiverService rtuDataRiverService;
|
|
|
|
|
|
@Resource
|
|
|
- private IEtlRiverDataService etlRiverDataService;
|
|
|
+ private IEtlRsvrDataService etlRsvrDataService;
|
|
|
|
|
|
@Resource
|
|
|
- private IEtlRsvrDataService etlRsvrDataService;
|
|
|
+ private IRtuDataRsvrService rtuDataRsvrService;
|
|
|
|
|
|
@Resource
|
|
|
private IEtlGroundDataService etlGroundDataService;
|
|
|
|
|
|
+ @Resource
|
|
|
+ private IRtuDataGroundService rtuDataGroundService;
|
|
|
+
|
|
|
@Resource
|
|
|
private IEtlMvAttStBaseService etlMvAttStBaseService;
|
|
|
|
|
|
@Resource
|
|
|
private IMvAttStBaseService mvAttStBaseService;
|
|
|
|
|
|
+ @Resource
|
|
|
+ private IRtuBaseInfoService rtuBaseInfoService;
|
|
|
|
|
|
- @Scheduled(cron = "0 0/1 * * * * ")
|
|
|
- public void warnTask() {
|
|
|
+ @Scheduled(cron = "0 1 * * * * ")
|
|
|
+ public void attStBaseReadTask() {
|
|
|
try {
|
|
|
-// EtlMvAttStBaseDataTask task = new EtlMvAttStBaseDataTask(etlMvAttStBaseService, mvAttStBaseService);
|
|
|
-// FutureTask<Integer> futureTask = new FutureTask<>(task);
|
|
|
-// publicThreadPool.execute(futureTask);
|
|
|
-
|
|
|
-
|
|
|
-// EtlRainDataTask task = new EtlRainDataTask(redisTemplate, etlRainDataService, dataRainService);
|
|
|
-// FutureTask<Integer> futureTask = new FutureTask<>(task);
|
|
|
-// publicThreadPool.execute(futureTask);
|
|
|
-
|
|
|
-
|
|
|
-// TestWarnTask task2 = new TestWarnTask(redisTemplate,kafkaTemplate,messagingTemplate,etlWarningService,rtuWarningService,rtuBaseInfoService,projectBaseInfoService,dataRainService,dataRiverService,dataRsvrService,dataGroundService,originalWarningService);
|
|
|
-// FutureTask<Integer> futureTask2 = new FutureTask<>(task2);
|
|
|
-// publicThreadPool.execute(futureTask2);
|
|
|
-
|
|
|
-
|
|
|
-// TestDataTask task = new TestDataTask(redisTemplate, kafkaTemplate, messagingTemplate, etlWarningService, rtuWarningService, rtuBaseInfoService, projectBaseInfoService, dataRainService, dataRiverService, dataRsvrService, dataGroundService, dictBizService, rtuManageService,etlRainDataService,etlRiverDataService,etlRsvrDataService,etlGroundDataService);
|
|
|
-// FutureTask<Integer> futureTask = new FutureTask<>(task);
|
|
|
-// publicThreadPool.execute(futureTask);
|
|
|
-
|
|
|
+ log.info("ETL-测站-定时同步任务开始执行*********************");
|
|
|
+ EtlMvAttStBaseDataTask mvAttStBaseDataTask = new EtlMvAttStBaseDataTask(etlMvAttStBaseService, mvAttStBaseService);
|
|
|
+ FutureTask<Integer> futureTask = new FutureTask<>(mvAttStBaseDataTask);
|
|
|
+ publicThreadPool.execute(futureTask);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("{}", e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
+ @Scheduled(cron = "0 5 * * * * ")
|
|
|
+ public void rtuDataReadTask() {
|
|
|
+ try {
|
|
|
+ log.info("ETL-数据-定时同步任务开始执行*********************");
|
|
|
+ EtlRtuDataTask etlRtuDataTask = new EtlRtuDataTask();
|
|
|
+ FutureTask<Integer> futureTask = new FutureTask<>(etlRtuDataTask);
|
|
|
+ publicThreadPool.execute(futureTask);
|
|
|
} catch (Exception e) {
|
|
|
log.error("{}", e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private class EtlRtuDataTask implements Callable<Integer> {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Integer call() {
|
|
|
+ try {
|
|
|
+ log.info("测站定时报数据抽取开始**************");
|
|
|
+ HashOperations<String, String, String> opsHash = redisTemplate.opsForHash();
|
|
|
+ List<RtuInfoEntity> rtuInfoEntityList = rtuBaseInfoService.list();
|
|
|
+ for (RtuInfoEntity rtuInfoEntity : rtuInfoEntityList) {
|
|
|
+ Date rtuUpTm = null;
|
|
|
+ //读取最新一条雨情数据
|
|
|
+ EtlRainDataDTO etlRainDataDTO = new EtlRainDataDTO();
|
|
|
+ etlRainDataDTO.setRtuCode(rtuInfoEntity.getRtuCode());
|
|
|
+ if (opsHash.hasKey("etl_rtu_rain_read_time", rtuInfoEntity.getRtuCode())) {
|
|
|
+ String dt = opsHash.get("etl_rtu_rain_read_time", rtuInfoEntity.getRtuCode());
|
|
|
+ Date lastDate = Func.parse(dt, ConcurrentDateFormat.of("yyyy-MM-dd HH:mm:ss"));
|
|
|
+ etlRainDataDTO.setLastTime(lastDate);
|
|
|
+ }
|
|
|
+ RtuDataRainEntity rtuDataRainEntity = EtlRainDataTask.dataSync(etlRainDataDTO, etlRainDataService, rtuDataRainService);
|
|
|
+ if (null != rtuDataRainEntity) {
|
|
|
+ //保存时间
|
|
|
+ String dt = Func.formatDateTime(rtuDataRainEntity.getTm());
|
|
|
+ opsHash.put("etl_rtu_rain_read_time", rtuInfoEntity.getRtuCode(), dt);
|
|
|
+ //最后上报时间
|
|
|
+ if (rtuUpTm != null) {
|
|
|
+ if (rtuDataRainEntity.getTm().after(rtuUpTm)) {
|
|
|
+ rtuUpTm = rtuDataRainEntity.getTm();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ rtuUpTm = rtuDataRainEntity.getTm();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //读取最新一条河道数据
|
|
|
+ EtlRiverDataDTO etlRiverDataDTO = new EtlRiverDataDTO();
|
|
|
+ etlRiverDataDTO.setRtuCode(rtuInfoEntity.getRtuCode());
|
|
|
+ if (opsHash.hasKey("etl_rtu_river_read_time", rtuInfoEntity.getRtuCode())) {
|
|
|
+ String dt = opsHash.get("etl_rtu_river_read_time", rtuInfoEntity.getRtuCode());
|
|
|
+ Date lastDate = Func.parse(dt, ConcurrentDateFormat.of("yyyy-MM-dd HH:mm:ss"));
|
|
|
+ etlRiverDataDTO.setLastTime(lastDate);
|
|
|
+ }
|
|
|
+ RtuDataRiverEntity rtuDataRiverEntity = EtlRiverDataTask.dataSync(etlRiverDataDTO, etlRiverDataService, rtuDataRiverService);
|
|
|
+ if (null != rtuDataRiverEntity) {
|
|
|
+ //保存时间
|
|
|
+ String dt = Func.formatDateTime(rtuDataRiverEntity.getTm());
|
|
|
+ opsHash.put("etl_rtu_river_read_time", rtuInfoEntity.getRtuCode(), dt);
|
|
|
+ //最后上报时间
|
|
|
+ if (rtuUpTm != null) {
|
|
|
+ if (rtuDataRiverEntity.getTm().after(rtuUpTm)) {
|
|
|
+ rtuUpTm = rtuDataRiverEntity.getTm();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ rtuUpTm = rtuDataRiverEntity.getTm();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //读取最新一条水库数据
|
|
|
+ EtlRsvrDataDTO etlRsvrDataDTO = new EtlRsvrDataDTO();
|
|
|
+ etlRsvrDataDTO.setRtuCode(rtuInfoEntity.getRtuCode());
|
|
|
+ if (opsHash.hasKey("etl_rtu_rsvr_read_time", rtuInfoEntity.getRtuCode())) {
|
|
|
+ String dt = opsHash.get("etl_rtu_rsvr_read_time", rtuInfoEntity.getRtuCode());
|
|
|
+ Date lastDate = Func.parse(dt, ConcurrentDateFormat.of("yyyy-MM-dd HH:mm:ss"));
|
|
|
+ etlRsvrDataDTO.setLastTime(lastDate);
|
|
|
+ }
|
|
|
+ RtuDataRsvrEntity rsvrDataEntity = EtlRsvrDataTask.dataSync(etlRsvrDataDTO, etlRsvrDataService, rtuDataRsvrService);
|
|
|
+ if (null != rsvrDataEntity) {
|
|
|
+ //保存时间
|
|
|
+ String dt = Func.formatDateTime(rsvrDataEntity.getTm());
|
|
|
+ opsHash.put("etl_rtu_rsvr_read_time", rtuInfoEntity.getRtuCode(), dt);
|
|
|
+ //最后上报时间
|
|
|
+ if (rtuUpTm != null) {
|
|
|
+ if (rsvrDataEntity.getTm().after(rtuUpTm)) {
|
|
|
+ rtuUpTm = rsvrDataEntity.getTm();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ rtuUpTm = rsvrDataEntity.getTm();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //读取最新一条墒情数据
|
|
|
+ EtlGroundDataDTO groundDataDTO = new EtlGroundDataDTO();
|
|
|
+ groundDataDTO.setRtuCode(rtuInfoEntity.getRtuCode());
|
|
|
+ if (opsHash.hasKey("etl_rtu_ground_read_time", rtuInfoEntity.getRtuCode())) {
|
|
|
+ String dt = opsHash.get("etl_rtu_ground_read_time", rtuInfoEntity.getRtuCode());
|
|
|
+ Date lastDate = Func.parse(dt, ConcurrentDateFormat.of("yyyy-MM-dd HH:mm:ss"));
|
|
|
+ groundDataDTO.setLastTime(lastDate);
|
|
|
+ }
|
|
|
+ RtuDataGroundEntity groundDataEntity = EtlGroundDataTask.dataSync(groundDataDTO, etlGroundDataService, rtuDataGroundService);
|
|
|
+ if (null != groundDataEntity) {
|
|
|
+ //保存时间
|
|
|
+ String dt = Func.formatDateTime(groundDataEntity.getTm());
|
|
|
+ opsHash.put("etl_rtu_ground_read_time", rtuInfoEntity.getRtuCode(), dt);
|
|
|
+ //最后上报时间
|
|
|
+ if (rtuUpTm != null) {
|
|
|
+ if (groundDataEntity.getTm().after(rtuUpTm)) {
|
|
|
+ rtuUpTm = groundDataEntity.getTm();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ rtuUpTm = groundDataEntity.getTm();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //更新测站最近上报时间
|
|
|
+ if (null != rtuUpTm) {
|
|
|
+ RtuStatusEntity statusEntity = new RtuStatusEntity();
|
|
|
+ statusEntity.setLastUpTime(rtuUpTm);
|
|
|
+ statusEntity.setRtuCode(rtuInfoEntity.getRtuCode());
|
|
|
+ kafkaTemplate.send("ywxt.rtu.status", JsonUtil.toJson(statusEntity));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("实时数据统计任务异常 {}", e.getMessage());
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|