Эх сурвалжийг харах

人员管理 增加检索条件 设备告警 对接mqtt

ZhangManMan 2 жил өмнө
parent
commit
fe2beb4909

+ 11 - 0
pom.xml

@@ -218,6 +218,17 @@
             <artifactId>spring-boot-starter-amqp</artifactId>
         </dependency>
 
+        <!-- mqtt -->
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-stream</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.integration</groupId>
+            <artifactId>spring-integration-mqtt</artifactId>
+        </dependency>
+
+
     </dependencies>
 
     <build>

+ 18 - 0
src/main/java/com/sky/ioc/SkyIocApplication.java

@@ -1,10 +1,15 @@
 package com.sky.ioc;
 
+import com.sky.ioc.message.MQTTSubsribe;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.builder.SpringApplicationBuilder;
 import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
 
+import javax.annotation.PostConstruct;
+
 @SpringBootApplication
 public class SkyIocApplication extends SpringBootServletInitializer {
 
@@ -16,4 +21,17 @@ public class SkyIocApplication extends SpringBootServletInitializer {
     public static void main(String[] args) {
         SpringApplication.run(SkyIocApplication.class, args);
     }
+
+    @Autowired
+    private MQTTSubsribe mqttSubsribe;
+
+    /**
+     * 接受订阅的接口和消息,mqtt消费端
+     */
+    @PostConstruct
+    public void consumeMqttClient() throws MqttException {
+
+       // mqttSubsribe.start("notice/device/status/safety/camera,notice/device/status/energy/device");
+    }
+
 }

+ 17 - 4
src/main/java/com/sky/ioc/controller/life/LifeController.java

@@ -1,10 +1,13 @@
 package com.sky.ioc.controller.life;
 
+import com.sky.ioc.message.MQTTSubsribe;
+import com.sky.ioc.tool.ReturnMsg;
+import io.swagger.annotations.ApiOperation;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.*;
+
+import javax.xml.transform.Result;
 
 /** 智享生活
  * @author LunCe*/
@@ -13,6 +16,16 @@ import org.springframework.web.bind.annotation.RestController;
 @RequestMapping("/life")
 public class LifeController {
 
+    @Autowired
+    MQTTSubsribe mqtt;
+
+
+  //  @ApiOperation(value = "订阅mqtt---测试")
+    @GetMapping("mqtt")
+    public ReturnMsg insertByMq(){
+        mqtt.start("notice/device/status/safety/camera");
+        return ReturnMsg.ok();
+    }
 
 
 //

+ 5 - 1
src/main/java/com/sky/ioc/controller/person/PersonController.java

@@ -28,6 +28,8 @@ public class PersonController {
             @ApiImplicitParam(name = "name", value = "姓名", dataType = "String"),
             @ApiImplicitParam(name = "companyName", value = "公司", dataType = "String"),
             @ApiImplicitParam(name = "deptName", value = "部门", dataType = "String"),
+            @ApiImplicitParam(name = "type", value = "人员类型", dataType = "Integer"),
+            @ApiImplicitParam(name = "status", value = "人员状态", dataType = "Integer"),
             @ApiImplicitParam(name = "page", value = "页码",defaultValue="1", dataType = "Integer"),
             @ApiImplicitParam(name = "pageSize", value = "页大小", defaultValue="10",dataType = "Integer")
     })
@@ -35,9 +37,11 @@ public class PersonController {
     public ReturnMsg getPersonList(@RequestParam(name = "name", required = false) String name,
                                 @RequestParam(name = "companyName", required = false) String companyName,
                                 @RequestParam(name = "deptName", required = false) String deptName,
+                                @RequestParam(name = "type", required = false) Integer type,
+                                @RequestParam(name = "status", required = false) Integer status,
                                 @RequestParam(name = "page", required = false,defaultValue = "1") Integer page,
                                 @RequestParam(name = "pageSize", required = false,defaultValue = "10") Integer pageSize) {
-        return  personService.pageList(name,companyName,deptName,page,pageSize);
+        return  personService.pageList(name,companyName,deptName,type,status,page,pageSize);
     }
 
     @ApiOperation("新建人员")

+ 33 - 0
src/main/java/com/sky/ioc/entity/domain/energy/DeviceMessage.java

@@ -0,0 +1,33 @@
+package com.sky.ioc.entity.domain.energy;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import lombok.Data;
+
+@Data
+public class DeviceMessage {
+
+    /** ID */
+    @TableId(type = IdType.AUTO)
+    private Integer id;
+
+    /** 设备ID */
+    private Integer deviceId;
+
+/*
+    其中service 为服务名,supplierId为服务厂商编号,apiUrl为获取摄像头列表的地址,module为模块名,id为设备id,mesaage为离线通知内容
+*/
+    /** 服务名 */
+    private String service;
+    /** 服务厂商编号 */
+    private String supplierId;
+    /** 获取摄像头列表的地址 */
+    private String apiUrl;
+    /** 模块名 */
+    private String module;
+    /** 离线通知内容 */
+    private String message;
+
+    private String createTime;
+
+}

+ 9 - 0
src/main/java/com/sky/ioc/mapper/energy/DeviceMessageMapper.java

@@ -0,0 +1,9 @@
+package com.sky.ioc.mapper.energy;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.sky.ioc.entity.domain.energy.DeviceMessage;
+import org.apache.ibatis.annotations.Mapper;
+
+@Mapper
+public interface DeviceMessageMapper extends BaseMapper<DeviceMessage> {
+}

+ 12 - 4
src/main/java/com/sky/ioc/mapper/person/PersonMapper.java

@@ -26,18 +26,26 @@ public interface PersonMapper extends BaseMapper<Person> {
             "SELECT * " +
             "from  person " +
             "where is_del=0 " +
-            "<if test='name!=null and name!=\"\" and name!=null'>" +
+            "<if test='name!=null and name!=\"\"'>" +
             " and name like #{name} " +
             "</if>" +
-            "<if test='companyName!=null and companyName!=\"\" and companyName!=null'>" +
+            "<if test='companyName!=null and companyName!=\"\" '>" +
             " and company_name like #{companyName} " +
             "</if>" +
-            "<if test='deptName!=null and deptName!=\"\" and deptName!=null'>" +
+            "<if test='deptName!=null and deptName!=\"\" '>" +
             " and dept_name like #{deptName} " +
             "</if>" +
+            "<if test='type!=null and type>=0 '>" +
+            " and type = #{type} " +
+            "</if>" +
+            "<if test='status!=null and status>=0 '>" +
+            " and status = #{status} " +
+            "</if>" +
             "ORDER BY id " +
             "</script>")
     List<Person> pageList(@Param("name") String name,
                           @Param("companyName") String companyName,
-                          @Param("deptName") String deptName);
+                          @Param("deptName") String deptName,
+                          @Param("type") Integer type,
+                          @Param("status") Integer status);
 }

