import datetime import json from flask import request, jsonify, current_app from flask_restx import Resource, Namespace, reqparse from sqlalchemy import select, insert, update from sqlalchemy.orm import Session from app.defines import StatesCode, Module, OperationType from app.modle.message import Message from app.modle.users import User from app.utils.jwt_util import login_required from app.utils.save_log import save_log from app.utils.message_mq import add_message from app.utils.util import to_dict from config import Config ns = Namespace('message', description='消息管理接口') config = Config() @ns.route('/message_list') class MessageTypeApi(Resource): method_decorators = [login_required] @ns.doc(id='message_list', description='消息列表') def get(self): """获取消息列表""" data = {} for message in config.common.MESSAGR_TYPE: with Session(current_app.engine) as session: stmt = select(Message.id, Message.name).where(Message.type == message) for row in session.execute(stmt): if data.get(message): data[message].append({"id": row.id, "name": row.name}) else: data[message] = [{"id": row.id, "name": row.name}] save_log(request, Module.MESSAGE, OperationType.INQUIRE, StatesCode.SUCCESS) return jsonify(code=StatesCode.SUCCESS, message='成功', data=data) message_details = reqparse.RequestParser(bundle_errors=True) message_details.add_argument(name='message_id', type=int, location='args', required=False, help='消息id') message = reqparse.RequestParser(bundle_errors=True) message.add_argument(name='message_id', type=str, location='form', required=False, help='消息id') message.add_argument(name='name', type=str, location='form', required=False, help='消息名称') message.add_argument(name='pipeline', type=str, location='form', required=False, help='通道配置') message.add_argument(name='staff', type=str, location='form', required=False, help='人员配置: all role:'' company:'' ') message.add_argument(name='send_time', type=str, location='form', required=False, help='发送时间 immediately 2020-10-20 10:10:10') message.add_argument(name='title', type=str, location='form', required=False, help='标题') message.add_argument(name='content', type=str, location='form', required=False, help='内容') message.add_argument(name='style', type=str, location='form', required=False, help='样式') message.add_argument(name='type', type=str, location='form', required=False, help='消息类型') @ns.route('/message') class MessageApi(Resource): method_decorators = [login_required] @ns.doc(id='message_details', description='获取消息详情') @ns.expect(message_details) def get(self): """获取消息详情""" message_id = request.args.get('message_id') if message_id is None: return jsonify(code=StatesCode.UNKNOWN_ERROR, message='消息id不能为空') with Session(current_app.engine) as session: stmt = select(Message).where(Message.id == message_id) results = session.execute(stmt).scalars().all() save_log(request, Module.MESSAGE, OperationType.INQUIRE, StatesCode.SUCCESS) return jsonify(code=StatesCode.SUCCESS, message='成功', data=to_dict(results)) @ns.doc(id='add_message', description='新增消息') @ns.expect(message) def post(self): """新增消息""" # 添加rabbit mq ,定时发送创建定时任务发送,立即发送直接添加rabbit mq name = request.form.get('name') pipeline = request.form.get('pipeline') staff = request.form.get('staff') send_time = request.form.get('send_time') title = request.form.get('title') content = request.form.get('content') style = request.form.get('style') message_type = request.form.get('type') if pipeline != 'instation': return jsonify(code=StatesCode.UNKNOWN_ERROR, message='仅支持站内通知 instation') staff_list = [] with Session(User) as session: if staff == 'all': stmt = select(User.user_name) staff_list.extend(session.execute(stmt).scalars().all()) elif staff.split(':')[0] == 'company': stmt = select(User.user_name).where(User.company == staff.split(':')[1]) staff_list.extend(session.execute(stmt).scalars().all()) elif staff.split(':')[0] == 'role': stmt = select(User.user_name).where(User.role == staff.split(':')[1]) staff_list.extend(session.execute(stmt).scalars().all()) else: return jsonify(code=StatesCode.UNKNOWN_ERROR, message='无效的人员配置') try: if send_time != 'immediately': datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S') except Exception: return jsonify(code=StatesCode.UNKNOWN_ERROR, message='时间格式错误。例:2023-10-10 12:12:12') # 添加数据库 with Session(current_app.engine) as session: stmt = insert(Message).values( name=name, pipeline=pipeline, staff=staff, send_time=send_time, title=title, content=content, style=style, type=message_type ) session.execute(stmt) session.commit() message_data = json.dumps( { "staff": staff_list, "name": name, "title": title, "content": content, "style": style } ) # 判断发送时间,立即发送,或添加添加定时任务 if send_time == 'immediately': add_message(message_data) else: current_app.scheduler.add_job(id=name, func=add_message, trigger='date', run_date=datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S'), args=[message_data] ) save_log(request, Module.MESSAGE, OperationType.ADD, StatesCode.SUCCESS) return jsonify(code=StatesCode.SUCCESS, message='成功') @ns.doc(id='update_message', description='修改消息') @ns.expect(message) def put(self): """修改消息""" # 修改数据库,重新添加 message_id = request.form.get('message_id') name = request.form.get('name') pipeline = request.form.get('pipeline') staff = request.form.get('staff') send_time = request.form.get('send_time') title = request.form.get('title') content = request.form.get('content') style = request.form.get('style') message_type = request.form.get('type') if pipeline != 'instation': return jsonify(code=StatesCode.UNKNOWN_ERROR, message='仅支持站内通知 instation') staff_list = [] with Session(User) as session: if staff == 'all': stmt = select(User.user_name) staff_list.extend(session.execute(stmt).scalars().all()) elif staff.split(':')[0] == 'company': stmt = select(User.user_name).where(User.company == staff.split(':')[1]) staff_list.extend(session.execute(stmt).scalars().all()) elif staff.split(':')[0] == 'role': stmt = select(User.user_name).where(User.role == staff.split(':')[1]) staff_list.extend(session.execute(stmt).scalars().all()) else: return jsonify(code=StatesCode.UNKNOWN_ERROR, message='无效的人员配置') try: if send_time != 'immediately': datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S') except Exception: return jsonify(code=StatesCode.UNKNOWN_ERROR, message='时间格式错误。例:2023-10-10 12:12:12') # 添加数据库 with Session(current_app.engine) as session: stmt = update(Message).where(Message.id == message_id).values( name=name, pipeline=pipeline, staff=staff, send_time=send_time, title=title, content=content, style=style, type=message_type ) session.execute(stmt) session.commit() message_data = json.dumps( { "staff": staff_list, "name": name, "title": title, "content": content, "style": style } ) # 判断发送时间,立即发送,或添加添加定时任务 if send_time == 'immediately': add_message(message_data) else: current_app.scheduler.add_job(id=name, func=add_message, trigger='date', run_date=datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S'), args=[message_data] ) save_log(request, Module.MESSAGE, OperationType.UPDATE, StatesCode.SUCCESS) return jsonify(code=StatesCode.SUCCESS, message='成功')