EtlMpQrDataTask.java 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. /**
  2. * Copyright
  3. * All right reserved.
  4. * 项目名称:运维系统
  5. * 创建日期:2022/5/22
  6. */
  7. package org.springblade.modules.etl.task;
  8. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  9. import com.baomidou.mybatisplus.core.metadata.IPage;
  10. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.springblade.core.mp.support.Condition;
  13. import org.springblade.core.mp.support.Query;
  14. import org.springblade.core.tool.utils.ConcurrentDateFormat;
  15. import org.springblade.core.tool.utils.Func;
  16. import org.springblade.core.tool.utils.SpringUtil;
  17. import org.springblade.modules.baseinfo.org.entity.DeptRegionEntity;
  18. import org.springblade.modules.business.monitor.entity.WaterFlowRateHistoryInfoEntity;
  19. import org.springblade.modules.business.monitor.service.IWaterFlowRateHistoryService;
  20. import org.springblade.modules.etl.dto.EtlMpQrDTO;
  21. import org.springblade.modules.etl.entity.EtlMpQrEntity;
  22. import org.springblade.modules.etl.service.IEtlMpQrDataService;
  23. import java.util.Date;
  24. import java.util.List;
  25. import java.util.concurrent.Callable;
  26. /***
  27. * Date:2022/5/22
  28. * Title: 历史数据抽取
  29. * Description:
  30. * @author swp
  31. * @version 1.0
  32. * Remark:认为有必要的其他信息
  33. */
  34. @Slf4j
  35. public class EtlMpQrDataTask implements Callable<Integer> {
  36. private static IEtlMpQrDataService etlMpQrDataService;
  37. private static IEtlMpQrDataService getEtlMpQrDataService() {
  38. if (etlMpQrDataService == null) {
  39. etlMpQrDataService = SpringUtil.getBean(IEtlMpQrDataService.class);
  40. }
  41. return etlMpQrDataService;
  42. }
  43. private static IWaterFlowRateHistoryService waterFlowRateHistoryService;
  44. private static IWaterFlowRateHistoryService getWaterFlowRateHistoryService() {
  45. if (waterFlowRateHistoryService == null) {
  46. waterFlowRateHistoryService = SpringUtil.getBean(IWaterFlowRateHistoryService.class);
  47. }
  48. return waterFlowRateHistoryService;
  49. }
  50. private String year;
  51. public EtlMpQrDataTask(String year) {
  52. this.year = year;
  53. }
  54. @Override
  55. public Integer call() {
  56. try {
  57. log.info("{}年历史数据导入任务开始执行*********************", year);
  58. EtlMpQrDTO dto = new EtlMpQrDTO();
  59. String stStr = year + "0101000000";
  60. Date st = Func.parse(stStr, ConcurrentDateFormat.of("yyyyMMddHHmmss"));
  61. String etStr = Func.toStr(Func.toInt(year) + 1) + "0101000000";
  62. Date et = Func.parse(etStr, ConcurrentDateFormat.of("yyyyMMddHHmmss"));
  63. log.info("查询时间 {} {}", Func.formatDateTime(st), Func.formatDateTime(et));
  64. dto.setYear(year);
  65. int index = 1;
  66. while (true) {
  67. try {
  68. Query query = new Query();
  69. query.setSize(100);
  70. query.setCurrent(index);
  71. IPage<EtlMpQrEntity> pages = getEtlMpQrDataService().selectPage(Condition.getPage(query), dto);
  72. List<EtlMpQrEntity> list = pages.getRecords();
  73. if (Func.isNull(list)) {
  74. break;
  75. }
  76. if (list.size() == 0) {
  77. break;
  78. }
  79. for (EtlMpQrEntity etlMpQrEntity : list) {
  80. log.info("点位 {} 时间 {}", etlMpQrEntity.getMpCd(), etlMpQrEntity.getTm());
  81. if (Func.notNull(etlMpQrEntity.getMpCd()) && Func.notNull(etlMpQrEntity.getTm())) {
  82. LambdaQueryWrapper<WaterFlowRateHistoryInfoEntity> wrapper = Wrappers.<WaterFlowRateHistoryInfoEntity>query().lambda();
  83. wrapper.eq(WaterFlowRateHistoryInfoEntity::getMpcd, etlMpQrEntity.getMpCd());
  84. wrapper.eq(WaterFlowRateHistoryInfoEntity::getTm, Func.parse(etlMpQrEntity.getTm(), ConcurrentDateFormat.of("yyyyMMddHHmmss")));
  85. long count = getWaterFlowRateHistoryService().count(wrapper);
  86. if (count == 0) {
  87. WaterFlowRateHistoryInfoEntity entity = new WaterFlowRateHistoryInfoEntity();
  88. entity.setMpcd(etlMpQrEntity.getMpCd());
  89. String tm = etlMpQrEntity.getTm();
  90. entity.setTm(Func.parse(tm, ConcurrentDateFormat.of("yyyyMMddHHmmss")));
  91. if (Func.notNull(etlMpQrEntity.getMpq())) {
  92. entity.setMpq(Func.toDouble(etlMpQrEntity.getMpq()));
  93. }
  94. if (Func.notNull(etlMpQrEntity.getAccw())) {
  95. entity.setAccw(Func.toLong(etlMpQrEntity.getAccw()));
  96. }
  97. if (Func.notNull(etlMpQrEntity.getAccPqw())) {
  98. entity.setAccpqw(Func.toLong(etlMpQrEntity.getAccPqw()));
  99. }
  100. if (Func.notNull(etlMpQrEntity.getInStpq())) {
  101. entity.setInstpq(Func.toDouble(etlMpQrEntity.getInStpq()));
  102. }
  103. if (Func.notNull(etlMpQrEntity.getAccPqw())) {
  104. entity.setAccpqw(Func.toLong(etlMpQrEntity.getAccPqw()));
  105. }
  106. if (Func.notNull(etlMpQrEntity.getSpeRegData())) {
  107. entity.setSperegdata(etlMpQrEntity.getSpeRegData());
  108. }
  109. if (Func.notNull(etlMpQrEntity.getHourw())) {
  110. entity.setHourw(Func.toLong(etlMpQrEntity.getHourw()));
  111. }
  112. String ts = etlMpQrEntity.getTs();
  113. if (Func.notNull(ts)) {
  114. entity.setTs(Func.parse(ts, ConcurrentDateFormat.of("yyyyMMddHHmmss")));
  115. }
  116. getWaterFlowRateHistoryService().save(entity);
  117. }
  118. }
  119. }
  120. index += 1;
  121. } catch (Exception e) {
  122. e.printStackTrace();
  123. log.error(e.getMessage());
  124. }
  125. }
  126. log.info("{}年历史数据导入任务完成", year);
  127. } catch (Exception e) {
  128. e.printStackTrace();
  129. log.error("历史数据导入任务异常 {}", e.getMessage());
  130. }
  131. return 0;
  132. }
  133. }