/** * Copyright 2019 DH * All right reserved. * 项目名称: 运维系统 * 创建日期:2023/11/16 */ package org.springblade.jobtask; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.extern.slf4j.Slf4j; import org.springblade.constant.BusinessConstant; import org.springblade.core.tool.utils.Func; import org.springblade.enums.IsValidEnum; import org.springblade.modules.baseinfo.monitorpoint.entity.MonitorPointRelEntity; import org.springblade.modules.baseinfo.monitorpoint.service.IMonitorPointRelService; import org.springblade.modules.baseinfo.monitorsite.dto.MonitorSiteInfoDTO; import org.springblade.modules.baseinfo.monitorsite.entity.MonitorSiteInfoEntity; import org.springblade.modules.baseinfo.monitorsite.entity.MonitorSiteRelEntity; import org.springblade.modules.baseinfo.monitorsite.service.IMonitorSiteRelService; import org.springblade.modules.baseinfo.monitorsite.service.IMonitorSiteService; import org.springblade.modules.baseinfo.wateruseconsumer.entity.WaterUseConsumerInfoEntity; import org.springblade.modules.baseinfo.wateruseconsumer.service.IWaterUseConsumerService; import org.springblade.modules.business.monitor.entity.WaterFlowRateInfoEntity; import org.springblade.modules.business.monitor.service.IWaterFlowRateHistoryService; import org.springblade.modules.business.monitor.service.IWaterFlowRateService; import org.springblade.modules.share.entity.DataShareLogEntity; import org.springblade.modules.share.service.IDataShareLogServcie; import org.springblade.utils.HttpRequestUtil; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.HashOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; /*** * Date:2023/11/16 * Title:文件所属模块(必须填写) * Description:对本文件的详细描述,原则上不能少于30字 * @author dylan * @version 1.0 * Remark:认为有必要的其他信息 */ @Slf4j @Component @EnableScheduling public class DataShareProcessor { @Resource private TaskManager taskManager; @Resource private IMonitorSiteService monitorSiteService; @Resource private IMonitorPointRelService monitorPointRelService; @Resource private IMonitorSiteRelService monitorSiteRelService; @Resource private IWaterUseConsumerService waterUseConsumerService; @Resource private IWaterFlowRateService waterFlowRateService; @Resource private IWaterFlowRateHistoryService waterFlowRateHistoryService; @Resource private IDataShareLogServcie dataShareLogServcie; @Resource private RedisTemplate redisTemplate; @Value("${spring.task-config.data-share-server}") private String dataShareServer; /** * 超时时间,单位:毫秒 */ private int nTimeOut = 20 * 1000; /** * 计量设施数据同步,每天执行一次 */ @Scheduled(cron = "10 0 0 * * ?") public void siteShareDataTaskScheduled() { try { Date dt = new Date(); log.info("任务创建 {}", Func.formatDateTime(dt)); SiteShareDataTask task = new SiteShareDataTask(dt); FutureTask futureTask = new FutureTask<>(task); taskManager.submitTask(futureTask); } catch (Exception e) { log.error("{}", e.getMessage()); } } /** * 实时流量数据同步,每小时执行一次 */ @Scheduled(cron = "30 0 0/1 * * ?") public void waterFlowRateShareDataTaskScheduled() { try { Date dt = new Date(); log.info("任务创建 {}", Func.formatDateTime(dt)); WaterFlowRateShareDataTask task = new WaterFlowRateShareDataTask(dt); FutureTask futureTask = new FutureTask<>(task); taskManager.submitTask(futureTask); } catch (Exception e) { log.error("{}", e.getMessage()); } } private class SiteShareDataTask implements Callable { private Date countTime; public SiteShareDataTask(Date countTime) { this.countTime = countTime; } @Override public Integer call() throws Exception { try { log.info("计量设施同步任务开始执行 {}", Func.formatDateTime(countTime)); List datas = new ArrayList<>(); MonitorSiteInfoDTO dto = new MonitorSiteInfoDTO(); dto.setDataShareActive(IsValidEnum.VALID_2.getCode()); List list = monitorSiteService.selectAll(dto); if (Func.notNull(list)) { for (MonitorSiteInfoEntity entity : list) { LambdaQueryWrapper wrapper = Wrappers.query().lambda(); wrapper.eq(MonitorSiteRelEntity::getWiustCd, entity.getWiustCd()); MonitorSiteRelEntity siteRelEntity = monitorSiteRelService.getOne(wrapper); if (Func.notNull(siteRelEntity)) { LambdaQueryWrapper useWrapper = Wrappers.query().lambda(); useWrapper.eq(WaterUseConsumerInfoEntity::getWiuCd, siteRelEntity.getWiuCd()); WaterUseConsumerInfoEntity waterUseConsumerInfoEntity = waterUseConsumerService.getOne(useWrapper); if (Func.notNull(waterUseConsumerInfoEntity) && Func.notNull(waterUseConsumerInfoEntity.getAdlCd())) { JSONObject data = new JSONObject(); data.put("id", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd()); data.put("measName", entity.getWiustNm()); //计量方式代码,参考输入参数说明下的字典项(以电折水计量涉及的样本井、普通井选用其他计量-以电量、柴油和其他动力消耗折算水量) data.put("measTypeCode", "24"); //计量方式 data.put("measType", "7"); //使用状态 5、正常;4故障;3停用 data.put("status", "5"); //是否有效字段,1有效,0无效(删除的数据)新增时不传默认为1 data.put("flagValid", "1"); //传输方式代码,1、在线计量;2、非在线计量样本井的话传1,普通井传2 data.put("transModeCode", "1"); //取水地址行政编码,县级行政区划代码 data.put("intLocDivCode", waterUseConsumerInfoEntity.getAdlCd()); //是否修改行政区划代码 ,修改时,默认传1,不传不修改行政区划。 data.put("isAddvcd", "1"); //首次发证证件编号/证照编号 data.put("ecBaseInfoId", entity.getSiteCertId()); //数据接收路径 ,1.部级 2.省级(默认省级)(传编码) data.put("dataReceivePath", "2"); //数据来源类型,1、国控转计量; 2、用户安装的计量设施;3、中央专项资金; data.put("source", entity.getSiteOwner()); //数据来源类型,如要修改已传输的计量设施数据来源,维护该字段 data.put("updateSource", entity.getSiteOwner()); datas.add(data); } } } } if (datas.size() > 0) { HashOperations hashops = redisTemplate.opsForHash(); String date = hashops.get("datashare", "date"); int dsBatchCount = 1; if (Func.isNull(date)) { date = Func.formatDate(countTime); } else { String today = Func.formatDate(countTime); if (!today.equals(date)) { date = today; dsBatchCount = 1; } else { String dsBatch = hashops.get("datashare", "dsBatch"); if (Func.isNull(dsBatch)) { dsBatchCount = 1; } else { dsBatchCount = Func.toInt(dsBatch) + 1; } } } String dsBatch = BusinessConstant.DATA_SHARE_AREA + "-" + date + "-" + String.format("%04d", dsBatchCount); JSONObject post = new JSONObject(); post.put("dsBatch", dsBatch); post.put("timestamp", countTime); post.put("data", datas); log.info("json post {}", post.toJSONString()); String url = dataShareServer + "/wr_web_manager/api/wr/datasync/fac/infoinsertOrUpdate"; // 请求头 Map headers = new HashMap<>(); headers.put("area", BusinessConstant.DATA_SHARE_AREA); headers.put("key", BusinessConstant.DATA_SHARE_KEY); JSONObject res = HttpRequestUtil.doHttpPostRequest("计量设施数据同步", url, headers, post.toJSONString(), nTimeOut); if (Func.notNull(res)) { log.info("接口返回 {}", res.toJSONString()); } //更新同步批次 hashops.put("datashare", "date", date); hashops.put("datashare", "dsBatch", Func.toStr(dsBatchCount)); //记录请求日志 DataShareLogEntity logEntity = new DataShareLogEntity(); logEntity.setRequestDesc("计量设施数据同步"); logEntity.setCountTime(countTime); logEntity.setPostUrl(url); if (Func.notNull(res)) { logEntity.setRequestResult(res.toJSONString()); } int timeLong = (int) (System.currentTimeMillis() - countTime.getTime()); logEntity.setTaskTimeLong(timeLong); dataShareLogServcie.save(logEntity); } } catch (Exception e) { e.printStackTrace(); log.error("计量设施同步任务异常 {}", e.getMessage()); } finally { log.info("计量设施同步任务完成 {} 毫秒", System.currentTimeMillis() - countTime.getTime()); } return 0; } } private class WaterFlowRateShareDataTask implements Callable { private Date countTime; public WaterFlowRateShareDataTask(Date countTime) { this.countTime = countTime; } @Override public Integer call() throws Exception { try { log.info("实时流量同步任务开始执行 {}", Func.formatDateTime(countTime)); List datas = new ArrayList<>(); MonitorSiteInfoDTO dto = new MonitorSiteInfoDTO(); dto.setDataShareActive(IsValidEnum.VALID_2.getCode()); List list = monitorSiteService.selectAll(dto); if (Func.notNull(list)) { for (MonitorSiteInfoEntity entity : list) { LambdaQueryWrapper wrapper = Wrappers.query().lambda(); wrapper.eq(MonitorPointRelEntity::getWiustCd, entity.getWiustCd()); List points = monitorPointRelService.list(wrapper); if (Func.notNull(points) && points.size() > 0) { LambdaQueryWrapper dataWrapper = Wrappers.query().lambda(); dataWrapper.eq(WaterFlowRateInfoEntity::getMpcd, points.get(0).getMpCd()); WaterFlowRateInfoEntity waterFlowRateInfoEntity = waterFlowRateService.getOne(dataWrapper); if (Func.notNull(waterFlowRateInfoEntity)) { JSONObject data = new JSONObject(); // data.put("id", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd()); data.put("mpCd", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd()); double dayAccw = waterFlowRateHistoryService.getDayAccw(points.get(0).getMpCd()); data.put("dayW", dayAccw); Calendar calendar = Calendar.getInstance(); calendar.setTime(countTime); calendar.set(Calendar.HOUR_OF_DAY,0); calendar.set(Calendar.MINUTE,0); calendar.set(Calendar.SECOND,0); calendar.set(Calendar.MILLISECOND,0); calendar.add(Calendar.DAY_OF_MONTH,-1); data.put("dt", Func.formatDate(calendar.getTime())); data.put("inTime", Func.formatDate(calendar.getTime())); datas.add(data); } else { JSONObject data = new JSONObject(); // data.put("id", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd()); data.put("mpCd", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd()); data.put("dayW", 0.0); Calendar calendar = Calendar.getInstance(); calendar.setTime(countTime); calendar.set(Calendar.HOUR_OF_DAY,0); calendar.set(Calendar.MINUTE,0); calendar.set(Calendar.SECOND,0); calendar.set(Calendar.MILLISECOND,0); calendar.add(Calendar.DAY_OF_MONTH,-1); data.put("dt", Func.formatDate(calendar.getTime())); data.put("inTime", Func.formatDate(calendar.getTime())); datas.add(data); } } } } if (datas.size() > 0) { HashOperations hashops = redisTemplate.opsForHash(); String date = hashops.get("datashare", "date"); int dsBatchCount = 1; if (Func.isNull(date)) { date = Func.formatDate(countTime); } else { String today = Func.formatDate(countTime); if (!today.equals(date)) { date = today; dsBatchCount = 1; } else { String dsBatch = hashops.get("datashare", "dsBatch"); if (Func.isNull(dsBatch)) { dsBatchCount = 1; } else { dsBatchCount = Func.toInt(dsBatch) + 1; } } } String dsBatch = BusinessConstant.DATA_SHARE_AREA + "-" + date + "-" + String.format("%04d", dsBatchCount); JSONObject post = new JSONObject(); post.put("dsBatch", dsBatch); post.put("timestamp", countTime); post.put("data", datas); log.info("json post {}", post.toJSONString()); String url = dataShareServer + "/wr_web_manager/api/wr/datasync/wiwrDayWr/insertOrUpdate"; // 请求头 Map headers = new HashMap<>(); headers.put("area", BusinessConstant.DATA_SHARE_AREA); headers.put("key", BusinessConstant.DATA_SHARE_KEY); JSONObject res = HttpRequestUtil.doHttpPostRequest("实时流量数据同步", url, headers, post.toJSONString(), nTimeOut); if (Func.notNull(res)) { log.info("接口返回 {}", res.toJSONString()); } //更新同步批次 hashops.put("datashare", "date", date); hashops.put("datashare", "dsBatch", Func.toStr(dsBatchCount)); //记录请求日志 DataShareLogEntity logEntity = new DataShareLogEntity(); logEntity.setRequestDesc("实时流量数据同步"); logEntity.setCountTime(countTime); logEntity.setPostUrl(url); if (Func.notNull(res)) { logEntity.setRequestResult(res.toJSONString()); } int timeLong = (int) (System.currentTimeMillis() - countTime.getTime()); logEntity.setTaskTimeLong(timeLong); dataShareLogServcie.save(logEntity); } } catch (Exception e) { // e.printStackTrace(); log.error("实时流量同步任务异常 {}", e.getMessage()); //记录请求日志 DataShareLogEntity logEntity = new DataShareLogEntity(); logEntity.setRequestDesc("实时流量数据同步"); logEntity.setCountTime(countTime); logEntity.setPostUrl(dataShareServer + "/wr_web_manager/api/wr/datasync/wiwrDayWr/insertOrUpdate"); logEntity.setRequestResult("实时流量同步任务异常 "+e.getMessage()); int timeLong = (int) (System.currentTimeMillis() - countTime.getTime()); logEntity.setTaskTimeLong(timeLong); dataShareLogServcie.save(logEntity); } finally { log.info("实时流量同步任务完成 {} 毫秒", System.currentTimeMillis() - countTime.getTime()); } return 0; } } }