Python 开发篇——如何在 Flask 下编写 JWT 登录

首先,HTTP 是无状态的协议(对于事务处理没有记忆能力,每次客户端和服务端会话完成时,服务端不会保存任何会话信息)——每个请求都是完全独立的,服务端无法确认当前访问者的身份信息,无法分辨上一次的请求发送者和这一次的发送者是不是同一个人。所以服务器与浏览器为了进行会话跟踪(知道是谁在访问自己),就必须主动的去维护一个状态,这个状态用于告知服务端前后两个请求是否来自同一浏览器。为此,前端开发者便加入了 Cookie 来实现有状态的 HTTP 连接。而后实现授权的方式就有 cookie、session、token 和 JWT。

什么是 JWT?

JWT.IO 解释:JSON Web Token (JWT) 是一个开放标准 ( RFC 7519 ),它定义了一种紧凑且自包含的方式,用于在各方之间作为 JSON 对象安全地传输信息。该信息可以被验证和信任,因为它是经过数字签名的。JWT 可以使用秘密(使用 HMAC 算法)或使用 RSA 或 ECDSA 的公钥/私钥对进行签名。

案例

由于网上许多案例都为 HS256(对称加密),所以这里我使用 RSA256(非对称加密)作为补充。

  1. 首先需要生成私钥和公钥

ssh-keygen -t rsa -b 4096 -m PEM -f jwtRS256.key# Don't add passphraseopenssl rsa -in jwtRS256.key -pubout -outform PEM -out jwtRS256.key.pubcat jwtRS256.keycat jwtRS256.key.pub

复制代码

  1. 选择 Python 的 JWT 库,我这里选择了两个库

  • PyJWT(需要 cryptography 库)

 >>> import jwt >>> with open('jwtRS256.key', 'rb') as f: ...    private_key = f.read() ... >>> with open('jwtRS256.key.pub', 'rb') as f: ...    public_key = f.read() ... >>> print(encoded) eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg >>> decoded = jwt.decode(encoded, public_key, algorithms=["RS256"]) {'some': 'payload'}

复制代码

  >>> from authlib.jose import jwt  >>> header = {'alg': 'RS256'}  >>> payload = {'iss': 'Authlib', 'sub': '123', ...}  >>> with open('jwtRS256.key', 'rb') as f:  ...    private_key = f.read()  ...  >>> s = jwt.encode(header, payload, private_key)  >>> with open('jwtRS256.key.pub', 'rb') as f:  ...    public_key = f.read()  ...  >>> claims = jwt.decode(s, public_key)  >>> print(claims)  {'iss': 'Authlib', 'sub': '123', ...}  >>> print(claims.header)  {'alg': 'RS256', 'typ': 'JWT'}  >>> claims.validate()3. 

复制代码

  1. 工作原理 

Using JWT for user authentication in Flask》中的代码参考:

