Browse Source

新增消息、修改消息

zhangnaiwen 2 years ago
parent
commit
087fbd62a8

+ 6 - 0
conf/uwsgi.ini

@@ -59,6 +59,12 @@ if-env = UWSGI_HARAKIRI
 harakiri = $(UWSGI_HARAKIRI)
 endif =
 
+# 连接时间
+http-timeout = 60
+if-env = UWSGI_TIMEOUT
+http-timeout = $(UWSGI_TIMEOUT)
+endif =
+
 # 打开http body缓冲
 if-env = UWSGI_POST_BUFFERING
 post-buffering = $(UWSGI_POST_BUFFERING)

+ 7 - 0
config/config.yml

@@ -18,6 +18,13 @@ common:
   UNDERLYING_SYSTEM_PICTURE_PATH: /data/underlying_system
   UNDERLYING_SYSTEM_PICTURE_URL: http://127.0.0.1/underlying_system/
 
+RabbitMq:
+  HOST: 8.130.177.22 # ip地址
+  POST: 5672 # 端口
+  USER: admin # 用户名
+  PASSWORD: aaaaaa  # 密码
+  QUEUE: message # 队列
+
 #redis:
 #  # redis配置
 #  HOST: 192.168.2.35 # 使用容器运行服务时,该HOST信息无用,容器内部会自动获取redis容器的ip

+ 3 - 2
docker-compose-v3.yml

@@ -3,7 +3,7 @@ services:
   sky-gistools-server:
       container_name: operation_management_center
 
-      image: operation_management_center:latest
+      image: operation_management_center:1.0.0
 
       ports:
         - "5000:5000"
@@ -14,11 +14,12 @@ services:
         - UWSGI_HTTP_PROCESSES=4
         - UWSGI_HTTP_KEEPALIVE=false
         - UWSGI_HARAKIRI=600
+        - UWSGI_TIMEOUT=600
         - UWSGI_PY_OPTIMIZE=0
 
       volumes:
         - /data/logs:/data/logs # 日志
-        - /data/company_logo:data/company_logo  # 公司商标存储地址
+        - /data/company_logo:/data/company_logo  # 公司商标存储地址
         - /data/template:/data/template  # 模版存储地址
         - /data/company:/data/company  # 公司图片存醋地址
         - /data/building:/data/building  # 楼宇图片存醋地址

+ 2 - 1
docker-compose.yml

@@ -12,11 +12,12 @@ service:
     - UWSGI_HTTP_PROCESSES=4
     - UWSGI_HTTP_KEEPALIVE=false
     - UWSGI_HARAKIRI=600
+    - UWSGI_TIMEOUT=600
     - UWSGI_PY_OPTIMIZE=0
 
   volumes:
     - /data/logs:/data/logs # 日志
-    - /data/company_logo:data/company_logo  # 公司商标存储地址
+    - /data/company_logo:/data/company_logo  # 公司商标存储地址
     - /data/template:/data/template  # 模版存储地址
     - /data/company:/data/company  # 公司图片存醋地址
     - /data/building:/data/building  # 楼宇图片存醋地址

+ 16 - 0
src/app/__init__.py

@@ -1,6 +1,7 @@
 import os
 
 from flask import Flask
+from flask_apscheduler import APScheduler
 from flask_cors import CORS
 
 from sqlalchemy import create_engine
@@ -12,6 +13,15 @@ manage_path = os.path.dirname(os.path.abspath(__file__))
 config_yml_path = os.path.join(os.path.dirname(os.path.dirname(manage_path)), 'config', 'config.yml')
 
 
+class ApschedulerConfig(object):
+    # 开关
+    SCHEDULER_API_ENABLED = True
+
+    SCHEDULER_EXECUTORS = {
+        'default': {'type': 'threadpool', 'max_workers': 20}
+    }
+
+
 def create_app():
     """创建app并初始化相关配置参数"""
 
@@ -20,6 +30,8 @@ def create_app():
 
     app = Flask(__name__)
 
+    app.config.from_object(ApschedulerConfig())
+
     CORS(app)
 
     request_handlers.configure(app)
@@ -29,4 +41,8 @@ def create_app():
 
     api.init_app(app)
 
+    app.scheduler = APScheduler()
+    app.scheduler.init_app(app)
+    app.scheduler.start()
+
     return app

+ 184 - 4
src/app/api/message.py

@@ -1,11 +1,17 @@
+import datetime
+import json
+
 from flask import request, jsonify, current_app
 from flask_restx import Resource, Namespace, reqparse
-from sqlalchemy import select
+from sqlalchemy import select, insert, update
 from sqlalchemy.orm import Session
 
 from app.defines import StatesCode, Module, OperationType
 from app.modle.message import Message
