Selaa lähdekoodia

兴安盟水资源运维系统-增加数据同步数据 ,推送到金水平台

dylan 2 vuotta sitten
vanhempi
commit
d6805529e9
20 muutettua tiedostoa jossa 1104 lisäystä ja 4 poistoa
  1. 6 0
      src/main/java/org/springblade/constant/BusinessConstant.java
  2. 100 0
      src/main/java/org/springblade/enums/SiteOwnerTypeEnum.java
  3. 95 0
      src/main/java/org/springblade/enums/SzyWaterLevelMetadataEnum.java
  4. 367 0
      src/main/java/org/springblade/jobtask/DataShareProcessor.java
  5. 1 1
      src/main/java/org/springblade/jobtask/TaskManager.java
  6. 1 0
      src/main/java/org/springblade/modules/baseinfo/monitorpoint/service/IMonitorPointService.java
  7. 18 0
      src/main/java/org/springblade/modules/baseinfo/monitorsite/entity/MonitorSiteInfoEntity.java
  8. 10 0
      src/main/java/org/springblade/modules/baseinfo/monitorsite/mapper/MonitorSiteMapper.xml
  9. 4 1
      src/main/java/org/springblade/modules/baseinfo/monitorsite/service/IMonitorSiteService.java
  10. 1 1
      src/main/java/org/springblade/modules/business/monitor/mapper/WaterFlowRateMapper.xml
  11. 3 0
      src/main/java/org/springblade/modules/business/monitor/service/IWaterFlowRateHistoryService.java
  12. 59 0
      src/main/java/org/springblade/modules/business/monitor/service/impl/WaterFlowRateHistoryServiceImpl.java
  13. 82 0
      src/main/java/org/springblade/modules/share/entity/DataShareLogEntity.java
  14. 21 0
      src/main/java/org/springblade/modules/share/mapper/DataShareLogMapper.java
  15. 15 0
      src/main/java/org/springblade/modules/share/mapper/DataShareLogMapper.xml
  16. 21 0
      src/main/java/org/springblade/modules/share/service/IDataShareLogServcie.java
  17. 25 0
      src/main/java/org/springblade/modules/share/service/impl/DataShareLogServcieImpl.java
  18. 96 1
      src/main/java/org/springblade/mq/kafka/handler/MessageHandler.java
  19. 176 0
      src/main/java/org/springblade/utils/HttpRequestUtil.java
  20. 3 0
      src/main/resources/application-dev.yml

+ 6 - 0
src/main/java/org/springblade/constant/BusinessConstant.java

@@ -76,4 +76,10 @@ public class BusinessConstant {
 	public static final Long DEPT_ROOT_ID =0L;
 
 	public static final Long DEPT_XAM_ID=1123598813738675201L;
+
+
+	//public static final String DATA_SHARE_JS_SERVER="https://waterintake.goldenwater.com.cn";
+
+	public static final String DATA_SHARE_AREA ="152200_CHANGSHANG1";
+	public static final String DATA_SHARE_KEY ="0A3F8C5809D4F72BE0630EC3010A433C";
 }

+ 100 - 0
src/main/java/org/springblade/enums/SiteOwnerTypeEnum.java

@@ -0,0 +1,100 @@
+package org.springblade.enums;
+
+public enum SiteOwnerTypeEnum {
+    /**
+     * 国控转计量
+     */
+    LEVEL_0(1, "国控转计量"),
+
+    /**
+     * 用户安装的计量设施
+     */
+    LEVEL_1(2, "用户安装的计量设施"),
+
+    /**
+     * 中央专项资金
+     */
+    LEVEL_2(3, "中央专项资金");
+
+
+    private int code;
+
+    private String name;
+
+    private SiteOwnerTypeEnum(int code, String name) {
+        this.code = code;
+        this.name = name;
+    }
+
+    /**
+     * Function:根据code来获取对应的name
+     * Author: Admin
+     * Date:2021/6/26
+     *
+     * @param code
+     * @return java.lang.String
+     * @throws Exception
+     */
+    public static String getName(int code) {
+        for (SiteOwnerTypeEnum type : SiteOwnerTypeEnum.values()) {
+            if (type.code == code) {
+                return type.name;
+            }
+        }
+        return "";
+    }
+
+    /**
+     * Function:根据code来获取对应的枚举常量
+     * Author: Admin
+     * Date:2021/6/26
+     *
+     * @param code
+     * @return com.zhgzjg.web.enums.DeleteFlag
+     * @throws Exception
+     */
+    public static SiteOwnerTypeEnum getEnumByCode(int code) {
+        for (SiteOwnerTypeEnum type : SiteOwnerTypeEnum.values()) {
+            if (type.code == code) {
+                return type;
+            }
+        }
+        return null;
+    }
+
+
+    /**
+     * Function:根据name来获取对应的枚举常量
+     * Author: Admin
+     * Date:2021/6/26
+     *
+     * @param name
+     * @return com.zhgzjg.web.enums.DeleteFlag
+     * @throws Exception
+     */
+    public static SiteOwnerTypeEnum getEnumByName(String name) {
+        for (SiteOwnerTypeEnum type : SiteOwnerTypeEnum.values()) {
+            if (type.name.equals(name)) {
+                return type;
+            }
+        }
+        return null;
+    }
+
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+}

+ 95 - 0
src/main/java/org/springblade/enums/SzyWaterLevelMetadataEnum.java

