/** * Copyright * All right reserved. * 项目名称: * 创建日期:2022/5/22 */ package org.springblade.etl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import lombok.extern.slf4j.Slf4j; import org.springblade.core.tool.utils.ConcurrentDateFormat; import org.springblade.core.tool.utils.Func; import org.springblade.etl.entity.EtlWarningInfoEntity; import org.springblade.etl.service.IEtlWarningService; import org.springblade.modules.business.warning.entity.RtuWarningInfoEntity; import org.springblade.modules.business.warning.service.IRtuWarningService; import org.springframework.data.redis.core.HashOperations; import org.springframework.data.redis.core.RedisTemplate; import java.util.Date; import java.util.List; import java.util.concurrent.Callable; /*** * Date:2022/5/22 * Title: 预警数据抽取 * Description: * @author swp * @version 1.0 * Remark:认为有必要的其他信息 */ @Slf4j public class EtlWarningTask implements Callable { private RedisTemplate redisTemplate; private IEtlWarningService etlWarningService; private IRtuWarningService rtuWarningService; public EtlWarningTask(RedisTemplate redisTemplate, IEtlWarningService etlWarningService, IRtuWarningService rtuWarningService){ this.redisTemplate=redisTemplate; this.etlWarningService=etlWarningService; this.rtuWarningService=rtuWarningService; } @Override public Integer call() { try { log.info("预警数据抽取开始**************"); HashOperations opsHash = redisTemplate.opsForHash(); Date date = new Date(); if (opsHash.hasKey("etl", "warn_update_time")) { String dt = opsHash.get("etl", "warn_update_time"); date = Func.parse(dt, ConcurrentDateFormat.of("yyyy-MM-dd HH:mm:ss")); } LambdaQueryWrapper wrapper = Wrappers.query().lambda(); wrapper.gt(EtlWarningInfoEntity::getOriginalCreateTime,date); wrapper.orderByAsc(EtlWarningInfoEntity::getOriginalCreateTime); List list = etlWarningService.list(wrapper); if (null != list && list.size() > 0) { for (EtlWarningInfoEntity warningInfo : list) { RtuWarningInfoEntity rtuWarningInfoEntity=new RtuWarningInfoEntity(); rtuWarningService.save(rtuWarningInfoEntity); } } String dt = Func.formatDateTime(date); opsHash.put("etl", "warn_update_time", dt); } catch (Exception e) { log.error("数据任务异常 {}", e.getMessage()); } return 0; } }