+from app.modle.users import User
+from app.utils.jwt_util import login_required
 from app.utils.save_log import save_log
+from app.utils.message_mq import add_message
 from app.utils.util import to_dict
 from config import Config
 
@@ -16,6 +22,8 @@ config = Config()
 
 @ns.route('/message_list')
 class MessageTypeApi(Resource):
+    method_decorators = [login_required]
+
     @ns.doc(id='message_list', description='消息列表')
     def get(self):
         """获取消息列表"""
@@ -38,8 +46,22 @@ class MessageTypeApi(Resource):
 message_details = reqparse.RequestParser(bundle_errors=True)
 message_details.add_argument(name='message_id', type=int, location='args', required=False, help='消息id')
 
+message = reqparse.RequestParser(bundle_errors=True)
+
+message.add_argument(name='message_id', type=str, location='form', required=False, help='消息id')
+message.add_argument(name='name', type=str, location='form', required=False, help='消息名称')
+message.add_argument(name='pipeline', type=str, location='form', required=False, help='通道配置')
+message.add_argument(name='staff', type=str, location='form', required=False, help='人员配置: all role:'' company:'' ')
+message.add_argument(name='send_time', type=str, location='form', required=False, help='发送时间 immediately 2020-10-20 10:10:10')
+message.add_argument(name='title', type=str, location='form', required=False, help='标题')
+message.add_argument(name='content', type=str, location='form', required=False, help='内容')
+message.add_argument(name='style', type=str, location='form', required=False, help='样式')
+message.add_argument(name='type', type=str, location='form', required=False, help='消息类型')
+
 
+@ns.route('/message')
 class MessageApi(Resource):
+    method_decorators = [login_required]
 
     @ns.doc(id='message_details', description='获取消息详情')
     @ns.expect(message_details)
@@ -59,11 +81,169 @@ class MessageApi(Resource):
 
         return jsonify(code=StatesCode.SUCCESS, message='成功', data=to_dict(results))
 
+    @ns.doc(id='add_message', description='新增消息')
+    @ns.expect(message)
     def post(self):
         """新增消息"""
-        pass
 
+        # 添加rabbit mq ,定时发送创建定时任务发送,立即发送直接添加rabbit mq
+        name = request.form.get('name')
+        pipeline = request.form.get('pipeline')
+        staff = request.form.get('staff')
+        send_time = request.form.get('send_time')
+        title = request.form.get('title')
+        content = request.form.get('content')
+        style = request.form.get('style')
+        message_type = request.form.get('type')
+
+        if pipeline != 'instation':
+            return jsonify(code=StatesCode.UNKNOWN_ERROR, message='仅支持站内通知 instation')
+
+        staff_list = []
+
+        with Session(User) as session:
+            if staff == 'all':
+                stmt = select(User.user_name)
+                staff_list.extend(session.execute(stmt).scalars().all())
+
+            elif staff.split(':')[0] == 'company':
+                stmt = select(User.user_name).where(User.company == staff.split(':')[1])
+                staff_list.extend(session.execute(stmt).scalars().all())
+
+            elif staff.split(':')[0] == 'role':
+                stmt = select(User.user_name).where(User.role == staff.split(':')[1])
+                staff_list.extend(session.execute(stmt).scalars().all())
+
+            else:
+                return jsonify(code=StatesCode.UNKNOWN_ERROR, message='无效的人员配置')
+
+        try:
+            if send_time != 'immediately':
+                datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S')
+        except Exception:
+            return jsonify(code=StatesCode.UNKNOWN_ERROR, message='时间格式错误。例:2023-10-10 12:12:12')
+
+        # 添加数据库
+        with Session(current_app.engine) as session:
+            stmt = insert(Message).values(
+                name=name,
+                pipeline=pipeline,
+                staff=staff,
+                send_time=send_time,
+                title=title,
+                content=content,
+                style=style,
+                type=message_type
+            )
+            session.execute(stmt)
+            session.commit()
+
+        message_data = json.dumps(
+            {
+                "staff": staff_list,
+                "name": name,
+                "title": title,
+                "content": content,
+                "style": style
+            }
+        )
+
+        # 判断发送时间,立即发送,或添加添加定时任务
+        if send_time == 'immediately':
+            add_message(message_data)
+
+        else:
+            current_app.scheduler.add_job(id=name,
+                                          func=add_message,
+                                          trigger='date',
+                                          run_date=datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S'),
+                                          args=[message_data]
+                                          )
+
+        save_log(request, Module.MESSAGE, OperationType.ADD, StatesCode.SUCCESS)
+
+        return jsonify(code=StatesCode.SUCCESS, message='成功')
+
+    @ns.doc(id='update_message', description='修改消息')
+    @ns.expect(message)
     def put(self):
         """修改消息"""
