|
|
@@ -67,91 +67,127 @@ public class EtlWarnDataTask implements Callable<Integer> {
|
|
|
taskInfo.put("taskType", "rsvr");
|
|
|
taskInfo.put("remark", "预警数据同步任务");
|
|
|
taskInfo.put("taskExecuteTime", st);
|
|
|
- //更新已抽取的数据
|
|
|
- LambdaQueryWrapper<OriginalWarningInfoEntity> wrapper = Wrappers.<OriginalWarningInfoEntity>query().lambda();
|
|
|
- wrapper.ne(OriginalWarningInfoEntity::getReviewStatus, "1");
|
|
|
- wrapper.eq(OriginalWarningInfoEntity::getType, "4");
|
|
|
- List<OriginalWarningInfoEntity> originalWarningInfoEntities = warnDataCacheService.list(wrapper);
|
|
|
- if (Func.notNull(originalWarningInfoEntities)) {
|
|
|
+ //因为预警数据会动态更新,所以需要更新已抽取的数据,只更新未关闭的预警数据
|
|
|
+ OriginalWarningInfoEntity dto = new OriginalWarningInfoEntity();
|
|
|
+ dto.setType("4");
|
|
|
+ dto.setReviewStatus("1");
|
|
|
+ List<OriginalWarningInfoEntity> originalWarningInfoEntities = warnDataCacheService.selectWarningList(dto);
|
|
|
+ if (Func.notNull(originalWarningInfoEntities) && originalWarningInfoEntities.size() > 0) {
|
|
|
for (OriginalWarningInfoEntity entity : originalWarningInfoEntities) {
|
|
|
+ //读取源数据库的预警信息
|
|
|
EtlWarningInfoDTO warningInfoDTO = new EtlWarningInfoDTO();
|
|
|
warningInfoDTO.setWarnId(entity.getWarnId());
|
|
|
EtlWarningInfoEntity etlWarningInfoEntity = etlWarningService.getByWarnId(warningInfoDTO);
|
|
|
if (Func.notNull(etlWarningInfoEntity)) {
|
|
|
+ entity.setWarnTime(etlWarningInfoEntity.getWarnTime());
|
|
|
entity.setWarnInfo(etlWarningInfoEntity.getWarnInfo());
|
|
|
entity.setWarnStatus(etlWarningInfoEntity.getWarnStatus());
|
|
|
entity.setReviewStatus(etlWarningInfoEntity.getReviewStatus());
|
|
|
- entity.setUpdateTime(etlWarningInfoEntity.getOriginalUpdateTime());
|
|
|
warnDataCacheService.updateById(entity);
|
|
|
}
|
|
|
}
|
|
|
+ log.info("更新已抽取的未关闭预警数据共{}条", originalWarningInfoEntities.size());
|
|
|
}
|
|
|
- //抽取最新数据
|
|
|
+ //抽取源数据库最新数据,默认同步所有的数据
|
|
|
+ Calendar calendar = Calendar.getInstance();
|
|
|
+ calendar.setTimeInMillis(st);
|
|
|
EtlWarningInfoDTO etlWarningInfoDTO = new EtlWarningInfoDTO();
|
|
|
+ Date etlUpdateTime = calendar.getTime();
|
|
|
ValueOperations<String, String> ops = redisTemplate.opsForValue();
|
|
|
- String key = "etl.warn.updatetime";
|
|
|
+ String key = "etl.warn.etl.updatetime";
|
|
|
if (redisTemplate.hasKey(key)) {
|
|
|
String lastUpdateTime = ops.get(key);
|
|
|
- Date ut = Func.parse(lastUpdateTime, ConcurrentDateFormat.of("yyyy-MM-dd HH:mm:ss"));
|
|
|
- etlWarningInfoDTO.setLastTime(ut);
|
|
|
- } else {
|
|
|
- //没有数据,同步最近24小时
|
|
|
- Calendar calendar = Calendar.getInstance();
|
|
|
- calendar.set(Calendar.DAY_OF_MONTH, -1);
|
|
|
- etlWarningInfoDTO.setLastTime(calendar.getTime());
|
|
|
+ if (Func.notNull(lastUpdateTime)) {
|
|
|
+ etlUpdateTime = Func.parse(lastUpdateTime, ConcurrentDateFormat.of("yyyy-MM-dd HH:mm:ss"));
|
|
|
+ etlWarningInfoDTO.setLastTime(etlUpdateTime);
|
|
|
+ }
|
|
|
}
|
|
|
etlWarningInfoDTO.setReviewStatus("1");
|
|
|
etlWarningInfoDTO.setType("4");
|
|
|
+ //只抽取新创建的数据,更新的数据不在此部分处理
|
|
|
List<EtlWarningInfoEntity> updateList = etlWarningService.selectNewList(etlWarningInfoDTO);
|
|
|
- if (Func.notNull(updateList)) {
|
|
|
+ if (Func.notNull(updateList) && updateList.size() > 0) {
|
|
|
for (EtlWarningInfoEntity entity : updateList) {
|
|
|
- OriginalWarningInfoEntity warnInfo = new OriginalWarningInfoEntity();
|
|
|
- warnInfo.setWarnId(entity.getWarnId());
|
|
|
- warnInfo.setStcd(entity.getStcd());
|
|
|
- warnInfo.setWarnName(entity.getWarnName());
|
|
|
- warnInfo.setWarnTime(entity.getWarnTime());
|
|
|
- warnInfo.setWarnLevelCode(entity.getWarnLevelCode());
|
|
|
- warnInfo.setWarnInfo(entity.getWarnInfo());
|
|
|
- warnInfo.setWarnStatus(entity.getWarnStatus());
|
|
|
- warnInfo.setReviewStatus(entity.getReviewStatus());
|
|
|
- warnInfo.setType(entity.getType());
|
|
|
- warnInfo.setRainWarnRate(entity.getRainWarnRate());
|
|
|
- warnInfo.setWarnSort(entity.getWarnSort());
|
|
|
- warnInfo.setWarnLocation(entity.getWarnLocation());
|
|
|
- warnInfo.setWarnAdcd(entity.getWarnAdcd());
|
|
|
- warnInfo.setUserAdcd(entity.getUserAdcd());
|
|
|
- warnInfo.setWarnLgtd(entity.getWarnLgtd());
|
|
|
- warnInfo.setWarnLttd(entity.getWarnLttd());
|
|
|
- warnInfo.setIntv(entity.getIntv());
|
|
|
- warnInfo.setDrp(entity.getDrp());
|
|
|
- warnInfo.setZ(entity.getZ());
|
|
|
- warnInfo.setQ(entity.getQ());
|
|
|
- warnInfo.setWptn(entity.getWptn());
|
|
|
- warnInfo.setRz(entity.getRz());
|
|
|
- warnInfo.setRwptn(entity.getRwptn());
|
|
|
- warnInfo.setInq(entity.getInq());
|
|
|
- warnInfo.setRelWarnId(entity.getRelWarnId());
|
|
|
- warnInfo.setCreateTime(entity.getOriginalCreateTime());
|
|
|
- warnInfo.setUpdateTime(entity.getOriginalUpdateTime());
|
|
|
- warnDataCacheService.save(warnInfo);
|
|
|
+ LambdaQueryWrapper<OriginalWarningInfoEntity> wrapper = Wrappers.<OriginalWarningInfoEntity>query().lambda();
|
|
|
+ wrapper.eq(OriginalWarningInfoEntity::getWarnId, entity.getWarnId());
|
|
|
+ wrapper.last("limit 1");
|
|
|
+ OriginalWarningInfoEntity detail = warnDataCacheService.getOne(wrapper);
|
|
|
+ if (Func.isNull(detail)) {
|
|
|
+ OriginalWarningInfoEntity warnInfo = new OriginalWarningInfoEntity();
|
|
|
+ warnInfo.setWarnId(entity.getWarnId());
|
|
|
+ warnInfo.setStcd(entity.getStcd());
|
|
|
+ warnInfo.setWarnName(entity.getWarnName());
|
|
|
+ warnInfo.setWarnTime(entity.getWarnTime());
|
|
|
+ warnInfo.setWarnLevelCode(entity.getWarnLevelCode());
|
|
|
+ warnInfo.setWarnInfo(entity.getWarnInfo());
|
|
|
+ warnInfo.setWarnStatus(entity.getWarnStatus());
|
|
|
+ warnInfo.setReviewStatus(entity.getReviewStatus());
|
|
|
+ warnInfo.setType(entity.getType());
|
|
|
+ warnInfo.setRainWarnRate(entity.getRainWarnRate());
|
|
|
+ warnInfo.setWarnSort(entity.getWarnSort());
|
|
|
+ warnInfo.setWarnLocation(entity.getWarnLocation());
|
|
|
+ warnInfo.setWarnAdcd(entity.getWarnAdcd());
|
|
|
+ warnInfo.setUserAdcd(entity.getUserAdcd());
|
|
|
+ warnInfo.setWarnLgtd(entity.getWarnLgtd());
|
|
|
+ warnInfo.setWarnLttd(entity.getWarnLttd());
|
|
|
+ warnInfo.setIntv(entity.getIntv());
|
|
|
+ warnInfo.setDrp(entity.getDrp());
|
|
|
+ warnInfo.setZ(entity.getZ());
|
|
|
+ warnInfo.setQ(entity.getQ());
|
|
|
+ warnInfo.setWptn(entity.getWptn());
|
|
|
+ warnInfo.setRz(entity.getRz());
|
|
|
+ warnInfo.setRwptn(entity.getRwptn());
|
|
|
+ warnInfo.setInq(entity.getInq());
|
|
|
+ warnInfo.setRelWarnId(entity.getRelWarnId());
|
|
|
+ warnDataCacheService.save(warnInfo);
|
|
|
+ } else {
|
|
|
+ detail.setWarnTime(entity.getWarnTime());
|
|
|
+ detail.setWarnInfo(entity.getWarnInfo());
|
|
|
+ detail.setWarnStatus(entity.getWarnStatus());
|
|
|
+ if(Func.isNull(entity.getReviewStatus())){
|
|
|
+ detail.setReviewStatus("0");
|
|
|
+ }else {
|
|
|
+ detail.setReviewStatus(entity.getReviewStatus());
|
|
|
+ }
|
|
|
+ warnDataCacheService.updateById(detail);
|
|
|
+ }
|
|
|
+ Date warnTime = entity.getWarnTime();
|
|
|
+ if (warnTime.after(etlUpdateTime)) {
|
|
|
+ etlUpdateTime = warnTime;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //更新抽取同步时间
|
|
|
+ ops.set(key, Func.formatDateTime(etlUpdateTime));
|
|
|
+ log.info("抽取预警数据共{}条", updateList.size());
|
|
|
+ }
|
|
|
+ //推送到业务系统,任何有修改过的数据都需要进行同步
|
|
|
+ dto = new OriginalWarningInfoEntity();
|
|
|
+ Date ywxtUpdateTime = calendar.getTime();
|
|
|
+ key = "etl.warn.ywxt.updatetime";
|
|
|
+ if (redisTemplate.hasKey(key)) {
|
|
|
+ //同步最新有变化的数据
|
|
|
+ String lastUpdateTime = ops.get(key);
|
|
|
+ if (Func.notNull(lastUpdateTime)) {
|
|
|
+ ywxtUpdateTime = Func.parse(lastUpdateTime, ConcurrentDateFormat.of("yyyy-MM-dd HH:mm:ss"));
|
|
|
+ dto.setUpdateTime(ywxtUpdateTime);
|
|
|
}
|
|
|
}
|
|
|
- //推送到业务系统
|
|
|
- wrapper = Wrappers.<OriginalWarningInfoEntity>query().lambda();
|
|
|
- wrapper.eq(OriginalWarningInfoEntity::getIsDeleted, 0);
|
|
|
- wrapper.ne(OriginalWarningInfoEntity::getReviewStatus, "1");
|
|
|
- wrapper.eq(OriginalWarningInfoEntity::getType, "4");
|
|
|
- originalWarningInfoEntities = warnDataCacheService.list(wrapper);
|
|
|
- if (Func.notNull(originalWarningInfoEntities)) {
|
|
|
+ originalWarningInfoEntities = warnDataCacheService.selectWarningList(dto);
|
|
|
+ if (Func.notNull(originalWarningInfoEntities) && originalWarningInfoEntities.size() > 0) {
|
|
|
for (OriginalWarningInfoEntity entity : originalWarningInfoEntities) {
|
|
|
//通过KAFKA发送最新一条数据
|
|
|
String jsonText = JSONObject.toJSONString(entity);
|
|
|
- log.info("warn info {}", jsonText);
|
|
|
- // kafkaTemplate.send(topic, jsonText);
|
|
|
+ //log.info("warn info {}", jsonText);
|
|
|
+ kafkaTemplate.send(topic, jsonText);
|
|
|
+ Date updateTime = entity.getUpdateTime();
|
|
|
+ if (updateTime.after(ywxtUpdateTime)) {
|
|
|
+ ywxtUpdateTime = updateTime;
|
|
|
+ }
|
|
|
}
|
|
|
+ //更新运维系统同步时间
|
|
|
+ ops.set(key, Func.formatDateTime(ywxtUpdateTime));
|
|
|
+ log.info("推送预警数据共{}条", originalWarningInfoEntities.size());
|
|
|
}
|
|
|
- //更新缓存时间
|
|
|
- ops.set(key, Func.formatDateTime(etlWarningInfoDTO.getLastTime()));
|
|
|
//提交任务运行状态
|
|
|
long et = System.currentTimeMillis();
|
|
|
taskInfo.put("taskExecuteLong", et - st);
|