dylan 2 лет назад
Родитель
Сommit
e7330b1156
21 измененных файлов с 933 добавлено и 221 удалено
  1. 0 2
      TSDDCS.iml
  2. 22 0
      pom.xml
  3. 8 0
      tsddcs-commonservice/pom.xml
  4. 2 0
      tsddcs-commonservice/src/main/java/org/ts/ddcs/common/constant/KafkaConstants.java
  5. 49 0
      tsddcs-datacollect/pom.xml
  6. 2 0
      tsddcs-datacollect/src/main/java/org/ts/ddcs/constant/BusinessConstant.java
  7. 37 4
      tsddcs-datacollect/src/main/java/org/ts/ddcs/controller/TestController.java
  8. 45 183
      tsddcs-datacollect/src/main/java/org/ts/ddcs/dataStream/DataStreamManager.java
  9. 1 0
      tsddcs-datacollect/src/main/java/org/ts/ddcs/dataStream/processor/sw/DataStreamSwExtraDatagramProcessor.java
  10. 4 8
      tsddcs-datacollect/src/main/java/org/ts/ddcs/dataStream/processor/sw/DataStreamSwHourDatagramProcessor.java
  11. 9 12
      tsddcs-datacollect/src/main/java/org/ts/ddcs/dataStream/processor/sw/DataStreamSwTimeIntervalDatagramProcessor.java
  12. 55 4
      tsddcs-datacollect/src/main/java/org/ts/ddcs/dataStream/processor/szy/DataStreamSzyFixedTimeDatagramProcessor.java
  13. 8 3
      tsddcs-datacollect/src/main/java/org/ts/ddcs/dataStream/sink/DataStreamKafkaSink.java
  14. 64 0
      tsddcs-datacollect/src/main/java/org/ts/ddcs/hbase/HBaseConfig.java
  15. 549 0
      tsddcs-datacollect/src/main/java/org/ts/ddcs/hbase/HBaseService.java
  16. 0 1
      tsddcs-datacollect/src/main/java/org/ts/ddcs/itemdatatemplate/sw/DataItemTemplateTT.java
  17. 0 1
      tsddcs-datacollect/src/main/java/org/ts/ddcs/itemdatatemplate/szy/DataItemTemplateSzyTT.java
  18. 57 1
      tsddcs-datacollect/src/main/java/org/ts/ddcs/kafka/MessageHandler.java
  19. 7 1
      tsddcs-datacollect/src/main/java/org/ts/ddcs/main/MainProcessor.java
  20. 10 0
      tsddcs-datacollect/src/main/resources/application-dev.yml
  21. 4 1
      tsddcs-datacollect/src/main/resources/application.yml

+ 0 - 2
TSDDCS.iml

@@ -1,2 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<module type="JAVA_MODULE" version="4" />

+ 22 - 0
pom.xml

@@ -33,6 +33,10 @@
         <spring.platform.version>Cairo-SR8</spring.platform.version>
 
         <maven.plugin.version>3.8.1</maven.plugin.version>
+
+        <!-- hutool工具集版本 -->
+        <hutool.version>5.7.18</hutool.version>
+        <hutool.bcprov.version>1.70</hutool.bcprov.version>
     </properties>
 
     <modules>
@@ -203,6 +207,24 @@
                 <artifactId>jackson-core</artifactId>
                 <version>RELEASE</version>
             </dependency>
