123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- import json
- from flask import request, jsonify, current_app
- from flask_restx import Resource, Namespace, reqparse
- from sqlalchemy import select, insert, update, delete, func
- from sqlalchemy.orm import Session
- from app.defines import StatesCode, Module, OperationType
- from app.modle.role import Role
- from app.modle.users import User
- from app.utils.jwt_util import login_required
- from app.utils.save_log import save_log
- from app.utils.util import to_dict
- ns = Namespace('role', description='角色管理接口')
- role_list = reqparse.RequestParser(bundle_errors=True)
- role_list.add_argument(name='page_size', type=int, location='args', required=False, help='每页记录数量,默认:20')
- role_list.add_argument(name='page', type=int, location='args', required=False, help='页数')
- @ns.route('/role_list')
- class GetUserListApi(Resource):
- method_decorators = [login_required]
- @ns.doc(id='get_role_list', description='获取角色列表')
- @ns.expect(role_list)
- def get(self):
- """获取角色列表"""
- page_size = int(request.args.get('page_size', 20))
- page = int(request.args.get('page', 1))
- with Session(current_app.engine) as session:
- stmt = select(Role).offset(page_size * (page - 1)).limit(page_size)
- results = session.execute(stmt).scalars().all()
- save_log(request, Module.ROLE, OperationType.INQUIRE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message='成功', data=to_dict(results))
- role = reqparse.RequestParser(bundle_errors=True)
- role.add_argument(name='role_id', type=int, location='form', required=False, help='角色id')
- role.add_argument(name='role_name', type=str, location='form', required=False, help='角色名称')
- role.add_argument(name='role_describe', type=str, location='form', required=False, help='角色描述')
- role.add_argument(name='role_permission', type=str, location='form', required=False, help='角色权限')
- role_id = reqparse.RequestParser(bundle_errors=True)
- role_id.add_argument(name='role_id', type=int, location='args', required=False, help='角色id')
- @ns.route('/role')
- class RoleApi(Resource):
- method_decorators = [login_required]
- @ns.doc(id='get_role', description='获取角色')
- @ns.expect(role_id)
- def get(self):
- """获取角色"""
- role_id = request.args.get('role_id')
- if role_id is None:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message="角色id不能为空")
- with Session(current_app.engine) as session:
- stmt = select(Role).where(Role.id == role_id)
- results = session.execute(stmt).scalars().all()
- save_log(request, Module.ROLE, OperationType.INQUIRE, StatesCode.SUCCESS)
- return {"code": StatesCode.SUCCESS, "message": "成功", "data": to_dict(results)}
- @ns.doc(id='add_role', description='添加角色')
- @ns.expect(role)
- def post(self):
- """添加角色"""
- role_name = request.form.get('role_name')
- role_describe = request.form.get('role_describe')
- role_permission = request.form.get('role_permission')
- if role_name is None:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message="角色名不能为空")
- if role_permission:
- role_permission = json.loads(role_permission)
- with Session(current_app.engine) as session:
- stmt = insert(Role).values(
- role_name=role_name,
- role_describe=role_describe,
- role_permission=role_permission
- )
- session.execute(stmt)
- session.commit()
- save_log(request, Module.ROLE, OperationType.ADD, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message='成功')
- @ns.doc(id='modify_role', description='修改角色')
- @ns.expect(role)
- def put(self):
- """修改角色"""
- role_id = request.form.get('role_id')
- role_name = request.form.get('role_name')
- role_describe = request.form.get('role_describe')
- role_permission = request.form.get('role_permission')
- if role_name is None or role_id is None:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message="角色名和角色id不能为空")
- if role_permission:
- role_permission = json.loads(role_permission)
- with Session(current_app.engine) as session:
- stmt = update(Role).where(Role.id == role_id).values(
- role_name=role_name,
- role_describe=role_describe,
- role_permission=role_permission
- )
- session.execute(stmt)
- session.commit()
- save_log(request, Module.ROLE, OperationType.UPDATE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message="添加角色成功")
- @ns.doc(id='delete_role', description='删除角色')
- @ns.expect(role)
- def delete(self):
- """删除角色"""
- role_id = request.form.get('role_id')
- if role_id is None:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message="角色名和角色id不能为空")
- # 查询用户,用户的角色为当前角色,无法删除角色
- with Session(current_app.engine) as session:
- stmt = select(func.count(User.role)).where(User.role == role_id)
- results = session.execute(stmt).scalars().first()
- if results > 0:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message="该角色组存在用户,无法删除")
- else:
- stmt = delete(Role).where(Role.id == role_id)
- session.execute(stmt)
- session.commit()
- save_log(request, Module.ROLE, OperationType.DELETE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message="删除角色成功")
- get_member = reqparse.RequestParser(bundle_errors=True)
- get_member.add_argument(name='role_id', type=int, location='args', required=False, help='角色id')
- del_member = reqparse.RequestParser(bundle_errors=True)
- del_member.add_argument(name='user_id', type=int, location='form', required=False, help='用户id')
- batch_del_member = reqparse.RequestParser(bundle_errors=True)
- batch_del_member.add_argument(name='users_id', type=str, location='form', required=False, help='用户ids')
- @ns.route('/member')
- class RoleMemberApi(Resource):
- method_decorators = [login_required]
- @ns.doc(id='get_role_member', description='获取角色成员')
- @ns.expect(get_member)
- def get(self):
- """获取角色成员"""
- role_id = request.args.get('role_id')
- if role_id is None:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message='角色id不能为空')
- with Session(current_app.engine) as session:
- stmt = select(User).where(User.role == role_id)
- results = session.execute(stmt).scalars().all()
- save_log(request, Module.ROLE, OperationType.INQUIRE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message='成功', data=to_dict(results))
- @ns.doc(id='delete_role_member', description='移除角色成员')
- @ns.expect(del_member)
- def put(self):
- """移除角色成员"""
- user_id = request.form.get('user_id')
- if user_id is None:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message='用户id不能为空')
- with Session(current_app.engine) as session:
- stmt = update(User).where(User.id == user_id).values(role=None)
- session.execute(stmt)
- session.commit()
- save_log(request, Module.ROLE, OperationType.DELETE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message="移除成功")
- # class BatchDeleteRoleMember(Resource):
- @ns.doc(id='batch_delete_role_member', description='批量移除角色成员')
- @ns.expect(batch_del_member)
- def delete(self):
- """批量移除角色成员"""
- users_id = request.form.get('users_id')
- if users_id is not None:
- users_id = json.loads(users_id)
- else:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message='用户id不能为空')
- values = []
- for user_id in users_id:
- values.append({"id": user_id, "role": None})
- with Session(current_app.engine) as session:
- session.execute(
- update(User),
- values
- )
- session.commit()
- save_log(request, Module.ROLE, OperationType.BATCH_DELETE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message='批量移除成功')
|