| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367 |
- /**
- * Copyright 2019 DH
- * All right reserved.
- * 项目名称: 运维系统
- * 创建日期:2023/11/16
- */
- package org.springblade.jobtask;
- import com.alibaba.fastjson.JSONObject;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.toolkit.Wrappers;
- import lombok.extern.slf4j.Slf4j;
- import org.springblade.constant.BusinessConstant;
- import org.springblade.core.tool.utils.Func;
- import org.springblade.enums.IsValidEnum;
- import org.springblade.modules.baseinfo.monitorpoint.entity.MonitorPointRelEntity;
- import org.springblade.modules.baseinfo.monitorpoint.service.IMonitorPointRelService;
- import org.springblade.modules.baseinfo.monitorsite.dto.MonitorSiteInfoDTO;
- import org.springblade.modules.baseinfo.monitorsite.entity.MonitorSiteInfoEntity;
- import org.springblade.modules.baseinfo.monitorsite.entity.MonitorSiteRelEntity;
- import org.springblade.modules.baseinfo.monitorsite.service.IMonitorSiteRelService;
- import org.springblade.modules.baseinfo.monitorsite.service.IMonitorSiteService;
- import org.springblade.modules.baseinfo.wateruseconsumer.entity.WaterUseConsumerInfoEntity;
- import org.springblade.modules.baseinfo.wateruseconsumer.service.IWaterUseConsumerService;
- import org.springblade.modules.business.monitor.entity.WaterFlowRateInfoEntity;
- import org.springblade.modules.business.monitor.service.IWaterFlowRateHistoryService;
- import org.springblade.modules.business.monitor.service.IWaterFlowRateService;
- import org.springblade.modules.share.entity.DataShareLogEntity;
- import org.springblade.modules.share.service.IDataShareLogServcie;
- import org.springblade.utils.HttpRequestUtil;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.data.redis.core.HashOperations;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.scheduling.annotation.EnableScheduling;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- import java.util.*;
- import java.util.concurrent.Callable;
- import java.util.concurrent.FutureTask;
- /***
- * Date:2023/11/16
- * Title:文件所属模块(必须填写)
- * Description:对本文件的详细描述,原则上不能少于30字
- * @author dylan
- * @version 1.0
- * Remark:认为有必要的其他信息
- */
- @Slf4j
- @Component
- @EnableScheduling
- public class DataShareProcessor {
- @Resource
- private TaskManager taskManager;
- @Resource
- private IMonitorSiteService monitorSiteService;
- @Resource
- private IMonitorPointRelService monitorPointRelService;
- @Resource
- private IMonitorSiteRelService monitorSiteRelService;
- @Resource
- private IWaterUseConsumerService waterUseConsumerService;
- @Resource
- private IWaterFlowRateService waterFlowRateService;
- @Resource
- private IWaterFlowRateHistoryService waterFlowRateHistoryService;
- @Resource
- private IDataShareLogServcie dataShareLogServcie;
- @Resource
- private RedisTemplate<String, String> redisTemplate;
- @Value("${spring.task-config.data-share-server}")
- private String dataShareServer;
- /**
- * 超时时间,单位:毫秒
- */
- private int nTimeOut = 20 * 1000;
- /**
- * 计量设施数据同步,每天执行一次
- */
- @Scheduled(cron = "10 0 0 * * ?")
- public void siteShareDataTaskScheduled() {
- try {
- Date dt = new Date();
- log.info("任务创建 {}", Func.formatDateTime(dt));
- SiteShareDataTask task = new SiteShareDataTask(dt);
- FutureTask<Integer> futureTask = new FutureTask<>(task);
- taskManager.submitTask(futureTask);
- } catch (Exception e) {
- log.error("{}", e.getMessage());
- }
- }
- /**
- * 实时流量数据同步,每小时执行一次
- */
- @Scheduled(cron = "30 0 0/1 * * ?")
- public void waterFlowRateShareDataTaskScheduled() {
- try {
- Date dt = new Date();
- log.info("任务创建 {}", Func.formatDateTime(dt));
- WaterFlowRateShareDataTask task = new WaterFlowRateShareDataTask(dt);
- FutureTask<Integer> futureTask = new FutureTask<>(task);
- taskManager.submitTask(futureTask);
- } catch (Exception e) {
- log.error("{}", e.getMessage());
- }
- }
- private class SiteShareDataTask implements Callable<Integer> {
- private Date countTime;
- public SiteShareDataTask(Date countTime) {
- this.countTime = countTime;
- }
- @Override
- public Integer call() throws Exception {
- try {
- log.info("计量设施同步任务开始执行 {}", Func.formatDateTime(countTime));
- List<JSONObject> datas = new ArrayList<>();
- MonitorSiteInfoDTO dto = new MonitorSiteInfoDTO();
- dto.setDataShareActive(IsValidEnum.VALID_2.getCode());
- List<MonitorSiteInfoEntity> list = monitorSiteService.selectAll(dto);
- if (Func.notNull(list)) {
- for (MonitorSiteInfoEntity entity : list) {
- LambdaQueryWrapper<MonitorSiteRelEntity> wrapper = Wrappers.<MonitorSiteRelEntity>query().lambda();
- wrapper.eq(MonitorSiteRelEntity::getWiustCd, entity.getWiustCd());
- MonitorSiteRelEntity siteRelEntity = monitorSiteRelService.getOne(wrapper);
- if (Func.notNull(siteRelEntity)) {
- LambdaQueryWrapper<WaterUseConsumerInfoEntity> useWrapper = Wrappers.<WaterUseConsumerInfoEntity>query().lambda();
- useWrapper.eq(WaterUseConsumerInfoEntity::getWiuCd, siteRelEntity.getWiuCd());
- WaterUseConsumerInfoEntity waterUseConsumerInfoEntity = waterUseConsumerService.getOne(useWrapper);
- if (Func.notNull(waterUseConsumerInfoEntity) && Func.notNull(waterUseConsumerInfoEntity.getAdlCd())) {
- JSONObject data = new JSONObject();
- data.put("id", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd());
- data.put("measName", entity.getWiustNm());
- //计量方式代码,参考输入参数说明下的字典项(以电折水计量涉及的样本井、普通井选用其他计量-以电量、柴油和其他动力消耗折算水量)
- data.put("measTypeCode", "24");
- //计量方式
- data.put("measType", "7");
- //使用状态 5、正常;4故障;3停用
- data.put("status", "5");
- //是否有效字段,1有效,0无效(删除的数据)新增时不传默认为1
- data.put("flagValid", "1");
- //传输方式代码,1、在线计量;2、非在线计量样本井的话传1,普通井传2
- data.put("transModeCode", "1");
- //取水地址行政编码,县级行政区划代码
- data.put("intLocDivCode", waterUseConsumerInfoEntity.getAdlCd());
- //是否修改行政区划代码 ,修改时,默认传1,不传不修改行政区划。
- data.put("isAddvcd", "1");
- //首次发证证件编号/证照编号
- data.put("ecBaseInfoId", entity.getSiteCertId());
- //数据接收路径 ,1.部级 2.省级(默认省级)(传编码)
- data.put("dataReceivePath", "2");
- //数据来源类型,1、国控转计量; 2、用户安装的计量设施;3、中央专项资金;
- data.put("source", entity.getSiteOwner());
- //数据来源类型,如要修改已传输的计量设施数据来源,维护该字段
- data.put("updateSource", entity.getSiteOwner());
- datas.add(data);
- }
- }
- }
- }
- if (datas.size() > 0) {
- HashOperations<String, String, String> hashops = redisTemplate.opsForHash();
- String date = hashops.get("datashare", "date");
- int dsBatchCount = 1;
- if (Func.isNull(date)) {
- date = Func.formatDate(countTime);
- } else {
- String today = Func.formatDate(countTime);
- if (!today.equals(date)) {
- date = today;
- dsBatchCount = 1;
- } else {
- String dsBatch = hashops.get("datashare", "dsBatch");
- if (Func.isNull(dsBatch)) {
- dsBatchCount = 1;
- } else {
- dsBatchCount = Func.toInt(dsBatch) + 1;
- }
- }
- }
- String dsBatch = BusinessConstant.DATA_SHARE_AREA + "-" + date + "-" + String.format("%04d", dsBatchCount);
- JSONObject post = new JSONObject();
- post.put("dsBatch", dsBatch);
- post.put("timestamp", countTime);
- post.put("data", datas);
- log.info("json post {}", post.toJSONString());
- String url = dataShareServer + "/wr_web_manager/api/wr/datasync/fac/infoinsertOrUpdate";
- // 请求头
- Map<String, String> headers = new HashMap<>();
- headers.put("area", BusinessConstant.DATA_SHARE_AREA);
- headers.put("key", BusinessConstant.DATA_SHARE_KEY);
- JSONObject res = HttpRequestUtil.doHttpPostRequest("计量设施数据同步", url, headers, post.toJSONString(), nTimeOut);
- if (Func.notNull(res)) {
- log.info("接口返回 {}", res.toJSONString());
- }
- //更新同步批次
- hashops.put("datashare", "date", date);
- hashops.put("datashare", "dsBatch", Func.toStr(dsBatchCount));
- //记录请求日志
- DataShareLogEntity logEntity = new DataShareLogEntity();
- logEntity.setRequestDesc("计量设施数据同步");
- logEntity.setCountTime(countTime);
- logEntity.setPostUrl(url);
- if (Func.notNull(res)) {
- logEntity.setRequestResult(res.toJSONString());
- }
- int timeLong = (int) (System.currentTimeMillis() - countTime.getTime());
- logEntity.setTaskTimeLong(timeLong);
- dataShareLogServcie.save(logEntity);
- }
- } catch (Exception e) {
- e.printStackTrace();
- log.error("计量设施同步任务异常 {}", e.getMessage());
- } finally {
- log.info("计量设施同步任务完成 {} 毫秒", System.currentTimeMillis() - countTime.getTime());
- }
- return 0;
- }
- }
- private class WaterFlowRateShareDataTask implements Callable<Integer> {
- private Date countTime;
- public WaterFlowRateShareDataTask(Date countTime) {
- this.countTime = countTime;
- }
- @Override
- public Integer call() throws Exception {
- try {
- log.info("实时流量同步任务开始执行 {}", Func.formatDateTime(countTime));
- List<JSONObject> datas = new ArrayList<>();
- MonitorSiteInfoDTO dto = new MonitorSiteInfoDTO();
- dto.setDataShareActive(IsValidEnum.VALID_2.getCode());
- List<MonitorSiteInfoEntity> list = monitorSiteService.selectAll(dto);
- if (Func.notNull(list)) {
- for (MonitorSiteInfoEntity entity : list) {
- LambdaQueryWrapper<MonitorPointRelEntity> wrapper = Wrappers.<MonitorPointRelEntity>query().lambda();
- wrapper.eq(MonitorPointRelEntity::getWiustCd, entity.getWiustCd());
- List<MonitorPointRelEntity> points = monitorPointRelService.list(wrapper);
- if (Func.notNull(points) && points.size() > 0) {
- LambdaQueryWrapper<WaterFlowRateInfoEntity> dataWrapper = Wrappers.<WaterFlowRateInfoEntity>query().lambda();
- dataWrapper.eq(WaterFlowRateInfoEntity::getMpcd, points.get(0).getMpCd());
- WaterFlowRateInfoEntity waterFlowRateInfoEntity = waterFlowRateService.getOne(dataWrapper);
- if (Func.notNull(waterFlowRateInfoEntity)) {
- JSONObject data = new JSONObject();
- // data.put("id", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd());
- data.put("mpCd", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd());
- double dayAccw = waterFlowRateHistoryService.getDayAccw(points.get(0).getMpCd());
- data.put("dayW", dayAccw);
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(countTime);
- calendar.set(Calendar.HOUR_OF_DAY,0);
- calendar.set(Calendar.MINUTE,0);
- calendar.set(Calendar.SECOND,0);
- calendar.set(Calendar.MILLISECOND,0);
- calendar.add(Calendar.DAY_OF_MONTH,-1);
- data.put("dt", Func.formatDate(calendar.getTime()));
- data.put("inTime", Func.formatDate(calendar.getTime()));
- datas.add(data);
- } else {
- JSONObject data = new JSONObject();
- // data.put("id", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd());
- data.put("mpCd", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd());
- data.put("dayW", 0.0);
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(countTime);
- calendar.set(Calendar.HOUR_OF_DAY,0);
- calendar.set(Calendar.MINUTE,0);
- calendar.set(Calendar.SECOND,0);
- calendar.set(Calendar.MILLISECOND,0);
- calendar.add(Calendar.DAY_OF_MONTH,-1);
- data.put("dt", Func.formatDate(calendar.getTime()));
- data.put("inTime", Func.formatDate(calendar.getTime()));
- datas.add(data);
- }
- }
- }
- }
- if (datas.size() > 0) {
- HashOperations<String, String, String> hashops = redisTemplate.opsForHash();
- String date = hashops.get("datashare", "date");
- int dsBatchCount = 1;
- if (Func.isNull(date)) {
- date = Func.formatDate(countTime);
- } else {
- String today = Func.formatDate(countTime);
- if (!today.equals(date)) {
- date = today;
- dsBatchCount = 1;
- } else {
- String dsBatch = hashops.get("datashare", "dsBatch");
- if (Func.isNull(dsBatch)) {
- dsBatchCount = 1;
- } else {
- dsBatchCount = Func.toInt(dsBatch) + 1;
- }
- }
- }
- String dsBatch = BusinessConstant.DATA_SHARE_AREA + "-" + date + "-" + String.format("%04d", dsBatchCount);
- JSONObject post = new JSONObject();
- post.put("dsBatch", dsBatch);
- post.put("timestamp", countTime);
- post.put("data", datas);
- log.info("json post {}", post.toJSONString());
- String url = dataShareServer + "/wr_web_manager/api/wr/datasync/wiwrDayWr/insertOrUpdate";
- // 请求头
- Map<String, String> headers = new HashMap<>();
- headers.put("area", BusinessConstant.DATA_SHARE_AREA);
- headers.put("key", BusinessConstant.DATA_SHARE_KEY);
- JSONObject res = HttpRequestUtil.doHttpPostRequest("实时流量数据同步", url, headers, post.toJSONString(), nTimeOut);
- if (Func.notNull(res)) {
- log.info("接口返回 {}", res.toJSONString());
- }
- //更新同步批次
- hashops.put("datashare", "date", date);
- hashops.put("datashare", "dsBatch", Func.toStr(dsBatchCount));
- //记录请求日志
- DataShareLogEntity logEntity = new DataShareLogEntity();
- logEntity.setRequestDesc("实时流量数据同步");
- logEntity.setCountTime(countTime);
- logEntity.setPostUrl(url);
- if (Func.notNull(res)) {
- logEntity.setRequestResult(res.toJSONString());
- }
- int timeLong = (int) (System.currentTimeMillis() - countTime.getTime());
- logEntity.setTaskTimeLong(timeLong);
- dataShareLogServcie.save(logEntity);
- }
- } catch (Exception e) {
- // e.printStackTrace();
- log.error("实时流量同步任务异常 {}", e.getMessage());
- //记录请求日志
- DataShareLogEntity logEntity = new DataShareLogEntity();
- logEntity.setRequestDesc("实时流量数据同步");
- logEntity.setCountTime(countTime);
- logEntity.setPostUrl(dataShareServer + "/wr_web_manager/api/wr/datasync/wiwrDayWr/insertOrUpdate");
- logEntity.setRequestResult("实时流量同步任务异常 "+e.getMessage());
- int timeLong = (int) (System.currentTimeMillis() - countTime.getTime());
- logEntity.setTaskTimeLong(timeLong);
- dataShareLogServcie.save(logEntity);
- } finally {
- log.info("实时流量同步任务完成 {} 毫秒", System.currentTimeMillis() - countTime.getTime());
- }
- return 0;
- }
- }
- }
|