فهرست منبع

数据接收简单版本开发

dylan 2 سال پیش
والد
کامیت
58c3e15994
16فایلهای تغییر یافته به همراه95 افزوده شده و 79 حذف شده
  1. 1 2
      tsddcs-datacollect-simple/pom.xml
  2. 4 3
      tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/DataItemTemplateCache.java
  3. 2 0
      tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/DataStream.java
  4. 1 1
      tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/DataStreamManageHolder.java
  5. 11 9
      tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/DataStreamManager.java
  6. 3 3
      tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/TcpDataStreamAdapter.java
  7. 14 14
      tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/processor/sw/DataStreamSwHourDatagramProcessor.java
  8. 7 8
      tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/processor/sw/DataStreamSwImageDatagramProcessor.java
  9. 7 8
      tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/processor/sw/DataStreamSwLinkHeartProcessor.java
  10. 1 2
      tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/sink/DataStreamKafkaSink.java
  11. 1 2
      tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/sink/DataStreamSwJpzKafkaSink.java
  12. 21 2
      tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/main/MainProcessor.java
  13. 6 22
      tsddcs-datacollect-simple/src/main/resources/application-dev.yml
  14. 1 1
      tsddcs-datacollect-simple/src/main/resources/application.yml
  15. 8 2
      tsddcs-datacollect-simple/src/main/resources/templates/sw2014.xml
  16. 7 0
      tsddcs-datacollect-simple/src/main/resources/templates/szy2012.xml

+ 1 - 2
tsddcs-datacollect-simple/pom.xml

@@ -10,7 +10,7 @@
     <modelVersion>4.0.0</modelVersion>
     <modelVersion>4.0.0</modelVersion>
 
 
     <groupId>org.ts.ddcs</groupId>
     <groupId>org.ts.ddcs</groupId>
-    <artifactId>datacollect-simple</artifactId>
+    <artifactId>tsddcs-datacollect-simple</artifactId>
     <name>${project.artifactId}</name>
     <name>${project.artifactId}</name>
     <version>1.0.0.RELEASE</version>
     <version>1.0.0.RELEASE</version>
     <packaging>jar</packaging>
     <packaging>jar</packaging>
@@ -70,5 +70,4 @@
         <!--<artifactId>spring-boot-starter-data-redis</artifactId>-->
         <!--<artifactId>spring-boot-starter-data-redis</artifactId>-->
         <!--</dependency>-->
         <!--</dependency>-->
     </dependencies>
     </dependencies>
-
 </project>
 </project>

+ 4 - 3
tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/DataItemTemplateCache.java

@@ -46,7 +46,7 @@ public class DataItemTemplateCache {
         this.readSize = this.readSize + readSize;
         this.readSize = this.readSize + readSize;
     }
     }
 
 
-    public void resetStore(){
+    public void resetStore() {
         this.keyStore.clear();
         this.keyStore.clear();
     }
     }
 
 
@@ -81,7 +81,8 @@ public class DataItemTemplateCache {
     public Object getValue(String key) {
     public Object getValue(String key) {
         return this.keyStore.get(key);
         return this.keyStore.get(key);
     }
     }
-    public boolean hasValue(String key){
-        return  this.keyStore.containsKey(key);
+
+    public boolean hasValue(String key) {
+        return this.keyStore.containsKey(key);
     }
     }
 }
 }

+ 2 - 0
tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/DataStream.java

@@ -1,10 +1,12 @@
 package org.ts.ddcs.dataStream;
 package org.ts.ddcs.dataStream;
+
 import lombok.Data;
 import lombok.Data;
 import org.ts.ddcs.datatemplate.TestState;
 import org.ts.ddcs.datatemplate.TestState;
 
 
 
 
 /**
 /**
  * 数据流
  * 数据流
+ *
  * @author dylan
  * @author dylan
  */
  */
 @Data
 @Data

+ 1 - 1
tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/DataStreamManageHolder.java

@@ -1,4 +1,4 @@
-package  org.ts.ddcs.dataStream;
+package org.ts.ddcs.dataStream;
 
 
 
 
 /**
 /**

+ 11 - 9
tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/DataStreamManager.java

@@ -24,7 +24,7 @@ import java.util.concurrent.*;
 @Slf4j
 @Slf4j
 public class DataStreamManager {
 public class DataStreamManager {
 
 
-    private  RocksDB rocksDB;
+    private RocksDB rocksDB;
     /**
     /**
      * 数据流工作线程数量
      * 数据流工作线程数量
      */
      */
