package org.ts.ddcs.dataStream; import com.alibaba.fastjson.JSONObject; import org.ts.ddcs.common.enums.AgreementEnum; import org.ts.ddcs.datatemplate.DataTemplateManageHolder; import org.ts.ddcs.datatemplate.Template; import org.ts.ddcs.datatemplate.TemplateLineNode; import org.ts.ddcs.kafka.KafkaClient; import org.ts.ddcs.log.LogHelper; import org.ts.ddcs.net.DataChannelHolder; import java.io.File; import java.util.List; import java.util.Map; import java.util.concurrent.*; /** * 数据流主模块 * * @author dylan */ public class DataStreamManager { /** * 数据流工作线程数量 */ // private static final int STREAM_THREAD_NUM = 4; /** * 数据块队列 */ // private LinkedBlockingQueue> blockQueue = new LinkedBlockingQueue>(1024); /** * 连接适配器 */ private Map dataStreamAdapterCache = new ConcurrentHashMap<>(1024); /** * 数据链压力记录 */ // private Map dataStreamStreesCache = new ConcurrentHashMap<>(STREAM_THREAD_NUM); /** * 模板处理器链 */ private Map streamLineCache = new ConcurrentHashMap<>(); /** * 公共线程池 **/ // private static ThreadFactory batchThreadFactory = new ThreadFactoryBuilder().setNameFormat("batch-thread-pool-%d").build(); // private static ExecutorService batchThreadPool = new ThreadPoolExecutor(1, 1, // 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024), batchThreadFactory, new ThreadPoolExecutor.AbortPolicy()); /** * 任务定时器 */ // private ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "batch-stream-task-" + r.hashCode())); /** * 数据流管理器初始化 */ public void init(String templates) { // String[] list = templates.split(","); // if (list.length > 0) { // for (String t : list) { // DataStreamLineManager lineManager = new DataStreamLineManager(AgreementEnum.getCode(t)); // List nodes = DataTemplateManageHolder.newInstance().dataTemplateManager.cloneTemplateLineNode(AgreementEnum.getCode(t)); // for (TemplateLineNode node : nodes // ) { // lineManager.addProcessor(node.dataStreamProcessor, node); // } // streamLineCache.put(AgreementEnum.getCode(t), lineManager); // } // } // try { //this.initDataStream(); // this.scheduleExecutor.scheduleWithFixedDelay(() -> { // try { // long tm = System.currentTimeMillis(); // BatchDataStreamTask task = new BatchDataStreamTask(tm); // FutureTask futureTask = new FutureTask<>(task); // batchThreadPool.execute(futureTask); // } catch (Exception e) { // LogHelper.error(e); // } // }, 0, 100, TimeUnit.MILLISECONDS); // } catch (Exception e) { // LogHelper.error(e); // throw e; // } } // private class BatchDataStreamTask implements Callable { // // private long batchTm; // // BatchDataStreamTask(long batchTm) { // this.batchTm = batchTm; // } // // @Override // public Integer call() { // try { // Map streamMap = new HashMap<>(); // for (int i = 1; i <= STREAM_THREAD_NUM; i++) { // if (blockQueue.isEmpty()) { // break; // } // Map block = blockQueue.poll(); // String adapterKey = (String) block.get("adapterKey"); // String filterKey = (String) block.get("filterKey"); // dataStreamStreesCache.put(filterKey, dataStreamStreesCache.get(filterKey) + 1); // DataStreamLineManager lineManager = streamLineCache.get(filterKey); // DataStreamAdapter dataStreamAdapter = dataStreamAdapterCache.get(adapterKey); // lineManager.source(dataStreamAdapter); // streamMap.put(dataStreamAdapter.getAdapterKey(), dataStreamAdapter.getAdapterKey()); // } // Set keySet = dataStreamAdapterCache.keySet(); // // for (String key : keySet) { // if (!streamMap.containsKey(key)) { // DataStreamAdapter dataStreamAdapter = dataStreamAdapterCache.get(key); // if (!dataStreamAdapter.processComplete()) { // String filterKey = dataStreamAdapter.getDataStreamLineKey(); // dataStreamStreesCache.put(filterKey, dataStreamStreesCache.get(filterKey) + 1); // DataStreamLineManager lineManager = streamLineCache.get(filterKey); // lineManager.source(dataStreamAdapter); // } // } // } // // // long ut = System.currentTimeMillis() - batchTm; // if (ut > 100) { // LogHelper.info("task time too long !!!!!!!!!!!!!!!"); // } // } catch (Exception e) { // LogHelper.error(e); // } // return 0; // } // } /** * 注册连接 * * @param adapter */ void regConnect(DataStreamAdapter adapter) { try { // String allocateFilterKey = "01"; // int minStreesValue = dataStreamStreesCache.get("01"); // for (int i = 2; i <= STREAM_THREAD_NUM; i++) { // String filterKey = String.format("%02d", i); // int tmpStreesValue = dataStreamStreesCache.get(filterKey); // if (tmpStreesValue < minStreesValue) { // minStreesValue = tmpStreesValue; // allocateFilterKey = filterKey; // } // } // adapter.setDataStreamLineKey(allocateFilterKey); //if (!dataStreamAdapterCache.containsKey(adapter.getAdapterKey())) { dataStreamAdapterCache.put(adapter.getAdapterKey(), adapter); //} }catch (Exception e){ e.printStackTrace(); } } /** * 关闭连接 * * @param adapter */ public void closeConnect(DataStreamAdapter adapter) { // try { // KafkaStream.clearKeyValue(adapter.getAdapterKey()); //if (dataStreamAdapterCache.containsKey(adapter.getAdapterKey())) { // dataStreamAdapterCache.remove(adapter.getAdapterKey()); // waittingCloseAdapterCache.put(adapter.getAdapterKey(), adapter); // } // } catch (RocksDBException e) { // e.printStackTrace(); // } } // public void sourceEvent(String filterKey, String key) { // Map block = new HashMap<>(); // block.put("filterKey", filterKey); // block.put("adapterKey", key); // block.put("tm", System.currentTimeMillis()); // blockQueue.offer(block); // } public void responseEvent(String key,byte[] respBlock) { if (this.dataStreamAdapterCache.containsKey(key)){ DataStreamAdapter dataStreamAdapter= DataChannelHolder.newInstance().dataChannel.getDataTemplateAdapterByKey(key); if (null != dataStreamAdapter){ dataStreamAdapter.output(respBlock); }else{ LogHelper.info("运行错误,链接已经提前关闭:"+key); } } } public void sourceEvent(String adapterKey, byte[] byteBuff) { JSONObject block = new JSONObject(); block.put("adapterKey", adapterKey); block.put("block", byteBuff); block.put("tm", System.currentTimeMillis()); KafkaClient.messageSubmit("kafka-data-stream-source-event", block.toJSONString()); } /** * 初始化数据流处理器 */ private void initDataStream() { try { // for (int i = 0; i < STREAM_THREAD_NUM; i++) { // DataStreamLineManager lineManager = new DataStreamLineManager(); // List nodes = DataTemplateManageHolder.newInstance().dataTemplateManager.cloneTemplateLineNode(); // for (TemplateLineNode node : nodes // ) { // lineManager.addProcessor(node.dataStreamProcessor, node); // } // String lineKey = String.format("%02d", (i + 1)); // streamLineCache.put(lineKey, lineManager); // dataStreamStreesCache.put(lineKey, 1); // } } catch (Exception e) { LogHelper.error("初始化数据流失败!"); throw e; } } }