123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- import datetime
- import json
- from flask import request, jsonify, current_app
- from flask_restx import Resource, Namespace, reqparse
- from sqlalchemy import select, insert, update
- from sqlalchemy.orm import Session
- from app.defines import StatesCode, Module, OperationType
- from app.modle.message import Message
- from app.modle.users import User
- from app.utils.jwt_util import login_required
- from app.utils.save_log import save_log
- from app.utils.message_mq import add_message
- from app.utils.util import to_dict
- from config import Config
- ns = Namespace('message', description='消息管理接口')
- config = Config()
- @ns.route('/message_list')
- class MessageTypeApi(Resource):
- method_decorators = [login_required]
- @ns.doc(id='message_list', description='消息列表')
- def get(self):
- """获取消息列表"""
- data = {}
- for message in config.common.MESSAGR_TYPE:
- with Session(current_app.engine) as session:
- stmt = select(Message.id, Message.name).where(Message.type == message)
- for row in session.execute(stmt):
- if data.get(message):
- data[message].append({"id": row.id, "name": row.name})
- else:
- data[message] = [{"id": row.id, "name": row.name}]
- save_log(request, Module.MESSAGE, OperationType.INQUIRE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message='成功', data=data)
- message_details = reqparse.RequestParser(bundle_errors=True)
- message_details.add_argument(name='message_id', type=int, location='args', required=False, help='消息id')
- message = reqparse.RequestParser(bundle_errors=True)
- message.add_argument(name='message_id', type=str, location='form', required=False, help='消息id')
- message.add_argument(name='name', type=str, location='form', required=False, help='消息名称')
- message.add_argument(name='pipeline', type=str, location='form', required=False, help='通道配置')
- message.add_argument(name='staff', type=str, location='form', required=False, help='人员配置: all role:'' company:'' ')
- message.add_argument(name='send_time', type=str, location='form', required=False, help='发送时间 immediately 2020-10-20 10:10:10')
- message.add_argument(name='title', type=str, location='form', required=False, help='标题')
- message.add_argument(name='content', type=str, location='form', required=False, help='内容')
- message.add_argument(name='style', type=str, location='form', required=False, help='样式')
- message.add_argument(name='type', type=str, location='form', required=False, help='消息类型')
- @ns.route('/message')
- class MessageApi(Resource):
- method_decorators = [login_required]
- @ns.doc(id='message_details', description='获取消息详情')
- @ns.expect(message_details)
- def get(self):
- """获取消息详情"""
- message_id = request.args.get('message_id')
- if message_id is None:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message='消息id不能为空')
- with Session(current_app.engine) as session:
- stmt = select(Message).where(Message.id == message_id)
- results = session.execute(stmt).scalars().all()
- save_log(request, Module.MESSAGE, OperationType.INQUIRE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message='成功', data=to_dict(results))
- @ns.doc(id='add_message', description='新增消息')
- @ns.expect(message)
- def post(self):
- """新增消息"""
- # 添加rabbit mq ,定时发送创建定时任务发送,立即发送直接添加rabbit mq
- name = request.form.get('name')
- pipeline = request.form.get('pipeline')
- staff = request.form.get('staff')
- send_time = request.form.get('send_time')
- title = request.form.get('title')
- content = request.form.get('content')
- style = request.form.get('style')
- message_type = request.form.get('type')
- if pipeline != 'instation':
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message='仅支持站内通知 instation')
- staff_list = []
- with Session(User) as session:
- if staff == 'all':
- stmt = select(User.user_name)
- staff_list.extend(session.execute(stmt).scalars().all())
- elif staff.split(':')[0] == 'company':
- stmt = select(User.user_name).where(User.company == staff.split(':')[1])
- staff_list.extend(session.execute(stmt).scalars().all())
- elif staff.split(':')[0] == 'role':
- stmt = select(User.user_name).where(User.role == staff.split(':')[1])
- staff_list.extend(session.execute(stmt).scalars().all())
- else:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message='无效的人员配置')
- try:
- if send_time != 'immediately':
- datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S')
- except Exception:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message='时间格式错误。例:2023-10-10 12:12:12')
- # 添加数据库
- with Session(current_app.engine) as session:
- stmt = insert(Message).values(
- name=name,
- pipeline=pipeline,
- staff=staff,
- send_time=send_time,
- title=title,
- content=content,
- style=style,
- type=message_type
- )
- session.execute(stmt)
- session.commit()
- message_data = json.dumps(
- {
- "staff": staff_list,
- "name": name,
- "title": title,
- "content": content,
- "style": style
- }
- )
- # 判断发送时间,立即发送,或添加添加定时任务
- if send_time == 'immediately':
- add_message(message_data)
- else:
- current_app.scheduler.add_job(id=name,
- func=add_message,
- trigger='date',
- run_date=datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S'),
- args=[message_data]
- )
- save_log(request, Module.MESSAGE, OperationType.ADD, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message='成功')
- @ns.doc(id='update_message', description='修改消息')
- @ns.expect(message)
- def put(self):
- """修改消息"""
- # 修改数据库,重新添加
- message_id = request.form.get('message_id')
- name = request.form.get('name')
- pipeline = request.form.get('pipeline')
- staff = request.form.get('staff')
- send_time = request.form.get('send_time')
- title = request.form.get('title')
- content = request.form.get('content')
- style = request.form.get('style')
- message_type = request.form.get('type')
- if pipeline != 'instation':
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message='仅支持站内通知 instation')
- staff_list = []
- with Session(User) as session:
- if staff == 'all':
- stmt = select(User.user_name)
- staff_list.extend(session.execute(stmt).scalars().all())
- elif staff.split(':')[0] == 'company':
- stmt = select(User.user_name).where(User.company == staff.split(':')[1])
- staff_list.extend(session.execute(stmt).scalars().all())
- elif staff.split(':')[0] == 'role':
- stmt = select(User.user_name).where(User.role == staff.split(':')[1])
- staff_list.extend(session.execute(stmt).scalars().all())
- else:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message='无效的人员配置')
- try:
- if send_time != 'immediately':
- datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S')
- except Exception:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message='时间格式错误。例:2023-10-10 12:12:12')
- # 添加数据库
- with Session(current_app.engine) as session:
- stmt = update(Message).where(Message.id == message_id).values(
- name=name,
- pipeline=pipeline,
- staff=staff,
- send_time=send_time,
- title=title,
- content=content,
- style=style,
- type=message_type
- )
- session.execute(stmt)
- session.commit()
- message_data = json.dumps(
- {
- "staff": staff_list,
- "name": name,
- "title": title,
- "content": content,
- "style": style
- }
- )
- # 判断发送时间,立即发送,或添加添加定时任务
- if send_time == 'immediately':
- add_message(message_data)
- else:
- current_app.scheduler.add_job(id=name,
- func=add_message,
- trigger='date',
- run_date=datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S'),
- args=[message_data]
- )
- save_log(request, Module.MESSAGE, OperationType.UPDATE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message='成功')
|