@@ -116,7 +116,7 @@ public class DataStreamManager {
     /**
     /**
      * 数据流管理器初始化
      * 数据流管理器初始化
      */
      */
-    public void init(String templates,RocksDB rocksDB) throws IllegalAccessException, ClassNotFoundException, InstantiationException {
+    public void init(String templates, RocksDB rocksDB) throws IllegalAccessException, ClassNotFoundException, InstantiationException {
         try {
         try {
             this.rocksDB = rocksDB;
             this.rocksDB = rocksDB;
             //初始化数据流处理器
             //初始化数据流处理器
@@ -152,9 +152,10 @@ public class DataStreamManager {
                                 }
                                 }
                             }
                             }
                             idel = true;
                             idel = true;
-                        } else {
-                            log.info("数据流处理器已被锁住 {}", i);
                         }
                         }
+//                        else {
+//                            log.info("数据流处理器已被锁住 {}", i);
+//                        }
                     }
                     }
                     if (!idel) {
                     if (!idel) {
                         log.info("任务处理器性能故障");
                         log.info("任务处理器性能故障");
@@ -188,9 +189,10 @@ public class DataStreamManager {
                                             batchThreadPool.execute(futureTask);
                                             batchThreadPool.execute(futureTask);
                                             idel = true;
                                             idel = true;
                                             break;
                                             break;
-                                        } else {
-                                            log.info("数据流处理器已被锁住 {}", i);
                                         }
                                         }
+//                                        else {
+//                                            log.info("数据流处理器已被锁住 {}", i);
+//                                        }
                                     }
                                     }
                                     if (!idel) {
                                     if (!idel) {
                                         log.info("任务处理器性能故障");
                                         log.info("任务处理器性能故障");
@@ -285,12 +287,12 @@ public class DataStreamManager {
         }
         }
     }
     }
 
 
-    public DataStreamAdapter getAdapter(String rtuCode){
+    public DataStreamAdapter getAdapter(String rtuCode) {
         Set<String> keySet = dataStreamAdapterCache.keySet();
         Set<String> keySet = dataStreamAdapterCache.keySet();
         for (String key : keySet) {
         for (String key : keySet) {
             DataStreamAdapter dataStreamAdapter = dataStreamAdapterCache.get(key);
             DataStreamAdapter dataStreamAdapter = dataStreamAdapterCache.get(key);
-            String stcd =dataStreamAdapter.getRtuCode();
-            if (null != stcd && stcd.equals(rtuCode)){
+            String stcd = dataStreamAdapter.getRtuCode();
+            if (null != stcd && stcd.equals(rtuCode)) {
                 return dataStreamAdapter;
                 return dataStreamAdapter;
             }
             }
         }
         }

+ 3 - 3
tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/TcpDataStreamAdapter.java

@@ -37,17 +37,17 @@ public class TcpDataStreamAdapter extends DataStreamAdapter {
                             log.info("{} tcp writeAndFlush is successful", adapterKey);
                             log.info("{} tcp writeAndFlush is successful", adapterKey);
                         } else {
                         } else {
                             //记录错误
                             //记录错误
-                            log.error("{} tcp writeAndFlush is error {}",adapterKey, channelFuture.cause().getMessage());
+                            log.error("{} tcp writeAndFlush is error {}", adapterKey, channelFuture.cause().getMessage());
                         }
                         }
                     }
                     }
                 });
                 });
             } catch (Exception e) {
             } catch (Exception e) {
                 log.error(this.getClass().getName(), e);
                 log.error(this.getClass().getName(), e);
             } finally {
             } finally {
-                log.info("{} tcp writeAndFlush to rtuCode {}",adapterKey ,rtuCode);
+                log.info("{} tcp writeAndFlush to rtuCode {}", adapterKey, rtuCode);
             }
             }
         } else {
         } else {
-            log.info("{} tcp net channel close",adapterKey );
+            log.info("{} tcp net channel close", adapterKey);
         }
         }
     }
     }
 
 

+ 14 - 14
tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/processor/sw/DataStreamSwHourDatagramProcessor.java

