123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- package com.sky.ioc.message;
- import org.eclipse.paho.client.mqttv3.MqttClient;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.MqttSecurityException;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import java.util.Arrays;
- import java.util.List;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
- @Component
- public class MQTTSubsribe {
- @Value("${spring.mqtt.password}")
- private String password;
- @Value("${spring.mqtt.username}")
- private String username;
- @Value("${spring.mqtt.url}")
- private String url;
- //配置中的topic
- @Value("${spring.mqtt.topic}")
- private String defaultTopic;
- // 连接超时时间
- @Value("${spring.mqtt.completionTimeout}")
- private int completionTimeout;
- private String[] topics;
- private MqttClient client;
- private MqttConnectOptions mqttConnectOptions;
- @Autowired
- private PushCallback pushCallback;
- private ScheduledExecutorService scheduled;
- public void startReconnect() {
- this.scheduled = Executors.newSingleThreadScheduledExecutor();
- // 定时任务——重新连接mqtt服务器
- this.scheduled.scheduleAtFixedRate(new Runnable() {
- public void run() {
- if (!MQTTSubsribe.this.client.isConnected()) {
- try {
- MQTTSubsribe.this.client.connect(MQTTSubsribe.this.mqttConnectOptions);
- System.out.println("---mqtt已经重新连接上---");
- int[] Qos = new int[]{1};
- MQTTSubsribe.this.client.subscribe(MQTTSubsribe.this.topics, Qos);
- } catch (MqttSecurityException var2) {
- var2.printStackTrace();
- } catch (MqttException var3) {
- var3.printStackTrace();
- }
- }
- }
- }, 5000L, 10000L, TimeUnit.MILLISECONDS);
- }
- // 对mqttConnectOptions对象的常规设置
- public MqttConnectOptions getMqttConnectOptions() {
- this.mqttConnectOptions = new MqttConnectOptions();
- this.mqttConnectOptions.setCleanSession(true);
- this.mqttConnectOptions.setUserName(username);
- this.mqttConnectOptions.setPassword(password.toCharArray());
- this.mqttConnectOptions.setServerURIs(new String[]{url});
- this.mqttConnectOptions.setConnectionTimeout(completionTimeout);
- this.mqttConnectOptions.setKeepAliveInterval(2000);
- return mqttConnectOptions;
- }
- // 连接mqtt服务器订阅信息方法
- // topic也可作为参数传入
- public void start(String topic) {
- try {
- this.client = new MqttClient(url, getClientId(), new MemoryPersistence());
- this.getMqttConnectOptions();
- this.client.setCallback(this.pushCallback);
- // MqttTopic topicMq = client.getTopic(defaultTopic);
- this.client.connect(this.mqttConnectOptions);
- // topics = topic.toString().split(",");
- //订阅消息
- int[] Qos = {1,1};
- // 可将订阅的一个或多个topic都存入数组中,同时订阅
- // String[] topic1 = {defaultTopic};
- List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
- String[] topics = new String[topicList.size()];
- topicList.toArray(topics);
- this.client.subscribe(topics, Qos);
- boolean connected = this.client.isConnected();
- System.out.println("设备mqtt连接状态为:" + connected);
- String flag = connected ? "成功" : "失败";
- } catch (MqttException e) {
- e.printStackTrace();
- }
- }
- // 随机生成唯一client.id方法
- public String getClientId() {
- String nums = "";
- String[] codeChars = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9",
- "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z",
- "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z"};
- for (int i = 0; i < 23; i++) {
- int charNum = (int) Math.floor(Math.random() * codeChars.length);
- nums = nums + codeChars[charNum];
- }
- return nums;
- }
- }
|