| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- package org.ts.ddcs.mqtt;
- import org.ts.ddcs.log.LogHelper;
- import org.eclipse.paho.client.mqttv3.MqttClient;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import java.util.List;
- /**
- * @author dylan
- */
- @Component
- public class Mqtt {
- @Value("${run-config.mqtt.mqttSubTopic}")
- public String subTopic;
- @Value("${run-config.mqtt.mqttPubTopic}")
- public String pubTopic;
- @Value("${run-config.mqtt.mqttBroker}")
- public String mqttBroker;
- @Value("${run-config.server.serverId}")
- public String clientId;
- public void send(String jsonStr) {
- try {
- int qos = 0;
- MemoryPersistence persistence = new MemoryPersistence();
- MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
- // MQTT 连接选项
- MqttConnectOptions connOpts = new MqttConnectOptions();
- connOpts.setUserName("collect");
- connOpts.setPassword("123".toCharArray());
- // 保留会话
- connOpts.setCleanSession(true);
- // 设置回调
- client.setCallback(new OnMessageCallback());
- // 建立连接
- client.connect(connOpts);
- // 订阅
- //client.subscribe(subTopic);
- // 消息发布所需参数
- MqttMessage message = new MqttMessage(jsonStr.getBytes());
- message.setQos(qos);
- client.publish(pubTopic, message);
- Thread.sleep(100);
- client.disconnect();
- client.close();
- } catch (MqttException me) {
- LogHelper.error("reason " + me.getReasonCode());
- LogHelper.error("msg " + me.getMessage());
- LogHelper.error("loc " + me.getLocalizedMessage());
- LogHelper.error("cause " + me.getCause());
- LogHelper.error("excep " + me);
- LogHelper.error(me);
- } catch (InterruptedException e) {
- LogHelper.error(e);
- }
- }
- public void send(List<String> jsonStrList) {
- try {
- int qos = 0;
- MemoryPersistence persistence = new MemoryPersistence();
- MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
- // MQTT 连接选项
- MqttConnectOptions connOpts = new MqttConnectOptions();
- connOpts.setUserName("collect");
- connOpts.setPassword("123".toCharArray());
- // 保留会话
- connOpts.setCleanSession(true);
- // 设置回调
- client.setCallback(new OnMessageCallback());
- // 建立连接
- client.connect(connOpts);
- // 订阅
- //client.subscribe(subTopic);
- // 消息发布所需参数
- for (String jsonStr : jsonStrList) {
- MqttMessage message = new MqttMessage(jsonStr.getBytes());
- message.setQos(qos);
- client.publish(pubTopic, message);
- Thread.sleep(200);
- }
- Thread.sleep(100);
- client.disconnect();
- client.close();
- } catch (MqttException me) {
- LogHelper.error("reason " + me.getReasonCode());
- LogHelper.error("msg " + me.getMessage());
- LogHelper.error("loc " + me.getLocalizedMessage());
- LogHelper.error("cause " + me.getCause());
- LogHelper.error("excep " + me);
- LogHelper.error(me);
- } catch (InterruptedException e) {
- LogHelper.error(e);
- }
- }
- }
|