+            <dependency>
+                <groupId>com.google.protobuf</groupId>
+                <artifactId>protobuf-java</artifactId>
+                <version>2.5.0</version>
+            </dependency>
+
+            <!-- hutool工具类集.start -->
+            <dependency>
+                <groupId>cn.hutool</groupId>
+                <artifactId>hutool-all</artifactId>
+                <version>${hutool.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.bouncycastle</groupId>
+                <artifactId>bcprov-jdk15to18</artifactId>
+                <version>${hutool.bcprov.version}</version>
+            </dependency>
+            <!-- hutool工具类集.end -->
         </dependencies>
     </dependencyManagement>
 

+ 8 - 0
tsddcs-commonservice/pom.xml

@@ -40,6 +40,14 @@
             <artifactId>validation-api</artifactId>
             <version>2.0.0.Final</version>
         </dependency>
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk15to18</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

+ 2 - 0
tsddcs-commonservice/src/main/java/org/ts/ddcs/common/constant/KafkaConstants.java

@@ -18,4 +18,6 @@ public class KafkaConstants {
     public final static String DATA_STREAM_SOURCE_EVENT = "data_stream_source_event";
     public final static String TOPIC_TSYWXT_DATAGRAM_MESSAGE = "topic-tsywxt-datagram-message";
     public final static String TOPIC_TS_V1_DATAGRAM = "queue-waiting-process-datagram";
+
+    public final static String TOPIC_TS_HBASE ="topic-tsywxt-hbase";
 }

+ 49 - 0
tsddcs-datacollect/pom.xml

@@ -73,6 +73,55 @@
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-data-redis</artifactId>
         </dependency>
+
+        <!--<dependency>-->
+            <!--<groupId>org.springframework.data</groupId>-->
+            <!--<artifactId>spring-data-hadoop</artifactId>-->
+            <!--<version>2.5.0.RELEASE</version>-->
+            <!--<exclusions>-->
+                <!--<exclusion>-->
+                    <!--<groupId>com.sun.xml.bind</groupId>-->
+                    <!--<artifactId>jaxb-impl</artifactId>-->
+                <!--</exclusion>-->
+            <!--</exclusions>-->
+        <!--</dependency>-->
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>2.5.3</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>servlet-api-2.5</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-reload4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk15to18</artifactId>
+        </dependency>
     </dependencies>
 
 </project>

+ 2 - 0
tsddcs-datacollect/src/main/java/org/ts/ddcs/constant/BusinessConstant.java

@@ -75,4 +75,6 @@ public class BusinessConstant {
 	 * 数据流工作线程数量
 	 */
 	public static final int STREAM_THREAD_NUM = 4;
+
+	public static final String STORE_CACHE_KEY_RTU_INLINE_COUNT = "store.cache.key.rtu.inline.count";
 }

+ 37 - 4
tsddcs-datacollect/src/main/java/org/ts/ddcs/controller/TestController.java

@@ -6,13 +6,32 @@
  */
 package org.ts.ddcs.controller;
 
+import com.alibaba.fastjson.JSONObject;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RestController;
+import org.ts.ddcs.common.enums.AgreementEnum;
+import org.ts.ddcs.common.metadata.TaishanV1JsonPackMetadataEnum;
 import org.ts.ddcs.common.util.BytesUtil;
+import org.ts.ddcs.hbase.HBaseService;
 
+
+import javax.annotation.Resource;
+import java.io.IOException;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 
 /***
  * Date:2022/10/27
@@ -24,6 +43,20 @@ import java.net.InetAddress;
  */
 @RestController
 public class TestController {
+    @Resource
+    private HBaseService hBaseService;
+
+    @GetMapping("/hbase/test")
+    public JSONObject hbaseTest() {
+        JSONObject data = new JSONObject();
+        String tableName = AgreementEnum.AGREEMENT_SW.getName() + "_" + "datagram";
+
+       List<Map<String,Object>> list = hBaseService.getData(tableName.toUpperCase());
+
+        data.put("data",list);
+        data.put("code",1);
+        return data;
+    }
 
 
     public static byte[] hexStr2Bytes(String hexString) {
@@ -137,8 +170,8 @@ public class TestController {
 
             // 4.客户端启动成功,输出提示信息
             System.out.println("****客户端启动成功****");
-            String msg ="7E7E01005128290300002F20080230522230271439160396987E7E01005128290300002F0008023052221027144016034F497E7E01005128290300002F0008023052221027144116038F187E7E01005128290300002F0008023052221027144216038FE87E7E01005128290300002F0008023052221027144316034FB97E7E01005128290300002F0008023052221027144416038E087E7E01005128290300002F0008023052221027144516034E597E7E01005128290300002F0008023052221027144616034EA97E7E01005128290300002F0008023052221027144716038EF8";
-            byte[] buffer= BytesUtil.hexStr2Bytes(msg);
+            String msg = "7E7E01005128290300002F20080230522230271439160396987E7E01005128290300002F0008023052221027144016034F497E7E01005128290300002F0008023052221027144116038F187E7E01005128290300002F0008023052221027144216038FE87E7E01005128290300002F0008023052221027144316034FB97E7E01005128290300002F0008023052221027144416038E087E7E01005128290300002F0008023052221027144516034E597E7E01005128290300002F0008023052221027144616034EA97E7E01005128290300002F0008023052221027144716038EF8";
+            byte[] buffer = BytesUtil.hexStr2Bytes(msg);
 
             DatagramPacket packets = new DatagramPacket(buffer, buffer.length, address, port);
             // 5.4 发送数据出去
@@ -151,14 +184,14 @@ public class TestController {
                 socket.receive(packet);
                 // 6.3 取出数据
                 int len = packet.getLength();
-                if (len >0) {
+                if (len > 0) {
                     String rs = new String(buf, 0, len);
                     System.out.println("收到了ip为:" + packet.getAddress() + " 端口号为:" + packet.getPort() + "的消息:" + rs);
                     break;
                 }
             }
             socket.close();
-        }catch (Exception e){
+        } catch (Exception e) {
             e.printStackTrace();
         }
         return "ok";

+ 45 - 183
tsddcs-datacollect/src/main/java/org/ts/ddcs/dataStream/DataStreamManager.java

@@ -27,29 +27,21 @@ public class DataStreamManager {
 
     private RocksDB storeCache;
 
-
-//    private int taskIdelCount = 0;
-
     /**
      * 连接适配器
      */
     private Map<String, DataStreamAdapter> dataStreamAdapterCache = new ConcurrentHashMap<>(2048);
 
-
     /**
      * 数据流队列
      */
-//    private int currentQueue=0;
     private LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<>(2048);
-
     private LinkedBlockingQueue<String> joinQueue = new LinkedBlockingQueue<>(2048);
-
     private LinkedBlockingQueue<String> runningCacheQueue = new LinkedBlockingQueue<>(2048);
-
     private LinkedBlockingQueue<String> removeCacheQueue = new LinkedBlockingQueue<>(2048);
 
     /**
-     * 模板处理器链
+     * 数据处理器,每个处理器支持对所有规约报文进行解析
      */
     private Map<Integer, DataStreamLineManager> streamLineCache = new ConcurrentHashMap<>(BusinessConstant.STREAM_THREAD_NUM);
 
@@ -65,14 +57,6 @@ public class DataStreamManager {
      */
     private ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "batch-stream-task-" + r.hashCode()));
 
-//    private void putAdapter(String key) {
-//        this.readQueue.offer(key);
-//    }
-//
-//    private String getNextAdapter() {
-//        return readQueue.poll();
-//    }
-
     /**
      * 创建数据流处理链
      *
@@ -86,39 +70,20 @@ public class DataStreamManager {
             String[] list = templates.split(",");
             if (list.length > 0) {
                 for (String templateName : list) {
-                    if (AgreementEnum.AGREEMENT_SW.getName().equals(templateName)) {
-                        List<Template> templateList = DataTemplateManageHolder.newInstance().dataTemplateManager.cloneTemplateLine(AgreementEnum.AGREEMENT_SW.getName());
-                        if (null != templateList && templateList.size() > 0) {
-                            for (Template template : templateList) {
-                                if (BusinessConstant.TEMPLATE_TYPE_PROCESSOR.equals(template.getType())) {
-                                    lineManager.addProcessor(AgreementEnum.AGREEMENT_SW.getName(), template);
-                                } else if (BusinessConstant.TEMPLATE_TYPE_SINK.equals(template.getType())) {
-                                    lineManager.addSink(AgreementEnum.AGREEMENT_SW.getName(), template);
-                                }
+                    List<Template> templateList = DataTemplateManageHolder.newInstance().dataTemplateManager.cloneTemplateLine(templateName);
+                    if (null != templateList && templateList.size() > 0) {
+                        for (Template template : templateList) {
+                            if (BusinessConstant.TEMPLATE_TYPE_PROCESSOR.equals(template.getType())) {
+                                lineManager.addProcessor(templateName, template);
+                            } else if (BusinessConstant.TEMPLATE_TYPE_SINK.equals(template.getType())) {
+                                lineManager.addSink(templateName, template);
                             }
                         }
-                        List<ItemTemplate> itemTemplateList = DataTemplateManageHolder.newInstance().dataTemplateManager.cloneItemTemplateLine(AgreementEnum.AGREEMENT_SW.getName());
-                        if (null != itemTemplateList && itemTemplateList.size() > 0) {
-                            for (ItemTemplate itemTemplate : itemTemplateList) {
-                                lineManager.addItemTemplates(AgreementEnum.AGREEMENT_SW.getName(), itemTemplate);
-                            }
-                        }
-                    } else if (AgreementEnum.AGREEMENT_SZY.getName().equals(templateName)) {
-                        List<Template> templateList = DataTemplateManageHolder.newInstance().dataTemplateManager.cloneTemplateLine(AgreementEnum.AGREEMENT_SZY.getName());
-                        if (null != templateList && templateList.size() > 0) {
-                            for (Template template : templateList) {
-                                if (BusinessConstant.TEMPLATE_TYPE_PROCESSOR.equals(template.getType())) {
-                                    lineManager.addProcessor(AgreementEnum.AGREEMENT_SZY.getName(), template);
-                                } else if (BusinessConstant.TEMPLATE_TYPE_SINK.equals(template.getType())) {
-                                    lineManager.addSink(AgreementEnum.AGREEMENT_SZY.getName(), template);
-                                }
-                            }
-                        }
-                        List<ItemTemplate> itemTemplateList = DataTemplateManageHolder.newInstance().dataTemplateManager.cloneItemTemplateLine(AgreementEnum.AGREEMENT_SZY.getName());
-                        if (null != itemTemplateList && itemTemplateList.size() > 0) {
-                            for (ItemTemplate itemTemplate : itemTemplateList) {
-                                lineManager.addItemTemplates(AgreementEnum.AGREEMENT_SZY.getName(), itemTemplate);
-                            }
+                    }
+                    List<ItemTemplate> itemTemplateList = DataTemplateManageHolder.newInstance().dataTemplateManager.cloneItemTemplateLine(templateName);
+                    if (null != itemTemplateList && itemTemplateList.size() > 0) {
+                        for (ItemTemplate itemTemplate : itemTemplateList) {
+                            lineManager.addItemTemplates(templateName, itemTemplate);
                         }
                     }
                 }
@@ -131,7 +96,6 @@ public class DataStreamManager {
      * 数据流管理器初始化
      */
     public void init(String templates, RocksDB storeCache) throws IllegalAccessException, ClassNotFoundException, InstantiationException {
-        //  try {
         //数据缓存器
         this.storeCache = storeCache;
         //初始化数据流处理器
@@ -193,101 +157,24 @@ public class DataStreamManager {
                     log.info("数据流处理器性能故障!!");
                     //报警
                 }
-
-
-//                    boolean hasTask = false;
-//                    for (int i = 1; i <= STREAM_THREAD_NUM; i++) {
-//                        DataStreamLineManager lineManager = streamLineCache.get(i);
-//                        if (!lineManager.lock()) {
-//                            //有空闲处理器
-//                            String key = readQueue.poll();
-//                            if (null != key) {
-//                                DataStreamAdapter dataStreamAdapter = dataStreamAdapterCache.get(key);
-//                                if (null != dataStreamAdapter) {
-//                                    if (!dataStreamAdapter.isLock()) {
-//                                        //数据流适配器空闲
-//                                        if (dataStreamAdapter.remaining()) {
-//                                            lineManager.lock(true);
-//                                            dataStreamAdapter.setLock(true);
-//                                            //创建数据处理任务
-//                                            long tm = System.currentTimeMillis();
-//                                            BatchDataStreamTask task = new BatchDataStreamTask(tm, lineManager, dataStreamAdapter);
-//                                            FutureTask<Integer> futureTask = new FutureTask<>(task);
-//                                            batchThreadPool.execute(futureTask);
-//                                            hasTask = true;
-//                                        }
-//                                    }
-//                                } else {
-//                                    log.info("找不到适配器 {}", key);
-//                                }
-//                            }
-//                            idel = true;
-//                        } else {
-//                            log.info("数据流处理器已被锁住 {}", i);
-//                        }
-//                    }
-//                    if (!idel) {
-//                        log.info("任务处理器性能故障");
-//                    }
-//                    if (!hasTask) {
-//                        taskIdelCount += 1;
-//                    } else {
-//                        taskIdelCount = 0;
-//                    }
-                //log.info("task idel count {}",taskIdelCount);
-//                    if (taskIdelCount >= 10) {
-//                        //没有任务
-//                        Set<String> keySet = dataStreamAdapterCache.keySet();
-//                        for (String key : keySet) {
-//                            DataStreamAdapter dataStreamAdapter = dataStreamAdapterCache.get(key);
-//                            if (!dataStreamAdapter.isLock()) {
-//                                //数据流适配器空闲
-//                                if (dataStreamAdapter.remaining()) {
-//                                    idel = false;
-//                                    //有待处理数据
-//                                    for (int i = 1; i <= STREAM_THREAD_NUM; i++) {
-//                                        DataStreamLineManager lineManager = streamLineCache.get(i);
-//                                        if (!lineManager.lock()) {
-//                                            //有空闲处理器
-//                                            dataStreamAdapter.setLock(true);
-//                                            lineManager.lock(true);
-//                                            //创建数据处理任务
-//                                            long tm = System.currentTimeMillis();
-//                                            BatchDataStreamTask task = new BatchDataStreamTask(tm, lineManager, dataStreamAdapter);
-//                                            FutureTask<Integer> futureTask = new FutureTask<>(task);
-//                                            batchThreadPool.execute(futureTask);
-//                                            idel = true;
-//                                            break;
-//                                        } else {
-//                                            log.info("数据流处理器已被锁住 {}", i);
-//                                        }
-//                                    }
-//                                    if (!idel) {
-//                                        log.info("任务处理器性能故障");
-//                                        break;
-//                                    }
-//                                }
-//                            }
-//                        }
-//                        taskIdelCount = 0;
-//                    }
-
             } catch (Exception e) {
                 log.error(this.getClass().getName(), e);
             }
         }, 0, 100, TimeUnit.MILLISECONDS);
-        // } catch (Exception e) {
-        //   log.error(this.getClass().getName(), e);
-        //  throw e;
-        //   }
     }
 
     private class BatchDataStreamTask implements Callable<Integer> {
-
+        /**
+         * 任务提交时间
+         */
         private long batchTime;
-
+        /**
+         * 数据流连接器
+         */
         private DataStreamAdapter dataStreamAdapter;
-
+        /**
+         * 数据流处理器
+         */
         private DataStreamLineManager dataStreamLineManager;
 
         BatchDataStreamTask(long batchTime, DataStreamLineManager dataStreamLineManager, DataStreamAdapter dataStreamAdapter) {
@@ -300,15 +187,18 @@ public class DataStreamManager {
         public Integer call() {
             try {
                 dataStreamLineManager.stream(dataStreamAdapter);
-                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                 String rtuCode = dataStreamAdapter.getRtuCode();
-                HashOperations<String, String, String> ops = MainProcessorHolder.newInstance().dataProcess.redisTemplate.opsForHash();
-                ops.put(rtuCode, "last_connect_id", dataStreamAdapter.getAdapterKey());
-                ops.put(rtuCode, "last_connect_time", sdf.format(new Date(batchTime)));
-                dataStreamAdapter.setLock(false);
-                dataStreamLineManager.lock(false);
+                if (null != rtuCode && rtuCode.length() > 0) {
+                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+                    HashOperations<String, String, String> ops = MainProcessorHolder.newInstance().dataProcess.redisTemplate.opsForHash();
+                    ops.put(rtuCode, "last_connect_id", dataStreamAdapter.getAdapterKey());
+                    ops.put(rtuCode, "last_connect_time", sdf.format(new Date(batchTime)));
+                }
             } catch (Exception e) {
                 log.error(this.getClass().getName(), e);
+            } finally {
+                dataStreamAdapter.setLock(false);
+                dataStreamLineManager.lock(false);
             }
             return 0;
         }
@@ -323,52 +213,24 @@ public class DataStreamManager {
     public void regConnect(DataStreamAdapter adapter) {
         this.dataStreamAdapterCache.put(adapter.getAdapterKey(), adapter);
         this.joinQueue.offer(adapter.getAdapterKey());
-    }
-
-//    /**
-//     * 关闭连接
-//     *
-//     * @param adapter
-//     */
-//    public void closeConnect(DataStreamAdapter adapter) {
-//        if (dataStreamAdapterCache.containsKey(adapter.getAdapterKey())) {
-//            if (!adapter.isLock()) {
-//                byte[] block = adapter.getBlock();
-//                if (null == block || block.length == 0) {
-//                    dataStreamAdapterCache.remove(adapter.getAdapterKey());
-//                }
-//            }
-//        }
-//    }
 
-//    public void readEvent(DataStreamAdapter adapter) {
-//        readQueue.offer(adapter.getAdapterKey());
-//    }
+    }
 
+    /**
+     * 写报文到测站
+     *
+     * @param key
+     * @param respBlock
+     * @return
+     */
     public boolean responseEvent(String key, byte[] respBlock) {
-        if (this.dataStreamAdapterCache.containsKey(key)) {
-            DataStreamAdapter dataStreamAdapter = this.dataStreamAdapterCache.get(key);
-            if (!dataStreamAdapter.isClose()) {
-                dataStreamAdapter.output(respBlock);
-                return true;
-            } else {
-                log.info("运行错误,链接已经提前关闭: {}", key);
-            }
+        DataStreamAdapter dataStreamAdapter = this.dataStreamAdapterCache.get(key);
+        if (null != dataStreamAdapter && !dataStreamAdapter.isClose()) {
+            dataStreamAdapter.output(respBlock);
+            return true;
+        } else {
+            log.info("运行错误,链接已经关闭: {}", key);
         }
         return false;
     }
-
-//    public DataStreamAdapter getAdapter(String rtuCode) {
-//        Set<String> keySet = dataStreamAdapterCache.keySet();
-//        for (String key : keySet) {
-//            DataStreamAdapter dataStreamAdapter = dataStreamAdapterCache.get(key);
-//            if (!dataStreamAdapter.isClose()) {
-//                String stcd = dataStreamAdapter.getRtuCode();
-//                if (null != stcd && stcd.equals(rtuCode)) {
-//                    return dataStreamAdapter;
-//                }
-//            }
-//        }
-//        return null;
-//    }
 }

+ 1 - 0
tsddcs-datacollect/src/main/java/org/ts/ddcs/dataStream/processor/sw/DataStreamSwExtraDatagramProcessor.java

@@ -64,6 +64,7 @@ public class DataStreamSwExtraDatagramProcessor extends DataStreamProcessor {
             byte[] sendPacketTime = BytesHelp.subBytes(dataArea, count, 6);
             dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_UP_TIME_BYTE.getName(), sendPacketTime);
             count += 6;
+            dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_UP_TIME.getName(), "20" + BytesHelp.byte2HexStr(sendPacketTime));
             //遥测站地址
             DataItemTemplate itemTemplateSt = itemTemplateMap.get(SwElementCodeEnum.CODE_ST.getCode());
             int readSize = itemTemplateSt.stream(dataArea, count);

+ 4 - 8
tsddcs-datacollect/src/main/java/org/ts/ddcs/dataStream/processor/sw/DataStreamSwHourDatagramProcessor.java

@@ -60,17 +60,13 @@ public class DataStreamSwHourDatagramProcessor extends DataStreamProcessor {
             byte[] serialNo = BytesHelp.subBytes(dataArea, count, 2);
             dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_SERIAL_NO.getName(), serialNo);
             count += 2;
-            //String serialNo = Sw2014DatagramTemplateHelp.toSerialNo(dataArea, count);
-            // dataStreamCache.putValue(DatagramConstant.serialNo, serialNo);
             // 发报时间
             byte[] sendPacketTime = BytesHelp.subBytes(dataArea, count, 6);
             dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_UP_TIME_BYTE.getName(), sendPacketTime);
             count += 6;
-            //String sendPacketTime = Sw2014DatagramTemplateHelp.toSendPacketTime(dataArea, count);
-            // dataStreamCache.putValue(DatagramConstant.sendPacketTime, sendPacketTime);
+            dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_UP_TIME.getName(), "20" + BytesHelp.byte2HexStr(sendPacketTime));
             //遥测站地址
-            DataItemTemplate itemTemplateSt = itemTemplateMap.get(SwElementCodeEnum.CODE_ST.getName());
-         //   itemTemplateSt.clear();
+            DataItemTemplate itemTemplateSt = itemTemplateMap.get(SwElementCodeEnum.CODE_ST.getCode());
             int readSize = itemTemplateSt.stream(dataArea, count);
             if (itemTemplateSt.isMacth()) {
                 itemTemplateSt.saveValue(this.keyStore);
@@ -81,11 +77,11 @@ public class DataStreamSwHourDatagramProcessor extends DataStreamProcessor {
             count += 1;
             dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_SITE_KIND.getName(), BytesHelp.byte2HexStr(siteKind));
             //采集时间
-            DataItemTemplate itemTemplateTt = itemTemplateMap.get(SwElementCodeEnum.CODE_TT.getName());
-          // itemTemplateSt.clear();
+            DataItemTemplate itemTemplateTt = itemTemplateMap.get(SwElementCodeEnum.CODE_TT.getCode());
             readSize = itemTemplateTt.stream(dataArea, count);
             if (itemTemplateTt.isMacth()) {
                 itemTemplateTt.saveValue(this.keyStore);
+                dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_COLLECT_TIME.getName(), itemTemplateTt.getStringValue());
             }
             count += readSize;
             //解析观测要素

+ 9 - 12
tsddcs-datacollect/src/main/java/org/ts/ddcs/dataStream/processor/sw/DataStreamSwTimeIntervalDatagramProcessor.java

@@ -36,7 +36,7 @@ import java.util.*;
 public class DataStreamSwTimeIntervalDatagramProcessor extends DataStreamProcessor {
 
     private DataStreamCache dataStreamCache;
-    private Map<String, List<Map<String,Object>>> keyStore = new HashMap<>();
+    private Map<String, List<Map<String, Object>>> keyStore = new HashMap<>();
 
     @Override
     public void init(DataStreamLineManager context) {
@@ -61,14 +61,11 @@ public class DataStreamSwTimeIntervalDatagramProcessor extends DataStreamProcess
             byte[] serialNo = BytesHelp.subBytes(dataArea, count, 2);
             dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_SERIAL_NO.getName(), serialNo);
             count += 2;
-            //String serialNo = Sw2014DatagramTemplateHelp.toSerialNo(dataArea, count);
-            // dataStreamCache.putValue(DatagramConstant.serialNo, serialNo);
             // 发报时间
             byte[] sendPacketTime = BytesHelp.subBytes(dataArea, count, 6);
             dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_UP_TIME_BYTE.getName(), sendPacketTime);
             count += 6;
-            //String sendPacketTime = Sw2014DatagramTemplateHelp.toSendPacketTime(dataArea, count);
-            // dataStreamCache.putValue(DatagramConstant.sendPacketTime, sendPacketTime);
+            dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_UP_TIME.getName(), "20" + BytesHelp.byte2HexStr(sendPacketTime));
             //遥测站地址
             DataItemTemplate itemTemplateSt = itemTemplateMap.get(SwElementCodeEnum.CODE_ST.getCode());
             int readSize = itemTemplateSt.stream(dataArea, count);
