DataShareProcessor.java 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. /**
  2. * Copyright 2019 DH
  3. * All right reserved.
  4. * 项目名称: 运维系统
  5. * 创建日期:2023/11/16
  6. */
  7. package org.springblade.jobtask;
  8. import com.alibaba.fastjson.JSONObject;
  9. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  10. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.springblade.constant.BusinessConstant;
  13. import org.springblade.core.tool.utils.Func;
  14. import org.springblade.enums.IsValidEnum;
  15. import org.springblade.modules.baseinfo.monitorpoint.entity.MonitorPointRelEntity;
  16. import org.springblade.modules.baseinfo.monitorpoint.service.IMonitorPointRelService;
  17. import org.springblade.modules.baseinfo.monitorsite.dto.MonitorSiteInfoDTO;
  18. import org.springblade.modules.baseinfo.monitorsite.entity.MonitorSiteInfoEntity;
  19. import org.springblade.modules.baseinfo.monitorsite.entity.MonitorSiteRelEntity;
  20. import org.springblade.modules.baseinfo.monitorsite.service.IMonitorSiteRelService;
  21. import org.springblade.modules.baseinfo.monitorsite.service.IMonitorSiteService;
  22. import org.springblade.modules.baseinfo.wateruseconsumer.entity.WaterUseConsumerInfoEntity;
  23. import org.springblade.modules.baseinfo.wateruseconsumer.service.IWaterUseConsumerService;
  24. import org.springblade.modules.business.monitor.entity.WaterFlowRateInfoEntity;
  25. import org.springblade.modules.business.monitor.service.IWaterFlowRateHistoryService;
  26. import org.springblade.modules.business.monitor.service.IWaterFlowRateService;
  27. import org.springblade.modules.share.entity.DataShareLogEntity;
  28. import org.springblade.modules.share.service.IDataShareLogServcie;
  29. import org.springblade.utils.HttpRequestUtil;
  30. import org.springframework.beans.factory.annotation.Value;
  31. import org.springframework.data.redis.core.HashOperations;
  32. import org.springframework.data.redis.core.RedisTemplate;
  33. import org.springframework.scheduling.annotation.EnableScheduling;
  34. import org.springframework.scheduling.annotation.Scheduled;
  35. import org.springframework.stereotype.Component;
  36. import javax.annotation.Resource;
  37. import java.util.*;
  38. import java.util.concurrent.Callable;
  39. import java.util.concurrent.FutureTask;
  40. /***
  41. * Date:2023/11/16
  42. * Title:文件所属模块(必须填写)
  43. * Description:对本文件的详细描述,原则上不能少于30字
  44. * @author dylan
  45. * @version 1.0
  46. * Remark:认为有必要的其他信息
  47. */
  48. @Slf4j
  49. @Component
  50. @EnableScheduling
  51. public class DataShareProcessor {
  52. @Resource
  53. private TaskManager taskManager;
  54. @Resource
  55. private IMonitorSiteService monitorSiteService;
  56. @Resource
  57. private IMonitorPointRelService monitorPointRelService;
  58. @Resource
  59. private IMonitorSiteRelService monitorSiteRelService;
  60. @Resource
  61. private IWaterUseConsumerService waterUseConsumerService;
  62. @Resource
  63. private IWaterFlowRateService waterFlowRateService;
  64. @Resource
  65. private IWaterFlowRateHistoryService waterFlowRateHistoryService;
  66. @Resource
  67. private IDataShareLogServcie dataShareLogServcie;
  68. @Resource
  69. private RedisTemplate<String, String> redisTemplate;
  70. @Value("${spring.task-config.data-share-server}")
  71. private String dataShareServer;
  72. /**
  73. * 超时时间,单位:毫秒
  74. */
  75. private int nTimeOut = 20 * 1000;
  76. /**
  77. * 计量设施数据同步,每天执行一次
  78. */
  79. @Scheduled(cron = "10 0 0 * * ?")
  80. public void siteShareDataTaskScheduled() {
  81. try {
  82. Date dt = new Date();
  83. log.info("任务创建 {}", Func.formatDateTime(dt));
  84. SiteShareDataTask task = new SiteShareDataTask(dt);
  85. FutureTask<Integer> futureTask = new FutureTask<>(task);
  86. taskManager.submitTask(futureTask);
  87. } catch (Exception e) {
  88. log.error("{}", e.getMessage());
  89. }
  90. }
  91. /**
  92. * 实时流量数据同步,每小时执行一次
  93. */
  94. @Scheduled(cron = "30 0 0/1 * * ?")
  95. public void waterFlowRateShareDataTaskScheduled() {
  96. try {
  97. Date dt = new Date();
  98. log.info("任务创建 {}", Func.formatDateTime(dt));
  99. WaterFlowRateShareDataTask task = new WaterFlowRateShareDataTask(dt);
  100. FutureTask<Integer> futureTask = new FutureTask<>(task);
  101. taskManager.submitTask(futureTask);
  102. } catch (Exception e) {
  103. log.error("{}", e.getMessage());
  104. }
  105. }
  106. private class SiteShareDataTask implements Callable<Integer> {
  107. private Date countTime;
  108. public SiteShareDataTask(Date countTime) {
  109. this.countTime = countTime;
  110. }
  111. @Override
  112. public Integer call() throws Exception {
  113. try {
  114. log.info("计量设施同步任务开始执行 {}", Func.formatDateTime(countTime));
  115. List<JSONObject> datas = new ArrayList<>();
  116. MonitorSiteInfoDTO dto = new MonitorSiteInfoDTO();
  117. dto.setDataShareActive(IsValidEnum.VALID_2.getCode());
  118. List<MonitorSiteInfoEntity> list = monitorSiteService.selectAll(dto);
  119. if (Func.notNull(list)) {
  120. for (MonitorSiteInfoEntity entity : list) {
  121. LambdaQueryWrapper<MonitorSiteRelEntity> wrapper = Wrappers.<MonitorSiteRelEntity>query().lambda();
  122. wrapper.eq(MonitorSiteRelEntity::getWiustCd, entity.getWiustCd());
  123. MonitorSiteRelEntity siteRelEntity = monitorSiteRelService.getOne(wrapper);
  124. if (Func.notNull(siteRelEntity)) {
  125. LambdaQueryWrapper<WaterUseConsumerInfoEntity> useWrapper = Wrappers.<WaterUseConsumerInfoEntity>query().lambda();
  126. useWrapper.eq(WaterUseConsumerInfoEntity::getWiuCd, siteRelEntity.getWiuCd());
  127. WaterUseConsumerInfoEntity waterUseConsumerInfoEntity = waterUseConsumerService.getOne(useWrapper);
  128. if (Func.notNull(waterUseConsumerInfoEntity) && Func.notNull(waterUseConsumerInfoEntity.getAdlCd())) {
  129. JSONObject data = new JSONObject();
  130. data.put("id", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd());
  131. data.put("measName", entity.getWiustNm());
  132. //计量方式代码,参考输入参数说明下的字典项(以电折水计量涉及的样本井、普通井选用其他计量-以电量、柴油和其他动力消耗折算水量)
  133. data.put("measTypeCode", "24");
  134. //计量方式
  135. data.put("measType", "7");
  136. //使用状态 5、正常;4故障;3停用
  137. data.put("status", "5");
  138. //是否有效字段,1有效,0无效(删除的数据)新增时不传默认为1
  139. data.put("flagValid", "1");
  140. //传输方式代码,1、在线计量;2、非在线计量样本井的话传1,普通井传2
  141. data.put("transModeCode", "1");
  142. //取水地址行政编码,县级行政区划代码
  143. data.put("intLocDivCode", waterUseConsumerInfoEntity.getAdlCd());
  144. //是否修改行政区划代码 ,修改时,默认传1,不传不修改行政区划。
  145. data.put("isAddvcd", "1");
  146. //首次发证证件编号/证照编号
  147. data.put("ecBaseInfoId", entity.getSiteCertId());
  148. //数据接收路径 ,1.部级 2.省级(默认省级)(传编码)
  149. data.put("dataReceivePath", "2");
  150. //数据来源类型,1、国控转计量; 2、用户安装的计量设施;3、中央专项资金;
  151. data.put("source", entity.getSiteOwner());
  152. //数据来源类型,如要修改已传输的计量设施数据来源,维护该字段
  153. data.put("updateSource", entity.getSiteOwner());
  154. datas.add(data);
  155. }
  156. }
  157. }
  158. }
  159. if (datas.size() > 0) {
  160. HashOperations<String, String, String> hashops = redisTemplate.opsForHash();
  161. String date = hashops.get("datashare", "date");
  162. int dsBatchCount = 1;
  163. if (Func.isNull(date)) {
  164. date = Func.formatDate(countTime);
  165. } else {
  166. String today = Func.formatDate(countTime);
  167. if (!today.equals(date)) {
  168. date = today;
  169. dsBatchCount = 1;
  170. } else {
  171. String dsBatch = hashops.get("datashare", "dsBatch");
  172. if (Func.isNull(dsBatch)) {
  173. dsBatchCount = 1;
  174. } else {
  175. dsBatchCount = Func.toInt(dsBatch) + 1;
  176. }
  177. }
  178. }
  179. String dsBatch = BusinessConstant.DATA_SHARE_AREA + "-" + date + "-" + String.format("%04d", dsBatchCount);
  180. JSONObject post = new JSONObject();
  181. post.put("dsBatch", dsBatch);
  182. post.put("timestamp", countTime);
  183. post.put("data", datas);
  184. log.info("json post {}", post.toJSONString());
  185. String url = dataShareServer + "/wr_web_manager/api/wr/datasync/fac/infoinsertOrUpdate";
  186. // 请求头
  187. Map<String, String> headers = new HashMap<>();
  188. headers.put("area", BusinessConstant.DATA_SHARE_AREA);
  189. headers.put("key", BusinessConstant.DATA_SHARE_KEY);
  190. JSONObject res = HttpRequestUtil.doHttpPostRequest("计量设施数据同步", url, headers, post.toJSONString(), nTimeOut);
  191. if (Func.notNull(res)) {
  192. log.info("接口返回 {}", res.toJSONString());
  193. }
  194. //更新同步批次
  195. hashops.put("datashare", "date", date);
  196. hashops.put("datashare", "dsBatch", Func.toStr(dsBatchCount));
  197. //记录请求日志
  198. DataShareLogEntity logEntity = new DataShareLogEntity();
  199. logEntity.setRequestDesc("计量设施数据同步");
  200. logEntity.setCountTime(countTime);
  201. logEntity.setPostUrl(url);
  202. if (Func.notNull(res)) {
  203. logEntity.setRequestResult(res.toJSONString());
  204. }
  205. int timeLong = (int) (System.currentTimeMillis() - countTime.getTime());
  206. logEntity.setTaskTimeLong(timeLong);
  207. dataShareLogServcie.save(logEntity);
  208. }
  209. } catch (Exception e) {
  210. e.printStackTrace();
  211. log.error("计量设施同步任务异常 {}", e.getMessage());
  212. } finally {
  213. log.info("计量设施同步任务完成 {} 毫秒", System.currentTimeMillis() - countTime.getTime());
  214. }
  215. return 0;
  216. }
  217. }
  218. private class WaterFlowRateShareDataTask implements Callable<Integer> {
  219. private Date countTime;
  220. public WaterFlowRateShareDataTask(Date countTime) {
  221. this.countTime = countTime;
  222. }
  223. @Override
  224. public Integer call() throws Exception {
  225. try {
  226. log.info("实时流量同步任务开始执行 {}", Func.formatDateTime(countTime));
  227. List<JSONObject> datas = new ArrayList<>();
  228. MonitorSiteInfoDTO dto = new MonitorSiteInfoDTO();
  229. dto.setDataShareActive(IsValidEnum.VALID_2.getCode());
  230. List<MonitorSiteInfoEntity> list = monitorSiteService.selectAll(dto);
  231. if (Func.notNull(list)) {
  232. for (MonitorSiteInfoEntity entity : list) {
  233. LambdaQueryWrapper<MonitorPointRelEntity> wrapper = Wrappers.<MonitorPointRelEntity>query().lambda();
  234. wrapper.eq(MonitorPointRelEntity::getWiustCd, entity.getWiustCd());
  235. List<MonitorPointRelEntity> points = monitorPointRelService.list(wrapper);
  236. if (Func.notNull(points) && points.size() > 0) {
  237. LambdaQueryWrapper<WaterFlowRateInfoEntity> dataWrapper = Wrappers.<WaterFlowRateInfoEntity>query().lambda();
  238. dataWrapper.eq(WaterFlowRateInfoEntity::getMpcd, points.get(0).getMpCd());
  239. WaterFlowRateInfoEntity waterFlowRateInfoEntity = waterFlowRateService.getOne(dataWrapper);
  240. if (Func.notNull(waterFlowRateInfoEntity)) {
  241. JSONObject data = new JSONObject();
  242. // data.put("id", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd());
  243. data.put("mpCd", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd());
  244. double dayAccw = waterFlowRateHistoryService.getDayAccw(points.get(0).getMpCd());
  245. data.put("dayW", dayAccw);
  246. Calendar calendar = Calendar.getInstance();
  247. calendar.setTime(countTime);
  248. calendar.set(Calendar.HOUR_OF_DAY,0);
  249. calendar.set(Calendar.MINUTE,0);
  250. calendar.set(Calendar.SECOND,0);
  251. calendar.set(Calendar.MILLISECOND,0);
  252. calendar.add(Calendar.DAY_OF_MONTH,-1);
  253. data.put("dt", Func.formatDate(calendar.getTime()));
  254. data.put("inTime", Func.formatDate(calendar.getTime()));
  255. datas.add(data);
  256. } else {
  257. JSONObject data = new JSONObject();
  258. // data.put("id", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd());
  259. data.put("mpCd", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd());
  260. data.put("dayW", 0.0);
  261. Calendar calendar = Calendar.getInstance();
  262. calendar.setTime(countTime);
  263. calendar.set(Calendar.HOUR_OF_DAY,0);
  264. calendar.set(Calendar.MINUTE,0);
  265. calendar.set(Calendar.SECOND,0);
  266. calendar.set(Calendar.MILLISECOND,0);
  267. calendar.add(Calendar.DAY_OF_MONTH,-1);
  268. data.put("dt", Func.formatDate(calendar.getTime()));
  269. data.put("inTime", Func.formatDate(calendar.getTime()));
  270. datas.add(data);
  271. }
  272. }
  273. }
  274. }
  275. if (datas.size() > 0) {
  276. HashOperations<String, String, String> hashops = redisTemplate.opsForHash();
  277. String date = hashops.get("datashare", "date");
  278. int dsBatchCount = 1;
  279. if (Func.isNull(date)) {
  280. date = Func.formatDate(countTime);
  281. } else {
  282. String today = Func.formatDate(countTime);
  283. if (!today.equals(date)) {
  284. date = today;
  285. dsBatchCount = 1;
  286. } else {
  287. String dsBatch = hashops.get("datashare", "dsBatch");
  288. if (Func.isNull(dsBatch)) {
  289. dsBatchCount = 1;
  290. } else {
  291. dsBatchCount = Func.toInt(dsBatch) + 1;
  292. }
  293. }
  294. }
  295. String dsBatch = BusinessConstant.DATA_SHARE_AREA + "-" + date + "-" + String.format("%04d", dsBatchCount);
  296. JSONObject post = new JSONObject();
  297. post.put("dsBatch", dsBatch);
  298. post.put("timestamp", countTime);
  299. post.put("data", datas);
  300. log.info("json post {}", post.toJSONString());
  301. String url = dataShareServer + "/wr_web_manager/api/wr/datasync/wiwrDayWr/insertOrUpdate";
  302. // 请求头
  303. Map<String, String> headers = new HashMap<>();
  304. headers.put("area", BusinessConstant.DATA_SHARE_AREA);
  305. headers.put("key", BusinessConstant.DATA_SHARE_KEY);
  306. JSONObject res = HttpRequestUtil.doHttpPostRequest("实时流量数据同步", url, headers, post.toJSONString(), nTimeOut);
  307. if (Func.notNull(res)) {
  308. log.info("接口返回 {}", res.toJSONString());
  309. }
  310. //更新同步批次
  311. hashops.put("datashare", "date", date);
  312. hashops.put("datashare", "dsBatch", Func.toStr(dsBatchCount));
  313. //记录请求日志
  314. DataShareLogEntity logEntity = new DataShareLogEntity();
  315. logEntity.setRequestDesc("实时流量数据同步");
  316. logEntity.setCountTime(countTime);
  317. logEntity.setPostUrl(url);
  318. if (Func.notNull(res)) {
  319. logEntity.setRequestResult(res.toJSONString());
  320. }
  321. int timeLong = (int) (System.currentTimeMillis() - countTime.getTime());
  322. logEntity.setTaskTimeLong(timeLong);
  323. dataShareLogServcie.save(logEntity);
  324. }
  325. } catch (Exception e) {
  326. // e.printStackTrace();
  327. log.error("实时流量同步任务异常 {}", e.getMessage());
  328. //记录请求日志
  329. DataShareLogEntity logEntity = new DataShareLogEntity();
  330. logEntity.setRequestDesc("实时流量数据同步");
  331. logEntity.setCountTime(countTime);
  332. logEntity.setPostUrl(dataShareServer + "/wr_web_manager/api/wr/datasync/wiwrDayWr/insertOrUpdate");
  333. logEntity.setRequestResult("实时流量同步任务异常 "+e.getMessage());
  334. int timeLong = (int) (System.currentTimeMillis() - countTime.getTime());
  335. logEntity.setTaskTimeLong(timeLong);
  336. dataShareLogServcie.save(logEntity);
  337. } finally {
  338. log.info("实时流量同步任务完成 {} 毫秒", System.currentTimeMillis() - countTime.getTime());
  339. }
  340. return 0;
  341. }
  342. }
  343. }