|
- import base64
- import json
- from io import BytesIO
- from flask import request, jsonify, Response, current_app
- from flask_restx import Resource, Namespace, reqparse
- from sqlalchemy import insert, select, update, delete, func
- from sqlalchemy.orm import Session
- from werkzeug.datastructures import FileStorage
- from app.defines import StatesCode, Module, OperationType
- from app.modle.users import User
- from app.utils.jwt_util import login_required
- from app.utils.save_log import save_log
- from app.utils.util import to_dict, MyXlwt
- ns = Namespace('users', description='用户管理接口')
- get_users = reqparse.RequestParser(bundle_errors=True)
- get_users.add_argument(name='status', type=float, required=True, location='args', help='用户状态')
- get_users.add_argument(name='page_size', type=int, location='args', required=False, help='每页记录数量,默认:20')
- get_users.add_argument(name='page', type=int, location='args', required=False, help='第几页')
- @ns.route('/user_list')
- class GetUserListApi(Resource):
- method_decorators = [login_required]
- @ns.doc(id='get_users_list', description='获取用户列表')
- @ns.expect(get_users)
- def get(self):
- """获取用户列表"""
- status = request.args.get('status')
- page_size = int(request.args.get('page_size', 20))
- page = int(request.args.get('page', 1))
- if status is None:
- return jsonify(code=StatesCode.PARA_ERROR, message='用户状态不能为空')
- try:
- with Session(current_app.engine) as session:
- count = select(func.count(User.id)).where(User.account_status == status)
- count_results = session.execute(count).scalars().first()
- stmt = select(User).where(User.account_status == status).offset(page_size * (page - 1)).limit(page_size)
- results = session.execute(stmt).scalars().all()
- save_log(request, Module.USER, OperationType.INQUIRE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message="成功", total=count_results, data=to_dict(results))
- except Exception as e:
- save_log(request, Module.USER, OperationType.INQUIRE, StatesCode.UNKNOWN_ERROR)
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
- add_user = reqparse.RequestParser(bundle_errors=True)
- add_user.add_argument(name='id', type=str, required=False, location='form', help='用户id')
- add_user.add_argument(name='username', type=str, required=True, location='form', help='用户名')
- add_user.add_argument(name='password', type=str, required=True, location='form', help='密码')
- add_user.add_argument(name='photograph', type=FileStorage, required=False, location='files', help='照片')
- add_user.add_argument(name='name', type=str, required=False, location='form', help='姓名')
- add_user.add_argument(name='phone', type=str, required=False, location='form', help='电话')
- add_user.add_argument(name='email', type=str, required=False, location='form', help='邮箱')
- add_user.add_argument(name='company', type=str, required=False, location='form', help='公司')
- add_user.add_argument(name='department', type=str, required=False, location='form', help='部门')
- add_user.add_argument(name='role', type=str, required=False, location='form', help='角色')
- add_user.add_argument(name='permission', type=str, required=False, location='form', help='权限')
- add_user.add_argument(name='on_job_status', type=str, required=False, location='form', help='在职状态')
- add_user.add_argument(name='duty', type=str, required=False, location='form', help='职务')
- add_user.add_argument(name='nationality', type=str, required=False, location='form', help='国籍')
- user_details = reqparse.RequestParser(bundle_errors=True)
- user_details.add_argument(name='id', type=int, required=True, location='args', help='用户id')
- @ns.route('/user')
- class UsersApi(Resource):
- method_decorators = [login_required]
- @ns.doc(id='get_users_details', description='获取用户详情')
- @ns.expect(user_details)
- def get(self):
- """获取用户详情"""
- user_id = request.args.get('id')
- if user_id is None:
- return jsonify(code=StatesCode.PARA_ERROR, message="用户id不能为空")
- try:
- with Session(current_app.engine) as session:
- stmt = select(User).where(User.id == user_id)
- results = session.execute(stmt).scalars().all()
- save_log(request, Module.USER, OperationType.INQUIRE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message='成功', data=to_dict(results))
- except Exception as e:
- save_log(request, Module.USER, OperationType.INQUIRE, StatesCode.UNKNOWN_ERROR)
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
- @ns.doc(id='add_users', description='添加用户')
- @ns.expect(add_user)
- def post(self):
- """添加用户"""
- username = request.form.get('username')
- password = request.form.get('password')
- photograph = request.files.get('photograph')
- name = request.form.get('name')
- phone = request.form.get('phone')
- email = request.form.get('email')
- company = request.form.get('company')
- department = request.form.get('department')
- role = request.form.get('role')
- permission = request.form.get('permission')
- on_job_status = request.form.get('on_job_status')
- duty = request.form.get('duty')
- nationality = request.form.get('nationality')
- if username is None or password is None:
- return jsonify(code=StatesCode.PARA_ERROR, message="用户名或密码不能为空")
- if photograph:
- ext = photograph.filename.split('.')[-1]
- base64_data = b'data:image/%s;base64,' % ext.encode('utf-8') + base64.b64encode(photograph.read())
- base64_data = base64_data.decode('utf-8')
- else:
- base64_data = None
- try:
- with Session(current_app.engine) as session:
- # 判断用户是否存在
- stmt = select(User).where(User.user_name == username)
- result = session.execute(stmt).scalars().first()
- if result:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message="用户已存在")
- # 添加用户
- session.execute(
- insert(User).values(
- user_name=username,
- password=User().generate_password(password),
- photograph=base64_data,
- name=name,
- phone=phone,
- email=email,
- company=company,
- department=department,
- role=role,
- permission=permission,
- on_job_status=on_job_status,
- duty=duty,
- nationality=nationality,
- )
- )
- session.commit()
- save_log(request, Module.USER, OperationType.ADD, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message="添加用户成功")
- except Exception as e:
- save_log(request, Module.USER, OperationType.ADD, StatesCode.UNKNOWN_ERROR)
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
- @ns.doc(id='update_users_details', description='更新用户详情')
- @ns.expect(add_user)
- def put(self):
- """更新用户详情"""
- user_id = request.form.get('id')
- username = request.form.get('username')
- password = request.form.get('password')
- photograph = request.files.get('photograph')
- name = request.form.get('name')
- phone = request.form.get('phone')
- email = request.form.get('email')
- company = request.form.get('company')
- department = request.form.get('department')
- role = request.form.get('role')
- permission = request.form.get('permission')
- on_job_status = request.form.get('on_job_status')
- duty = request.form.get('duty')
- nationality = request.form.get('nationality')
- if user_id is None:
- return jsonify(code=StatesCode.PARA_ERROR, message='用户id不能为空')
- if photograph:
- ext = photograph.filename.split('.')[-1]
- base64_data = b'data:image/%s;base64,' % ext.encode('utf-8') + base64.b64encode(photograph.read())
- base64_data = base64_data.decode('utf-8')
- else:
- base64_data = None
- try:
- with Session(current_app.engine) as session:
- stmt = update(User).where(User.id == user_id).values(
- user_name=username,
- password=User().generate_password(password),
- photograph=base64_data,
- name=name,
- phone=phone,
- email=email,
- company=company,
- department=department,
- role=role,
- permission=permission,
- on_job_status=on_job_status,
- duty=duty,
- nationality=nationality,
- )
- session.execute(stmt)
- session.commit()
- save_log(request, Module.USER, OperationType.UPDATE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message='修改成功')
- except Exception as e:
- save_log(request, Module.USER, OperationType.UPDATE, StatesCode.UNKNOWN_ERROR)
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
- @ns.doc(id='delete_users', description='删除用户')
- @ns.expect(add_user)
- def delete(self):
- """删除用户"""
- user_id = request.form.get('id')
- if user_id is None:
- return jsonify(code=StatesCode.PARA_ERROR, message="用户id不能为空")
- try:
- with Session(current_app.engine) as session:
- stmt = delete(User).where(User.id == user_id)
- session.execute(stmt)
- session.commit()
- save_log(request, Module.USER, OperationType.DELETE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message="删除成功")
- except Exception as e:
- save_log(request, Module.USER, OperationType.DELETE, StatesCode.UNKNOWN_ERROR)
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
- users_status = reqparse.RequestParser(bundle_errors=True)
- users_status.add_argument(name='id', type=int, required=True, location='form', help='用户id')
- users_status.add_argument(name='status', type=int, required=True, location='form', help='用户状态')
- @ns.route('/user_status')
- class GetUsersApi(Resource):
- method_decorators = [login_required]
- @ns.doc(id='modify_users_status', description='修改用户状态,激活、禁用')
- @ns.expect(users_status)
- def put(self):
- """修改用户状态,激活、禁用"""
- user_id = int(request.form.get('id'))
- account_status = int(request.form.get('status'))
- if user_id is None or account_status is None:
- return jsonify(code=StatesCode.PARA_ERROR, message="用户id或用户状态不能为空")
- try:
- with Session(current_app.engine) as session:
- stmt = update(User).where(User.id == user_id).values(account_status=account_status)
- session.execute(stmt)
- session.commit()
- save_log(request, Module.USER, OperationType.UPDATE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message="修改用户状态成功")
- except Exception as e:
- save_log(request, Module.USER, OperationType.UPDATE, StatesCode.UNKNOWN_ERROR)
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
- batch_delete_user = reqparse.RequestParser(bundle_errors=True)
- batch_delete_user.add_argument(name='users_id', type=str, required=True, location='form', help='用户id列表')
- @ns.route('/batch_delete_user')
- class BatchDeleteUserApi(Resource):
- method_decorators = [login_required]
- @ns.doc(id='batch_delete_users', description='批量删除用户')
- @ns.expect(batch_delete_user)
- def delete(self):
- """批量删除用户"""
- users_id = request.form.get('users_id')
- if users_id is not None:
- users_id = json.loads(users_id)
- else:
- return jsonify(code=StatesCode.PARA_ERROR, message='用户id不能为空')
- try:
- with Session(current_app.engine) as session:
- stmt = select(User).where(User.id.in_(users_id))
- results = session.execute(stmt).scalars().all()
- for result in results:
- session.delete(result)
- session.commit()
- save_log(request, Module.USER, OperationType.BATCH_DELETE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message='批量删除成功')
- except Exception as e:
- save_log(request, Module.USER, OperationType.BATCH_DELETE, StatesCode.UNKNOWN_ERROR)
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
- batch_modify_user_status = reqparse.RequestParser(bundle_errors=True)
- batch_modify_user_status.add_argument(name='users_id', type=list, required=True, location='form', help='用户id列表')
- batch_modify_user_status.add_argument(name='status', type=int, required=True, location='form',
- help='用户状态 0为正常,1为禁用')
- @ns.route('/batch_modify_user_status')
- class BatchModifyUsersStatusApi(Resource):
- method_decorators = [login_required]
- @ns.doc(id='batch_modify_user_status', description='批量修改用户状态,激活、禁用')
- @ns.expect(batch_modify_user_status)
- def put(self):
- """批量修改用户状态,激活、禁用"""
- users_id = request.form.get('users_id')
- account_status = request.form.get('status')
- if users_id is not None and account_status is not None:
- users_id = json.loads(users_id)
- else:
- return jsonify(code=StatesCode.PARA_ERROR, message='用户id和状态不能为空')
- try:
- values = []
- for user_id in users_id:
- values.append({"id": user_id, "account_status": account_status})
- with Session(current_app.engine) as session:
- session.execute(
- update(User),
- values
- )
- session.commit()
- save_log(request, Module.USER, OperationType.BATCH_UPDATE, StatesCode.SUCCESS)
- return jsonify(code=StatesCode.SUCCESS, message='批量修改用户状态成功')
- except Exception as e:
- save_log(request, Module.USER, OperationType.BATCH_UPDATE, StatesCode.UNKNOWN_ERROR)
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
- @ns.route('/export_data')
- class ExportDataApi(Resource):
- method_decorators = [login_required]
- @ns.doc(id='export_data', description='导出用户数据')
- def get(self):
- """导出用户数据"""
- try:
- with Session(current_app.engine) as session:
- stmt = select(User)
- results = session.execute(stmt).scalars().all()
- my_xlwt = MyXlwt()
- title = ['id', 'user_name', 'password', 'name', 'phone', 'email', 'company', 'department',
- 'role',
- 'permission', 'on_job_status', 'duty', 'account_status', 'nationality', 'register_time',
- 'common_menus']
- # 写入标题
- my_xlwt.write_row(0, 0, title)
- # 写入内容
- start_row = 1 # 内容起始行
- for result in results:
- data = [result.id, result.user_name, result.password, result.name, result.phone,
- result.email, result.company, result.department, result.role, result.permission,
- result.on_job_status,
- result.duty, result.account_status, result.nationality, result.register_time,
- result.common_menus]
- my_xlwt.write_row(start_row, 0, data)
- start_row += 1
- output = BytesIO()
- my_xlwt.save(output)
- response = Response(output.getvalue())
- response.headers['Content-Type'] = 'application/octet-stream'
- response.headers['Content-Disposition'] = 'attachment; filename=%s' % 'users.xls'
- save_log(request, Module.USER, OperationType.EXPORT, StatesCode.SUCCESS)
- return response
- except Exception as e:
- save_log(request, Module.USER, OperationType.EXPORT, StatesCode.UNKNOWN_ERROR)
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message=str(e))
|