users.py 17 KB


  1. import base64
  2. import json
  3. from io import BytesIO
  4. from flask import request, jsonify, Response, current_app
  5. from flask_restx import Resource, Namespace, reqparse
  6. from sqlalchemy import insert, select, update, delete, func
  7. from sqlalchemy.orm import Session
  8. from werkzeug.datastructures import FileStorage
  9. from app.defines import StatesCode, Module, OperationType
  10. from app.modle.users import User
  11. from app.utils.jwt_util import login_required
  12. from app.utils.save_log import save_log
  13. from app.utils.util import to_dict, MyXlwt
  14. ns = Namespace('users', description='用户管理接口')
  15. get_users = reqparse.RequestParser(bundle_errors=True)
  16. get_users.add_argument(name='status', type=float, required=True, location='args', help='用户状态')
  17. get_users.add_argument(name='page_size', type=int, location='args', required=False, help='每页记录数量,默认:20')
  18. get_users.add_argument(name='page', type=int, location='args', required=False, help='第几页')
  19. @ns.route('/user_list')
  20. class GetUserListApi(Resource):
  21. method_decorators = [login_required]
  22. @ns.doc(id='get_users_list', description='获取用户列表')
  23. @ns.expect(get_users)
  24. def get(self):
  25. """获取用户列表"""
  26. status = request.args.get('status')
  27. page_size = int(request.args.get('page_size', 20))
  28. page = int(request.args.get('page', 1))
  29. if status is None:
  30. return jsonify(code=StatesCode.PARA_ERROR, message='用户状态不能为空')
  31. try:
  32. with Session(current_app.engine) as session:
  33. count = select(func.count(User.id)).where(User.account_status == status)
  34. count_results = session.execute(count).scalars().first()
  35. stmt = select(User).where(User.account_status == status).offset(page_size * (page - 1)).limit(page_size)
  36. results = session.execute(stmt).scalars().all()
  37. save_log(request, Module.USER, OperationType.INQUIRE, StatesCode.SUCCESS)
  38. return jsonify(code=StatesCode.SUCCESS, message="成功", total=count_results, data=to_dict(results))
  39. except Exception as e:
  40. save_log(request, Module.USER, OperationType.INQUIRE, StatesCode.UNKNOWN_ERROR)
  41. return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
  42. add_user = reqparse.RequestParser(bundle_errors=True)
  43. add_user.add_argument(name='id', type=str, required=False, location='form', help='用户id')
  44. add_user.add_argument(name='username', type=str, required=True, location='form', help='用户名')
  45. add_user.add_argument(name='password', type=str, required=True, location='form', help='密码')
  46. add_user.add_argument(name='photograph', type=FileStorage, required=False, location='files', help='照片')
  47. add_user.add_argument(name='name', type=str, required=False, location='form', help='姓名')
  48. add_user.add_argument(name='phone', type=str, required=False, location='form', help='电话')
  49. add_user.add_argument(name='email', type=str, required=False, location='form', help='邮箱')
  50. add_user.add_argument(name='company', type=str, required=False, location='form', help='公司')
  51. add_user.add_argument(name='department', type=str, required=False, location='form', help='部门')
  52. add_user.add_argument(name='role', type=str, required=False, location='form', help='角色')
  53. add_user.add_argument(name='permission', type=str, required=False, location='form', help='权限')
  54. add_user.add_argument(name='on_job_status', type=str, required=False, location='form', help='在职状态')
  55. add_user.add_argument(name='duty', type=str, required=False, location='form', help='职务')
  56. add_user.add_argument(name='nationality', type=str, required=False, location='form', help='国籍')
  57. user_details = reqparse.RequestParser(bundle_errors=True)
  58. user_details.add_argument(name='id', type=int, required=True, location='args', help='用户id')
  59. @ns.route('/user')
  60. class UsersApi(Resource):
  61. method_decorators = [login_required]
  62. @ns.doc(id='get_users_details', description='获取用户详情')
  63. @ns.expect(user_details)
  64. def get(self):
  65. """获取用户详情"""
  66. user_id = request.args.get('id')
  67. if user_id is None:
  68. return jsonify(code=StatesCode.PARA_ERROR, message="用户id不能为空")
  69. try:
  70. with Session(current_app.engine) as session:
  71. stmt = select(User).where(User.id == user_id)
  72. results = session.execute(stmt).scalars().all()
  73. save_log(request, Module.USER, OperationType.INQUIRE, StatesCode.SUCCESS)
  74. return jsonify(code=StatesCode.SUCCESS, message='成功', data=to_dict(results))
  75. except Exception as e:
  76. save_log(request, Module.USER, OperationType.INQUIRE, StatesCode.UNKNOWN_ERROR)
  77. return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
  78. @ns.doc(id='add_users', description='添加用户')
  79. @ns.expect(add_user)
  80. def post(self):
  81. """添加用户"""
  82. username = request.form.get('username')
  83. password = request.form.get('password')
  84. photograph = request.files.get('photograph')
  85. name = request.form.get('name')
  86. phone = request.form.get('phone')
  87. email = request.form.get('email')
  88. company = request.form.get('company')
  89. department = request.form.get('department')
  90. role = request.form.get('role')
  91. permission = request.form.get('permission')
  92. on_job_status = request.form.get('on_job_status')
  93. duty = request.form.get('duty')
  94. nationality = request.form.get('nationality')
  95. if username is None or password is None:
  96. return jsonify(code=StatesCode.PARA_ERROR, message="用户名或密码不能为空")
  97. if photograph:
  98. ext = photograph.filename.split('.')[-1]
  99. base64_data = b'data:image/%s;base64,' % ext.encode('utf-8') + base64.b64encode(photograph.read())
  100. base64_data = base64_data.decode('utf-8')
  101. else:
  102. base64_data = None
  103. try:
  104. with Session(current_app.engine) as session:
  105. # 判断用户是否存在
  106. stmt = select(User).where(User.user_name == username)
  107. result = session.execute(stmt).scalars().first()
  108. if result:
  109. return jsonify(code=StatesCode.UNKNOWN_ERROR, message="用户已存在")
  110. # 添加用户
  111. session.execute(
  112. insert(User).values(
  113. user_name=username,
  114. password=User().generate_password(password),
  115. photograph=base64_data,
  116. name=name,
  117. phone=phone,
  118. email=email,
  119. company=company,
  120. department=department,
  121. role=role,
  122. permission=permission,
  123. on_job_status=on_job_status,
  124. duty=duty,
  125. nationality=nationality,
  126. )
  127. )
  128. session.commit()
  129. save_log(request, Module.USER, OperationType.ADD, StatesCode.SUCCESS)
  130. return jsonify(code=StatesCode.SUCCESS, message="添加用户成功")
  131. except Exception as e:
  132. save_log(request, Module.USER, OperationType.ADD, StatesCode.UNKNOWN_ERROR)
  133. return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
  134. @ns.doc(id='update_users_details', description='更新用户详情')
  135. @ns.expect(add_user)
  136. def put(self):
  137. """更新用户详情"""
  138. user_id = request.form.get('id')
  139. username = request.form.get('username')
  140. password = request.form.get('password')
  141. photograph = request.files.get('photograph')
  142. name = request.form.get('name')
  143. phone = request.form.get('phone')
  144. email = request.form.get('email')
  145. company = request.form.get('company')
  146. department = request.form.get('department')
  147. role = request.form.get('role')
  148. permission = request.form.get('permission')
  149. on_job_status = request.form.get('on_job_status')
  150. duty = request.form.get('duty')
  151. nationality = request.form.get('nationality')
  152. if user_id is None:
  153. return jsonify(code=StatesCode.PARA_ERROR, message='用户id不能为空')
  154. if photograph:
  155. ext = photograph.filename.split('.')[-1]
  156. base64_data = b'data:image/%s;base64,' % ext.encode('utf-8') + base64.b64encode(photograph.read())
  157. base64_data = base64_data.decode('utf-8')
  158. else:
  159. base64_data = None
  160. try:
  161. with Session(current_app.engine) as session:
  162. stmt = update(User).where(User.id == user_id).values(
  163. user_name=username,
  164. password=User().generate_password(password),
  165. photograph=base64_data,
  166. name=name,
  167. phone=phone,
  168. email=email,
  169. company=company,
  170. department=department,
  171. role=role,
  172. permission=permission,
  173. on_job_status=on_job_status,
  174. duty=duty,
  175. nationality=nationality,
  176. )
  177. session.execute(stmt)
  178. session.commit()
  179. save_log(request, Module.USER, OperationType.UPDATE, StatesCode.SUCCESS)
  180. return jsonify(code=StatesCode.SUCCESS, message='修改成功')
  181. except Exception as e:
  182. save_log(request, Module.USER, OperationType.UPDATE, StatesCode.UNKNOWN_ERROR)
  183. return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
  184. @ns.doc(id='delete_users', description='删除用户')
  185. @ns.expect(add_user)
  186. def delete(self):
  187. """删除用户"""
  188. user_id = request.form.get('id')
  189. if user_id is None:
  190. return jsonify(code=StatesCode.PARA_ERROR, message="用户id不能为空")
  191. try:
  192. with Session(current_app.engine) as session:
  193. stmt = delete(User).where(User.id == user_id)
  194. session.execute(stmt)
  195. session.commit()
  196. save_log(request, Module.USER, OperationType.DELETE, StatesCode.SUCCESS)
  197. return jsonify(code=StatesCode.SUCCESS, message="删除成功")
  198. except Exception as e:
  199. save_log(request, Module.USER, OperationType.DELETE, StatesCode.UNKNOWN_ERROR)
  200. return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
  201. users_status = reqparse.RequestParser(bundle_errors=True)
  202. users_status.add_argument(name='id', type=int, required=True, location='form', help='用户id')
  203. users_status.add_argument(name='status', type=int, required=True, location='form', help='用户状态')
  204. @ns.route('/user_status')
  205. class GetUsersApi(Resource):
  206. method_decorators = [login_required]
  207. @ns.doc(id='modify_users_status', description='修改用户状态,激活、禁用')
  208. @ns.expect(users_status)
  209. def put(self):
  210. """修改用户状态,激活、禁用"""
  211. user_id = int(request.form.get('id'))
  212. account_status = int(request.form.get('status'))
  213. if user_id is None or account_status is None:
  214. return jsonify(code=StatesCode.PARA_ERROR, message="用户id或用户状态不能为空")
  215. try:
  216. with Session(current_app.engine) as session:
  217. stmt = update(User).where(User.id == user_id).values(account_status=account_status)
  218. session.execute(stmt)
  219. session.commit()
  220. save_log(request, Module.USER, OperationType.UPDATE, StatesCode.SUCCESS)
  221. return jsonify(code=StatesCode.SUCCESS, message="修改用户状态成功")
  222. except Exception as e:
  223. save_log(request, Module.USER, OperationType.UPDATE, StatesCode.UNKNOWN_ERROR)
  224. return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
  225. batch_delete_user = reqparse.RequestParser(bundle_errors=True)
  226. batch_delete_user.add_argument(name='users_id', type=str, required=True, location='form', help='用户id列表')
  227. @ns.route('/batch_delete_user')
  228. class BatchDeleteUserApi(Resource):
  229. method_decorators = [login_required]
  230. @ns.doc(id='batch_delete_users', description='批量删除用户')
  231. @ns.expect(batch_delete_user)
  232. def delete(self):
  233. """批量删除用户"""
  234. users_id = request.form.get('users_id')
  235. if users_id is not None:
  236. users_id = json.loads(users_id)
  237. else:
  238. return jsonify(code=StatesCode.PARA_ERROR, message='用户id不能为空')
  239. try:
  240. with Session(current_app.engine) as session:
  241. stmt = select(User).where(User.id.in_(users_id))
  242. results = session.execute(stmt).scalars().all()
  243. for result in results:
  244. session.delete(result)
  245. session.commit()
  246. save_log(request, Module.USER, OperationType.BATCH_DELETE, StatesCode.SUCCESS)
  247. return jsonify(code=StatesCode.SUCCESS, message='批量删除成功')
  248. except Exception as e:
  249. save_log(request, Module.USER, OperationType.BATCH_DELETE, StatesCode.UNKNOWN_ERROR)
  250. return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
  251. batch_modify_user_status = reqparse.RequestParser(bundle_errors=True)
  252. batch_modify_user_status.add_argument(name='users_id', type=list, required=True, location='form', help='用户id列表')
  253. batch_modify_user_status.add_argument(name='status', type=int, required=True, location='form',
  254. help='用户状态 0为正常,1为禁用')
  255. @ns.route('/batch_modify_user_status')
  256. class BatchModifyUsersStatusApi(Resource):
  257. method_decorators = [login_required]
  258. @ns.doc(id='batch_modify_user_status', description='批量修改用户状态,激活、禁用')
  259. @ns.expect(batch_modify_user_status)
  260. def put(self):
  261. """批量修改用户状态,激活、禁用"""
  262. users_id = request.form.get('users_id')
  263. account_status = request.form.get('status')
  264. if users_id is not None and account_status is not None:
  265. users_id = json.loads(users_id)
  266. else:
  267. return jsonify(code=StatesCode.PARA_ERROR, message='用户id和状态不能为空')
  268. try:
  269. values = []
  270. for user_id in users_id:
  271. values.append({"id": user_id, "account_status": account_status})
  272. with Session(current_app.engine) as session:
  273. session.execute(
  274. update(User),
  275. values
  276. )
  277. session.commit()
  278. save_log(request, Module.USER, OperationType.BATCH_UPDATE, StatesCode.SUCCESS)
  279. return jsonify(code=StatesCode.SUCCESS, message='批量修改用户状态成功')
  280. except Exception as e:
  281. save_log(request, Module.USER, OperationType.BATCH_UPDATE, StatesCode.UNKNOWN_ERROR)
  282. return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
  283. @ns.route('/export_data')
  284. class ExportDataApi(Resource):
  285. method_decorators = [login_required]
  286. @ns.doc(id='export_data', description='导出用户数据')
  287. def get(self):
  288. """导出用户数据"""
  289. try:
  290. with Session(current_app.engine) as session:
  291. stmt = select(User)
  292. results = session.execute(stmt).scalars().all()
  293. my_xlwt = MyXlwt()
  294. title = ['id', 'user_name', 'password', 'name', 'phone', 'email', 'company', 'department',
  295. 'role',
  296. 'permission', 'on_job_status', 'duty', 'account_status', 'nationality', 'register_time',
  297. 'common_menus']
  298. # 写入标题
  299. my_xlwt.write_row(0, 0, title)
  300. # 写入内容
  301. start_row = 1 # 内容起始行
  302. for result in results:
  303. data = [result.id, result.user_name, result.password, result.name, result.phone,
  304. result.email, result.company, result.department, result.role, result.permission,
  305. result.on_job_status,
  306. result.duty, result.account_status, result.nationality, result.register_time,
  307. result.common_menus]
  308. my_xlwt.write_row(start_row, 0, data)
  309. start_row += 1
  310. output = BytesIO()
  311. my_xlwt.save(output)
  312. response = Response(output.getvalue())
  313. response.headers['Content-Type'] = 'application/octet-stream'
  314. response.headers['Content-Disposition'] = 'attachment; filename=%s' % 'users.xls'
  315. save_log(request, Module.USER, OperationType.EXPORT, StatesCode.SUCCESS)
  316. return response
  317. except Exception as e:
  318. save_log(request, Module.USER, OperationType.EXPORT, StatesCode.UNKNOWN_ERROR)
  319. return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))