MQTTSubsribe.java 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package com.sky.ioc.message;
  2. import org.eclipse.paho.client.mqttv3.MqttClient;
  3. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  4. import org.eclipse.paho.client.mqttv3.MqttException;
  5. import org.eclipse.paho.client.mqttv3.MqttSecurityException;
  6. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.stereotype.Component;
  10. import java.util.Arrays;
  11. import java.util.List;
  12. import java.util.concurrent.Executors;
  13. import java.util.concurrent.ScheduledExecutorService;
  14. import java.util.concurrent.TimeUnit;
  15. @Component
  16. public class MQTTSubsribe {
  17. @Value("${spring.mqtt.password}")
  18. private String password;
  19. @Value("${spring.mqtt.username}")
  20. private String username;
  21. @Value("${spring.mqtt.url}")
  22. private String url;
  23. //配置中的topic
  24. @Value("${spring.mqtt.topic}")
  25. private String defaultTopic;
  26. // 连接超时时间
  27. @Value("${spring.mqtt.completionTimeout}")
  28. private int completionTimeout;
  29. private String[] topics;
  30. private MqttClient client;
  31. private MqttConnectOptions mqttConnectOptions;
  32. @Autowired
  33. private PushCallback pushCallback;
  34. private ScheduledExecutorService scheduled;
  35. public void startReconnect() {
  36. this.scheduled = Executors.newSingleThreadScheduledExecutor();
  37. // 定时任务——重新连接mqtt服务器
  38. this.scheduled.scheduleAtFixedRate(new Runnable() {
  39. public void run() {
  40. if (!MQTTSubsribe.this.client.isConnected()) {
  41. try {
  42. MQTTSubsribe.this.client.connect(MQTTSubsribe.this.mqttConnectOptions);
  43. System.out.println("---mqtt已经重新连接上---");
  44. int[] Qos = new int[]{1};
  45. MQTTSubsribe.this.client.subscribe(MQTTSubsribe.this.topics, Qos);
  46. } catch (MqttSecurityException var2) {
  47. var2.printStackTrace();
  48. } catch (MqttException var3) {
  49. var3.printStackTrace();
  50. }
  51. }
  52. }
  53. }, 5000L, 10000L, TimeUnit.MILLISECONDS);
  54. }
  55. // 对mqttConnectOptions对象的常规设置
  56. public MqttConnectOptions getMqttConnectOptions() {
  57. this.mqttConnectOptions = new MqttConnectOptions();
  58. this.mqttConnectOptions.setCleanSession(true);
  59. this.mqttConnectOptions.setUserName(username);
  60. this.mqttConnectOptions.setPassword(password.toCharArray());
  61. this.mqttConnectOptions.setServerURIs(new String[]{url});
  62. this.mqttConnectOptions.setConnectionTimeout(completionTimeout);
  63. this.mqttConnectOptions.setKeepAliveInterval(2000);
  64. return mqttConnectOptions;
  65. }
  66. // 连接mqtt服务器订阅信息方法
  67. // topic也可作为参数传入
  68. public void start(String topic) {
  69. try {
  70. this.client = new MqttClient(url, getClientId(), new MemoryPersistence());
  71. this.getMqttConnectOptions();
  72. this.client.setCallback(this.pushCallback);
  73. // MqttTopic topicMq = client.getTopic(defaultTopic);
  74. this.client.connect(this.mqttConnectOptions);
  75. // topics = topic.toString().split(",");
  76. //订阅消息
  77. int[] Qos = {1,1};
  78. // 可将订阅的一个或多个topic都存入数组中,同时订阅
  79. // String[] topic1 = {defaultTopic};
  80. List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
  81. String[] topics = new String[topicList.size()];
  82. topicList.toArray(topics);
  83. this.client.subscribe(topics, Qos);
  84. boolean connected = this.client.isConnected();
  85. System.out.println("设备mqtt连接状态为:" + connected);
  86. String flag = connected ? "成功" : "失败";
  87. } catch (MqttException e) {
  88. e.printStackTrace();
  89. }
  90. }
  91. // 随机生成唯一client.id方法
  92. public String getClientId() {
  93. String nums = "";
  94. String[] codeChars = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9",
  95. "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z",
  96. "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z"};
  97. for (int i = 0; i < 23; i++) {
  98. int charNum = (int) Math.floor(Math.random() * codeChars.length);
  99. nums = nums + codeChars[charNum];
  100. }
  101. return nums;
  102. }
  103. }