login.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. from flask import request, jsonify
  2. from sqlalchemy import select
  3. from werkzeug.security import check_password_hash
  4. from flask_restx import Resource, Namespace, reqparse
  5. from sqlalchemy.orm import Session
  6. from app.configs.config import engine
  7. from app.modle.users import User
  8. from app.utils.jwt_util import generate_jwt
  9. from app.defines import StatesCode
  10. ns = Namespace('login', description='登入')
  11. login = reqparse.RequestParser(bundle_errors=True)
  12. login.add_argument(name='username', type=str, required=True, location='form', help='用户名')
  13. login.add_argument(name='password', type=str, required=True, location='form', help='密码')
  14. @ns.route('')
  15. class LoginApi(Resource):
  16. @ns.doc(id='login', description='登入')
  17. @ns.expect(login)
  18. def post(self):
  19. """登入"""
  20. username = request.form.get('username')
  21. password = request.form.get('password')
  22. with Session(engine) as session:
  23. stmt = select(User).where(User.user_name == username)
  24. result = session.execute(stmt).scalars().first()
  25. # statement = session.query(User).filter_by(user_name=username).first()
  26. if result is None:
  27. return jsonify(code=StatesCode.UNKNOWN_ERROR, message='用户不存在')
  28. # 是否为禁用状态
  29. if result.account_status:
  30. return jsonify(code=StatesCode.UNKNOWN_ERROR, message='用户已禁用')
  31. # 验证用户密码
  32. if not result.check_password(password):
  33. return jsonify(code=StatesCode.UNKNOWN_ERROR, message="密码错误")
  34. # 生成token
  35. token = generate_jwt(result.id)
  36. return jsonify(code=StatesCode.SUCCESS, message="登录成功", date=token)