+ 126 - 0
src/main/java/com/sky/ioc/message/MQTTSubsribe.java

@@ -0,0 +1,126 @@
+package com.sky.ioc.message;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttSecurityException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class MQTTSubsribe {
+
+    @Value("${spring.mqtt.password}")
+    private String password;
+
+    @Value("${spring.mqtt.username}")
+    private String username;
+
+    @Value("${spring.mqtt.url}")
+    private String url;
+    //配置中的topic
+    @Value("${spring.mqtt.topic}")
+    private String defaultTopic;
+
+    // 连接超时时间
+    @Value("${spring.mqtt.completionTimeout}")
+    private int completionTimeout;
+
+    private String[] topics;
+
+    private MqttClient client;
+
+    private MqttConnectOptions mqttConnectOptions;
+
+    @Autowired
+    private PushCallback pushCallback;
+
+    private ScheduledExecutorService scheduled;
+
+    public void startReconnect() {
+        this.scheduled = Executors.newSingleThreadScheduledExecutor();
+        //    定时任务——重新连接mqtt服务器
+        this.scheduled.scheduleAtFixedRate(new Runnable() {
+            public void run() {
+                if (!MQTTSubsribe.this.client.isConnected()) {
+                    try {
+                        MQTTSubsribe.this.client.connect(MQTTSubsribe.this.mqttConnectOptions);
+                        System.out.println("---mqtt已经重新连接上---");
+                        int[] Qos = new int[]{1};
+                        MQTTSubsribe.this.client.subscribe(MQTTSubsribe.this.topics, Qos);
+                    } catch (MqttSecurityException var2) {
+                        var2.printStackTrace();
+                    } catch (MqttException var3) {
+                        var3.printStackTrace();
+                    }
+                }
+
+            }
+        }, 5000L, 10000L, TimeUnit.MILLISECONDS);
+    }
+
+
+    //	对mqttConnectOptions对象的常规设置
+    public MqttConnectOptions getMqttConnectOptions() {
+        this.mqttConnectOptions = new MqttConnectOptions();
+        this.mqttConnectOptions.setCleanSession(true);
+        this.mqttConnectOptions.setUserName(username);
+        this.mqttConnectOptions.setPassword(password.toCharArray());
+        this.mqttConnectOptions.setServerURIs(new String[]{url});
+        this.mqttConnectOptions.setConnectionTimeout(completionTimeout);
+        this.mqttConnectOptions.setKeepAliveInterval(2000);
+
+        return mqttConnectOptions;
+    }
+
+    //	连接mqtt服务器订阅信息方法
+//	topic也可作为参数传入
+    public void start(String topic) {
+
+        try {
+            this.client = new MqttClient(url, getClientId(), new MemoryPersistence());
+            this.getMqttConnectOptions();
+            this.client.setCallback(this.pushCallback);
+//            MqttTopic topicMq = client.getTopic(defaultTopic);
+            this.client.connect(this.mqttConnectOptions);
+//            topics = topic.toString().split(",");
+            //订阅消息
+            int[] Qos = {1,1};
+//            可将订阅的一个或多个topic都存入数组中,同时订阅
+//            String[] topic1 = {defaultTopic};
+            List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
+            String[] topics = new String[topicList.size()];
+            topicList.toArray(topics);
+            this.client.subscribe(topics, Qos);
+
+            boolean connected = this.client.isConnected();
+            System.out.println("设备mqtt连接状态为:" + connected);
+            String flag = connected ? "成功" : "失败";
+        } catch (MqttException e) {
+            e.printStackTrace();
+        }
+    }
+
+    //	随机生成唯一client.id方法
+    public String getClientId() {
+        String nums = "";
+        String[] codeChars = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9",
+                "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z",
+                "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z"};
+        for (int i = 0; i < 23; i++) {
+            int charNum = (int) Math.floor(Math.random() * codeChars.length);
+            nums = nums + codeChars[charNum];
+        }
+        return nums;
+    }
+}
+
+