# flask importsfrom flask import Flask, request, jsonify, make_responsefrom flask_sqlalchemy import SQLAlchemyimport uuid # for public idfrom  werkzeug.security import generate_password_hash, check_password_hash# imports for PyJWT authenticationimport jwtfrom datetime import datetime, timedeltafrom functools import wraps
# creates Flask objectapp = Flask(__name__)# configuration# NEVER HARDCODE YOUR CONFIGURATION IN YOUR CODE# INSTEAD CREATE A .env FILE AND STORE IN ITapp.config['SECRET_KEY'] = 'your secret key'# database nameapp.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///Database.db'app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True# creates SQLALCHEMY objectdb = SQLAlchemy(app)
# Database ORMsclass User(db.Model): id = db.Column(db.Integer, primary_key = True) public_id = db.Column(db.String(50), unique = True) name = db.Column(db.String(100)) email = db.Column(db.String(70), unique = True) password = db.Column(db.String(80))
# decorator for verifying the JWTdef token_required(f): @wraps(f) def decorated(*args, **kwargs): token = None # jwt is passed in the request header if 'x-access-token' in request.headers: token = request.headers['x-access-token'] # return 401 if token is not passed if not token: return jsonify({'message' : 'Token is missing !!'}), 401
try: # decoding the payload to fetch the stored details data = jwt.decode(token, app.config['SECRET_KEY']) current_user = User.query\ .filter_by(public_id = data['public_id'])\ .first() except: return jsonify({ 'message' : 'Token is invalid !!' }), 401 # returns the current logged in users contex to the routes return f(current_user, *args, **kwargs)
return decorated
# User Database Route# this route sends back list of users users@app.route('/user', methods =['GET'])@token_requireddef get_all_users(current_user): # querying the database # for all the entries in it users = User.query.all() # converting the query objects # to list of jsons output = [] for user in users: # appending the user data json # to the response list output.append({ 'public_id': user.public_id, 'name' : user.name, 'email' : user.email })
return jsonify({'users': output})
# route for loging user in@app.route('/login', methods =['POST'])def login(): # creates dictionary of form data auth = request.form
if not auth or not auth.get('email') or not auth.get('password'): # returns 401 if any email or / and password is missing return make_response( 'Could not verify', 401, {'WWW-Authenticate' : 'Basic realm ="Login required !!"'} )
user = User.query\ .filter_by(email = auth.get('email'))\ .first()
if not user: # returns 401 if user does not exist return make_response( 'Could not verify', 401, {'WWW-Authenticate' : 'Basic realm ="User does not exist !!"'} )
if check_password_hash(user.password, auth.get('password')): # generates the JWT Token token = jwt.encode({ 'public_id': user.public_id, 'exp' : datetime.utcnow() + timedelta(minutes = 30) }, app.config['SECRET_KEY'])
return make_response(jsonify({'token' : token.decode('UTF-8')}), 201) # returns 403 if password is wrong return make_response( 'Could not verify', 403, {'WWW-Authenticate' : 'Basic realm ="Wrong Password !!"'} )
# signup route@app.route('/signup', methods =['POST'])def signup(): # creates a dictionary of the form data data = request.form
# gets name, email and password name, email = data.get('name'), data.get('email') password = data.get('password')
# checking for existing user user = User.query\ .filter_by(email = email)\ .first() if not user: # database ORM object user = User( public_id = str(uuid.uuid4()), name = name, email = email, password = generate_password_hash(password) ) # insert user db.session.add(user) db.session.commit()
return make_response('Successfully registered.', 201) else: # returns 202 if user already exists return make_response('User already exists. Please Log in.', 202)
if __name__ == "__main__": # setting debug to True enables hot reload # and also provides a debuger shell # if you hit an error while running the server app.run(debug = True)

复制代码

注意:上面的 wraps 写法存在错误,会产生'_io.BufferedReader' object is not callable

import functools...class token_required(object):
def __init__(self, func): self.func = func functools.update_wrapper(self, func)
def __call__(self, *args, **kwargs): token = None # jwt is passed in the request header if 'x-access-token' in request.headers: token = request.headers['x-access-token'] # return 401 if token is not passed if not token: return jsonify({'message' : 'Token is missing !!'}), 401
try: with open('jwtRS256.key.pub', 'rb') as f: public_key = f.read() # decoding the payload to fetch the stored details data = jwt.decode(token, public_key) current_user = get_user_by_id(data['public_id']) except: return jsonify({ 'message' : 'Token is invalid !!' }), 401 # returns the current logged in users contex to the routes result = self.func(current_user, *args, **kwargs) return result

复制代码

总结

大部分语言都已经支持了 JWT,这里可以从jwt.io的类库中可以看出。目前 JWT 主要运用于 OAuth1、OAuth2 和 OpenID 等单点登录功能,而且将来会有更多的企业和系统开发需要使用 JWT 技术。而且我也非常感谢本文中引用的原作者提供了相关的材料,便于我们学习。