| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- /**
- * Copyright
- * All right reserved.
- * 项目名称:运维系统
- * 创建日期:2022/5/22
- */
- package org.springblade.modules.etl.task;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.metadata.IPage;
- import com.baomidou.mybatisplus.core.toolkit.Wrappers;
- import lombok.extern.slf4j.Slf4j;
- import org.springblade.core.mp.support.Condition;
- import org.springblade.core.mp.support.Query;
- import org.springblade.core.tool.utils.ConcurrentDateFormat;
- import org.springblade.core.tool.utils.Func;
- import org.springblade.core.tool.utils.SpringUtil;
- import org.springblade.modules.baseinfo.org.entity.DeptRegionEntity;
- import org.springblade.modules.business.monitor.entity.WaterFlowRateHistoryInfoEntity;
- import org.springblade.modules.business.monitor.service.IWaterFlowRateHistoryService;
- import org.springblade.modules.etl.dto.EtlMpQrDTO;
- import org.springblade.modules.etl.entity.EtlMpQrEntity;
- import org.springblade.modules.etl.service.IEtlMpQrDataService;
- import java.util.Date;
- import java.util.List;
- import java.util.concurrent.Callable;
- /***
- * Date:2022/5/22
- * Title: 历史数据抽取
- * Description:
- * @author swp
- * @version 1.0
- * Remark:认为有必要的其他信息
- */
- @Slf4j
- public class EtlMpQrDataTask implements Callable<Integer> {
- private static IEtlMpQrDataService etlMpQrDataService;
- private static IEtlMpQrDataService getEtlMpQrDataService() {
- if (etlMpQrDataService == null) {
- etlMpQrDataService = SpringUtil.getBean(IEtlMpQrDataService.class);
- }
- return etlMpQrDataService;
- }
- private static IWaterFlowRateHistoryService waterFlowRateHistoryService;
- private static IWaterFlowRateHistoryService getWaterFlowRateHistoryService() {
- if (waterFlowRateHistoryService == null) {
- waterFlowRateHistoryService = SpringUtil.getBean(IWaterFlowRateHistoryService.class);
- }
- return waterFlowRateHistoryService;
- }
- private String year;
- public EtlMpQrDataTask(String year) {
- this.year = year;
- }
- @Override
- public Integer call() {
- try {
- log.info("{}年历史数据导入任务开始执行*********************", year);
- EtlMpQrDTO dto = new EtlMpQrDTO();
- String stStr = year + "0101000000";
- Date st = Func.parse(stStr, ConcurrentDateFormat.of("yyyyMMddHHmmss"));
- String etStr = Func.toStr(Func.toInt(year) + 1) + "0101000000";
- Date et = Func.parse(etStr, ConcurrentDateFormat.of("yyyyMMddHHmmss"));
- log.info("查询时间 {} {}", Func.formatDateTime(st), Func.formatDateTime(et));
- dto.setYear(year);
- int index = 1;
- while (true) {
- try {
- Query query = new Query();
- query.setSize(100);
- query.setCurrent(index);
- IPage<EtlMpQrEntity> pages = getEtlMpQrDataService().selectPage(Condition.getPage(query), dto);
- List<EtlMpQrEntity> list = pages.getRecords();
- if (Func.isNull(list)) {
- break;
- }
- if (list.size() == 0) {
- break;
- }
- for (EtlMpQrEntity etlMpQrEntity : list) {
- log.info("点位 {} 时间 {}", etlMpQrEntity.getMpCd(), etlMpQrEntity.getTm());
- if (Func.notNull(etlMpQrEntity.getMpCd()) && Func.notNull(etlMpQrEntity.getTm())) {
- LambdaQueryWrapper<WaterFlowRateHistoryInfoEntity> wrapper = Wrappers.<WaterFlowRateHistoryInfoEntity>query().lambda();
- wrapper.eq(WaterFlowRateHistoryInfoEntity::getMpcd, etlMpQrEntity.getMpCd());
- wrapper.eq(WaterFlowRateHistoryInfoEntity::getTm, Func.parse(etlMpQrEntity.getTm(), ConcurrentDateFormat.of("yyyyMMddHHmmss")));
- long count = getWaterFlowRateHistoryService().count(wrapper);
- if (count == 0) {
- WaterFlowRateHistoryInfoEntity entity = new WaterFlowRateHistoryInfoEntity();
- entity.setMpcd(etlMpQrEntity.getMpCd());
- String tm = etlMpQrEntity.getTm();
- entity.setTm(Func.parse(tm, ConcurrentDateFormat.of("yyyyMMddHHmmss")));
- if (Func.notNull(etlMpQrEntity.getMpq())) {
- entity.setMpq(Func.toDouble(etlMpQrEntity.getMpq()));
- }
- if (Func.notNull(etlMpQrEntity.getAccw())) {
- entity.setAccw(Func.toLong(etlMpQrEntity.getAccw()));
- }
- if (Func.notNull(etlMpQrEntity.getAccPqw())) {
- entity.setAccpqw(Func.toLong(etlMpQrEntity.getAccPqw()));
- }
- if (Func.notNull(etlMpQrEntity.getInStpq())) {
- entity.setInstpq(Func.toDouble(etlMpQrEntity.getInStpq()));
- }
- if (Func.notNull(etlMpQrEntity.getAccPqw())) {
- entity.setAccpqw(Func.toLong(etlMpQrEntity.getAccPqw()));
- }
- if (Func.notNull(etlMpQrEntity.getSpeRegData())) {
- entity.setSperegdata(etlMpQrEntity.getSpeRegData());
- }
- if (Func.notNull(etlMpQrEntity.getHourw())) {
- entity.setHourw(Func.toLong(etlMpQrEntity.getHourw()));
- }
- String ts = etlMpQrEntity.getTs();
- if (Func.notNull(ts)) {
- entity.setTs(Func.parse(ts, ConcurrentDateFormat.of("yyyyMMddHHmmss")));
- }
- getWaterFlowRateHistoryService().save(entity);
- }
- }
- }
- index += 1;
- } catch (Exception e) {
- e.printStackTrace();
- log.error(e.getMessage());
- }
- }
- log.info("{}年历史数据导入任务完成", year);
- } catch (Exception e) {
- e.printStackTrace();
- log.error("历史数据导入任务异常 {}", e.getMessage());
- }
- return 0;
- }
- }
|