Mqtt.java 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package org.ts.ddcs.mqtt;
  2. import org.ts.ddcs.log.LogHelper;
  3. import org.eclipse.paho.client.mqttv3.MqttClient;
  4. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  5. import org.eclipse.paho.client.mqttv3.MqttException;
  6. import org.eclipse.paho.client.mqttv3.MqttMessage;
  7. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.stereotype.Component;
  10. import java.util.List;
  11. /**
  12. * @author dylan
  13. */
  14. @Component
  15. public class Mqtt {
  16. @Value("${run-config.mqtt.mqttSubTopic}")
  17. public String subTopic;
  18. @Value("${run-config.mqtt.mqttPubTopic}")
  19. public String pubTopic;
  20. @Value("${run-config.mqtt.mqttBroker}")
  21. public String mqttBroker;
  22. @Value("${run-config.server.serverId}")
  23. public String clientId;
  24. public void send(String jsonStr) {
  25. try {
  26. int qos = 0;
  27. MemoryPersistence persistence = new MemoryPersistence();
  28. MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
  29. // MQTT 连接选项
  30. MqttConnectOptions connOpts = new MqttConnectOptions();
  31. connOpts.setUserName("collect");
  32. connOpts.setPassword("123".toCharArray());
  33. // 保留会话
  34. connOpts.setCleanSession(true);
  35. // 设置回调
  36. client.setCallback(new OnMessageCallback());
  37. // 建立连接
  38. client.connect(connOpts);
  39. // 订阅
  40. //client.subscribe(subTopic);
  41. // 消息发布所需参数
  42. MqttMessage message = new MqttMessage(jsonStr.getBytes());
  43. message.setQos(qos);
  44. client.publish(pubTopic, message);
  45. Thread.sleep(100);
  46. client.disconnect();
  47. client.close();
  48. } catch (MqttException me) {
  49. LogHelper.error("reason " + me.getReasonCode());
  50. LogHelper.error("msg " + me.getMessage());
  51. LogHelper.error("loc " + me.getLocalizedMessage());
  52. LogHelper.error("cause " + me.getCause());
  53. LogHelper.error("excep " + me);
  54. LogHelper.error(me);
  55. } catch (InterruptedException e) {
  56. LogHelper.error(e);
  57. }
  58. }
  59. public void send(List<String> jsonStrList) {
  60. try {
  61. int qos = 0;
  62. MemoryPersistence persistence = new MemoryPersistence();
  63. MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
  64. // MQTT 连接选项
  65. MqttConnectOptions connOpts = new MqttConnectOptions();
  66. connOpts.setUserName("collect");
  67. connOpts.setPassword("123".toCharArray());
  68. // 保留会话
  69. connOpts.setCleanSession(true);
  70. // 设置回调
  71. client.setCallback(new OnMessageCallback());
  72. // 建立连接
  73. client.connect(connOpts);
  74. // 订阅
  75. //client.subscribe(subTopic);
  76. // 消息发布所需参数
  77. for (String jsonStr : jsonStrList) {
  78. MqttMessage message = new MqttMessage(jsonStr.getBytes());
  79. message.setQos(qos);
  80. client.publish(pubTopic, message);
  81. Thread.sleep(200);
  82. }
  83. Thread.sleep(100);
  84. client.disconnect();
  85. client.close();
  86. } catch (MqttException me) {
  87. LogHelper.error("reason " + me.getReasonCode());
  88. LogHelper.error("msg " + me.getMessage());
  89. LogHelper.error("loc " + me.getLocalizedMessage());
  90. LogHelper.error("cause " + me.getCause());
  91. LogHelper.error("excep " + me);
  92. LogHelper.error(me);
  93. } catch (InterruptedException e) {
  94. LogHelper.error(e);
  95. }
  96. }
  97. }