OAuth2.0授权流程详解
约 1617 字大约 5 分钟
oauth2authorization
2025-08-21
概述
OAuth 2.0 是一个授权框架,允许第三方应用在用户授权下访问用户在资源服务器上的受保护资源,而无需获取用户的密码。它是现代 Web 和移动应用最广泛使用的授权标准。
OAuth 2.0 角色与概念
授权码流程 (Authorization Code Flow)
最安全的流程,适用于有后端服务器的 Web 应用。
# 后端实现 (Python/Flask)
import secrets
import requests
from flask import Flask, redirect, request, session
CLIENT_ID = os.environ["OAUTH_CLIENT_ID"]
CLIENT_SECRET = os.environ["OAUTH_CLIENT_SECRET"]
AUTH_URL = "https://github.com/login/oauth/authorize"
TOKEN_URL = "https://github.com/login/oauth/access_token"
REDIRECT_URI = "https://myapp.com/callback"
@app.route('/login')
def login():
state = secrets.token_urlsafe(32)
session['oauth_state'] = state # 存储 state 用于验证
params = {
'client_id': CLIENT_ID,
'redirect_uri': REDIRECT_URI,
'scope': 'read:user user:email',
'state': state,
'response_type': 'code',
}
auth_url = f"{AUTH_URL}?{urlencode(params)}"
return redirect(auth_url)
@app.route('/callback')
def callback():
# 验证 state 防止 CSRF
if request.args.get('state') != session.pop('oauth_state', None):
abort(403, "Invalid state parameter")
code = request.args.get('code')
if not code:
abort(400, "Missing authorization code")
# 用 code 换取 token(后端到后端,client_secret 不暴露)
response = requests.post(TOKEN_URL, data={
'grant_type': 'authorization_code',
'code': code,
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'redirect_uri': REDIRECT_URI,
}, headers={'Accept': 'application/json'})
tokens = response.json()
access_token = tokens['access_token']
refresh_token = tokens.get('refresh_token')
# 使用 access_token 获取用户信息
user_info = requests.get('https://api.github.com/user',
headers={'Authorization': f'Bearer {access_token}'}
).json()
session['user'] = user_info
return redirect('/dashboard')Authorization Code + PKCE
PKCE(Proof Key for Code Exchange)是授权码流程的增强版,专为公开客户端(SPA、移动端)设计,防止授权码拦截攻击。
// SPA 实现 PKCE
// 1. 生成 code_verifier 和 code_challenge
function generateCodeVerifier() {
const array = new Uint8Array(32);
crypto.getRandomValues(array);
return base64UrlEncode(array);
}
async function generateCodeChallenge(verifier) {
const encoder = new TextEncoder();
const data = encoder.encode(verifier);
const hash = await crypto.subtle.digest('SHA-256', data);
return base64UrlEncode(new Uint8Array(hash));
}
function base64UrlEncode(buffer) {
return btoa(String.fromCharCode(...buffer))
.replace(/\+/g, '-')
.replace(/\//g, '_')
.replace(/=+$/, '');
}
// 2. 发起授权请求
async function startLogin() {
const codeVerifier = generateCodeVerifier();
const codeChallenge = await generateCodeChallenge(codeVerifier);
// 安全存储 code_verifier
sessionStorage.setItem('code_verifier', codeVerifier);
const params = new URLSearchParams({
response_type: 'code',
client_id: CLIENT_ID,
redirect_uri: REDIRECT_URI,
scope: 'openid profile email',
state: crypto.randomUUID(),
code_challenge: codeChallenge,
code_challenge_method: 'S256',
});
window.location.href = `${AUTH_URL}?${params}`;
}
// 3. 回调处理
async function handleCallback() {
const params = new URLSearchParams(window.location.search);
const code = params.get('code');
const codeVerifier = sessionStorage.getItem('code_verifier');
const response = await fetch(TOKEN_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'authorization_code',
code,
client_id: CLIENT_ID,
redirect_uri: REDIRECT_URI,
code_verifier: codeVerifier, // 发送 verifier,不需要 client_secret
}),
});
const tokens = await response.json();
// 存储 access_token 和 refresh_token
}Client Credentials Flow
服务间通信,无用户参与。
# 机器对机器认证
response = requests.post(TOKEN_URL, data={
'grant_type': 'client_credentials',
'client_id': SERVICE_CLIENT_ID,
'client_secret': SERVICE_CLIENT_SECRET,
'scope': 'api:read api:write',
})
access_token = response.json()['access_token']
# 通常没有 refresh_token,过期后重新获取Device Authorization Flow
适用于无浏览器的设备(智能电视、CLI 工具)。
Implicit Flow(已弃用)
# Implicit Flow 将 token 直接放在 URL fragment 中
# 安全问题:
# 1. token 暴露在浏览器历史记录中
# 2. token 可能通过 Referer 头泄露
# 3. 无法使用 refresh_token
# 4. 容易受到 token 注入攻击
# 已被 Authorization Code + PKCE 替代
# 不要在新项目中使用 Implicit FlowToken 类型与管理
# Access Token: 短生命周期(15 分钟 - 1 小时)
# Refresh Token: 长生命周期(7 天 - 30 天),用于获取新的 access_token
# Token 刷新流程
def refresh_access_token(refresh_token: str) -> dict:
response = requests.post(TOKEN_URL, data={
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET, # 机密客户端需要
})
if response.status_code != 200:
# refresh_token 可能已过期或被吊销
raise AuthenticationError("Token refresh failed")
tokens = response.json()
return {
'access_token': tokens['access_token'],
'refresh_token': tokens.get('refresh_token', refresh_token),
'expires_in': tokens['expires_in'],
}Token Introspection & Revocation
# Token 内省(验证 opaque token)
def introspect_token(token: str) -> dict:
response = requests.post(f"{AUTH_SERVER}/token/introspect", data={
'token': token,
'token_type_hint': 'access_token',
}, auth=(CLIENT_ID, CLIENT_SECRET))
result = response.json()
if not result.get('active'):
raise AuthenticationError("Token is not active")
return result
# {'active': True, 'sub': 'user123', 'scope': 'read write', 'exp': 1700000000}
# Token 吊销
def revoke_token(token: str, token_type: str = 'refresh_token'):
requests.post(f"{AUTH_SERVER}/token/revoke", data={
'token': token,
'token_type_hint': token_type,
}, auth=(CLIENT_ID, CLIENT_SECRET))Scope 设计
# 精细化的 Scope 设计
SCOPES = {
'openid': 'Access your identity',
'profile': 'Access your profile information',
'email': 'Access your email address',
'read:repos': 'Read access to repositories',
'write:repos': 'Write access to repositories',
'admin:org': 'Full admin access to organization',
}
# 最小权限:只请求需要的 scope
# 好: scope=read:repos
# 坏: scope=admin:org (权限过大)授权流程选择
最佳实践
- Web 应用使用 Authorization Code 流程,SPA/移动端加上 PKCE
- 不要使用 Implicit Flow,已被弃用,改用 Auth Code + PKCE
- access_token 有效期控制在 15 分钟,配合 refresh_token 续期
- 始终验证 state 参数,防止 CSRF 攻击
- Scope 遵循最小权限,只请求应用实际需要的权限
- 客户端密钥安全存储,永远不要在前端代码中暴露 client_secret
- 实现 token 吊销,用户注销或安全事件时立即撤销 token
- 使用 HTTPS,所有 OAuth 通信必须通过 TLS 加密
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于