+ 69 - 0
src/main/java/com/sky/ioc/message/PushCallback.java

@@ -0,0 +1,69 @@
+package com.sky.ioc.message;
+
+import com.alibaba.fastjson.JSONObject;
+import com.sky.ioc.entity.domain.energy.DeviceMessage;
+import com.sky.ioc.mapper.energy.DeviceMessageMapper;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.core.RedisTemplate;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Configuration
+public class PushCallback implements MqttCallback {
+
+    @Autowired
+    private MQTTSubsribe mqttSubsribe;
+
+    @Autowired
+    private RedisTemplate redisTemplate;
+
+    //    定时任务——定时缓存查询的数据
+    private ScheduledExecutorService scheduled;
+
+    @Autowired
+    DeviceMessageMapper deviceMessageMapper;
+
+
+    @Override
+    public void connectionLost(Throwable throwable) {
+        // 连接丢失后,一般在这里面进行重连
+        System.out.println("连接断开,可以做重连");
+        this.mqttSubsribe.startReconnect();
+    }
+
+    @Override
+
+    public void messageArrived(String topic, MqttMessage message) throws Exception {
+        // subscribe后得到的消息会执行到这里面
+        System.out.println("接收消息主题 : " + topic);
+        System.out.println("接收消息Qos : " + message.getQos());
+        System.out.println("接收消息内容 : " + new String(message.getPayload()));
+        String value = new String(message.getPayload());
+        JSONObject jsonObject = JSONObject.parseObject(value);
+        DeviceMessage deviceMessage = new DeviceMessage();
+        deviceMessage.setMessage(jsonObject.getString("message"));
+        deviceMessage.setService(jsonObject.getString("service"));
+        deviceMessage.setModule(jsonObject.getString("module"));
+        deviceMessage.setApiUrl(jsonObject.getString("apiUrl"));
+        deviceMessage.setSupplierId(jsonObject.getString("supplierId"));
+        deviceMessage.setDeviceId(jsonObject.getInteger("id"));
+        deviceMessage.setCreateTime(new Date()+"");
+        deviceMessageMapper.insert(deviceMessage);
+
+    }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+        System.out.println("deliveryComplete---------" + token.isComplete());
+    }
+}
+

+ 1 - 1
src/main/java/com/sky/ioc/service/person/PersonService.java

@@ -8,7 +8,7 @@ import java.util.List;
 
 public interface PersonService {
 
-    ReturnMsg pageList(String name,String companyName,String deptName,Integer page,Integer pageSize);
+    ReturnMsg pageList(String name,String companyName,String deptName,Integer type,Integer status,Integer page,Integer pageSize);
 
     ReturnMsg addPerson(Person person);
 

+ 2 - 2
src/main/java/com/sky/ioc/service/person/impl/PersonServiceImpl.java

@@ -23,7 +23,7 @@ public class PersonServiceImpl implements PersonService {
 
 
     @Override
-    public ReturnMsg pageList(String name,String companyName,String deptName,Integer page,Integer pageSize) {
+    public ReturnMsg pageList(String name,String companyName,String deptName,Integer type,Integer status,Integer page,Integer pageSize) {
         PageHelper.startPage(page,pageSize);
         if(StringUtils.isNotBlank(name)){
             name ="%"+name+"%";
@@ -34,7 +34,7 @@ public class PersonServiceImpl implements PersonService {
         if(StringUtils.isNotBlank(deptName)){
             deptName ="%"+deptName+"%";
         }
-        List<Person> lists = personMapper.pageList(name,companyName,deptName);
+        List<Person> lists = personMapper.pageList(name,companyName,deptName,type,status);
         return ReturnMsg.ok(new PageInfo<>(lists));
     }
 

+ 7 - 0
src/main/resources/application-dev.yml

@@ -31,6 +31,13 @@ spring:
     port: 5672
     username: admin
     password: aaaaaa
+  mqtt:
+    url: tcp://10.12.242.162:1883
+    topic: notice/device/status/safety/camera,notice/device/status/energy/device
+    username: device
+    password: device123
+    completionTimeout: 3000
+
 
 
 # 日志配置

+ 6 - 0
src/main/resources/application-publish.yml

@@ -31,6 +31,12 @@ spring:
     port: 5672
     username: admin
     password: aaaaaa
+  mqtt:
+    url: tcp://10.12.242.162:1883
+    topic: notice/device/status/safety/camera,notice/device/status/energy/device
+    username: device
+    password: device123
+    completionTimeout: 3000
 
 # 日志配置
 logging: