| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- package org.ts.ddcs.dataStream;
- import com.alibaba.fastjson.JSONObject;
- import org.ts.ddcs.common.enums.AgreementEnum;
- import org.ts.ddcs.datatemplate.DataTemplateManageHolder;
- import org.ts.ddcs.datatemplate.Template;
- import org.ts.ddcs.datatemplate.TemplateLineNode;
- import org.ts.ddcs.kafka.KafkaClient;
- import org.ts.ddcs.log.LogHelper;
- import org.ts.ddcs.net.DataChannelHolder;
- import java.io.File;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.*;
- /**
- * 数据流主模块
- *
- * @author dylan
- */
- public class DataStreamManager {
- /**
- * 数据流工作线程数量
- */
- // private static final int STREAM_THREAD_NUM = 4;
- /**
- * 数据块队列
- */
- // private LinkedBlockingQueue<Map<String, Object>> blockQueue = new LinkedBlockingQueue<Map<String, Object>>(1024);
- /**
- * 连接适配器
- */
- private Map<String, DataStreamAdapter> dataStreamAdapterCache = new ConcurrentHashMap<>(1024);
- /**
- * 数据链压力记录
- */
- // private Map<String, Integer> dataStreamStreesCache = new ConcurrentHashMap<>(STREAM_THREAD_NUM);
- /**
- * 模板处理器链
- */
- private Map<Integer, DataStreamLineManager> streamLineCache = new ConcurrentHashMap<>();
- /**
- * 公共线程池
- **/
- // private static ThreadFactory batchThreadFactory = new ThreadFactoryBuilder().setNameFormat("batch-thread-pool-%d").build();
- // private static ExecutorService batchThreadPool = new ThreadPoolExecutor(1, 1,
- // 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), batchThreadFactory, new ThreadPoolExecutor.AbortPolicy());
- /**
- * 任务定时器
- */
- // private ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "batch-stream-task-" + r.hashCode()));
- /**
- * 数据流管理器初始化
- */
- public void init(String templates) {
- // String[] list = templates.split(",");
- // if (list.length > 0) {
- // for (String t : list) {
- // DataStreamLineManager lineManager = new DataStreamLineManager(AgreementEnum.getCode(t));
- // List<TemplateLineNode> nodes = DataTemplateManageHolder.newInstance().dataTemplateManager.cloneTemplateLineNode(AgreementEnum.getCode(t));
- // for (TemplateLineNode node : nodes
- // ) {
- // lineManager.addProcessor(node.dataStreamProcessor, node);
- // }
- // streamLineCache.put(AgreementEnum.getCode(t), lineManager);
- // }
- // }
- // try {
- //this.initDataStream();
- // this.scheduleExecutor.scheduleWithFixedDelay(() -> {
- // try {
- // long tm = System.currentTimeMillis();
- // BatchDataStreamTask task = new BatchDataStreamTask(tm);
- // FutureTask<Integer> futureTask = new FutureTask<>(task);
- // batchThreadPool.execute(futureTask);
- // } catch (Exception e) {
- // LogHelper.error(e);
- // }
- // }, 0, 100, TimeUnit.MILLISECONDS);
- // } catch (Exception e) {
- // LogHelper.error(e);
- // throw e;
- // }
- }
- // private class BatchDataStreamTask implements Callable<Integer> {
- //
- // private long batchTm;
- //
- // BatchDataStreamTask(long batchTm) {
- // this.batchTm = batchTm;
- // }
- //
- // @Override
- // public Integer call() {
- // try {
- // Map<String, String> streamMap = new HashMap<>();
- // for (int i = 1; i <= STREAM_THREAD_NUM; i++) {
- // if (blockQueue.isEmpty()) {
- // break;
- // }
- // Map<String, Object> block = blockQueue.poll();
- // String adapterKey = (String) block.get("adapterKey");
- // String filterKey = (String) block.get("filterKey");
- // dataStreamStreesCache.put(filterKey, dataStreamStreesCache.get(filterKey) + 1);
- // DataStreamLineManager lineManager = streamLineCache.get(filterKey);
- // DataStreamAdapter dataStreamAdapter = dataStreamAdapterCache.get(adapterKey);
- // lineManager.source(dataStreamAdapter);
- // streamMap.put(dataStreamAdapter.getAdapterKey(), dataStreamAdapter.getAdapterKey());
- // }
- // Set<String> keySet = dataStreamAdapterCache.keySet();
- //
- // for (String key : keySet) {
- // if (!streamMap.containsKey(key)) {
- // DataStreamAdapter dataStreamAdapter = dataStreamAdapterCache.get(key);
- // if (!dataStreamAdapter.processComplete()) {
- // String filterKey = dataStreamAdapter.getDataStreamLineKey();
- // dataStreamStreesCache.put(filterKey, dataStreamStreesCache.get(filterKey) + 1);
- // DataStreamLineManager lineManager = streamLineCache.get(filterKey);
- // lineManager.source(dataStreamAdapter);
- // }
- // }
- // }
- //
- //
- // long ut = System.currentTimeMillis() - batchTm;
- // if (ut > 100) {
- // LogHelper.info("task time too long !!!!!!!!!!!!!!!");
- // }
- // } catch (Exception e) {
- // LogHelper.error(e);
- // }
- // return 0;
- // }
- // }
- /**
- * 注册连接
- *
- * @param adapter
- */
- void regConnect(DataStreamAdapter adapter) {
- try {
- // String allocateFilterKey = "01";
- // int minStreesValue = dataStreamStreesCache.get("01");
- // for (int i = 2; i <= STREAM_THREAD_NUM; i++) {
- // String filterKey = String.format("%02d", i);
- // int tmpStreesValue = dataStreamStreesCache.get(filterKey);
- // if (tmpStreesValue < minStreesValue) {
- // minStreesValue = tmpStreesValue;
- // allocateFilterKey = filterKey;
- // }
- // }
- // adapter.setDataStreamLineKey(allocateFilterKey);
- //if (!dataStreamAdapterCache.containsKey(adapter.getAdapterKey())) {
- dataStreamAdapterCache.put(adapter.getAdapterKey(), adapter);
- //}
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- /**
- * 关闭连接
- *
- * @param adapter
- */
- public void closeConnect(DataStreamAdapter adapter) {
- // try {
- // KafkaStream.clearKeyValue(adapter.getAdapterKey());
- //if (dataStreamAdapterCache.containsKey(adapter.getAdapterKey())) {
- // dataStreamAdapterCache.remove(adapter.getAdapterKey());
- // waittingCloseAdapterCache.put(adapter.getAdapterKey(), adapter);
- // }
- // } catch (RocksDBException e) {
- // e.printStackTrace();
- // }
- }
- // public void sourceEvent(String filterKey, String key) {
- // Map<String, Object> block = new HashMap<>();
- // block.put("filterKey", filterKey);
- // block.put("adapterKey", key);
- // block.put("tm", System.currentTimeMillis());
- // blockQueue.offer(block);
- // }
- public void responseEvent(String key,byte[] respBlock) {
- if (this.dataStreamAdapterCache.containsKey(key)){
- DataStreamAdapter dataStreamAdapter= DataChannelHolder.newInstance().dataChannel.getDataTemplateAdapterByKey(key);
- if (null != dataStreamAdapter){
- dataStreamAdapter.output(respBlock);
- }else{
- LogHelper.info("运行错误,链接已经提前关闭:"+key);
- }
- }
- }
- public void sourceEvent(String adapterKey, byte[] byteBuff) {
- JSONObject block = new JSONObject();
- block.put("adapterKey", adapterKey);
- block.put("block", byteBuff);
- block.put("tm", System.currentTimeMillis());
- KafkaClient.messageSubmit("kafka-data-stream-source-event", block.toJSONString());
- }
- /**
- * 初始化数据流处理器
- */
- private void initDataStream() {
- try {
- // for (int i = 0; i < STREAM_THREAD_NUM; i++) {
- // DataStreamLineManager lineManager = new DataStreamLineManager();
- // List<TemplateLineNode> nodes = DataTemplateManageHolder.newInstance().dataTemplateManager.cloneTemplateLineNode();
- // for (TemplateLineNode node : nodes
- // ) {
- // lineManager.addProcessor(node.dataStreamProcessor, node);
- // }
- // String lineKey = String.format("%02d", (i + 1));
- // streamLineCache.put(lineKey, lineManager);
- // dataStreamStreesCache.put(lineKey, 1);
- // }
- } catch (Exception e) {
- LogHelper.error("初始化数据流失败!");
- throw e;
- }
- }
- }
|