message.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. import datetime
  2. import json
  3. from flask import request, jsonify, current_app
  4. from flask_restx import Resource, Namespace, reqparse
  5. from sqlalchemy import select, insert, update
  6. from sqlalchemy.orm import Session
  7. from app.defines import StatesCode, Module, OperationType
  8. from app.modle.message import Message
  9. from app.modle.users import User
  10. from app.utils.jwt_util import login_required
  11. from app.utils.save_log import save_log
  12. from app.utils.message_mq import add_message
  13. from app.utils.util import to_dict
  14. from config import Config
  15. ns = Namespace('message', description='消息管理接口')
  16. config = Config()
  17. @ns.route('/message_list')
  18. class MessageTypeApi(Resource):
  19. method_decorators = [login_required]
  20. @ns.doc(id='message_list', description='消息列表')
  21. def get(self):
  22. """获取消息列表"""
  23. data = {}
  24. for message in config.common.MESSAGR_TYPE:
  25. with Session(current_app.engine) as session:
  26. stmt = select(Message.id, Message.name).where(Message.type == message)
  27. for row in session.execute(stmt):
  28. if data.get(message):
  29. data[message].append({"id": row.id, "name": row.name})
  30. else:
  31. data[message] = [{"id": row.id, "name": row.name}]
  32. save_log(request, Module.MESSAGE, OperationType.INQUIRE, StatesCode.SUCCESS)
  33. return jsonify(code=StatesCode.SUCCESS, message='成功', data=data)
  34. message_details = reqparse.RequestParser(bundle_errors=True)
  35. message_details.add_argument(name='message_id', type=int, location='args', required=False, help='消息id')
  36. message = reqparse.RequestParser(bundle_errors=True)
  37. message.add_argument(name='message_id', type=str, location='form', required=False, help='消息id')
  38. message.add_argument(name='name', type=str, location='form', required=False, help='消息名称')
  39. message.add_argument(name='pipeline', type=str, location='form', required=False, help='通道配置')
  40. message.add_argument(name='staff', type=str, location='form', required=False, help='人员配置: all role:'' company:'' ')
  41. message.add_argument(name='send_time', type=str, location='form', required=False, help='发送时间 immediately 2020-10-20 10:10:10')
  42. message.add_argument(name='title', type=str, location='form', required=False, help='标题')
  43. message.add_argument(name='content', type=str, location='form', required=False, help='内容')
  44. message.add_argument(name='style', type=str, location='form', required=False, help='样式')
  45. message.add_argument(name='type', type=str, location='form', required=False, help='消息类型')
  46. @ns.route('/message')
  47. class MessageApi(Resource):
  48. method_decorators = [login_required]
  49. @ns.doc(id='message_details', description='获取消息详情')
  50. @ns.expect(message_details)
  51. def get(self):
  52. """获取消息详情"""
  53. message_id = request.args.get('message_id')
  54. if message_id is None:
  55. return jsonify(code=StatesCode.UNKNOWN_ERROR, message='消息id不能为空')
  56. with Session(current_app.engine) as session:
  57. stmt = select(Message).where(Message.id == message_id)
  58. results = session.execute(stmt).scalars().all()
  59. save_log(request, Module.MESSAGE, OperationType.INQUIRE, StatesCode.SUCCESS)
  60. return jsonify(code=StatesCode.SUCCESS, message='成功', data=to_dict(results))
  61. @ns.doc(id='add_message', description='新增消息')
  62. @ns.expect(message)
  63. def post(self):
  64. """新增消息"""
  65. # 添加rabbit mq ,定时发送创建定时任务发送,立即发送直接添加rabbit mq
  66. name = request.form.get('name')
  67. pipeline = request.form.get('pipeline')
  68. staff = request.form.get('staff')
  69. send_time = request.form.get('send_time')
  70. title = request.form.get('title')
  71. content = request.form.get('content')
  72. style = request.form.get('style')
  73. message_type = request.form.get('type')
  74. if pipeline != 'instation':
  75. return jsonify(code=StatesCode.UNKNOWN_ERROR, message='仅支持站内通知 instation')
  76. staff_list = []
  77. with Session(User) as session:
  78. if staff == 'all':
  79. stmt = select(User.user_name)
  80. staff_list.extend(session.execute(stmt).scalars().all())
  81. elif staff.split(':')[0] == 'company':
  82. stmt = select(User.user_name).where(User.company == staff.split(':')[1])
  83. staff_list.extend(session.execute(stmt).scalars().all())
  84. elif staff.split(':')[0] == 'role':
  85. stmt = select(User.user_name).where(User.role == staff.split(':')[1])
  86. staff_list.extend(session.execute(stmt).scalars().all())
  87. else:
  88. return jsonify(code=StatesCode.UNKNOWN_ERROR, message='无效的人员配置')
  89. try:
  90. if send_time != 'immediately':
  91. datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S')
  92. except Exception:
  93. return jsonify(code=StatesCode.UNKNOWN_ERROR, message='时间格式错误。例:2023-10-10 12:12:12')
  94. # 添加数据库
  95. with Session(current_app.engine) as session:
  96. stmt = insert(Message).values(
  97. name=name,
  98. pipeline=pipeline,
  99. staff=staff,
  100. send_time=send_time,
  101. title=title,
  102. content=content,
  103. style=style,
  104. type=message_type
  105. )
  106. session.execute(stmt)
  107. session.commit()
  108. message_data = json.dumps(
  109. {
  110. "staff": staff_list,
  111. "name": name,
  112. "title": title,
  113. "content": content,
  114. "style": style
  115. }
  116. )
  117. # 判断发送时间,立即发送,或添加添加定时任务
  118. if send_time == 'immediately':
  119. add_message(message_data)
  120. else:
  121. current_app.scheduler.add_job(id=name,
  122. func=add_message,
  123. trigger='date',
  124. run_date=datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S'),
  125. args=[message_data]
  126. )
  127. save_log(request, Module.MESSAGE, OperationType.ADD, StatesCode.SUCCESS)
  128. return jsonify(code=StatesCode.SUCCESS, message='成功')
  129. @ns.doc(id='update_message', description='修改消息')
  130. @ns.expect(message)
  131. def put(self):
  132. """修改消息"""
  133. # 修改数据库,重新添加
  134. message_id = request.form.get('message_id')
  135. name = request.form.get('name')
  136. pipeline = request.form.get('pipeline')
  137. staff = request.form.get('staff')
  138. send_time = request.form.get('send_time')
  139. title = request.form.get('title')
  140. content = request.form.get('content')
  141. style = request.form.get('style')
  142. message_type = request.form.get('type')
  143. if pipeline != 'instation':
  144. return jsonify(code=StatesCode.UNKNOWN_ERROR, message='仅支持站内通知 instation')
  145. staff_list = []
  146. with Session(User) as session:
  147. if staff == 'all':
  148. stmt = select(User.user_name)
  149. staff_list.extend(session.execute(stmt).scalars().all())
  150. elif staff.split(':')[0] == 'company':
  151. stmt = select(User.user_name).where(User.company == staff.split(':')[1])
  152. staff_list.extend(session.execute(stmt).scalars().all())
  153. elif staff.split(':')[0] == 'role':
  154. stmt = select(User.user_name).where(User.role == staff.split(':')[1])
  155. staff_list.extend(session.execute(stmt).scalars().all())
  156. else:
  157. return jsonify(code=StatesCode.UNKNOWN_ERROR, message='无效的人员配置')
  158. try:
  159. if send_time != 'immediately':
  160. datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S')
  161. except Exception:
  162. return jsonify(code=StatesCode.UNKNOWN_ERROR, message='时间格式错误。例:2023-10-10 12:12:12')
  163. # 添加数据库
  164. with Session(current_app.engine) as session:
  165. stmt = update(Message).where(Message.id == message_id).values(
  166. name=name,
  167. pipeline=pipeline,
  168. staff=staff,
  169. send_time=send_time,
  170. title=title,
  171. content=content,
  172. style=style,
  173. type=message_type
  174. )
  175. session.execute(stmt)
  176. session.commit()
  177. message_data = json.dumps(
  178. {
  179. "staff": staff_list,
  180. "name": name,
  181. "title": title,
  182. "content": content,
  183. "style": style
  184. }
  185. )
  186. # 判断发送时间,立即发送,或添加添加定时任务
  187. if send_time == 'immediately':
  188. add_message(message_data)
  189. else:
  190. current_app.scheduler.add_job(id=name,
  191. func=add_message,
  192. trigger='date',
  193. run_date=datetime.datetime.strptime(send_time, '%Y-%m-%d %H:%M:%S'),
  194. args=[message_data]
  195. )
  196. save_log(request, Module.MESSAGE, OperationType.UPDATE, StatesCode.SUCCESS)
  197. return jsonify(code=StatesCode.SUCCESS, message='成功')