12345678910111213141516171819202122232425262728293031323334353637383940414243444546 |
- from flask import request, jsonify
- from sqlalchemy import select
- from werkzeug.security import check_password_hash
- from flask_restx import Resource, Namespace, reqparse
- from sqlalchemy.orm import Session
- from app.configs.config import engine
- from app.modle.users import User
- from app.utils.jwt_util import generate_jwt
- from app.defines import StatesCode
- ns = Namespace('login', description='登入')
- login = reqparse.RequestParser(bundle_errors=True)
- login.add_argument(name='username', type=str, required=True, location='form', help='用户名')
- login.add_argument(name='password', type=str, required=True, location='form', help='密码')
- @ns.route('')
- class LoginApi(Resource):
- @ns.doc(id='login', description='登入')
- @ns.expect(login)
- def post(self):
- """登入"""
- username = request.form.get('username')
- password = request.form.get('password')
- with Session(engine) as session:
- stmt = select(User).where(User.user_name == username)
- result = session.execute(stmt).scalars().first()
- # statement = session.query(User).filter_by(user_name=username).first()
- if result is None:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message='用户不存在')
- # 是否为禁用状态
- if result.account_status:
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message='用户已禁用')
- # 验证用户密码
- if not result.check_password(password):
- return jsonify(code=StatesCode.UNKNOWN_ERROR, message="密码错误")
- # 生成token
- token = generate_jwt(result.id)
- return jsonify(code=StatesCode.SUCCESS, message="登录成功", date=token)
|