@@ -85,9 +82,9 @@ public class DataStreamSwTimeIntervalDatagramProcessor extends DataStreamProcess
             readSize = itemTemplateTt.stream(dataArea, count);
             if (itemTemplateTt.isMacth()) {
                 itemTemplateTt.saveValue(this.keyStore);
+                dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_COLLECT_TIME.getName(), itemTemplateTt.getStringValue());
             }
             count += readSize;
-
             int mValue = 0;
             DataItemTemplate dataItemTemplateDRxnn = itemTemplateMap.get(SwElementCodeEnum.CODE_DRxnn.getCode());
             readSize = dataItemTemplateDRxnn.stream(dataArea, count);
@@ -146,9 +143,9 @@ public class DataStreamSwTimeIntervalDatagramProcessor extends DataStreamProcess
     public void batchComplete(DataStreamAdapter dataStreamAdapter) {
         Set<String> keys = this.keyStore.keySet();
         for (String key : keys) {
-            List<Map<String,Object>> list = this.keyStore.get(key);
+            List<Map<String, Object>> list = this.keyStore.get(key);
             if (null != list) {
-                for (Map<String,Object> jsonObject : list) {
+                for (Map<String, Object> jsonObject : list) {
                     log.info("item {} {}", key, JSONObject.toJSONString(jsonObject));
                 }
             }
@@ -162,7 +159,7 @@ public class DataStreamSwTimeIntervalDatagramProcessor extends DataStreamProcess
         byte[] rtuCodeByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_RTU_CODE.getName());
         respPacket = BytesHelp.arrayApend(respPacket, rtuCodeByte);
         //中心站地址
-        byte[] centerStationByte =  this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_CENTER_STATION_CODE.getName());
+        byte[] centerStationByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_CENTER_STATION_CODE.getName());
         respPacket = BytesHelp.arrayApend(respPacket, centerStationByte);
         //密码
         byte[] rtuPwByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_SW_PW.getName());
