首先,HTTP 是无状态的协议(对于事务处理没有记忆能力,每次客户端和服务端会话完成时,服务端不会保存任何会话信息)——每个请求都是完全独立的,服务端无法确认当前访问者的身份信息,无法分辨上一次的请求发送者和这一次的发送者是不是同一个人。所以服务器与浏览器为了进行会话跟踪(知道是谁在访问自己),就必须主动的去维护一个状态,这个状态用于告知服务端前后两个请求是否来自同一浏览器。为此,前端开发者便加入了 Cookie 来实现有状态的 HTTP 连接。而后实现授权的方式就有 cookie、session、token 和 JWT。
什么是 JWT?
JWT.IO 解释:JSON Web Token (JWT) 是一个开放标准 ( RFC 7519 ),它定义了一种紧凑且自包含的方式,用于在各方之间作为 JSON 对象安全地传输信息。该信息可以被验证和信任,因为它是经过数字签名的。JWT 可以使用秘密(使用 HMAC 算法)或使用 RSA 或 ECDSA 的公钥/私钥对进行签名。
案例
由于网上许多案例都为 HS256(对称加密),所以这里我使用 RSA256(非对称加密)作为补充。
-
首先需要生成私钥和公钥
-
查阅《Generate OpenSSL RSA Key Pair using genpkey》得到了带密码的 pem 文件, 但是在使用中会出现
TypeError: Password was not given but private key is encrypted
的错误。 -
从《How to generate JWT RS256 key》找到了解决办法
ssh-keygen -t rsa -b 4096 -m PEM -f jwtRS256.key
# Don't add passphrase
openssl rsa -in jwtRS256.key -pubout -outform PEM -out jwtRS256.key.pub
cat jwtRS256.key
cat jwtRS256.key.pub
复制代码
-
选择 Python 的 JWT 库,我这里选择了两个库
-
PyJWT(需要 cryptography 库)
>>> import jwt
>>> with open('jwtRS256.key', 'rb') as f:
... private_key = f.read()
...
>>> with open('jwtRS256.key.pub', 'rb') as f:
... public_key = f.read()
...
>>> print(encoded)
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzb21lIjoicGF5bG9hZCJ9.4twFt5NiznN84AWoo1d7KO1T_yoc0Z6XOpOVswacPZg
>>> decoded = jwt.decode(encoded, public_key, algorithms=["RS256"])
{'some': 'payload'}
复制代码
>>> from authlib.jose import jwt
>>> header = {'alg': 'RS256'}
>>> payload = {'iss': 'Authlib', 'sub': '123', ...}
>>> with open('jwtRS256.key', 'rb') as f:
... private_key = f.read()
...
>>> s = jwt.encode(header, payload, private_key)
>>> with open('jwtRS256.key.pub', 'rb') as f:
... public_key = f.read()
...
>>> claims = jwt.decode(s, public_key)
>>> print(claims)
{'iss': 'Authlib', 'sub': '123', ...}
>>> print(claims.header)
{'alg': 'RS256', 'typ': 'JWT'}
>>> claims.validate()3.
复制代码
-
工作原理
《Using JWT for user authentication in Flask》中的代码参考:
# flask imports
from flask import Flask, request, jsonify, make_response
from flask_sqlalchemy import SQLAlchemy
import uuid # for public id
from werkzeug.security import generate_password_hash, check_password_hash
# imports for PyJWT authentication
import jwt
from datetime import datetime, timedelta
from functools import wraps
# creates Flask object
app = Flask(__name__)
# configuration
# NEVER HARDCODE YOUR CONFIGURATION IN YOUR CODE
# INSTEAD CREATE A .env FILE AND STORE IN IT
app.config['SECRET_KEY'] = 'your secret key'
# database name
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///Database.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
# creates SQLALCHEMY object
db = SQLAlchemy(app)
# Database ORMs
class User(db.Model):
id = db.Column(db.Integer, primary_key = True)
public_id = db.Column(db.String(50), unique = True)
name = db.Column(db.String(100))
email = db.Column(db.String(70), unique = True)
password = db.Column(db.String(80))
# decorator for verifying the JWT
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = None
# jwt is passed in the request header
if 'x-access-token' in request.headers:
token = request.headers['x-access-token']
# return 401 if token is not passed
if not token:
return jsonify({'message' : 'Token is missing !!'}), 401
try:
# decoding the payload to fetch the stored details
data = jwt.decode(token, app.config['SECRET_KEY'])
current_user = User.query\
.filter_by(public_id = data['public_id'])\
.first()
except:
return jsonify({
'message' : 'Token is invalid !!'
}), 401
# returns the current logged in users contex to the routes
return f(current_user, *args, **kwargs)
return decorated
# User Database Route
# this route sends back list of users users
@app.route('/user', methods =['GET'])
@token_required
def get_all_users(current_user):
# querying the database
# for all the entries in it
users = User.query.all()
# converting the query objects
# to list of jsons
output = []
for user in users:
# appending the user data json
# to the response list
output.append({
'public_id': user.public_id,
'name' : user.name,
'email' : user.email
})
return jsonify({'users': output})
# route for loging user in
@app.route('/login', methods =['POST'])
def login():
# creates dictionary of form data
auth = request.form
if not auth or not auth.get('email') or not auth.get('password'):
# returns 401 if any email or / and password is missing
return make_response(
'Could not verify',
401,
{'WWW-Authenticate' : 'Basic realm ="Login required !!"'}
)
user = User.query\
.filter_by(email = auth.get('email'))\
.first()
if not user:
# returns 401 if user does not exist
return make_response(
'Could not verify',
401,
{'WWW-Authenticate' : 'Basic realm ="User does not exist !!"'}
)
if check_password_hash(user.password, auth.get('password')):
# generates the JWT Token
token = jwt.encode({
'public_id': user.public_id,
'exp' : datetime.utcnow() + timedelta(minutes = 30)
}, app.config['SECRET_KEY'])
return make_response(jsonify({'token' : token.decode('UTF-8')}), 201)
# returns 403 if password is wrong
return make_response(
'Could not verify',
403,
{'WWW-Authenticate' : 'Basic realm ="Wrong Password !!"'}
)
# signup route
@app.route('/signup', methods =['POST'])
def signup():
# creates a dictionary of the form data
data = request.form
# gets name, email and password
name, email = data.get('name'), data.get('email')
password = data.get('password')
# checking for existing user
user = User.query\
.filter_by(email = email)\
.first()
if not user:
# database ORM object
user = User(
public_id = str(uuid.uuid4()),
name = name,
email = email,
password = generate_password_hash(password)
)
# insert user
db.session.add(user)
db.session.commit()
return make_response('Successfully registered.', 201)
else:
# returns 202 if user already exists
return make_response('User already exists. Please Log in.', 202)
if __name__ == "__main__":
# setting debug to True enables hot reload
# and also provides a debuger shell
# if you hit an error while running the server
app.run(debug = True)
复制代码
注意:上面的 wraps 写法存在错误,会产生'_io.BufferedReader' object is not callable
import functools
...
class token_required(object):
def __init__(self, func):
self.func = func
functools.update_wrapper(self, func)
def __call__(self, *args, **kwargs):
token = None
# jwt is passed in the request header
if 'x-access-token' in request.headers:
token = request.headers['x-access-token']
# return 401 if token is not passed
if not token:
return jsonify({'message' : 'Token is missing !!'}), 401
try:
with open('jwtRS256.key.pub', 'rb') as f:
public_key = f.read()
# decoding the payload to fetch the stored details
data = jwt.decode(token, public_key)
current_user = get_user_by_id(data['public_id'])
except:
return jsonify({
'message' : 'Token is invalid !!'
}), 401
# returns the current logged in users contex to the routes
result = self.func(current_user, *args, **kwargs)
return result
复制代码
总结
大部分语言都已经支持了 JWT,这里可以从jwt.io
的类库中可以看出。目前 JWT 主要运用于 OAuth1、OAuth2 和 OpenID 等单点登录功能,而且将来会有更多的企业和系统开发需要使用 JWT 技术。而且我也非常感谢本文中引用的原作者提供了相关的材料,便于我们学习。