@@ -0,0 +1,95 @@
+/**
+ * Copyright 2019 DH
+ * All right reserved.
+ * 项目名称: 大恒泰山系统
+ * 创建日期:2023/10/31
+ */
+package org.springblade.enums;
+
+/*** 
+ * Date:2023/10/31
+ * Title:文件所属模块(必须填写)
+ * Description:对本文件的详细描述,原则上不能少于30字
+ * @author dylan
+ * @version 1.0
+ * Remark:认为有必要的其他信息
+ */
+public enum SzyWaterLevelMetadataEnum {
+    /**
+     * 接收时间
+     */
+    METADATA_FROM_TIME(0, "from_time"),
+
+    /**
+     * 采集时间
+     */
+    METADATA_COLLECT_TIME(1, "tm"),
+
+    /**
+     * 上报时间
+     */
+    METADATA_UP_TIME(2, "up_time"),
+    /**
+     * 测站编码
+     */
+    METADATA_RTU_CODE(3, "rtu_code"),
+    /**
+     * 水位
+     */
+    METADATA_CODE_LEVEL(4, "water_level")
+    ;
+
+    private int code;
+
+    private String name;
+
+    SzyWaterLevelMetadataEnum(int code, String name) {
+        this.code = code;
+        this.name = name;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * 根据CODE查名称
+     *
+     * @param code
+     * @return
+     */
+    public static String getName(int code) {
+        for (SzyWaterLevelMetadataEnum point : SzyWaterLevelMetadataEnum.values()) {
+            if (point.code == code) {
+                return point.name;
+            }
+        }
+        return "";
+    }
+
+    /**
+     * 根据名称查代码
+     * @param name
+     * @return
+     */
+    public static int getCode(String name) {
+        for (SzyWaterLevelMetadataEnum point : SzyWaterLevelMetadataEnum.values()) {
+            if (point.name.equals(name)) {
+                return point.code;
+            }
+        }
+        return -1;
+    }
+}

+ 367 - 0
src/main/java/org/springblade/jobtask/DataShareProcessor.java

@@ -0,0 +1,367 @@
+/**
+ * Copyright 2019 DH
+ * All right reserved.
+ * 项目名称: 大恒泰山系统
+ * 创建日期:2023/11/16
+ */
+package org.springblade.jobtask;
+
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import lombok.extern.slf4j.Slf4j;
+import org.springblade.constant.BusinessConstant;
+import org.springblade.core.tool.utils.Func;
+import org.springblade.enums.IsValidEnum;
+import org.springblade.modules.baseinfo.monitorpoint.entity.MonitorPointRelEntity;
+import org.springblade.modules.baseinfo.monitorpoint.service.IMonitorPointRelService;
+import org.springblade.modules.baseinfo.monitorsite.dto.MonitorSiteInfoDTO;
+import org.springblade.modules.baseinfo.monitorsite.entity.MonitorSiteInfoEntity;
+import org.springblade.modules.baseinfo.monitorsite.entity.MonitorSiteRelEntity;
+import org.springblade.modules.baseinfo.monitorsite.service.IMonitorSiteRelService;
+import org.springblade.modules.baseinfo.monitorsite.service.IMonitorSiteService;
+import org.springblade.modules.baseinfo.wateruseconsumer.entity.WaterUseConsumerInfoEntity;
+import org.springblade.modules.baseinfo.wateruseconsumer.service.IWaterUseConsumerService;
+import org.springblade.modules.business.monitor.entity.WaterFlowRateInfoEntity;
+import org.springblade.modules.business.monitor.service.IWaterFlowRateHistoryService;
+import org.springblade.modules.business.monitor.service.IWaterFlowRateService;
+import org.springblade.modules.share.entity.DataShareLogEntity;
+import org.springblade.modules.share.service.IDataShareLogServcie;
+import org.springblade.utils.HttpRequestUtil;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.redis.core.HashOperations;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+/***
+ * Date:2023/11/16
+ * Title:文件所属模块(必须填写)
+ * Description:对本文件的详细描述,原则上不能少于30字
+ * @author dylan
+ * @version 1.0
+ * Remark:认为有必要的其他信息
+ */
+@Slf4j
+@Component
+@EnableScheduling
+public class DataShareProcessor {
+    @Resource
+    private TaskManager taskManager;
+
+    @Resource
+    private IMonitorSiteService monitorSiteService;
+
+    @Resource
+    private IMonitorPointRelService monitorPointRelService;
+
+    @Resource
+    private IMonitorSiteRelService monitorSiteRelService;
+
+    @Resource
+    private IWaterUseConsumerService waterUseConsumerService;
+
+    @Resource
+    private IWaterFlowRateService waterFlowRateService;
+
+    @Resource
+    private IWaterFlowRateHistoryService waterFlowRateHistoryService;
+
+    @Resource
+    private IDataShareLogServcie dataShareLogServcie;
+
+    @Resource
+    private RedisTemplate<String, String> redisTemplate;
+
+    @Value("${spring.task-config.data-share-server}")
+    private String dataShareServer;
+
+    /**
+     * 超时时间,单位:毫秒
+     */
+    private int nTimeOut = 20 * 1000;
+
+    /**
+     * 计量设施数据同步
+     */
+    @Scheduled(cron = "10 0 0/1 * * * ")
+    public void siteShareDataTaskScheduled() {
+        try {
+            Date dt = new Date();
+            log.info("任务创建 {}", Func.formatDateTime(dt));
+            SiteShareDataTask task = new SiteShareDataTask(dt);
+            FutureTask<Integer> futureTask = new FutureTask<>(task);
+            taskManager.submitTask(futureTask);
+        } catch (Exception e) {
+            log.error("{}", e.getMessage());
+        }
+    }
+
+    /**
+     * 实时流量数据同步
+     */
+    @Scheduled(cron = "30 0/20 * * * * ")
+    public void waterFlowRateShareDataTaskScheduled() {
+        try {
+            Date dt = new Date();
+            log.info("任务创建 {}", Func.formatDateTime(dt));
+            WaterFlowRateShareDataTask task = new WaterFlowRateShareDataTask(dt);
+            FutureTask<Integer> futureTask = new FutureTask<>(task);
+            taskManager.submitTask(futureTask);
+        } catch (Exception e) {
+            log.error("{}", e.getMessage());
+        }
+    }
+
+    private class SiteShareDataTask implements Callable<Integer> {
+        private Date countTime;
+
+        public SiteShareDataTask(Date countTime) {
+            this.countTime = countTime;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+            try {
+                log.info("计量设施同步任务开始执行 {}", Func.formatDateTime(countTime));
+                List<JSONObject> datas = new ArrayList<>();
+                MonitorSiteInfoDTO dto = new MonitorSiteInfoDTO();
+                dto.setDataShareActive(IsValidEnum.VALID_2.getCode());
+                List<MonitorSiteInfoEntity> list = monitorSiteService.selectAll(dto);
+                if (Func.notNull(list)) {
+                    for (MonitorSiteInfoEntity entity : list) {
+                        LambdaQueryWrapper<MonitorSiteRelEntity> wrapper = Wrappers.<MonitorSiteRelEntity>query().lambda();
+                        wrapper.eq(MonitorSiteRelEntity::getWiustCd, entity.getWiustCd());
+                        MonitorSiteRelEntity siteRelEntity = monitorSiteRelService.getOne(wrapper);
+                        if (Func.notNull(siteRelEntity)) {
+                            LambdaQueryWrapper<WaterUseConsumerInfoEntity> useWrapper = Wrappers.<WaterUseConsumerInfoEntity>query().lambda();
+                            useWrapper.eq(WaterUseConsumerInfoEntity::getWiuCd, siteRelEntity.getWiuCd());
+                            WaterUseConsumerInfoEntity waterUseConsumerInfoEntity = waterUseConsumerService.getOne(useWrapper);
+                            if (Func.notNull(waterUseConsumerInfoEntity) && Func.notNull(waterUseConsumerInfoEntity.getAdlCd())) {
+                                JSONObject data = new JSONObject();
+                                data.put("id", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd());
+                                data.put("measName", entity.getWiustNm());
+                                //计量方式代码,参考输入参数说明下的字典项(以电折水计量涉及的样本井、普通井选用其他计量-以电量、柴油和其他动力消耗折算水量)
+                                data.put("measTypeCode", "24");
+                                //计量方式
+                                data.put("measType", "7");
+                                //使用状态 5、正常;4故障;3停用
+                                data.put("status", "5");
+                                //是否有效字段,1有效,0无效(删除的数据)新增时不传默认为1
+                                data.put("flagValid", "1");
+                                //传输方式代码,1、在线计量;2、非在线计量样本井的话传1,普通井传2
+                                data.put("transModeCode", "1");
+                                //取水地址行政编码,县级行政区划代码
+                                data.put("intLocDivCode", waterUseConsumerInfoEntity.getAdlCd());
+                                //是否修改行政区划代码 ,修改时,默认传1,不传不修改行政区划。
+                                data.put("isAddvcd", "1");
+                                //首次发证证件编号/证照编号
+                                data.put("ecBaseInfoId", entity.getSiteCertId());
+                                //数据接收路径 ,1.部级 2.省级(默认省级)(传编码)
+                                data.put("dataReceivePath", "2");
+                                //数据来源类型,1、国控转计量; 2、用户安装的计量设施;3、中央专项资金;
+                                data.put("source", entity.getSiteOwner());
+                                //数据来源类型,如要修改已传输的计量设施数据来源,维护该字段
+                                data.put("updateSource", entity.getSiteOwner());
+                                datas.add(data);
+                            }
+                        }
+                    }
+                }
+                if (datas.size() > 0) {
+                    HashOperations<String, String, String> hashops = redisTemplate.opsForHash();
+                    String date = hashops.get("datashare", "date");
+                    int dsBatchCount = 1;
+                    if (Func.isNull(date)) {
+                        date = Func.formatDate(countTime);
+                    } else {
+                        String today = Func.formatDate(countTime);
+                        if (!today.equals(date)) {
+                            date = today;
+                            dsBatchCount = 1;
+                        } else {
+                            String dsBatch = hashops.get("datashare", "dsBatch");
+                            if (Func.isNull(dsBatch)) {
+                                dsBatchCount = 1;
+                            } else {
+                                dsBatchCount = Func.toInt(dsBatch) + 1;
+                            }
+                        }
+                    }
+                    String dsBatch = BusinessConstant.DATA_SHARE_AREA + "-" + date + "-" + String.format("%04d", dsBatchCount);
+                    JSONObject post = new JSONObject();
+                    post.put("dsBatch", dsBatch);
+                    post.put("timestamp", countTime);
+                    post.put("data", datas);
+                    log.info("json post {}", post.toJSONString());
+                    String url = dataShareServer + "/wr_web_manager/api/wr/datasync/fac/infoinsertOrUpdate";
+                    // 请求头
+                    Map<String, String> headers = new HashMap<>();
+                    headers.put("area", BusinessConstant.DATA_SHARE_AREA);
+                    headers.put("key", BusinessConstant.DATA_SHARE_KEY);
+                    JSONObject res = HttpRequestUtil.doHttpPostRequest("计量设施数据同步", url, headers, post.toJSONString(), nTimeOut);
+                    if (Func.notNull(res)) {
+                        log.info("接口返回 {}", res.toJSONString());
+                    }
+                    //更新同步批次
+                    hashops.put("datashare", "date", date);
+                    hashops.put("datashare", "dsBatch", Func.toStr(dsBatchCount));
+                    //记录请求日志
+                    DataShareLogEntity logEntity = new DataShareLogEntity();
+                    logEntity.setRequestDesc("计量设施数据同步");
+                    logEntity.setCountTime(countTime);
+                    logEntity.setPostUrl(url);
+                    if (Func.notNull(res)) {
+                        logEntity.setRequestResult(res.toJSONString());
+                    }
+                    int timeLong = (int) (System.currentTimeMillis() - countTime.getTime());
+                    logEntity.setTaskTimeLong(timeLong);
+                    dataShareLogServcie.save(logEntity);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+                log.error("计量设施同步任务异常 {}", e.getMessage());
+            } finally {
+                log.info("计量设施同步任务完成 {} 毫秒", System.currentTimeMillis() - countTime.getTime());
+            }
+            return 0;
+        }
+    }
+
+    private class WaterFlowRateShareDataTask implements Callable<Integer> {
+
+        private Date countTime;
+
+        public WaterFlowRateShareDataTask(Date countTime) {
+            this.countTime = countTime;
+        }
+
+        @Override
+        public Integer call() throws Exception {
+            try {
+                log.info("实时流量同步任务开始执行 {}", Func.formatDateTime(countTime));
+                List<JSONObject> datas = new ArrayList<>();
+                MonitorSiteInfoDTO dto = new MonitorSiteInfoDTO();
+                dto.setDataShareActive(IsValidEnum.VALID_2.getCode());
+                List<MonitorSiteInfoEntity> list = monitorSiteService.selectAll(dto);
+                if (Func.notNull(list)) {
+                    for (MonitorSiteInfoEntity entity : list) {
+                        LambdaQueryWrapper<MonitorPointRelEntity> wrapper = Wrappers.<MonitorPointRelEntity>query().lambda();
+                        wrapper.eq(MonitorPointRelEntity::getWiustCd, entity.getWiustCd());
+                        List<MonitorPointRelEntity> points = monitorPointRelService.list(wrapper);
+                        if (Func.notNull(points) && points.size() > 0) {
+                            LambdaQueryWrapper<WaterFlowRateInfoEntity> dataWrapper = Wrappers.<WaterFlowRateInfoEntity>query().lambda();
+                            dataWrapper.eq(WaterFlowRateInfoEntity::getMpcd, points.get(0).getMpCd());
+                            WaterFlowRateInfoEntity waterFlowRateInfoEntity = waterFlowRateService.getOne(dataWrapper);
+                            if (Func.notNull(waterFlowRateInfoEntity)) {
+                                JSONObject data = new JSONObject();
+//                                data.put("id", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd());
+                                data.put("mpCd", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd());
+                                double dayAccw = waterFlowRateHistoryService.getDayAccw(points.get(0).getMpCd());
+                                data.put("dayW", dayAccw);
+                                Calendar calendar = Calendar.getInstance();
+                                calendar.setTime(countTime);
+                                calendar.set(Calendar.HOUR_OF_DAY,0);
+                                calendar.set(Calendar.MINUTE,0);
+                                calendar.set(Calendar.SECOND,0);
+                                calendar.set(Calendar.MILLISECOND,0);
+                                calendar.add(Calendar.DAY_OF_MONTH,-1);
+
+                                data.put("dt", Func.formatDate(calendar.getTime()));
+                                data.put("inTime", Func.formatDate(calendar.getTime()));
+                                datas.add(data);
+                            } else {
+                                JSONObject data = new JSONObject();
+//                                data.put("id", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd());
+                                data.put("mpCd", BusinessConstant.DATA_SHARE_AREA + "_" + entity.getWiustCd());
+                                data.put("dayW", 0.0);
+                                Calendar calendar = Calendar.getInstance();
+                                calendar.setTime(countTime);
+                                calendar.set(Calendar.HOUR_OF_DAY,0);
+                                calendar.set(Calendar.MINUTE,0);
+                                calendar.set(Calendar.SECOND,0);
+                                calendar.set(Calendar.MILLISECOND,0);
+                                calendar.add(Calendar.DAY_OF_MONTH,-1);
+
+                                data.put("dt", Func.formatDate(calendar.getTime()));
+                                data.put("inTime", Func.formatDate(calendar.getTime()));
+                                datas.add(data);
+                            }
+                        }
+                    }
+                }
+                if (datas.size() > 0) {
+                    HashOperations<String, String, String> hashops = redisTemplate.opsForHash();
+                    String date = hashops.get("datashare", "date");
+                    int dsBatchCount = 1;
+                    if (Func.isNull(date)) {
+                        date = Func.formatDate(countTime);
+                    } else {
+                        String today = Func.formatDate(countTime);
+                        if (!today.equals(date)) {
+                            date = today;
+                            dsBatchCount = 1;
+                        } else {
+                            String dsBatch = hashops.get("datashare", "dsBatch");
+                            if (Func.isNull(dsBatch)) {
+                                dsBatchCount = 1;
+                            } else {
+                                dsBatchCount = Func.toInt(dsBatch) + 1;
+                            }
+                        }
+                    }
+                    String dsBatch = BusinessConstant.DATA_SHARE_AREA + "-" + date + "-" + String.format("%04d", dsBatchCount);
+                    JSONObject post = new JSONObject();
+                    post.put("dsBatch", dsBatch);
+                    post.put("timestamp", countTime);
+                    post.put("data", datas);
+                    log.info("json post {}", post.toJSONString());
+                    String url = dataShareServer + "/wr_web_manager/api/wr/datasync/wiwrDayWr/insertOrUpdate";
+                    // 请求头
+                    Map<String, String> headers = new HashMap<>();
+                    headers.put("area", BusinessConstant.DATA_SHARE_AREA);
+                    headers.put("key", BusinessConstant.DATA_SHARE_KEY);
+                    JSONObject res = HttpRequestUtil.doHttpPostRequest("实时流量数据同步", url, headers, post.toJSONString(), nTimeOut);
+                    if (Func.notNull(res)) {
+                        log.info("接口返回 {}", res.toJSONString());
+                    }
+                    //更新同步批次
+                    hashops.put("datashare", "date", date);
+                    hashops.put("datashare", "dsBatch", Func.toStr(dsBatchCount));
+                    //记录请求日志
+                    DataShareLogEntity logEntity = new DataShareLogEntity();
+                    logEntity.setRequestDesc("实时流量数据同步");
+                    logEntity.setCountTime(countTime);
+                    logEntity.setPostUrl(url);
+                    if (Func.notNull(res)) {
+                        logEntity.setRequestResult(res.toJSONString());
+                   }
+                    int timeLong = (int) (System.currentTimeMillis() - countTime.getTime());
+                    logEntity.setTaskTimeLong(timeLong);
+                    dataShareLogServcie.save(logEntity);
+                }
+            } catch (Exception e) {
+               // e.printStackTrace();
+                log.error("实时流量同步任务异常 {}", e.getMessage());
+                //记录请求日志
+                DataShareLogEntity logEntity = new DataShareLogEntity();
+                logEntity.setRequestDesc("实时流量数据同步");
+                logEntity.setCountTime(countTime);
+                logEntity.setPostUrl(dataShareServer + "/wr_web_manager/api/wr/datasync/wiwrDayWr/insertOrUpdate");
+                logEntity.setRequestResult("实时流量同步任务异常 "+e.getMessage());
+                int timeLong = (int) (System.currentTimeMillis() - countTime.getTime());
+                logEntity.setTaskTimeLong(timeLong);
+                dataShareLogServcie.save(logEntity);
+            } finally {
+                log.info("实时流量同步任务完成 {} 毫秒", System.currentTimeMillis() - countTime.getTime());
+            }
+            return 0;
+        }
+    }
+}

+ 1 - 1
src/main/java/org/springblade/jobtask/TaskManager.java

@@ -31,7 +31,7 @@ public class TaskManager {
 	 * 公共线程池
 	 **/
 	private static ThreadFactory publicThreadFactory = new ThreadFactoryBuilder().setNameFormat("task-thread-pool-%d").build();
-	private static ExecutorService publicThreadPool = new ThreadPoolExecutor(1, 2,
+	private static ExecutorService publicThreadPool = new ThreadPoolExecutor(2, 4,
 		0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), publicThreadFactory, new ThreadPoolExecutor.AbortPolicy());
 
 	public void submitTask(FutureTask<Integer> futureTask) {

+ 1 - 0
src/main/java/org/springblade/modules/baseinfo/monitorpoint/service/IMonitorPointService.java

@@ -27,4 +27,5 @@ public interface IMonitorPointService extends BaseService<MonitorPointInfoEntity
     IPage<MonitorPointInfoVO> selectPage(IPage<MonitorPointInfoVO> page, MonitorPointInfoEntity dto);
 
     long gtMonitorPointCount(MonitorPointInfoDTO dto);
+
 }

+ 18 - 0
src/main/java/org/springblade/modules/baseinfo/monitorsite/entity/MonitorSiteInfoEntity.java

@@ -143,4 +143,22 @@ public class MonitorSiteInfoEntity extends BaseEntity {
      */
     @ApiModelProperty(value = "用户类型")
     private String useType;
+
+    /**
+     * 证照编号
+     */
+    @ApiModelProperty(value = "证照编号")
+    private String siteCertId;
+
+    /**
+     * 0 国控转计量,1用户安装的计量设施,2 中央专项资金
+     */
+    @ApiModelProperty(value = "站点建设来源类型")
+    private String siteOwner;
+
+    /**
+     * 是否同步到金水平台
+     */
+    @ApiModelProperty(value = "是否同步到金水平台")
+    private Integer dataShareActive;
 }

+ 10 - 0
src/main/java/org/springblade/modules/baseinfo/monitorsite/mapper/MonitorSiteMapper.xml

@@ -23,6 +23,9 @@
         <result column="use_type" property="useType"/>
         <result column="ts" property="ts"/>
         <result column="nt" property="nt"/>
+        <result column="site_cert_id" property="siteCertId"/>
+        <result column="site_owner" property="siteOwner"/>
+        <result column="data_share_active" property="dataShareActive"/>
 
     </resultMap>
 
@@ -46,10 +49,14 @@
         <result column="use_type" property="useType"/>
         <result column="ts" property="ts"/>
         <result column="nt" property="nt"/>
+        <result column="site_cert_id" property="siteCertId"/>
+        <result column="site_owner" property="siteOwner"/>
+        <result column="data_share_active" property="dataShareActive"/>
         <result column="wiu_cd" property="wiuCd"/>
         <result column="wiu_nm" property="wiuNm"/>
         <result column="org_id" property="orgId"/>
         <result column="dept_name" property="orgName"/>
+
     </resultMap>
 
     <select id="selectPage" resultMap="monitorSiteInfoVOResultMap">
@@ -90,6 +97,9 @@
         <if test="dto.wiustCd!=null">
             and n.wiust_cd like concat(concat('%', #{dto.wiustCd}), '%')
         </if>
+        <if test="dto.dataShareActive!=null">
+            and n.data_share_active = #{dto.dataShareActive}
+        </if>
         order by n.wiust_cd
     </select>
 

+ 4 - 1
src/main/java/org/springblade/modules/baseinfo/monitorsite/service/IMonitorSiteService.java

@@ -27,6 +27,9 @@ import java.util.List;
  */
 public interface IMonitorSiteService extends BaseService<MonitorSiteInfoEntity> {
     IPage<MonitorSiteInfoVO> selectPage(IPage<MonitorSiteInfoVO> page, MonitorSiteInfoEntity dto);
+
     List<MonitorSiteInfoEntity> selectAll(MonitorSiteInfoEntity dto);
-    Long  getMonitorSiteCount(MonitorSiteInfoDTO dto);
+
+    Long getMonitorSiteCount(MonitorSiteInfoDTO dto);
+
 }

+ 1 - 1
src/main/java/org/springblade/modules/business/monitor/mapper/WaterFlowRateMapper.xml

@@ -63,7 +63,7 @@
             and i.mp_nm LIKE concat(concat('%', #{dto.mpnm}), '%')
         </if>
         <if test="dto.mpcd!=null">
-            and n.mp_cd like concat(concat('%', #{dto.mpcd}), '%')
+            and i.mp_cd like concat(concat('%', #{dto.mpcd}), '%')
         </if>
         <if test="dto.orgId!=null">
             and (r.org_id =#{dto.orgId} or  FIND_IN_SET(#{dto.orgId} ,d.ancestors) >0 )

+ 3 - 0
src/main/java/org/springblade/modules/business/monitor/service/IWaterFlowRateHistoryService.java

@@ -56,4 +56,7 @@ public interface IWaterFlowRateHistoryService extends BaseService<WaterFlowRateH
      * @return
      */
     List<ConsumerWaterFlowCountInfoVO> consumerWaterUseCount(WaterFlowRateHistoryInfoDTO dto);
+
+
+    double getDayAccw(String mpcd);
 }

+ 59 - 0
src/main/java/org/springblade/modules/business/monitor/service/impl/WaterFlowRateHistoryServiceImpl.java

@@ -6,8 +6,12 @@
  */
 package org.springblade.modules.business.monitor.service.impl;
 
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.metadata.IPage;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import org.springblade.core.mp.base.BaseServiceImpl;
+import org.springblade.core.tool.utils.Func;
+import org.springblade.modules.baseinfo.monitorpoint.entity.MonitorPointRelEntity;
 import org.springblade.modules.business.monitor.dto.WaterFlowRateHistoryInfoDTO;
 import org.springblade.modules.business.monitor.entity.WaterFlowRateHistoryInfoEntity;
 import org.springblade.modules.business.monitor.mapper.WaterFlowRateHistoryMapper;
@@ -17,6 +21,8 @@ import org.springblade.modules.business.monitor.vo.OrgWaterFlowCountInfoVO;
 import org.springblade.modules.business.monitor.vo.WaterFlowRateHistoryInfoVO;
 import org.springframework.stereotype.Service;
 
+import java.util.Calendar;
+import java.util.Date;
 import java.util.List;
 
 
@@ -51,4 +57,57 @@ public class WaterFlowRateHistoryServiceImpl extends BaseServiceImpl<WaterFlowRa
         return baseMapper.consumerWaterUseCount(dto);
     }
 
+    @Override
+    public double getDayAccw(String mpcd) {
+
+        Date tm = new Date();
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(tm);
+        calendar.set(Calendar.HOUR_OF_DAY,0);
+        calendar.set(Calendar.MINUTE,0);
+        calendar.set(Calendar.SECOND,0);
+        calendar.set(Calendar.MILLISECOND,0);
+        Date et =  calendar.getTime();
+
+        calendar.add(Calendar.DAY_OF_MONTH,-1);
+        Date st = calendar.getTime();
+
+        LambdaQueryWrapper<WaterFlowRateHistoryInfoEntity> wrapper = Wrappers.<WaterFlowRateHistoryInfoEntity>query().lambda();
+        wrapper.ge(WaterFlowRateHistoryInfoEntity::getTm,st);
+       // wrapper.lt(WaterFlowRateHistoryInfoEntity::getTm,et);
+        wrapper.orderByAsc(WaterFlowRateHistoryInfoEntity::getTm);
+        wrapper.last("limit 1");
+
+        WaterFlowRateHistoryInfoEntity first = this.getOne(wrapper);
+        if (Func.notNull(first)){
+          //  calendar.setTime(tm);
+          //  calendar.set(Calendar.HOUR_OF_DAY,0);
+          //  calendar.set(Calendar.MINUTE,0);
+         //   calendar.set(Calendar.SECOND,0);
+         //   calendar.set(Calendar.MILLISECOND,0);
+          //   et =  calendar.getTime();
+
+         //   calendar.add(Calendar.DAY_OF_MONTH,-1);
+           //  st = calendar.getTime();
+            wrapper = Wrappers.<WaterFlowRateHistoryInfoEntity>query().lambda();
+            wrapper.ge(WaterFlowRateHistoryInfoEntity::getTm,et);
+         //   wrapper.lt(WaterFlowRateHistoryInfoEntity::getTm,et);
+            wrapper.orderByAsc(WaterFlowRateHistoryInfoEntity::getTm);
+            wrapper.last("limit 1");
+
+            WaterFlowRateHistoryInfoEntity last = this.getOne(wrapper);
+            if (Func.notNull(last)){
+                if (Func.notNull(first.getAccw()) && Func.notNull(last.getAccw())){
+                    return (double) last.getAccw().longValue()-(double) first.getAccw().longValue();
+                }
+                //else{
+                   // return 0;
+            //    }
+            }
+        }
+
+
+        return 0;
+    }
+
 }

+ 82 - 0
src/main/java/org/springblade/modules/share/entity/DataShareLogEntity.java

@@ -0,0 +1,82 @@
+/**
+ * Copyright 2019 DH
+ * All right reserved.
+ * 项目名称: 大恒泰山系统
+ * 创建日期:2023/11/17
+ */
+package org.springblade.modules.share.entity;
+
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.springblade.core.mp.base.BaseEntity;
+import org.springframework.format.annotation.DateTimeFormat;
+
+import java.util.Date;
+
+/***
+ * Date:2023/11/17
+ * Title:文件所属模块(必须填写)
+ * Description:对本文件的详细描述,原则上不能少于30字
+ * @author dylan
+ * @version 1.0
+ * Remark:认为有必要的其他信息
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@TableName("data_share_log")
+public class DataShareLogEntity extends BaseEntity {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 租户ID
+     */
+    @ApiModelProperty("租户ID")
+    private String tenantId;
+
+    /**
+     * 测站编码
+     */
+    @ApiModelProperty("测站编码")
+    private String rtuCode;
+
+    /**
+     * 请求说明
+     */
+    @ApiModelProperty("请求说明")
+    private String requestDesc;
+
+    /**
+     * 请求服务URL
+     */
+    @ApiModelProperty("URL")
+    private String postUrl;
+
+    /**
+     * 请求回应
+     */
+    @ApiModelProperty("请求回应")
+    private String requestResult;
+
+    /**
+     * 时间
+     */
+    @ApiModelProperty("时间")
+    @DateTimeFormat(
+            pattern = "yyyy-MM-dd HH:mm:ss"
+    )
+    @JsonFormat(
+            pattern = "yyyy-MM-dd HH:mm:ss",
+            timezone = "GMT+8"
+    )
+    private Date countTime;
+
+    /**
+     * 任务运行时间,毫秒
+     */
+    @ApiModelProperty("任务运行时间")
+    private Integer taskTimeLong;
+
+}

+ 21 - 0
src/main/java/org/springblade/modules/share/mapper/DataShareLogMapper.java

@@ -0,0 +1,21 @@
+/**
+ * Copyright 2019 DH
+ * All right reserved.
+ * 项目名称: 大恒泰山系统
+ * 创建日期:2022/8/24
+ */
+package org.springblade.modules.share.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.springblade.modules.share.entity.DataShareLogEntity;
+
+/***
+ * Date:2022/8/24
+ * Title:文件所属模块(必须填写)
+ * Description:对本文件的详细描述,原则上不能少于30字
+ * @author dylan
+ * @version 1.0
+ * Remark:认为有必要的其他信息
+ */
+public interface DataShareLogMapper extends BaseMapper<DataShareLogEntity> {
+}

+ 15 - 0
src/main/java/org/springblade/modules/share/mapper/DataShareLogMapper.xml

@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.springblade.modules.share.mapper.DataShareLogMapper">
+
+    <!-- 通用查询映射结果 -->
+    <resultMap id="rtuWarningInfoEntityResultMap" type="org.springblade.modules.share.entity.DataShareLogEntity">
+        <result column="rtu_code" property="rtuCode"/>
+        <result column="request_desc" property="requestDesc"/>
+        <result column="post_url" property="postUrl"/>
+        <result column="request_result" property="requestResult"/>
+        <result column="count_time" property="countTime"/>
+        <result column="task_time_long" property="taskTimeLong"/>
+    </resultMap>
+
+</mapper>

+ 21 - 0
src/main/java/org/springblade/modules/share/service/IDataShareLogServcie.java

@@ -0,0 +1,21 @@
+/**
+ * Copyright 2019 DH
+ * All right reserved.
+ * 项目名称: 大恒泰山系统
+ * 创建日期:2023/11/17
+ */
+package org.springblade.modules.share.service;
+
+import org.springblade.core.mp.base.BaseService;
+import org.springblade.modules.share.entity.DataShareLogEntity;
+
+/***
+ * Date:2023/11/17
+ * Title:文件所属模块(必须填写)
+ * Description:对本文件的详细描述,原则上不能少于30字
+ * @author dylan
+ * @version 1.0
+ * Remark:认为有必要的其他信息
+ */
+public interface IDataShareLogServcie extends BaseService<DataShareLogEntity> {
+}

+ 25 - 0
src/main/java/org/springblade/modules/share/service/impl/DataShareLogServcieImpl.java

@@ -0,0 +1,25 @@
+/**
+ * Copyright 2019 DH
+ * All right reserved.
+ * 项目名称: 大恒泰山系统
+ * 创建日期:2023/11/17
+ */
+package org.springblade.modules.share.service.impl;
+
+import org.springblade.core.mp.base.BaseServiceImpl;
+import org.springblade.modules.share.entity.DataShareLogEntity;
+import org.springblade.modules.share.mapper.DataShareLogMapper;
+import org.springblade.modules.share.service.IDataShareLogServcie;
+import org.springframework.stereotype.Service;
+
+/***
+ * Date:2023/11/17
+ * Title:文件所属模块(必须填写)
+ * Description:对本文件的详细描述,原则上不能少于30字
+ * @author dylan
+ * @version 1.0
+ * Remark:认为有必要的其他信息
+ */
+@Service
+public class DataShareLogServcieImpl extends BaseServiceImpl<DataShareLogMapper, DataShareLogEntity> implements IDataShareLogServcie {
+}

+ 96 - 1
src/main/java/org/springblade/mq/kafka/handler/MessageHandler.java

@@ -76,7 +76,7 @@ public class MessageHandler {
 
     /**
      * 报文
-     *
+     * 流量数据
      * @param record
      * @param acknowledgment
      */
@@ -154,6 +154,101 @@ public class MessageHandler {
                 statusEntity.setIsRiver(1);
                 statusEntity.setFromDate(entity.getTs());
                 kafkaTemplate.send(topicYwxtRtuStatus, JsonUtil.toJson(statusEntity));
+
+            } else {
+                log.info("monitor site dont find {}", rtuCode);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        } finally {
+            // 手动提交 offset
+            acknowledgment.acknowledge();
+        }
+    }
+
+    /**
+     * 报文
+     * 水位数据
+     * @param record
+     * @param acknowledgment
+     */
+    @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${spring.mq-topic.szy-water-level}", containerFactory = "ackContainerFactory")
+    public void waterLevelMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
+        try {
+            String message = (String) record.value();
+            log.info("{} ,{}", "tddcs water level info", message);
+            JSONObject body = JSONObject.parseObject(message);
+
+            WaterFlowRateHistoryInfoEntity historyInfoEntity = new WaterFlowRateHistoryInfoEntity();
+            String rtuCode = body.getString(SzyFlowRateMetadataEnum.METADATA_RTU_CODE.getName());
+            //水资源测站编码-高位在前
+            String rtutail = rtuCode.substring(rtuCode.length() - 4, rtuCode.length());
+            byte[] rtubuff = BytesHelp.hexStr2Bytes(rtutail);
+            int rtuvalue = BytesHelp.bytes2short(rtubuff);
+            if (rtuvalue >= 0 && rtuvalue <= 9999) {
+                rtutail = String.format("%04d", rtuvalue);
+                rtuCode = rtuCode.substring(0, 6) + rtutail;
+                historyInfoEntity.setMpcd(rtuCode + "001");
+            } else {
+                historyInfoEntity.setMpcd(rtuCode + "001");
+            }
+            LambdaQueryWrapper<MonitorSiteInfoEntity> siteInfoWrapper = Wrappers.<MonitorSiteInfoEntity>query().lambda();
+            siteInfoWrapper.eq(MonitorSiteInfoEntity::getIsDeleted, 0);
+            siteInfoWrapper.eq(MonitorSiteInfoEntity::getWiustCd, rtuCode);
+            long siteCount = monitorSiteService.count(siteInfoWrapper);
+            if (siteCount > 0) {
+                double waterLevel = body.getDouble(SzyWaterLevelMetadataEnum.METADATA_CODE_LEVEL.getName());
+                historyInfoEntity.setHourw(0L);
+                historyInfoEntity.setMpq(waterLevel);
+                historyInfoEntity.setAccw(0L);
+                historyInfoEntity.setTs(Func.parse(body.getString(SzyFlowRateMetadataEnum.METADATA_UP_TIME.getName()), ConcurrentDateFormat.of("yyyyMMddHHmmss")));
+                historyInfoEntity.setTm(Func.parse(body.getString(SzyFlowRateMetadataEnum.METADATA_COLLECT_TIME.getName()), ConcurrentDateFormat.of("yyyyMMddHHmmss")));
+                waterFlowRateHistoryService.save(historyInfoEntity);
+                LambdaQueryWrapper<WaterFlowRateInfoEntity> wrapper = Wrappers.<WaterFlowRateInfoEntity>query().lambda();
+                wrapper.eq(WaterFlowRateInfoEntity::getMpcd, historyInfoEntity.getMpcd());
+                WaterFlowRateInfoEntity entity = waterFlowRateService.getOne(wrapper);
+                if (null == entity) {
+                    entity = new WaterFlowRateInfoEntity();
+                    entity.setMpcd(historyInfoEntity.getMpcd());
+                }
+                entity.setMpq(historyInfoEntity.getMpq());
+                entity.setAccw(historyInfoEntity.getAccw());
+                entity.setHourw(historyInfoEntity.getHourw());
+                entity.setTs(historyInfoEntity.getTs());
+                entity.setTm(historyInfoEntity.getTm());
+                waterFlowRateService.saveOrUpdate(entity);
+
+
+                //更新测站最新上报时间
+                HashOperations<String, String, String> hashops = redisTemplate.opsForHash();
+                String key = RedisBusinessConstant.KEY_RTU_RUN_INFO + rtuCode;
+                String lastDatetimeText = hashops.get(key, RedisBusinessConstant.KEY_RTU_RUN_INFO_LAST_TIME);
+                if (null != lastDatetimeText && lastDatetimeText.length() > 0) {
+                    LocalDateTime lastDateTime = LocalDateTime.parse(lastDatetimeText, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
+                    LocalDateTime tm = LocalDateTime.ofInstant(entity.getTm().toInstant(), ZoneId.systemDefault());
+                    if (tm.isAfter(lastDateTime)) {
+                        lastDatetimeText = Func.formatDateTime(entity.getTm());
+                        hashops.put(key, RedisBusinessConstant.KEY_RTU_RUN_INFO_LAST_TIME, lastDatetimeText);
+                    }
+                } else {
+                    lastDatetimeText = Func.formatDateTime(entity.getTm());
+                    hashops.put(key, RedisBusinessConstant.KEY_RTU_RUN_INFO_LAST_TIME, lastDatetimeText);
+                }
+
+                //测站状态检测
+                RtuStatusDTO statusEntity = new RtuStatusDTO();
+                if (null != entity.getTm()) {
+                    statusEntity.setLastUpTime(entity.getTm());
+                } else if (null != entity.getTs()) {
+                    statusEntity.setLastUpTime(entity.getTs());
+                } else {
+                    statusEntity.setLastUpTime(new Date());
+                }
+                statusEntity.setRtuCode(rtuCode);
+                statusEntity.setIsRiver(1);
+                statusEntity.setFromDate(entity.getTs());
+                kafkaTemplate.send(topicYwxtRtuStatus, JsonUtil.toJson(statusEntity));
+
             } else {
                 log.info("monitor site dont find {}", rtuCode);
             }

+ 176 - 0
src/main/java/org/springblade/utils/HttpRequestUtil.java

@@ -0,0 +1,176 @@
+/**
+ * Copyright
+ * All right reserved.
+ * 项目名称:
+ * 创建日期: 2022/11/9
+ */
+package org.springblade.utils;
+
+import cn.hutool.core.util.CharsetUtil;
+import cn.hutool.http.ContentType;
+import cn.hutool.http.HttpRequest;
+import cn.hutool.http.HttpResponse;
+import cn.hutool.http.HttpStatus;
+import com.alibaba.fastjson.JSONObject;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springblade.core.log.exception.ServiceException;
+
+import java.util.Map;
+
+/**
+ * 创建日期: 2022/11/9
+ * Title: 文件所属模块(必须填写)
+ * Description:对本文件的详细描述,原则上不能少于30字
+ * @author DSH
+ * @mender:(文件的修改者,文件创建者之外的人)
+ * @version 1.0
+ * Remark:认为有必要的其他信息
+ */
+@Slf4j
+public class HttpRequestUtil {
+
+	/**
+	 * 功能: GET请求
+	 * 作者: DSH
+	 * 创建日期: 2022/11/09
+	 * @param interfaceName 接口名称
+	 * @param url 接口地址
+	 * @param headers 请求头
+	 * @param paramMap 接口入参
+	 * @param nTimeOut 超时时间,单位:毫秒
+	 * @return: com.alibaba.fastjson.JSONObject
+	 */
+	public static JSONObject doHttpGetRequest(String interfaceName, String url, Map<String, String> headers, Map<String, Object> paramMap, int nTimeOut) {
+		log.info("【{}】接口地址:{},参数:{}", interfaceName, url, paramMap);
+		HttpResponse response = HttpRequest.get(url)
+			.form(paramMap)
+			.addHeaders(headers)
+			.timeout(nTimeOut)
+			.execute();
+		if (response.getStatus() == HttpStatus.HTTP_OK) {
+			String body = response.body();
+			log.info("【{}】接口响应:{}", interfaceName, body);
+			if (StringUtils.isNotBlank(body)) {
+				JSONObject result = JSONObject.parseObject(body);
+				if (result.getIntValue("code") != HttpStatus.HTTP_OK) {
+					throw new ServiceException(result.getString("msg"));
+				}
+				return result;
+			}
+		} else {
+			log.error("【{}】请求异常,Http状态码:{}", interfaceName, response.getStatus());
+			throw new ServiceException("请求异常,Http状态码:"+ response.getStatus());
+		}
+		return null;
+	}
+
+	/**
+	 * 功能: POST请求
+	 * 作者: DSH
+	 * 创建日期: 2022/11/09
+	 * @param interfaceName 接口名称
+	 * @param url 接口地址
+	 * @param headers 请求头
+	 * @param paramMap 接口入参
+	 * @param nTimeOut 超时时间,单位:毫秒
+	 * @return: com.alibaba.fastjson.JSONObject
+	 */
+	public static JSONObject doHttpPostRequest(String interfaceName, String url, Map<String, String> headers, Map<String, Object> paramMap, int nTimeOut) {
+		log.info("【{}】接口地址:{},参数:{}", interfaceName, url, paramMap);
+		HttpResponse response = HttpRequest.post(url)
+			//.contentType("application/json;charset=utf-8")
+			.contentType(ContentType.JSON.toString(CharsetUtil.CHARSET_UTF_8))
+			.body(paramMap == null ? null : JSONObject.toJSONString(paramMap))
+			.addHeaders(headers)
+			.timeout(nTimeOut)
+			.execute();
+		if (response.getStatus() == HttpStatus.HTTP_OK) {
+			String body = response.body();
+			log.info("【{}】接口响应:{}", interfaceName, body);
+			if (StringUtils.isNotBlank(body)) {
+				JSONObject result = JSONObject.parseObject(body);
+				if (result.getIntValue("code") != HttpStatus.HTTP_OK) {
+					throw new ServiceException(result.getString("msg"));
+				}
+				return result;
+			}
+		} else {
+			log.error("【{}】请求异常,Http状态码:{}", interfaceName, response.getStatus());
+			throw new ServiceException("请求异常,Http状态码:"+ response.getStatus());
+		}
+		return null;
+	}
+
+	/**
+	 *  POST请求 ,按JSON字符串传入BODY
+	 * @param interfaceName
+	 * @param url
+	 * @param headers
+	 * @param paramMap
+	 * @param nTimeOut
+	 * @return
+	 */
+	public static JSONObject doHttpPostRequest(String interfaceName, String url, Map<String, String> headers, String paramMap, int nTimeOut) {
+		log.info("【{}】接口地址:{},参数:{}", interfaceName, url, paramMap);
+		HttpResponse response = HttpRequest.post(url)
+//			.contentType("application/json;charset=utf-8")
+			.contentType(ContentType.JSON.toString(CharsetUtil.CHARSET_UTF_8))
+			.body(paramMap)
+			.addHeaders(headers)
+			.timeout(nTimeOut)
+			.execute();
+		if (response.getStatus() == HttpStatus.HTTP_OK) {
+			String body = response.body();
+			log.info("【{}】接口响应:{}", interfaceName, body);
+			if (StringUtils.isNotBlank(body)) {
+				JSONObject result = JSONObject.parseObject(body);
+//				if (result.getIntValue("code") != HttpStatus.HTTP_OK) {
+//					throw new ServiceException(result.getString("msg"));
+//				}
+				return result;
+			}
+		} else {
+			log.error("【{}】请求异常,Http状态码:{}", interfaceName, response.getStatus());
+			throw new ServiceException("请求异常,Http状态码:"+ response.getStatus());
+		}
+		return null;
+	}
+
+
+	/**
+	 * 功能: POST请求
+	 * 作者: DSH
+	 * 创建日期: 2022/11/09
+	 * @param interfaceName 接口名称
+	 * @param url 接口地址
+	 * @param headers 请求头
+	 * @param paramMap 接口入参
+	 * @param nTimeOut 超时时间,单位:毫秒
+	 * @return: com.alibaba.fastjson.JSONObject
+	 */
+	public static JSONObject doHttpPostSubmitForm(String interfaceName, String url, Map<String, String> headers, Map<String, Object> paramMap, int nTimeOut) {
+		log.info("【{}】接口地址:{},参数:{}", interfaceName, url, paramMap);
+		HttpResponse response = HttpRequest.post(url)
+			.form(paramMap)
+			.addHeaders(headers)
+			.timeout(nTimeOut)
+			.execute();
+		if (response.getStatus() == HttpStatus.HTTP_OK) {
+			String body = response.body();
+			log.info("【{}】接口响应:{}", interfaceName, body);
+			if (StringUtils.isNotBlank(body)) {
+				JSONObject result = JSONObject.parseObject(body);
+				if (result.getIntValue("code") != HttpStatus.HTTP_OK) {
+					throw new ServiceException(result.getString("msg"));
+				}
+				return result;
+			}
+		} else {
+			log.error("【{}】请求异常,Http状态码:{}", interfaceName, response.getStatus());
+			throw new ServiceException("请求异常,Http状态码:"+ response.getStatus());
+		}
+		return null;
+	}
+
+}

+ 3 - 0
src/main/resources/application-dev.yml

@@ -101,6 +101,7 @@ spring:
   mq-topic:
     ywxt-datagram-message: topic-ywxt-datagram-message
     szy-water-flow-rate: topic-szy-water-flow-rate
+    szy-water-level: topic-szy-water-level
     szy-rtu-status: topic-szy-rtu-status
     szy-rtu-warn: topic-szy-rtu-warn
     ywxt-warning: topic-ywxt-warning-message
@@ -116,6 +117,8 @@ spring:
     surface-rain-task: false
     report-task: false
     rtu-datagram-up-task: false
+    data-share-server: https://waterintake.goldenwater.com.cn
+    #data-share-server: https://qysgl.mwr.cn
   warn-config:
     delay-duration: 120
     delay-min-duration: 10