package org.ts.ddcs.mqtt; import org.ts.ddcs.log.LogHelper; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.util.List; /** * @author dylan */ @Component public class Mqtt { @Value("${run-config.mqtt.mqttSubTopic}") public String subTopic; @Value("${run-config.mqtt.mqttPubTopic}") public String pubTopic; @Value("${run-config.mqtt.mqttBroker}") public String mqttBroker; @Value("${run-config.server.serverId}") public String clientId; public void send(String jsonStr) { try { int qos = 0; MemoryPersistence persistence = new MemoryPersistence(); MqttClient client = new MqttClient(mqttBroker, clientId, persistence); // MQTT 连接选项 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName("collect"); connOpts.setPassword("123".toCharArray()); // 保留会话 connOpts.setCleanSession(true); // 设置回调 client.setCallback(new OnMessageCallback()); // 建立连接 client.connect(connOpts); // 订阅 //client.subscribe(subTopic); // 消息发布所需参数 MqttMessage message = new MqttMessage(jsonStr.getBytes()); message.setQos(qos); client.publish(pubTopic, message); Thread.sleep(100); client.disconnect(); client.close(); } catch (MqttException me) { LogHelper.error("reason " + me.getReasonCode()); LogHelper.error("msg " + me.getMessage()); LogHelper.error("loc " + me.getLocalizedMessage()); LogHelper.error("cause " + me.getCause()); LogHelper.error("excep " + me); LogHelper.error(me); } catch (InterruptedException e) { LogHelper.error(e); } } public void send(List jsonStrList) { try { int qos = 0; MemoryPersistence persistence = new MemoryPersistence(); MqttClient client = new MqttClient(mqttBroker, clientId, persistence); // MQTT 连接选项 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName("collect"); connOpts.setPassword("123".toCharArray()); // 保留会话 connOpts.setCleanSession(true); // 设置回调 client.setCallback(new OnMessageCallback()); // 建立连接 client.connect(connOpts); // 订阅 //client.subscribe(subTopic); // 消息发布所需参数 for (String jsonStr : jsonStrList) { MqttMessage message = new MqttMessage(jsonStr.getBytes()); message.setQos(qos); client.publish(pubTopic, message); Thread.sleep(200); } Thread.sleep(100); client.disconnect(); client.close(); } catch (MqttException me) { LogHelper.error("reason " + me.getReasonCode()); LogHelper.error("msg " + me.getMessage()); LogHelper.error("loc " + me.getLocalizedMessage()); LogHelper.error("cause " + me.getCause()); LogHelper.error("excep " + me); LogHelper.error(me); } catch (InterruptedException e) { LogHelper.error(e); } } }