@@ -36,7 +36,7 @@ import java.util.*;
 public class DataStreamSwHourDatagramProcessor extends DataStreamProcessor {
 public class DataStreamSwHourDatagramProcessor extends DataStreamProcessor {
 
 
     private DataStreamCache dataStreamCache;
     private DataStreamCache dataStreamCache;
-    private Map<String, List<Map<String,Object>>> keyStore = new HashMap<>();
+    private Map<String, List<Map<String, Object>>> keyStore = new HashMap<>();
 
 
     @Override
     @Override
     public void init(DataStreamLineManager context) {
     public void init(DataStreamLineManager context) {
@@ -70,7 +70,7 @@ public class DataStreamSwHourDatagramProcessor extends DataStreamProcessor {
             // dataStreamCache.putValue(DatagramConstant.sendPacketTime, sendPacketTime);
             // dataStreamCache.putValue(DatagramConstant.sendPacketTime, sendPacketTime);
             //遥测站地址
             //遥测站地址
             DataItemTemplate itemTemplateSt = itemTemplateMap.get(SwElementCodeEnum.CODE_ST.getName());
             DataItemTemplate itemTemplateSt = itemTemplateMap.get(SwElementCodeEnum.CODE_ST.getName());
-         //   itemTemplateSt.clear();
+            //   itemTemplateSt.clear();
             int readSize = itemTemplateSt.stream(dataArea, count);
             int readSize = itemTemplateSt.stream(dataArea, count);
             if (itemTemplateSt.isMacth()) {
             if (itemTemplateSt.isMacth()) {
                 itemTemplateSt.saveValue(this.keyStore);
                 itemTemplateSt.saveValue(this.keyStore);
@@ -82,7 +82,7 @@ public class DataStreamSwHourDatagramProcessor extends DataStreamProcessor {
             dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_SITE_KIND.getName(), BytesHelp.byte2HexStr(siteKind));
             dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_SITE_KIND.getName(), BytesHelp.byte2HexStr(siteKind));
             //采集时间
             //采集时间
             DataItemTemplate itemTemplateTt = itemTemplateMap.get(SwElementCodeEnum.CODE_TT.getName());
             DataItemTemplate itemTemplateTt = itemTemplateMap.get(SwElementCodeEnum.CODE_TT.getName());
-          // itemTemplateSt.clear();
+            // itemTemplateSt.clear();
             readSize = itemTemplateTt.stream(dataArea, count);
             readSize = itemTemplateTt.stream(dataArea, count);
             if (itemTemplateTt.isMacth()) {
             if (itemTemplateTt.isMacth()) {
                 itemTemplateTt.saveValue(this.keyStore);
                 itemTemplateTt.saveValue(this.keyStore);
@@ -104,7 +104,7 @@ public class DataStreamSwHourDatagramProcessor extends DataStreamProcessor {
                     }
                     }
                     if (itemTemplateMap.containsKey(func)) {
                     if (itemTemplateMap.containsKey(func)) {
                         DataItemTemplate itemTemplate = itemTemplateMap.get(func);
                         DataItemTemplate itemTemplate = itemTemplateMap.get(func);
-                       // itemTemplateSt.clear();
+                        // itemTemplateSt.clear();
                         readSize = itemTemplate.stream(content, index);
                         readSize = itemTemplate.stream(content, index);
                         index += readSize;
                         index += readSize;
                         if (itemTemplate.isMacth()) {
                         if (itemTemplate.isMacth()) {
@@ -136,10 +136,10 @@ public class DataStreamSwHourDatagramProcessor extends DataStreamProcessor {
     public void batchComplete(DataStreamAdapter dataStreamAdapter) {
     public void batchComplete(DataStreamAdapter dataStreamAdapter) {
         Set<String> keys = this.keyStore.keySet();
         Set<String> keys = this.keyStore.keySet();
         for (String key : keys) {
         for (String key : keys) {
-            List<Map<String,Object>> list = this.keyStore.get(key);
-            if (null != list){
-                for (Map<String,Object> jsonObject:list){
-                    log.info("item {} {}",key,JSONObject.toJSONString(jsonObject));
+            List<Map<String, Object>> list = this.keyStore.get(key);
+            if (null != list) {
+                for (Map<String, Object> jsonObject : list) {
+                    log.info("item {} {}", key, JSONObject.toJSONString(jsonObject));
                 }
                 }
             }
             }
         }
         }
@@ -150,10 +150,10 @@ public class DataStreamSwHourDatagramProcessor extends DataStreamProcessor {
         respPacket[0] = 0x7e;
         respPacket[0] = 0x7e;
         respPacket[1] = 0x7e;
         respPacket[1] = 0x7e;
         //遥测站地址
         //遥测站地址
-        byte[] rtuCodeByte =  this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_RTU_CODE.getName());
+        byte[] rtuCodeByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_RTU_CODE.getName());
         respPacket = BytesHelp.arrayApend(respPacket, rtuCodeByte);
         respPacket = BytesHelp.arrayApend(respPacket, rtuCodeByte);
         //中心站地址
         //中心站地址
-        byte[] centerStationByte =  this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_CENTER_STATION_CODE.getName());
+        byte[] centerStationByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_CENTER_STATION_CODE.getName());
         respPacket = BytesHelp.arrayApend(respPacket, centerStationByte);
         respPacket = BytesHelp.arrayApend(respPacket, centerStationByte);
         //密码
         //密码
         byte[] rtuPwByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_SW_PW.getName());
         byte[] rtuPwByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_SW_PW.getName());
@@ -164,7 +164,7 @@ public class DataStreamSwHourDatagramProcessor extends DataStreamProcessor {
         //报文下行标识及长度
         //报文下行标识及长度
         respPacket = BytesHelp.arrayApend(respPacket, BytesHelp.hexStr2Bytes("8008"));
         respPacket = BytesHelp.arrayApend(respPacket, BytesHelp.hexStr2Bytes("8008"));
         //报文起始符
         //报文起始符
-        byte[] dataAreaStartFlagByte =this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_DATA_AREA_START_FLAG.getName());
+        byte[] dataAreaStartFlagByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_DATA_AREA_START_FLAG.getName());
         respPacket = BytesHelp.arrayApend(respPacket, dataAreaStartFlagByte);
         respPacket = BytesHelp.arrayApend(respPacket, dataAreaStartFlagByte);
         //流水号
         //流水号
         byte[] serialNoByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_SERIAL_NO.getName());
         byte[] serialNoByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_SERIAL_NO.getName());
@@ -186,8 +186,8 @@ public class DataStreamSwHourDatagramProcessor extends DataStreamProcessor {
             Set<String> keys = this.keyStore.keySet();
             Set<String> keys = this.keyStore.keySet();
             for (String key : keys) {
             for (String key : keys) {
                 Map<String, Object> data = new HashMap<>();
                 Map<String, Object> data = new HashMap<>();
-              //  data.put(JsonPackMetadataEnum.PACK_METADATA_DATAGRAM.getName(), BytesHelp.byte2HexStr(this.dataStreamCache.getDatagramBuff()));
-               // data.put(JsonPackMetadataEnum.PACK_METADATA_FROM_TIME.getName(), this.dataStreamCache.getPickPacketTime());
+                //  data.put(JsonPackMetadataEnum.PACK_METADATA_DATAGRAM.getName(), BytesHelp.byte2HexStr(this.dataStreamCache.getDatagramBuff()));
+                // data.put(JsonPackMetadataEnum.PACK_METADATA_FROM_TIME.getName(), this.dataStreamCache.getPickPacketTime());
                 // data.put(SinkJsonPackMetadataEnum.PACK_METADATA_FROM_TIME.getName(), this.dataStreamCache.getValue(DatagramConstant.pickPacketTime));
                 // data.put(SinkJsonPackMetadataEnum.PACK_METADATA_FROM_TIME.getName(), this.dataStreamCache.getValue(DatagramConstant.pickPacketTime));
 //                data.put(JsonPackMetadataEnum.PACK_METADATA_RTU_CODE.getName(), this.dataStreamCache.getRtuCode());
 //                data.put(JsonPackMetadataEnum.PACK_METADATA_RTU_CODE.getName(), this.dataStreamCache.getRtuCode());
 //
 //
@@ -198,7 +198,7 @@ public class DataStreamSwHourDatagramProcessor extends DataStreamProcessor {
 //                    data.put(JsonPackMetadataEnum.PACK_METADATA_COLLECT_TIME.getName(), DatagramHelp.getCollectTime(this.keyStore));
 //                    data.put(JsonPackMetadataEnum.PACK_METADATA_COLLECT_TIME.getName(), DatagramHelp.getCollectTime(this.keyStore));
 //                }
 //                }
                 //  data.put(SinkJsonPackMetadataEnum.PACK_METADATA_COLLECT_TIME.getName(), this.dataStreamCache.getStringValue(DatagramConstant.collectTime));
                 //  data.put(SinkJsonPackMetadataEnum.PACK_METADATA_COLLECT_TIME.getName(), this.dataStreamCache.getStringValue(DatagramConstant.collectTime));
-              //  data.put(JsonPackMetadataEnum.PACK_METADATA_UP_TIME.getName(), this.dataStreamCache.getStringValue(DatagramConstant.sendPacketTime));
+                //  data.put(JsonPackMetadataEnum.PACK_METADATA_UP_TIME.getName(), this.dataStreamCache.getStringValue(DatagramConstant.sendPacketTime));
                 data.put(DatagramMetadataEnum.CACHE_METADATA_ELEMENT_CODE.getName(), key);
                 data.put(DatagramMetadataEnum.CACHE_METADATA_ELEMENT_CODE.getName(), key);
                 List<Map<String, Object>> elements = this.keyStore.get(key);
                 List<Map<String, Object>> elements = this.keyStore.get(key);
                 for (Map<String, Object> jsonObject : elements) {
                 for (Map<String, Object> jsonObject : elements) {

+ 7 - 8
tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/processor/sw/DataStreamSwImageDatagramProcessor.java

@@ -37,7 +37,7 @@ import java.util.*;
 public class DataStreamSwImageDatagramProcessor extends DataStreamProcessor {
 public class DataStreamSwImageDatagramProcessor extends DataStreamProcessor {
 
 
     private DataStreamCache dataStreamCache;
     private DataStreamCache dataStreamCache;
-    private Map<String, List<Map<String,Object>>> keyStore = new HashMap<>();
+    private Map<String, List<Map<String, Object>>> keyStore = new HashMap<>();
     private int packpagSize;
     private int packpagSize;
     private int packpageCount;
     private int packpageCount;
 
 
@@ -75,7 +75,7 @@ public class DataStreamSwImageDatagramProcessor extends DataStreamProcessor {
                 count += 6;
                 count += 6;
                 //遥测站地址
                 //遥测站地址
                 DataItemTemplate itemTemplateSt = itemTemplateMap.get("F1");
                 DataItemTemplate itemTemplateSt = itemTemplateMap.get("F1");
-               // itemTemplateSt.clear();
+                // itemTemplateSt.clear();
                 int readSize = itemTemplateSt.stream(dataArea, count);
                 int readSize = itemTemplateSt.stream(dataArea, count);
                 if (itemTemplateSt.isMacth()) {
                 if (itemTemplateSt.isMacth()) {
                     itemTemplateSt.saveValue(this.keyStore);
                     itemTemplateSt.saveValue(this.keyStore);
@@ -87,7 +87,7 @@ public class DataStreamSwImageDatagramProcessor extends DataStreamProcessor {
                 dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_SITE_KIND.getName(), BytesHelp.byte2HexStr(siteKind));
                 dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_SITE_KIND.getName(), BytesHelp.byte2HexStr(siteKind));
                 //采集时间
                 //采集时间
                 DataItemTemplate itemTemplateTt = itemTemplateMap.get("F0");
                 DataItemTemplate itemTemplateTt = itemTemplateMap.get("F0");
-               // itemTemplateSt.clear();
+                // itemTemplateSt.clear();
                 readSize = itemTemplateTt.stream(dataArea, count);
                 readSize = itemTemplateTt.stream(dataArea, count);
                 if (itemTemplateSt.isMacth()) {
                 if (itemTemplateSt.isMacth()) {
                     itemTemplateSt.saveValue(this.keyStore);
                     itemTemplateSt.saveValue(this.keyStore);
@@ -140,7 +140,7 @@ public class DataStreamSwImageDatagramProcessor extends DataStreamProcessor {
         byte[] rtuCodeByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_RTU_CODE.getName());
         byte[] rtuCodeByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_RTU_CODE.getName());
         respPacket = BytesHelp.arrayApend(respPacket, rtuCodeByte);
         respPacket = BytesHelp.arrayApend(respPacket, rtuCodeByte);
         //中心站地址
         //中心站地址
-        byte[] centerStationByte =  this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_CENTER_STATION_CODE.getName());
+        byte[] centerStationByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_CENTER_STATION_CODE.getName());
         respPacket = BytesHelp.arrayApend(respPacket, centerStationByte);
         respPacket = BytesHelp.arrayApend(respPacket, centerStationByte);
         //密码
         //密码
         byte[] rtuPwByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_SW_PW.getName());
         byte[] rtuPwByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_SW_PW.getName());
@@ -155,10 +155,10 @@ public class DataStreamSwImageDatagramProcessor extends DataStreamProcessor {
         ps[0] = 0x02;
         ps[0] = 0x02;
         respPacket = BytesHelp.arrayApend(respPacket, ps);
         respPacket = BytesHelp.arrayApend(respPacket, ps);
 
 
-       // byte[] packIndex = BytesHelp.subBytes(packetContext, 14, 3);
+        // byte[] packIndex = BytesHelp.subBytes(packetContext, 14, 3);
         //int indexValue = BytesHelp.bytes2Int(packIndex, 0, 3);
         //int indexValue = BytesHelp.bytes2Int(packIndex, 0, 3);
-       // int total = (indexValue >> 12) & 0x0FFF;
-      //  int count = indexValue & 0x0FFF;
+        // int total = (indexValue >> 12) & 0x0FFF;
+        //  int count = indexValue & 0x0FFF;
         //流水号
         //流水号
         byte[] serialNoByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_SERIAL_NO.getName());
         byte[] serialNoByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_SERIAL_NO.getName());
         respPacket = BytesHelp.arrayApend(respPacket, serialNoByte);
         respPacket = BytesHelp.arrayApend(respPacket, serialNoByte);
@@ -188,5 +188,4 @@ public class DataStreamSwImageDatagramProcessor extends DataStreamProcessor {
     }
     }
 
 
 
 
-
 }
 }

+ 7 - 8
tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/processor/sw/DataStreamSwLinkHeartProcessor.java

@@ -7,7 +7,6 @@
 package org.ts.ddcs.dataStream.processor.sw;
 package org.ts.ddcs.dataStream.processor.sw;
 
 
 
 
-
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 
 
 import org.ts.ddcs.enums.SwDatagramTypeCodeEnum;
 import org.ts.ddcs.enums.SwDatagramTypeCodeEnum;
@@ -70,13 +69,13 @@ public class DataStreamSwLinkHeartProcessor extends DataStreamProcessor {
 
 
     @Override
     @Override
     public void batchComplete(DataStreamAdapter dataStreamAdapter) {
     public void batchComplete(DataStreamAdapter dataStreamAdapter) {
-      //  SimpleDateFormat timeFormat = new SimpleDateFormat("yyyyMMddHHmmss");
-       // String sendPacketTime = (String) this.dataStreamCache.getValue(DatagramConstant.sendPacketTime);
-       // String pickPacketTime = (String) this.dataStreamCache.getValue(DatagramConstant.pickPacketTime);
-      //  Date st = timeFormat.parse(sendPacketTime);
-       // Date pt = timeFormat.parse(pickPacketTime);
-       // SimpleDateFormat timeFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-       // log.info("接收到2F报文 发报时间:{} 接收时间:{}", timeFormat2.format(st), timeFormat2.format(pt));
+        //  SimpleDateFormat timeFormat = new SimpleDateFormat("yyyyMMddHHmmss");
+        // String sendPacketTime = (String) this.dataStreamCache.getValue(DatagramConstant.sendPacketTime);
+        // String pickPacketTime = (String) this.dataStreamCache.getValue(DatagramConstant.pickPacketTime);
+        //  Date st = timeFormat.parse(sendPacketTime);
+        // Date pt = timeFormat.parse(pickPacketTime);
+        // SimpleDateFormat timeFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        // log.info("接收到2F报文 发报时间:{} 接收时间:{}", timeFormat2.format(st), timeFormat2.format(pt));
     }
     }
 
 
     @Override
     @Override

+ 1 - 2
tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/sink/DataStreamKafkaSink.java

@@ -14,7 +14,6 @@ import org.ts.ddcs.common.util.BytesHelp;
 import org.ts.ddcs.dataStream.DataStreamAdapter;
 import org.ts.ddcs.dataStream.DataStreamAdapter;
 import org.ts.ddcs.dataStream.DataStreamCache;
 import org.ts.ddcs.dataStream.DataStreamCache;
 import org.ts.ddcs.dataStream.DataStreamLineManager;
 import org.ts.ddcs.dataStream.DataStreamLineManager;
-import org.ts.ddcs.kafka.KafkaClient;
 import org.ts.ddcs.metadata.DataStreamCacheMetadata;
 import org.ts.ddcs.metadata.DataStreamCacheMetadata;
 
 
 import java.text.SimpleDateFormat;
 import java.text.SimpleDateFormat;
@@ -50,6 +49,6 @@ public class DataStreamKafkaSink extends DataStreamSink {
         body.put(TaishanV1JsonPackMetadataEnum.PACK_METADATA_FROM_TIME.getName(), timeFormat.format(this.dataStreamCache.getPickPacketTime()));
         body.put(TaishanV1JsonPackMetadataEnum.PACK_METADATA_FROM_TIME.getName(), timeFormat.format(this.dataStreamCache.getPickPacketTime()));
         body.put(TaishanV1JsonPackMetadataEnum.PACK_METADATA_DATAGRAM_TYPE.getName(), this.dataStreamCache.getDatagramCode());
         body.put(TaishanV1JsonPackMetadataEnum.PACK_METADATA_DATAGRAM_TYPE.getName(), this.dataStreamCache.getDatagramCode());
         log.info("data item :{}", body.toJSONString());
         log.info("data item :{}", body.toJSONString());
-        KafkaClient.messageSubmit(KafkaConstants.TOPIC_TS_V1_DATAGRAM, body.toJSONString());
+       // KafkaClient.messageSubmit(KafkaConstants.TOPIC_TS_V1_DATAGRAM, body.toJSONString());
     }
     }
 }
 }

+ 1 - 2
tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/dataStream/sink/DataStreamSwJpzKafkaSink.java

@@ -18,7 +18,6 @@ import org.ts.ddcs.metadata.sw.jpz.SwJpzWeatherMetadataEnum;
 import org.ts.ddcs.dataStream.DataStreamAdapter;
 import org.ts.ddcs.dataStream.DataStreamAdapter;
 import org.ts.ddcs.dataStream.DataStreamCache;
 import org.ts.ddcs.dataStream.DataStreamCache;
 import org.ts.ddcs.dataStream.DataStreamLineManager;
 import org.ts.ddcs.dataStream.DataStreamLineManager;
-import org.ts.ddcs.kafka.KafkaClient;
 import org.ts.ddcs.metadata.DataStreamCacheMetadata;
 import org.ts.ddcs.metadata.DataStreamCacheMetadata;
 import org.ts.ddcs.tools.CloumnHelp;
 import org.ts.ddcs.tools.CloumnHelp;
 import org.ts.ddcs.tools.KeyStoreHelp;
 import org.ts.ddcs.tools.KeyStoreHelp;
@@ -268,6 +267,6 @@ public class DataStreamSwJpzKafkaSink extends DataStreamSink {
             //  }
             //  }
         }
         }
         log.info("data item :{}", jsonBody.toJSONString());
         log.info("data item :{}", jsonBody.toJSONString());
-        KafkaClient.messageSubmit(KafkaConstants.TOPIC_TSYWXT_DATAGRAM_MESSAGE, jsonBody.toJSONString());
+        //KafkaClient.messageSubmit(KafkaConstants.TOPIC_TSYWXT_DATAGRAM_MESSAGE, jsonBody.toJSONString());
     }
     }
 }
 }

+ 21 - 2
tsddcs-datacollect-simple/src/main/java/org/ts/ddcs/main/MainProcessor.java

@@ -1,6 +1,7 @@
 package org.ts.ddcs.main;
 package org.ts.ddcs.main;
 
 
 
 
+import com.alibaba.fastjson.JSONObject;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDB;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.beans.factory.annotation.Value;
@@ -14,8 +15,10 @@ import org.ts.ddcs.datatemplate.DataTemplateManageHolder;
 import org.ts.ddcs.datatemplate.DataTemplateManager;
 import org.ts.ddcs.datatemplate.DataTemplateManager;
 import org.ts.ddcs.net.DataChannel;
 import org.ts.ddcs.net.DataChannel;
 import org.ts.ddcs.net.DataChannelHolder;
 import org.ts.ddcs.net.DataChannelHolder;
+import org.ts.ddcs.rest.RestHttpClient;
 
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
 
 
 
 
 /**
 /**
@@ -44,9 +47,17 @@ public class MainProcessor {
     @Value("${tsywxt-config.nio.listener.udp-port}")
     @Value("${tsywxt-config.nio.listener.udp-port}")
     public String udpPort;
     public String udpPort;
 
 
-    @Value("${rockdb-config.save-path}")
+    @Value("${tsywxt-config.rockdb-config.save-path}")
     public String rockdbSavePath;
     public String rockdbSavePath;
 
 
+    @Value("${tsywxt-config.serves.sw-url}")
+    public String servesSwUrl;
+
+    @Value("${tsywxt-config.serves.szy-url}")
+    public String servesSzyUrl;
+
+    @Resource
+    private RestHttpClient restHttpClient;
 
 
     static {
     static {
         RocksDB.loadLibrary();
         RocksDB.loadLibrary();
@@ -92,7 +103,7 @@ public class MainProcessor {
             DataChannelHolder.newInstance().dataChannel = dataChannel;
             DataChannelHolder.newInstance().dataChannel = dataChannel;
             log.info("初始化数据流处理模块");
             log.info("初始化数据流处理模块");
             DataStreamManager dataStreamManager = new DataStreamManager();
             DataStreamManager dataStreamManager = new DataStreamManager();
-            dataStreamManager.init(this.templates,this.rocksDb);
+            dataStreamManager.init(this.templates, this.rocksDb);
             DataStreamManageHolder.newInstance().dataStreamManager = dataStreamManager;
             DataStreamManageHolder.newInstance().dataStreamManager = dataStreamManager;
         } catch (Exception e) {
         } catch (Exception e) {
             log.error(e.getMessage());
             log.error(e.getMessage());
@@ -112,4 +123,12 @@ public class MainProcessor {
             log.info("************************************* " + serverName + " 服务已成功运行 *************************************");
             log.info("************************************* " + serverName + " 服务已成功运行 *************************************");
         }
         }
     }
     }
+
+    public void postSwData(JSONObject data) {
+        this.restHttpClient.post(this.servesSwUrl, data);
+    }
+
+    public void postSzyData(JSONObject data) {
+        this.restHttpClient.post(this.servesSzyUrl, data);
+    }
 }
 }

+ 6 - 22
tsddcs-datacollect-simple/src/main/resources/application-dev.yml

@@ -1,28 +1,6 @@
 spring:
 spring:
   application:
   application:
     name: TsywxtDataCollect-Dev
     name: TsywxtDataCollect-Dev
-  kafka:
-    bootstrap-servers: 172.31.1.228:9092
-    producer:
-      retries: 0
-      batch-size: 16384
-      buffer-memory: 33554432
-      key-serializer: org.apache.kafka.common.serialization.StringSerializer
-      value-serializer: org.apache.kafka.common.serialization.StringSerializer
-    consumer:
-      group-id: kafka-tsywxt-collect-server
-      # 手动提交
-      enable-auto-commit: false
-      auto-offset-reset: latest
-      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-      properties:
-        session.timeout.ms: 60000
-    listener:
-      log-container-config: false
-      concurrency: 5
-      # 手动提交
-      ack-mode: manual_immediate
 tsywxt-config:
 tsywxt-config:
   nio:
   nio:
     listener:
     listener:
@@ -31,4 +9,10 @@ tsywxt-config:
   template:
   template:
     path: "E:\\dh_svn\\Project\\DHRJ-TSSYSTEM\\SERVER\\TSDDCS\\tsddcs-datacollect-simple\\src\\main\\resources\\templates"
     path: "E:\\dh_svn\\Project\\DHRJ-TSSYSTEM\\SERVER\\TSDDCS\\tsddcs-datacollect-simple\\src\\main\\resources\\templates"
     templates: SW2014,SZY2012
     templates: SW2014,SZY2012
+  rockdb-config:
+    save-path: "D:\\tmp\\rockdb"
+  serves:
+    sw-url: "http://localhost:50004/galaxy-business/rtu/data/sw/collect"
+    szy-url: "http://localhost:50004/galaxy-business/rtu/data/szy/collect"
+
 
 

+ 1 - 1
tsddcs-datacollect-simple/src/main/resources/application.yml

@@ -1,2 +1,2 @@
 server:
 server:
-  port: 9999
+  port: 10001

+ 8 - 2
tsddcs-datacollect-simple/src/main/resources/templates/sw2014.xml

@@ -139,7 +139,7 @@
             <StreamProcessor>org.ts.ddcs.dataStream.sink.DataStreamSwJpzKafkaSink</StreamProcessor>
             <StreamProcessor>org.ts.ddcs.dataStream.sink.DataStreamSwJpzKafkaSink</StreamProcessor>
             <ProcessorName>DataStreamSwJpzKafkaSink</ProcessorName>
             <ProcessorName>DataStreamSwJpzKafkaSink</ProcessorName>
             <TemplateName>柬埔寨KAFKA消息队列输出</TemplateName>
             <TemplateName>柬埔寨KAFKA消息队列输出</TemplateName>
-            <Enable>1</Enable>
+            <Enable>0</Enable>
         </Item>
         </Item>
         <Item>
         <Item>
             <Type>sink</Type>
             <Type>sink</Type>
@@ -155,6 +155,12 @@
             <TemplateName>输出到远端RTU</TemplateName>
             <TemplateName>输出到远端RTU</TemplateName>
             <Enable>0</Enable>
             <Enable>0</Enable>
         </Item>
         </Item>
-
+        <Item>
+            <Type>sink</Type>
+            <StreamProcessor>org.ts.ddcs.dataStream.sink.DataStreamSwRestSink</StreamProcessor>
+            <ProcessorName>DataStreamSwRestSink</ProcessorName>
+            <TemplateName>水文REST HTTP 输出</TemplateName>
+            <Enable>1</Enable>
+        </Item>
     </Templates>
     </Templates>
 </DataTemplate>
 </DataTemplate>

+ 7 - 0
tsddcs-datacollect-simple/src/main/resources/templates/szy2012.xml

@@ -191,6 +191,13 @@
             <StreamProcessor>org.ts.ddcs.dataStream.sink.DataStreamSzyXamKafkaSink</StreamProcessor>
             <StreamProcessor>org.ts.ddcs.dataStream.sink.DataStreamSzyXamKafkaSink</StreamProcessor>
             <ProcessorName>DataStreamSzyXamKafkaSink</ProcessorName>
             <ProcessorName>DataStreamSzyXamKafkaSink</ProcessorName>
             <TemplateName>兴安盟KAFKA消息队列输出</TemplateName>
             <TemplateName>兴安盟KAFKA消息队列输出</TemplateName>
+            <Enable>0</Enable>
+        </Item>
+        <Item>
+            <Type>sink</Type>
+            <StreamProcessor>org.ts.ddcs.dataStream.sink.DataStreamSzyRestSink</StreamProcessor>
+            <ProcessorName>DataStreamSzyRestSink</ProcessorName>
+            <TemplateName>水资源REST HTTP 输出</TemplateName>
             <Enable>1</Enable>
             <Enable>1</Enable>
         </Item>
         </Item>
     </Templates>
     </Templates>