EtlWarningTask.java 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. /**
  2. * Copyright
  3. * All right reserved.
  4. * 项目名称:
  5. * 创建日期:2022/5/22
  6. */
  7. package org.springblade.etl;
  8. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  9. import com.baomidou.mybatisplus.core.toolkit.Wrappers;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.springblade.core.tool.utils.ConcurrentDateFormat;
  12. import org.springblade.core.tool.utils.Func;
  13. import org.springblade.etl.entity.EtlWarningInfoEntity;
  14. import org.springblade.etl.service.IEtlWarningService;
  15. import org.springblade.modules.business.warning.entity.RtuWarningInfoEntity;
  16. import org.springblade.modules.business.warning.service.IRtuWarningService;
  17. import org.springframework.data.redis.core.HashOperations;
  18. import org.springframework.data.redis.core.RedisTemplate;
  19. import java.util.Date;
  20. import java.util.List;
  21. import java.util.concurrent.Callable;
  22. /***
  23. * Date:2022/5/22
  24. * Title: 预警数据抽取
  25. * Description:
  26. * @author swp
  27. * @version 1.0
  28. * Remark:认为有必要的其他信息
  29. */
  30. @Slf4j
  31. public class EtlWarningTask implements Callable<Integer> {
  32. private RedisTemplate redisTemplate;
  33. private IEtlWarningService etlWarningService;
  34. private IRtuWarningService rtuWarningService;
  35. public EtlWarningTask(RedisTemplate redisTemplate, IEtlWarningService etlWarningService, IRtuWarningService rtuWarningService){
  36. this.redisTemplate=redisTemplate;
  37. this.etlWarningService=etlWarningService;
  38. this.rtuWarningService=rtuWarningService;
  39. }
  40. @Override
  41. public Integer call() {
  42. try {
  43. log.info("预警数据抽取开始**************");
  44. HashOperations<String, String, String> opsHash = redisTemplate.opsForHash();
  45. Date date = new Date();
  46. if (opsHash.hasKey("etl", "warn_update_time")) {
  47. String dt = opsHash.get("etl", "warn_update_time");
  48. date = Func.parse(dt, ConcurrentDateFormat.of("yyyy-MM-dd HH:mm:ss"));
  49. }
  50. LambdaQueryWrapper<EtlWarningInfoEntity> wrapper = Wrappers.<EtlWarningInfoEntity>query().lambda();
  51. wrapper.gt(EtlWarningInfoEntity::getOriginalCreateTime,date);
  52. wrapper.orderByAsc(EtlWarningInfoEntity::getOriginalCreateTime);
  53. List<EtlWarningInfoEntity> list = etlWarningService.list(wrapper);
  54. if (null != list && list.size() > 0) {
  55. for (EtlWarningInfoEntity warningInfo : list) {
  56. RtuWarningInfoEntity rtuWarningInfoEntity=new RtuWarningInfoEntity();
  57. rtuWarningService.save(rtuWarningInfoEntity);
  58. }
  59. }
  60. String dt = Func.formatDateTime(date);
  61. opsHash.put("etl", "warn_update_time", dt);
  62. } catch (Exception e) {
  63. log.error("数据任务异常 {}", e.getMessage());
  64. }
  65. return 0;
  66. }
  67. }