/** * Copyright * All right reserved. * 项目名称:运维系统 * 创建日期:2022/5/22 */ package org.springblade.modules.etl.task; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.extern.slf4j.Slf4j; import org.springblade.core.mp.support.Condition; import org.springblade.core.mp.support.Query; import org.springblade.core.tool.utils.ConcurrentDateFormat; import org.springblade.core.tool.utils.Func; import org.springblade.core.tool.utils.SpringUtil; import org.springblade.modules.baseinfo.org.entity.DeptRegionEntity; import org.springblade.modules.business.monitor.entity.WaterFlowRateHistoryInfoEntity; import org.springblade.modules.business.monitor.service.IWaterFlowRateHistoryService; import org.springblade.modules.etl.dto.EtlMpQrDTO; import org.springblade.modules.etl.entity.EtlMpQrEntity; import org.springblade.modules.etl.service.IEtlMpQrDataService; import java.util.Date; import java.util.List; import java.util.concurrent.Callable; /*** * Date:2022/5/22 * Title: 历史数据抽取 * Description: * @author swp * @version 1.0 * Remark:认为有必要的其他信息 */ @Slf4j public class EtlMpQrDataTask implements Callable { private static IEtlMpQrDataService etlMpQrDataService; private static IEtlMpQrDataService getEtlMpQrDataService() { if (etlMpQrDataService == null) { etlMpQrDataService = SpringUtil.getBean(IEtlMpQrDataService.class); } return etlMpQrDataService; } private static IWaterFlowRateHistoryService waterFlowRateHistoryService; private static IWaterFlowRateHistoryService getWaterFlowRateHistoryService() { if (waterFlowRateHistoryService == null) { waterFlowRateHistoryService = SpringUtil.getBean(IWaterFlowRateHistoryService.class); } return waterFlowRateHistoryService; } private String year; public EtlMpQrDataTask(String year) { this.year = year; } @Override public Integer call() { try { log.info("{}年历史数据导入任务开始执行*********************", year); EtlMpQrDTO dto = new EtlMpQrDTO(); String stStr = year + "0101000000"; Date st = Func.parse(stStr, ConcurrentDateFormat.of("yyyyMMddHHmmss")); String etStr = Func.toStr(Func.toInt(year) + 1) + "0101000000"; Date et = Func.parse(etStr, ConcurrentDateFormat.of("yyyyMMddHHmmss")); log.info("查询时间 {} {}", Func.formatDateTime(st), Func.formatDateTime(et)); dto.setYear(year); int index = 1; while (true) { try { Query query = new Query(); query.setSize(100); query.setCurrent(index); IPage pages = getEtlMpQrDataService().selectPage(Condition.getPage(query), dto); List list = pages.getRecords(); if (Func.isNull(list)) { break; } if (list.size() == 0) { break; } for (EtlMpQrEntity etlMpQrEntity : list) { log.info("点位 {} 时间 {}", etlMpQrEntity.getMpCd(), etlMpQrEntity.getTm()); if (Func.notNull(etlMpQrEntity.getMpCd()) && Func.notNull(etlMpQrEntity.getTm())) { LambdaQueryWrapper wrapper = Wrappers.query().lambda(); wrapper.eq(WaterFlowRateHistoryInfoEntity::getMpcd, etlMpQrEntity.getMpCd()); wrapper.eq(WaterFlowRateHistoryInfoEntity::getTm, Func.parse(etlMpQrEntity.getTm(), ConcurrentDateFormat.of("yyyyMMddHHmmss"))); long count = getWaterFlowRateHistoryService().count(wrapper); if (count == 0) { WaterFlowRateHistoryInfoEntity entity = new WaterFlowRateHistoryInfoEntity(); entity.setMpcd(etlMpQrEntity.getMpCd()); String tm = etlMpQrEntity.getTm(); entity.setTm(Func.parse(tm, ConcurrentDateFormat.of("yyyyMMddHHmmss"))); if (Func.notNull(etlMpQrEntity.getMpq())) { entity.setMpq(Func.toDouble(etlMpQrEntity.getMpq())); } if (Func.notNull(etlMpQrEntity.getAccw())) { entity.setAccw(Func.toLong(etlMpQrEntity.getAccw())); } if (Func.notNull(etlMpQrEntity.getAccPqw())) { entity.setAccpqw(Func.toLong(etlMpQrEntity.getAccPqw())); } if (Func.notNull(etlMpQrEntity.getInStpq())) { entity.setInstpq(Func.toDouble(etlMpQrEntity.getInStpq())); } if (Func.notNull(etlMpQrEntity.getAccPqw())) { entity.setAccpqw(Func.toLong(etlMpQrEntity.getAccPqw())); } if (Func.notNull(etlMpQrEntity.getSpeRegData())) { entity.setSperegdata(etlMpQrEntity.getSpeRegData()); } if (Func.notNull(etlMpQrEntity.getHourw())) { entity.setHourw(Func.toLong(etlMpQrEntity.getHourw())); } String ts = etlMpQrEntity.getTs(); if (Func.notNull(ts)) { entity.setTs(Func.parse(ts, ConcurrentDateFormat.of("yyyyMMddHHmmss"))); } getWaterFlowRateHistoryService().save(entity); } } } index += 1; } catch (Exception e) { e.printStackTrace(); log.error(e.getMessage()); } } log.info("{}年历史数据导入任务完成", year); } catch (Exception e) { e.printStackTrace(); log.error("历史数据导入任务异常 {}", e.getMessage()); } return 0; } }