@@ -173,7 +170,7 @@ public class DataStreamSwTimeIntervalDatagramProcessor extends DataStreamProcess
         //报文下行标识及长度
         respPacket = BytesHelp.arrayApend(respPacket, BytesHelp.hexStr2Bytes("8008"));
         //报文起始符
-        byte[] dataAreaStartFlagByte =this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_DATA_AREA_START_FLAG.getName());
+        byte[] dataAreaStartFlagByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_DATA_AREA_START_FLAG.getName());
         respPacket = BytesHelp.arrayApend(respPacket, dataAreaStartFlagByte);
         //流水号
         byte[] serialNoByte = this.dataStreamCache.getBytesValue(DataStreamCacheMetadata.METADATA_SERIAL_NO.getName());
@@ -210,8 +207,8 @@ public class DataStreamSwTimeIntervalDatagramProcessor extends DataStreamProcess
                 //  data.put(SinkJsonPackMetadataEnum.PACK_METADATA_COLLECT_TIME.getName(), this.dataStreamCache.getStringValue(DatagramConstant.collectTime));
                 //   data.put(JsonPackMetadataEnum.PACK_METADATA_UP_TIME.getName(), this.dataStreamCache.getStringValue(DatagramConstant.sendPacketTime));
                 data.put(DatagramMetadataEnum.CACHE_METADATA_ELEMENT_CODE.getName(), key);
-                List<Map<String,Object>> elements = this.keyStore.get(key);
-                for (Map<String,Object> jsonObject : elements) {
+                List<Map<String, Object>> elements = this.keyStore.get(key);
+                for (Map<String, Object> jsonObject : elements) {
                     log.info("element {}", JSONObject.toJSONString(jsonObject));
                 }
                 data.put(DatagramMetadataEnum.CACHE_METADATA_DATA.getName(), elements);

+ 55 - 4
tsddcs-datacollect/src/main/java/org/ts/ddcs/dataStream/processor/szy/DataStreamSzyFixedTimeDatagramProcessor.java

@@ -20,6 +20,7 @@ import org.ts.ddcs.metadata.DataStreamCacheMetadata;
 
 import java.io.UnsupportedEncodingException;
 import java.time.Duration;
+import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
@@ -105,7 +106,20 @@ public class DataStreamSzyFixedTimeDatagramProcessor extends DataStreamProcessor
                     readSize = itemTemplateTt.stream(dataArea, count);
                     if (itemTemplateTt.isMacth()) {
                         String ct = itemTemplateTt.getStringValue();
-                        LocalDateTime tm = now.withDayOfMonth(Integer.valueOf(ct.substring(0, 2))).withHour(Integer.valueOf(ct.substring(2, 4))).withMinute(Integer.valueOf(ct.substring(4, 6))).withSecond(0).withNano(0);
+                        int day =Integer.valueOf(ct.substring(0, 2));
+                        LocalDate localDate= LocalDate.now();
+                        int month = localDate.getMonthValue();
+                        int maxDay= localDate.lengthOfMonth();
+                        if (day>maxDay){
+                            LocalDate preDate= LocalDate.now().minusMonths(1);
+                            int preMaxDay= preDate.lengthOfMonth();
+                            if (day<=preMaxDay){
+                                month = preDate.getMonthValue();
+                            }
+                        }
+                        LocalDateTime tm = now.withMonth(month).withDayOfMonth(Integer.valueOf(ct.substring(0, 2))).withHour(Integer.valueOf(ct.substring(2, 4))).withMinute(Integer.valueOf(ct.substring(4, 6))).withSecond(0).withNano(0);
+
+                      //  LocalDateTime tm = now.withDayOfMonth(Integer.valueOf(ct.substring(0, 2))).withHour(Integer.valueOf(ct.substring(2, 4))).withMinute(Integer.valueOf(ct.substring(4, 6))).withSecond(0).withNano(0);
                         if (tm.isBefore(now)) {
                             this.dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_COLLECT_TIME.getName(), tm.format(timeFormat));
                         } else if (tm.isAfter(now)) {
@@ -185,7 +199,20 @@ public class DataStreamSzyFixedTimeDatagramProcessor extends DataStreamProcessor
                     readSize = itemTemplateTt.stream(dataArea, count);
                     if (itemTemplateTt.isMacth()) {
                         String ct = itemTemplateTt.getStringValue();
-                        LocalDateTime tm = now.withDayOfMonth(Integer.valueOf(ct.substring(0, 2))).withHour(Integer.valueOf(ct.substring(2, 4))).withMinute(Integer.valueOf(ct.substring(4, 6))).withSecond(0).withNano(0);
+                        int day =Integer.valueOf(ct.substring(0, 2));
+                        LocalDate localDate= LocalDate.now();
+                        int month = localDate.getMonthValue();
+                        int maxDay= localDate.lengthOfMonth();
+                        if (day>maxDay){
+                            LocalDate preDate= LocalDate.now().minusMonths(1);
+                            int preMaxDay= preDate.lengthOfMonth();
+                            if (day<=preMaxDay){
+                                month = preDate.getMonthValue();
+                            }
+                        }
+                        LocalDateTime tm = now.withMonth(month).withDayOfMonth(Integer.valueOf(ct.substring(0, 2))).withHour(Integer.valueOf(ct.substring(2, 4))).withMinute(Integer.valueOf(ct.substring(4, 6))).withSecond(0).withNano(0);
+
+                        // LocalDateTime tm = now.withDayOfMonth(Integer.valueOf(ct.substring(0, 2))).withHour(Integer.valueOf(ct.substring(2, 4))).withMinute(Integer.valueOf(ct.substring(4, 6))).withSecond(0).withNano(0);
                         if (tm.isBefore(now)) {
                             this.dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_COLLECT_TIME.getName(), tm.format(timeFormat));
                         } else if (tm.isAfter(now)) {
@@ -258,7 +285,20 @@ public class DataStreamSzyFixedTimeDatagramProcessor extends DataStreamProcessor
                     readSize = itemTemplateTt.stream(dataArea, count);
                     if (itemTemplateTt.isMacth()) {
                         String ct = itemTemplateTt.getStringValue();
-                        LocalDateTime tm = now.withDayOfMonth(Integer.valueOf(ct.substring(0, 2))).withHour(Integer.valueOf(ct.substring(2, 4))).withMinute(Integer.valueOf(ct.substring(4, 6))).withSecond(0).withNano(0);
+                        int day =Integer.valueOf(ct.substring(0, 2));
+                        LocalDate localDate= LocalDate.now();
+                        int month = localDate.getMonthValue();
+                        int maxDay= localDate.lengthOfMonth();
+                        if (day>maxDay){
+                            LocalDate preDate= LocalDate.now().minusMonths(1);
+                            int preMaxDay= preDate.lengthOfMonth();
+                            if (day<=preMaxDay){
+                                month = preDate.getMonthValue();
+                            }
+                        }
+                        LocalDateTime tm = now.withMonth(month).withDayOfMonth(Integer.valueOf(ct.substring(0, 2))).withHour(Integer.valueOf(ct.substring(2, 4))).withMinute(Integer.valueOf(ct.substring(4, 6))).withSecond(0).withNano(0);
+
+                        //  LocalDateTime tm = now.withDayOfMonth(Integer.valueOf(ct.substring(0, 2))).withHour(Integer.valueOf(ct.substring(2, 4))).withMinute(Integer.valueOf(ct.substring(4, 6))).withSecond(0).withNano(0);
                         if (tm.isBefore(now)) {
                             this.dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_COLLECT_TIME.getName(), tm.format(timeFormat));
                         } else if (tm.isAfter(now)) {
@@ -338,7 +378,18 @@ public class DataStreamSzyFixedTimeDatagramProcessor extends DataStreamProcessor
                     readSize = itemTemplateTt.stream(dataArea, count);
                     if (itemTemplateTt.isMacth()) {
                         String ct = itemTemplateTt.getStringValue();
-                        LocalDateTime tm = now.withDayOfMonth(Integer.valueOf(ct.substring(0, 2))).withHour(Integer.valueOf(ct.substring(2, 4))).withMinute(Integer.valueOf(ct.substring(4, 6))).withSecond(0).withNano(0);
+                        int day =Integer.valueOf(ct.substring(0, 2));
+                        LocalDate localDate= LocalDate.now();
+                        int month = localDate.getMonthValue();
+                        int maxDay= localDate.lengthOfMonth();
+                        if (day>maxDay){
+                            LocalDate preDate= LocalDate.now().minusMonths(1);
+                            int preMaxDay= preDate.lengthOfMonth();
+                            if (day<=preMaxDay){
+                                month = preDate.getMonthValue();
+                            }
+                        }
+                        LocalDateTime tm = now.withMonth(month).withDayOfMonth(Integer.valueOf(ct.substring(0, 2))).withHour(Integer.valueOf(ct.substring(2, 4))).withMinute(Integer.valueOf(ct.substring(4, 6))).withSecond(0).withNano(0);
                         if (tm.isBefore(now)) {
                             this.dataStreamCache.putValue(DataStreamCacheMetadata.METADATA_COLLECT_TIME.getName(), tm.format(timeFormat));
                         } else if (tm.isAfter(now)) {

+ 8 - 3
tsddcs-datacollect/src/main/java/org/ts/ddcs/dataStream/sink/DataStreamKafkaSink.java

@@ -55,13 +55,18 @@ public class DataStreamKafkaSink extends DataStreamSink {
         Date dt = Date.from(this.dataStreamCache.getPickPacketTime().atZone(ZoneId.systemDefault()).toInstant());
         body.put(TaishanV1JsonPackMetadataEnum.PACK_METADATA_FROM_TIME.getName(), timeFormat.format(dt));
         body.put(TaishanV1JsonPackMetadataEnum.PACK_METADATA_DATAGRAM_TYPE.getName(), this.dataStreamCache.getDatagramCode());
+
         log.info("data item :{}", body.toJSONString());
-        String key = "topic."+KafkaConstants.TOPIC_TS_V1_DATAGRAM;
+        String key = "topic." + KafkaConstants.TOPIC_TS_V1_DATAGRAM;
         try {
             byte[] topicBuff = MainProcessorHolder.newInstance().dataProcess.getStoreCache(key);
-            String topic = new String(topicBuff,"utf-8");
-            log.info("topic {}",topic);
+            String topic = new String(topicBuff, "utf-8");
+            log.info("topic {}", topic);
             KafkaClient.messageSubmit(KafkaConstants.TOPIC_TS_V1_DATAGRAM, body.toJSONString());
+
+            body.put("collectTime", this.dataStreamCache.getValue(DataStreamCacheMetadata.METADATA_UP_TIME.getName()));
+            KafkaClient.messageSubmit(KafkaConstants.TOPIC_TS_HBASE, body.toJSONString());
+
         } catch (UnsupportedEncodingException e) {
             e.printStackTrace();
         } catch (RocksDBException e) {

+ 64 - 0
tsddcs-datacollect/src/main/java/org/ts/ddcs/hbase/HBaseConfig.java

@@ -0,0 +1,64 @@
+/**
+ * Copyright 2019 DH
+ * All right reserved.
+ * 项目名称: 大恒泰山系统
+ * 创建日期:2024/3/29
+ */
+package org.ts.ddcs.hbase;
+
+
+import lombok.Data;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+
+
+import java.io.IOException;
+
+
+/***
+ * Date:2024/3/29
+ * Title:文件所属模块(必须填写)
+ * Description:对本文件的详细描述,原则上不能少于30字
+ * @author dylan
+ * @version 1.0
+ * Remark:认为有必要的其他信息
+ */
+@Data
+@Configuration
+public class HBaseConfig {
+
+    @Value("${hbase.enable}")
+    private Boolean hbaseEnable;
+
+    @Value("${hbase.zookeeper.quorum}")
+    private String quorum;
+
+    @Value("${hbase.zookeeper.property.clientPort}")
+    private String clientPort;
+
+    @Value("${hbase.zookeeper.znode.parent}")
+    private String znodeParent;
+
+//    @Bean
+//    public org.apache.hadoop.conf.Configuration configuration() {
+//        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
+//        conf.set("hbase.zookeeper.quorum", quorum);
+//        conf.set("hbase.zookeeper.property.clientPort", clientPort);
+//        conf.set("zookeeper.znode.parent", znodeParent);
+//        return conf;
+//    }
+
+    @Bean
+    public HBaseService getHbaseService() {
+        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
+        conf.set("hbase.zookeeper.quorum", quorum);
+        conf.set("hbase.zookeeper.property.clientPort", clientPort);
+        conf.set("zookeeper.znode.parent", znodeParent);
+        return new HBaseService(conf, hbaseEnable);
+    }
+}

+ 549 - 0
tsddcs-datacollect/src/main/java/org/ts/ddcs/hbase/HBaseService.java

@@ -0,0 +1,549 @@
+/**
+ * 项目名称:
+ * 创建日期: 2023/4/26
+ */
+package org.ts.ddcs.hbase;
+
+import cn.hutool.core.util.IdUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.*;
+
+/**
+ * 创建日期: 2023/4/26
+ * Title: 文件所属模块(必须填写)
+ * Description:对本文件的详细描述,原则上不能少于30字
+ * @author DSH
+ * @version 1.0
+ * Remark:认为有必要的其他信息
+ * @mender:(文件的修改者,文件创建者之外的人)
+ */
+@Slf4j
+public class HBaseService {
+
+	/** 默认列 - 数据上报时间 */
+	private final String COLUMN_REPORT_TIME = "report_time";
+
+	/** 默认列 - 数据创建时间 */
+	private final String COLUMN_CREATE_TIME = "create_time";
+
+	/** 默认列 - 数据创建时间 */
+	private final String COLUMN_CREATE_TIME_STR = "create_time_str";
+
+	/**
+	 * 声明静态配置
+	 */
+	private Configuration conf = null;
+	private Connection connection = null;
+
+	public HBaseService(Configuration conf, boolean enable) {
+		if (enable) {
+			this.conf = conf;
+			try {
+				log.info("HBase连接初始化...");
+				connection = ConnectionFactory.createConnection(this.conf);
+			} catch (IOException e) {
+				log.error("获取HBase连接失败");
+			}
+		} else {
+			log.info("HBase连接未初始化!");
+		}
+	}
+
+	/**
+	 * 功能: 判断表是否存在
+	 * 创建日期: 2023/6/7
+	 * @param tableName 表名
+	 * @return: boolean
+	 */
+	public boolean isTableExists(String tableName) {
+		boolean exists = false;
+		try {
+			TableName table = TableName.valueOf(tableName);
+			exists = connection.getAdmin().tableExists(table);
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+		return exists;
+	}
+
+	/**
+	 * 功能: 创建表
+	 * 创建日期: 2023/6/7
+	 * @param tableName 表名
+	 * @param columnFamily 列族
+	 * @return: boolean
+	 */
+	public boolean createTable(String tableName, List<String> columnFamily) {
+		return createTable(tableName, columnFamily, null);
+	}
+
+	/**
+	 * 功能: 预分区创建表
+	 * 创建日期: 2023/6/7
+	 * @param tableName 表名
+	 * @param columnFamily 列族
+	 * @param keys 分区集合
+	 * @return: boolean
+	 */
+	public boolean createTable(String tableName, List<String> columnFamily, List<String> keys) {
+		if (isTableExists(tableName)) {
+			log.debug("HBase中表 {} 已存在。", tableName);
+			return false;
+		}
+		Admin admin = null;
+		try {
+			/*TableName table = TableName.valueOf(tableName);
+			HTableDescriptor desc = new HTableDescriptor(table);
+			for (String cf : columnFamily) {
+				desc.addFamily(new HColumnDescriptor(cf));
+			}*/
+			admin = connection.getAdmin();
+			List<ColumnFamilyDescriptor> familyDescriptors = new ArrayList<>(columnFamily.size());
+			for (String column : columnFamily) {
+				familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(column)).build());
+			}
+			TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamilies(familyDescriptors).build();
+			if (keys == null) {
+				admin.createTable(desc);
+			} else {
+				byte[][] splitKeys = getSplitKeys(keys);
+				admin.createTable(desc, splitKeys);
+			}
+			return true;
+		} catch (IOException e) {
+			e.printStackTrace();
+		} finally {
+			close(admin, null, null);
+		}
+		return false;
+	}
+
+	/**
+	 * 功能: 删除表
+	 * 创建日期: 2023/6/7
+	 * @param tableName 表名
+	 * @return: boolean
+	 */
+	public boolean dropTable(String tableName) {
+		Admin admin = null;
+		try {
+			admin = connection.getAdmin();
+			if (admin.tableExists(TableName.valueOf(tableName))) {
+				admin.disableTable(TableName.valueOf(tableName));
+				admin.deleteTable(TableName.valueOf(tableName));
+				log.debug("表【" + tableName + "】删除成功!");
+			}
+		} catch (IOException e) {
+			log.error(MessageFormat.format("表【{0}】删除失败。", tableName), e);
+			return false;
+		} finally {
+			close(admin, null, null);
+		}
+		return true;
+	}
+
+	/**
+	 * 功能: 插入数据(单条)
+	 * 创建日期: 2023/6/7
+	 * @param tableName 表名
+	 * @param rowKey rowKey
+	 * @param columnFamily 列族
+	 * @param column 列
+	 * @param value 值
+	 * @param reportTime 数据填报时间,毫秒
+	 * @return: boolean
+	 */
+	public boolean putData(String tableName, String rowKey, String columnFamily, String column, String value, Long reportTime) {
+		return putData(tableName, rowKey, columnFamily, Arrays.asList(column), Arrays.asList(value), reportTime);
+	}
+
+	/**
+	 * 功能: 插入数据(批量)
+	 * 创建日期: 2023/6/7
+	 * @param tableName 表名
+	 * @param rowKey rowKey
+	 * @param columnFamily 列族
+	 * @param columns 列
+	 * @param values 值
+	 * @return: boolean
+	 */
+	public boolean putData(String tableName, String rowKey, String columnFamily, List<String> columns, List<String> values, Long reportTime) {
+		Table table = null;
+		try {
+			table = connection.getTable(TableName.valueOf(tableName));
+			Put put = new Put(Bytes.toBytes(rowKey));
+			for (int i = 0; i < columns.size(); i++) {
+				if (columns.get(i) != null && values.get(i) != null) {
+					put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns.get(i)), Bytes.toBytes(values.get(i)));
+				} else {
+					throw new NullPointerException(MessageFormat.format("列名和列数据都不能为空,column:{0},value:{1}" , columns.get(i), values.get(i)));
+				}
+			}
+			if (reportTime != null) {
+				put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(COLUMN_REPORT_TIME), Bytes.toBytes(reportTime.longValue())); // 填报时间,可能不同于创建时间,可能用于排序、过滤查询
+			}
+			put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(COLUMN_CREATE_TIME), Bytes.toBytes(System.currentTimeMillis())); // 创建时间
+//			put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(COLUMN_CREATE_TIME_STR), Bytes.toBytes(Long.toString(System.currentTimeMillis()))); // 创建时间字符串
+			table.put(put);
+			table.close();
+			return true;
+		} catch (IOException e) {
+			log.error(MessageFormat.format("插入数据失败,tableName:{0},rowKey:{1},columnFamily:{2}", tableName, rowKey, columnFamily), e);
+			e.printStackTrace();
+			return false;
+		} finally {
+			close(null, null, table);
+		}
+	}
+
+
+	public boolean putData(String tableName, String rowKey, String columnFamily, List<String> columns, List<String> values) {
+		Table table = null;
+		try {
+			table = connection.getTable(TableName.valueOf(tableName));
+			Put put = new Put(Bytes.toBytes(rowKey));
+			for (int i = 0; i < columns.size(); i++) {
+				if (columns.get(i) != null && values.get(i) != null) {
+					put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns.get(i)), Bytes.toBytes(values.get(i)));
+				} else {
+					throw new NullPointerException(MessageFormat.format("列名和列数据都不能为空,column:{0},value:{1}" , columns.get(i), values.get(i)));
+				}
+			}
+//			if (reportTime != null) {
+//				put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(COLUMN_REPORT_TIME), Bytes.toBytes(reportTime.longValue())); // 填报时间,可能不同于创建时间,可能用于排序、过滤查询
+//			}
+//			put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(COLUMN_CREATE_TIME), Bytes.toBytes(System.currentTimeMillis())); // 创建时间
+//			put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(COLUMN_CREATE_TIME_STR), Bytes.toBytes(Long.toString(System.currentTimeMillis()))); // 创建时间字符串
+			table.put(put);
+			table.close();
+			return true;
+		} catch (IOException e) {
+			log.error(MessageFormat.format("插入数据失败,tableName:{0},rowKey:{1},columnFamily:{2}", tableName, rowKey, columnFamily), e);
+			e.printStackTrace();
+			return false;
+		} finally {
+			close(null, null, table);
+		}
+	}
+
+	/**
+	 * 功能: 获取数据(全表数据)
+	 * 创建日期: 2023/6/7
+	 * @param tableName 表名
+	 * @return: java.util.List<java.util.Map<java.lang.String,java.lang.String>>
+	 */
+	public List<Map<String, Object>> getData(String tableName) {
+		List<Map<String, Object>> list = new ArrayList<>();
+		Table table = null;
+		ResultScanner rs = null;
+		try {
+			table = connection.getTable(TableName.valueOf(tableName));
+			Scan scan = new Scan();
+			rs = table.getScanner(scan);
+			for (Result result : rs) {
+				HashMap<String, Object> map = new HashMap<>();
+				// rowkey
+				String row = Bytes.toString(result.getRow());
+				map.put("row", row);
+				for (Cell cell : result.listCells()) {
+					// 列族
+					String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
+					// 列
+					String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+					// 值
+					Object data = null;
+					if (COLUMN_CREATE_TIME.equals(qualifier) || COLUMN_REPORT_TIME.equals(qualifier)) {
+						data = Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+					} else {
+						data = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+					}
+					// Timestamp
+					long timestamp = cell.getTimestamp();
+					map.put(family + ":" + qualifier, data);
+				}
+				list.add(map);
+			}
+			table.close();
+		} catch (IOException e) {
+			log.error(MessageFormat.format("获取数据(全表数据)失败,tableName:{0}", tableName), e);
+			e.printStackTrace();
+		} finally {
+			close(null, rs, table);
+		}
+		return list;
+	}
+
+	public List<Map<String, Object>> getData(String tableName, Scan scan) {
+		List<Map<String, Object>> list = new ArrayList<>();
+		Table table = null;
+		ResultScanner rs = null;
+		try {
+
+			table = connection.getTable(TableName.valueOf(tableName));
+			if (scan == null) {
+				scan = new Scan();
+			}
+			rs = table.getScanner(scan);
+			for (Result result : rs) {
+				HashMap<String, Object> map = new HashMap<>();
+				// rowkey
+				String row = Bytes.toString(result.getRow());
+				map.put("row", row);
+				for (Cell cell : result.listCells()) {
+					// 列族
+					String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
+					// 列
+					String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+					// 值
+					Object data = null;
+					if (COLUMN_CREATE_TIME.equals(qualifier) || COLUMN_REPORT_TIME.equals(qualifier)) {
+						data = Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+					} else {
+						data = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+					}
+					// Timestamp
+					long timestamp = cell.getTimestamp();
+					map.put(family + ":" + qualifier, data);
+				}
+				list.add(map);
+			}
+			table.close();
+		} catch (IOException e) {
+			log.error(MessageFormat.format("获取数据(全表数据)失败,tableName:{0}", tableName), e);
+			e.printStackTrace();
+		} finally {
+			close(null, rs, table);
+		}
+		return list;
+	}
+
+	/**
+	 * 功能: 获取数据(根据rowkey)
+	 * 创建日期: 2023/6/7
+	 * @param tableName 表名
+	 * @param rowKey rowKey
+	 * @return: java.util.Map<java.lang.String,java.lang.String>
+	 */
+	public Map<String, Object> getData(String tableName, String rowKey) {
+		HashMap<String, Object> map = new HashMap<>();
+		try {
+			Table table = connection.getTable(TableName.valueOf(tableName));
+			Get get = new Get(Bytes.toBytes(rowKey));
+			Result result = table.get(get);
+			if (result != null && !result.isEmpty()) {
+				map.put("row", rowKey);
+				for (Cell cell : result.listCells()) {
+					// 列族
+					String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
+					// 列
+					String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+					// 值
+					Object data = null;
+					if (COLUMN_CREATE_TIME.equals(qualifier) || COLUMN_REPORT_TIME.equals(qualifier)) {
+						data = Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+					} else {
+						data = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+					}
+					map.put(family + ":" + qualifier, data);
+				}
+			}
+			table.close();
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+		return map;
+	}
+
+	/**
+	 * 功能: 获取数据(根据rowkey,列族,列)
+	 * 创建日期: 2023/6/7
+	 * @param tableName 表名
+	 * @param rowKey
+	 * @param columnFamily 列族
+	 * @param columnQualifier 列
+	 * @return: java.lang.String
+	 */
+	public String getData(String tableName, String rowKey, String columnFamily, String columnQualifier) {
+		String data = "";
+		try {
+			Table table = connection.getTable(TableName.valueOf(tableName));
+			Get get = new Get(Bytes.toBytes(rowKey));
+			get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier));
+			Result result = table.get(get);
+			if (result != null && !result.isEmpty()) {
+				Cell cell = result.listCells().get(0);
+
+				// 列族
+				String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
+				// 列
+				String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+				if (COLUMN_CREATE_TIME.equals(qualifier) || COLUMN_REPORT_TIME.equals(qualifier)) {
+					data = Long.toString(Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
+				} else {
+					data = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+				}
+			}
+			table.close();
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+		return data;
+	}
+
+	/**
+	 * 功能: 删除数据(根据rowkey)
+	 * 创建日期: 2023/6/7
+	 * @param tableName 表名
+	 * @param rowKey
+	 * @return: void
+	 */
+	public void deleteData(String tableName, String rowKey) throws IOException {
+		Table table = connection.getTable(TableName.valueOf(tableName));
+		Delete delete = new Delete(Bytes.toBytes(rowKey));
+		table.delete(delete);
+		table.close();
+	}
+
+	/**
+	 * 功能: 删除数据(根据rowkey,列族)
+	 * 创建日期: 2023/6/7
+	 * @param tableName 表名
+	 * @param rowKey
+	 * @param columnFamily 列族
+	 * @return: void
+	 */
+	public void deleteData(String tableName, String rowKey, String columnFamily) throws IOException {
+		Table table = connection.getTable(TableName.valueOf(tableName));
+		Delete delete = new Delete(Bytes.toBytes(rowKey));
+		delete.addFamily(columnFamily.getBytes());
+		table.delete(delete);
+		table.close();
+	}
+
+	/**
+	 * 功能: 删除数据(根据rowkey,列族)
+	 * 创建日期: 2023/6/7
+	 * @param tableName 表名
+	 * @param rowKey
+	 * @param columnFamily 列族
+	 * @param column 列
+	 * @return: void
+	 */
+	public void deleteData(String tableName, String rowKey, String columnFamily, String column) throws IOException {
+		Table table = connection.getTable(TableName.valueOf(tableName));
+		Delete delete = new Delete(Bytes.toBytes(rowKey));
+		delete.addColumn(columnFamily.getBytes(), column.getBytes());
+		table.delete(delete);
+		table.close();
+	}
+
+	/**
+	 * 功能: 删除数据(多行)
+	 * 创建日期: 2023/6/7
+	 * @param tableName 表名
+	 * @param rowKeys rowKey集合
+	 * @return: void
+	 */
+	public void deleteData(String tableName, List<String> rowKeys) throws IOException {
+		Table table = connection.getTable(TableName.valueOf(tableName));
+		List<Delete> deleteList = new ArrayList<>();
+		for (String row : rowKeys) {
+			Delete delete = new Delete(Bytes.toBytes(row));
+			deleteList.add(delete);
+		}
+		table.delete(deleteList);
+		table.close();
+	}
+
+	/**
+	 * 功能: 分区【10, 20, 30】 -> ( ,10] (10,20] (20,30] (30, )
+	 * 创建日期: 2023/6/7
+	 * @param keys 分区集合[10, 20, 30]
+	 * @return: byte[][]
+	 */
+	private byte[][] getSplitKeys(List<String> keys) {
+		byte[][] splitKeys = new byte[keys.size()][];
+		TreeSet<byte[]> rows = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+		for (String key : keys) {
+			rows.add(Bytes.toBytes(key));
+		}
+		int i = 0;
+		for (byte[] row : rows) {
+			splitKeys[i] = row;
+			i++;
+		}
+		return splitKeys;
+	}
+
+	/**
+	 * 功能: 查询库中所有表的表名
+	 * 创建日期: 2023/6/8
+	 * @param
+	 * @return: java.util.List<java.lang.String>
+	 */
+	public List<String> getAllTableNames() {
+		List<String> result = new ArrayList<>();
+		Admin admin = null;
+		try {
+			admin = connection.getAdmin();
+			TableName[] tableNames = admin.listTableNames();
+			for (TableName tableName : tableNames) {
+				result.add(tableName.getNameAsString());
+			}
+		} catch (IOException e) {
+			log.error("获取所有表的表名失败", e);
+		} finally {
+			close(admin, null, null);
+		}
+		return result;
+	}
+
+	/**
+	 * 功能: 关闭流
+	 * 创建日期: 2023/6/8
+	 * @param admin
+	 * @param rs
+	 * @param table
+	 * @return: void
+	 */
+	private void close(Admin admin, ResultScanner rs, Table table) {
+		if (admin != null) {
+			try {
+				admin.close();
+			} catch (IOException e) {
+				log.error("关闭Admin失败", e);
+			}
+		}
+
+		if (rs != null) {
+			rs.close();
+		}
+
+		if (table != null) {
+			try {
+				table.close();
+			} catch (IOException e) {
+				log.error("关闭Table失败", e);
+			}
+		}
+	}
+
+	/**
+	 * 功能: 创建一个新的rowKey(方便排序和查找最新记录)
+	 * 创建日期: 2023/9/20
+	 * @param
+	 * @return: java.lang.String
+	 */
+	public String createRowKey() {
+		return String.valueOf(Long.MAX_VALUE - System.currentTimeMillis()) + "-" + IdUtil.fastSimpleUUID();
+	}
+
+}

+ 0 - 1
tsddcs-datacollect/src/main/java/org/ts/ddcs/itemdatatemplate/sw/DataItemTemplateTT.java

@@ -1,6 +1,5 @@
 package org.ts.ddcs.itemdatatemplate.sw;
 
-import com.alibaba.fastjson.JSONObject;
 import org.ts.ddcs.common.util.BytesHelp;
 import org.ts.ddcs.itemdatatemplate.DataItemTemplate;
 

+ 0 - 1
tsddcs-datacollect/src/main/java/org/ts/ddcs/itemdatatemplate/szy/DataItemTemplateSzyTT.java

@@ -1,6 +1,5 @@
 package org.ts.ddcs.itemdatatemplate.szy;
 
-import com.alibaba.fastjson.JSONObject;
 import org.ts.ddcs.common.util.BytesHelp;
 import org.ts.ddcs.itemdatatemplate.DataItemTemplate;
 

+ 57 - 1
tsddcs-datacollect/src/main/java/org/ts/ddcs/kafka/MessageHandler.java

@@ -1,6 +1,7 @@
 package org.ts.ddcs.kafka;
 
 
+import cn.hutool.core.util.IdUtil;
 import com.alibaba.fastjson.JSONObject;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -8,10 +9,15 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.support.Acknowledgment;
 import org.springframework.stereotype.Component;
+import org.ts.ddcs.common.metadata.TaishanV1JsonPackMetadataEnum;
 import org.ts.ddcs.constant.BusinessConstant;
+import org.ts.ddcs.hbase.HBaseService;
 import org.ts.ddcs.main.MainProcessorHolder;
 import org.ts.ddcs.task.DownloadDatagramTask;
 
+import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.FutureTask;
 
 
@@ -23,6 +29,8 @@ import java.util.concurrent.FutureTask;
 @Slf4j
 @Component
 public class MessageHandler {
+    @Resource
+    private HBaseService hBaseService;
 
     /**
      * 下发报文到测站
@@ -34,7 +42,7 @@ public class MessageHandler {
     public void toRtu(ConsumerRecord record, Acknowledgment acknowledgment) {
         try {
             String message = (String) record.value();
-            log.info("kafka handler {}",message);
+            log.info("kafka handler {}", message);
             if (null != message) {
                 JSONObject data = JSONObject.parseObject(message);
                 String rtuCode = data.getString(BusinessConstant.BASE_KEY_RTU);
@@ -52,4 +60,52 @@ public class MessageHandler {
         }
     }
 
+
+    @KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${mq-topic.topic-tsywxt-hbase}", containerFactory = "ackContainerFactory")
+    public void toHbase(ConsumerRecord record, Acknowledgment acknowledgment) {
+        try {
+            String message = (String) record.value();
+            log.info("kafka handler {}", message);
+            if (null != message) {
+                JSONObject data = JSONObject.parseObject(message);
+                List<String> columnFamily = new ArrayList<>(1);
+                columnFamily.add("datagram");
+                String tableName = data.getString(TaishanV1JsonPackMetadataEnum.PACK_METADATA_AGREEMENT_TYPE.getName()) + "_" + "datagram";
+                hBaseService.createTable(tableName.toUpperCase(), columnFamily);
+
+                String rtuCode = data.getString(TaishanV1JsonPackMetadataEnum.PACK_METADATA_RTU_CODE.getName());
+                String rowKey = rtuCode + "-" + IdUtil.fastSimpleUUID() + "-" + String.valueOf(Long.MAX_VALUE - System.currentTimeMillis());
+
+                List<String> columns = new ArrayList<>();
+                List<String> values = new ArrayList<>();
+
+                columns.add(TaishanV1JsonPackMetadataEnum.PACK_METADATA_RTU_CODE.getName());
+                values.add(rtuCode);
+
+                columns.add(TaishanV1JsonPackMetadataEnum.PACK_METADATA_FROM_TIME.getName());
+                values.add(data.getString(TaishanV1JsonPackMetadataEnum.PACK_METADATA_FROM_TIME.getName()));
+
+                columns.add(TaishanV1JsonPackMetadataEnum.PACK_METADATA_DATAGRAM_TYPE.getName());
+                values.add(data.getString(TaishanV1JsonPackMetadataEnum.PACK_METADATA_DATAGRAM_TYPE.getName()));
+
+                columns.add(TaishanV1JsonPackMetadataEnum.PACK_METADATA_AGREEMENT_TYPE.getName());
+                values.add(data.getString(TaishanV1JsonPackMetadataEnum.PACK_METADATA_AGREEMENT_TYPE.getName()));
+
+                columns.add(TaishanV1JsonPackMetadataEnum.PACK_METADATA_DATAGRAM.getName());
+                values.add(data.getString(TaishanV1JsonPackMetadataEnum.PACK_METADATA_DATAGRAM.getName()));
+
+                columns.add("collectTime");
+                values.add(data.getString("collectTime"));
+
+                hBaseService.putData(tableName.toUpperCase(), rowKey, "datagram", columns, values);
+
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        } finally {
+            // 手动提交 offset
+            acknowledgment.acknowledge();
+        }
+    }
+
 }

+ 7 - 1
tsddcs-datacollect/src/main/java/org/ts/ddcs/main/MainProcessor.java

@@ -1,16 +1,21 @@
 package org.ts.ddcs.main;
 
 
+import com.alibaba.fastjson.JSONObject;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.serializer.StringRedisSerializer;
 import org.springframework.stereotype.Component;
 
 
+import org.springframework.web.bind.annotation.GetMapping;
 import org.ts.ddcs.common.constant.KafkaConstants;
 import org.ts.ddcs.constant.NetPortStatusConstant;
 import org.ts.ddcs.dataStream.DataStreamManageHolder;
@@ -24,6 +29,7 @@ import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 import java.io.UnsupportedEncodingException;
 import java.nio.charset.Charset;
+import java.util.List;
 import java.util.concurrent.*;
 
 
@@ -90,7 +96,7 @@ public class MainProcessor {
             //写入运行参数
             String key = "topic." + KafkaConstants.TOPIC_TS_V1_DATAGRAM;
             this.putStoreCache(key, KafkaConstants.TOPIC_TS_V1_DATAGRAM);
-            //
+
             redisTemplate.setKeySerializer(new StringRedisSerializer());
             redisTemplate.setValueSerializer(new StringRedisSerializer());
             redisTemplate.setHashKeySerializer(new StringRedisSerializer());

+ 10 - 0
tsddcs-datacollect/src/main/resources/application-dev.yml

@@ -34,11 +34,13 @@ spring:
       concurrency: 5
       # 手动提交
       ack-mode: manual_immediate
+
 rockdb-config:
   save-path: D:\\tmp\\rockdb
 mq-topic:
   topic-queue-to-rtu-datagram: queue-to-rtu-datagram
   topic-queue-waiting-process-datagram: queue-waiting-process-datagram
+  topic-tsywxt-hbase: topic-tsywxt-hbase
 tsywxt-config:
   nio:
     listener:
@@ -47,4 +49,12 @@ tsywxt-config:
   template:
     path: "E:\\dh_svn\\Project\\DHRJ-TSSYSTEM\\SERVER\\TSDDCS\\tsddcs-datacollect\\src\\main\\resources\\templates"
     templates: SW2014,SZY2012
+hbase:
+  enable: true
+  zookeeper:
+    quorum: 172.9.0.190
+    property:
+      clientPort: 2181
+    znode:
+      parent: /hbase
 

+ 4 - 1
tsddcs-datacollect/src/main/resources/application.yml

@@ -1,2 +1,5 @@
 server:
-  port: 9999
+  port: 9999
+spring:
+  profiles:
+    active: dev