-
-        return jsonify(code=StatesCode.SUCCESS, message='成功', data='')
+        # 修改数据库,重新添加
+
+        message_id = request.form.get('message_id')
+        name = request.form.get('name')
+        pipeline = request.form.get('pipeline')
+        staff = request.form.get('staff')
+        send_time = request.form.get('send_time')
+        title = request.form.get('title')
+        content = request.form.get('content')
+        style = request.form.get('style')
+        message_type = request.form.get('type')
+
+        if pipeline != 'instation':
+            return jsonify(code=StatesCode.UNKNOWN_ERROR, message='仅支持站内通知 instation')
+
+        staff_list = []
+
+        with Session(User) as session:
+            if staff == 'all':
+                stmt = select(User.user_name)
+                staff_list.extend(session.execute(stmt).scalars().all())
+
+            elif staff.split(':')[0] == 'company':
+                stmt = select(User.user_name).where(User.company == staff.split(':')[1])
+                staff_list.extend(session.execute(stmt).scalars().all())
+
+            elif staff.split(':')[0] == 'role':
+                stmt = select(User.user_name).where(User.role == staff.split(':')[1])
+                staff_list.extend(session.execute(stmt).scalars().all())
+
+            else:
+                return jsonify(code=StatesCode.UNKNOWN_ERROR, message='无效的人员配置')
+
+        try:
+            if send_time != 'immediately':
+                datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S')
+        except Exception:
+            return jsonify(code=StatesCode.UNKNOWN_ERROR, message='时间格式错误。例:2023-10-10 12:12:12')
+
+        # 添加数据库
+        with Session(current_app.engine) as session:
+            stmt = update(Message).where(Message.id == message_id).values(
+                name=name,
+                pipeline=pipeline,
+                staff=staff,
+                send_time=send_time,
+                title=title,
+                content=content,
+                style=style,
+                type=message_type
+            )
+            session.execute(stmt)
+            session.commit()
+
+        message_data = json.dumps(
+            {
+                "staff": staff_list,
+                "name": name,
+                "title": title,
+                "content": content,
+                "style": style
+            }
+        )
+
+        # 判断发送时间,立即发送,或添加添加定时任务
+        if send_time == 'immediately':
+            add_message(message_data)
+
+        else:
+            current_app.scheduler.add_job(id=name,
+                                          func=add_message,
+                                          trigger='date',
+                                          run_date=datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S'),
+                                          args=[message_data]
+                                          )
+
+        save_log(request, Module.MESSAGE, OperationType.UPDATE, StatesCode.SUCCESS)
+
+        return jsonify(code=StatesCode.SUCCESS, message='成功')

+ 1 - 10
src/app/modle/message.py

@@ -4,7 +4,7 @@ from app.modle import Base
 
 
 class Message(Base):
-    """提醒消息表"""
+    """消息表"""
 
     __tablename__ = 'message'
 
@@ -17,12 +17,3 @@ class Message(Base):
     content = Column(String, nullable=True, unique=False, index=False, doc='内容')
     style = Column(String, nullable=True, unique=False, index=False, doc='样式')
     type = Column(String, nullable=False, unique=False, index=False, doc='消息类型')
-# user = 'mac'
-# password = ''
-# host = 'localhost'
-# port = 5432
-# database = 'postgres'
-# uri = f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}'
-#
-# engine = create_engine(uri)
-# Base.metadata.create_all(engine)

+ 32 - 0
src/app/utils/message_mq.py

@@ -0,0 +1,32 @@
+import pika
+
+from config import Config
+
+
+def add_message(message):
+    """
+    添加消息
+    :param message: 消息内容
+    :return:
+    """
+
+    config = Config()
+
+    user_info = pika.PlainCredentials(config.RabbitMq.USER, config.RabbitMq.PASSWORD)  # 用户名和密码
+    connection = pika.BlockingConnection(
+        pika.ConnectionParameters(config.RabbitMq.HOST, config.RabbitMq.POST, '/', user_info)
+    )  # 连接服务器上的RabbitMQ服务
+
+    # 创建一个channel
+    channel = connection.channel()
+
+    # 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都可以加上这句
+    channel.queue_declare(queue=config.RabbitMq.QUEUE)
+
+    channel.basic_publish(exchange='',  # 当前是一个简单模式,所以这里设置为空字符串就可以了
+                          routing_key=config.RabbitMq.QUEUE,  # 指定消息要发送到哪个queue
+                          body=message  # 指定要发送的消息
+                          )
+
+    # 关闭连接
+    connection.close()