DataStreamManager.java 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package org.ts.ddcs.dataStream;
  2. import com.alibaba.fastjson.JSONObject;
  3. import org.ts.ddcs.common.enums.AgreementEnum;
  4. import org.ts.ddcs.datatemplate.DataTemplateManageHolder;
  5. import org.ts.ddcs.datatemplate.Template;
  6. import org.ts.ddcs.datatemplate.TemplateLineNode;
  7. import org.ts.ddcs.kafka.KafkaClient;
  8. import org.ts.ddcs.log.LogHelper;
  9. import org.ts.ddcs.net.DataChannelHolder;
  10. import java.io.File;
  11. import java.util.List;
  12. import java.util.Map;
  13. import java.util.concurrent.*;
  14. /**
  15. * 数据流主模块
  16. *
  17. * @author dylan
  18. */
  19. public class DataStreamManager {
  20. /**
  21. * 数据流工作线程数量
  22. */
  23. // private static final int STREAM_THREAD_NUM = 4;
  24. /**
  25. * 数据块队列
  26. */
  27. // private LinkedBlockingQueue<Map<String, Object>> blockQueue = new LinkedBlockingQueue<Map<String, Object>>(1024);
  28. /**
  29. * 连接适配器
  30. */
  31. private Map<String, DataStreamAdapter> dataStreamAdapterCache = new ConcurrentHashMap<>(1024);
  32. /**
  33. * 数据链压力记录
  34. */
  35. // private Map<String, Integer> dataStreamStreesCache = new ConcurrentHashMap<>(STREAM_THREAD_NUM);
  36. /**
  37. * 模板处理器链
  38. */
  39. private Map<Integer, DataStreamLineManager> streamLineCache = new ConcurrentHashMap<>();
  40. /**
  41. * 公共线程池
  42. **/
  43. // private static ThreadFactory batchThreadFactory = new ThreadFactoryBuilder().setNameFormat("batch-thread-pool-%d").build();
  44. // private static ExecutorService batchThreadPool = new ThreadPoolExecutor(1, 1,
  45. // 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), batchThreadFactory, new ThreadPoolExecutor.AbortPolicy());
  46. /**
  47. * 任务定时器
  48. */
  49. // private ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "batch-stream-task-" + r.hashCode()));
  50. /**
  51. * 数据流管理器初始化
  52. */
  53. public void init(String templates) {
  54. // String[] list = templates.split(",");
  55. // if (list.length > 0) {
  56. // for (String t : list) {
  57. // DataStreamLineManager lineManager = new DataStreamLineManager(AgreementEnum.getCode(t));
  58. // List<TemplateLineNode> nodes = DataTemplateManageHolder.newInstance().dataTemplateManager.cloneTemplateLineNode(AgreementEnum.getCode(t));
  59. // for (TemplateLineNode node : nodes
  60. // ) {
  61. // lineManager.addProcessor(node.dataStreamProcessor, node);
  62. // }
  63. // streamLineCache.put(AgreementEnum.getCode(t), lineManager);
  64. // }
  65. // }
  66. // try {
  67. //this.initDataStream();
  68. // this.scheduleExecutor.scheduleWithFixedDelay(() -> {
  69. // try {
  70. // long tm = System.currentTimeMillis();
  71. // BatchDataStreamTask task = new BatchDataStreamTask(tm);
  72. // FutureTask<Integer> futureTask = new FutureTask<>(task);
  73. // batchThreadPool.execute(futureTask);
  74. // } catch (Exception e) {
  75. // LogHelper.error(e);
  76. // }
  77. // }, 0, 100, TimeUnit.MILLISECONDS);
  78. // } catch (Exception e) {
  79. // LogHelper.error(e);
  80. // throw e;
  81. // }
  82. }
  83. // private class BatchDataStreamTask implements Callable<Integer> {
  84. //
  85. // private long batchTm;
  86. //
  87. // BatchDataStreamTask(long batchTm) {
  88. // this.batchTm = batchTm;
  89. // }
  90. //
  91. // @Override
  92. // public Integer call() {
  93. // try {
  94. // Map<String, String> streamMap = new HashMap<>();
  95. // for (int i = 1; i <= STREAM_THREAD_NUM; i++) {
  96. // if (blockQueue.isEmpty()) {
  97. // break;
  98. // }
  99. // Map<String, Object> block = blockQueue.poll();
  100. // String adapterKey = (String) block.get("adapterKey");
  101. // String filterKey = (String) block.get("filterKey");
  102. // dataStreamStreesCache.put(filterKey, dataStreamStreesCache.get(filterKey) + 1);
  103. // DataStreamLineManager lineManager = streamLineCache.get(filterKey);
  104. // DataStreamAdapter dataStreamAdapter = dataStreamAdapterCache.get(adapterKey);
  105. // lineManager.source(dataStreamAdapter);
  106. // streamMap.put(dataStreamAdapter.getAdapterKey(), dataStreamAdapter.getAdapterKey());
  107. // }
  108. // Set<String> keySet = dataStreamAdapterCache.keySet();
  109. //
  110. // for (String key : keySet) {
  111. // if (!streamMap.containsKey(key)) {
  112. // DataStreamAdapter dataStreamAdapter = dataStreamAdapterCache.get(key);
  113. // if (!dataStreamAdapter.processComplete()) {
  114. // String filterKey = dataStreamAdapter.getDataStreamLineKey();
  115. // dataStreamStreesCache.put(filterKey, dataStreamStreesCache.get(filterKey) + 1);
  116. // DataStreamLineManager lineManager = streamLineCache.get(filterKey);
  117. // lineManager.source(dataStreamAdapter);
  118. // }
  119. // }
  120. // }
  121. //
  122. //
  123. // long ut = System.currentTimeMillis() - batchTm;
  124. // if (ut > 100) {
  125. // LogHelper.info("task time too long !!!!!!!!!!!!!!!");
  126. // }
  127. // } catch (Exception e) {
  128. // LogHelper.error(e);
  129. // }
  130. // return 0;
  131. // }
  132. // }
  133. /**
  134. * 注册连接
  135. *
  136. * @param adapter
  137. */
  138. void regConnect(DataStreamAdapter adapter) {
  139. try {
  140. // String allocateFilterKey = "01";
  141. // int minStreesValue = dataStreamStreesCache.get("01");
  142. // for (int i = 2; i <= STREAM_THREAD_NUM; i++) {
  143. // String filterKey = String.format("%02d", i);
  144. // int tmpStreesValue = dataStreamStreesCache.get(filterKey);
  145. // if (tmpStreesValue < minStreesValue) {
  146. // minStreesValue = tmpStreesValue;
  147. // allocateFilterKey = filterKey;
  148. // }
  149. // }
  150. // adapter.setDataStreamLineKey(allocateFilterKey);
  151. //if (!dataStreamAdapterCache.containsKey(adapter.getAdapterKey())) {
  152. dataStreamAdapterCache.put(adapter.getAdapterKey(), adapter);
  153. //}
  154. }catch (Exception e){
  155. e.printStackTrace();
  156. }
  157. }
  158. /**
  159. * 关闭连接
  160. *
  161. * @param adapter
  162. */
  163. public void closeConnect(DataStreamAdapter adapter) {
  164. // try {
  165. // KafkaStream.clearKeyValue(adapter.getAdapterKey());
  166. //if (dataStreamAdapterCache.containsKey(adapter.getAdapterKey())) {
  167. // dataStreamAdapterCache.remove(adapter.getAdapterKey());
  168. // waittingCloseAdapterCache.put(adapter.getAdapterKey(), adapter);
  169. // }
  170. // } catch (RocksDBException e) {
  171. // e.printStackTrace();
  172. // }
  173. }
  174. // public void sourceEvent(String filterKey, String key) {
  175. // Map<String, Object> block = new HashMap<>();
  176. // block.put("filterKey", filterKey);
  177. // block.put("adapterKey", key);
  178. // block.put("tm", System.currentTimeMillis());
  179. // blockQueue.offer(block);
  180. // }
  181. public void responseEvent(String key,byte[] respBlock) {
  182. if (this.dataStreamAdapterCache.containsKey(key)){
  183. DataStreamAdapter dataStreamAdapter= DataChannelHolder.newInstance().dataChannel.getDataTemplateAdapterByKey(key);
  184. if (null != dataStreamAdapter){
  185. dataStreamAdapter.output(respBlock);
  186. }else{
  187. LogHelper.info("运行错误,链接已经提前关闭:"+key);
  188. }
  189. }
  190. }
  191. public void sourceEvent(String adapterKey, byte[] byteBuff) {
  192. JSONObject block = new JSONObject();
  193. block.put("adapterKey", adapterKey);
  194. block.put("block", byteBuff);
  195. block.put("tm", System.currentTimeMillis());
  196. KafkaClient.messageSubmit("kafka-data-stream-source-event", block.toJSONString());
  197. }
  198. /**
  199. * 初始化数据流处理器
  200. */
  201. private void initDataStream() {
  202. try {
  203. // for (int i = 0; i < STREAM_THREAD_NUM; i++) {
  204. // DataStreamLineManager lineManager = new DataStreamLineManager();
  205. // List<TemplateLineNode> nodes = DataTemplateManageHolder.newInstance().dataTemplateManager.cloneTemplateLineNode();
  206. // for (TemplateLineNode node : nodes
  207. // ) {
  208. // lineManager.addProcessor(node.dataStreamProcessor, node);
  209. // }
  210. // String lineKey = String.format("%02d", (i + 1));
  211. // streamLineCache.put(lineKey, lineManager);
  212. // dataStreamStreesCache.put(lineKey, 1);
  213. // }
  214. } catch (Exception e) {
  215. LogHelper.error("初始化数据流失败!");
  216. throw e;
  217. }
  218. }
  219. }