API安全设计原则
约 1697 字大约 6 分钟
apisecurity
2025-08-17
概述
API 是现代应用架构的核心,也是攻击者的主要目标。OWASP API Security Top 10 揭示了 API 安全的关键风险领域。本文将系统介绍 API 安全设计的核心原则,涵盖认证、授权、限流、输入验证和错误处理等方面。
API 安全威胁模型
认证方案
API Key
from fastapi import FastAPI, Depends, HTTPException, Security
from fastapi.security import APIKeyHeader
API_KEY_HEADER = APIKeyHeader(name="X-API-Key")
async def verify_api_key(api_key: str = Security(API_KEY_HEADER)):
# 从数据库或缓存验证 API Key
key_record = await db.get_api_key(api_key)
if not key_record or key_record.is_revoked:
raise HTTPException(status_code=403, detail="Invalid API key")
if key_record.is_expired:
raise HTTPException(status_code=403, detail="API key expired")
return key_record
@app.get("/api/data")
async def get_data(key: dict = Depends(verify_api_key)):
return {"data": "sensitive information"}OAuth 2.0 Bearer Token
from fastapi import Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
security = HTTPBearer()
async def verify_bearer_token(
credentials: HTTPAuthorizationCredentials = Depends(security)
):
token = credentials.credentials
try:
payload = jwt.decode(
token,
PUBLIC_KEY,
algorithms=["RS256"],
audience="https://api.example.com",
issuer="https://auth.example.com",
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")mTLS(双向 TLS)
# Nginx mTLS 配置
server {
listen 443 ssl;
ssl_certificate /etc/nginx/certs/server.crt;
ssl_certificate_key /etc/nginx/certs/server.key;
# 要求客户端证书
ssl_client_certificate /etc/nginx/certs/ca.crt;
ssl_verify_client on;
location /api/ {
# 将客户端证书信息传递给后端
proxy_set_header X-Client-DN $ssl_client_s_dn;
proxy_set_header X-Client-Verify $ssl_client_verify;
proxy_pass http://backend:8080;
}
}授权设计
对象级授权 (BOLA/IDOR 防御)
# OWASP API Security #1: Broken Object Level Authorization
# 最常见的 API 安全漏洞
# 错误:无授权检查
@app.get("/api/orders/{order_id}")
async def get_order_bad(order_id: int):
order = await db.get_order(order_id)
return order # 任何用户都能查看任何订单
# 正确:对象级授权检查
@app.get("/api/orders/{order_id}")
async def get_order_good(order_id: int, user=Depends(get_current_user)):
order = await db.get_order(order_id)
if not order:
raise HTTPException(status_code=404)
if order.user_id != user.id and not user.is_admin:
raise HTTPException(status_code=403, detail="Access denied")
return order基于角色的访问控制 (RBAC)
from functools import wraps
from enum import Enum
class Permission(Enum):
READ_USERS = "read:users"
WRITE_USERS = "write:users"
DELETE_USERS = "delete:users"
ADMIN = "admin"
ROLE_PERMISSIONS = {
"viewer": {Permission.READ_USERS},
"editor": {Permission.READ_USERS, Permission.WRITE_USERS},
"admin": {Permission.READ_USERS, Permission.WRITE_USERS,
Permission.DELETE_USERS, Permission.ADMIN},
}
def require_permission(permission: Permission):
def decorator(func):
@wraps(func)
async def wrapper(*args, user=Depends(get_current_user), **kwargs):
user_permissions = ROLE_PERMISSIONS.get(user.role, set())
if permission not in user_permissions:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return await func(*args, user=user, **kwargs)
return wrapper
return decorator
@app.delete("/api/users/{user_id}")
@require_permission(Permission.DELETE_USERS)
async def delete_user(user_id: int, user=Depends(get_current_user)):
await db.delete_user(user_id)
return {"status": "deleted"}速率限制
# 使用 Redis 实现滑动窗口限流
import redis
import time
r = redis.Redis()
def rate_limit(key: str, max_requests: int, window_seconds: int) -> bool:
"""滑动窗口限流"""
now = time.time()
pipeline = r.pipeline()
# 移除窗口外的记录
pipeline.zremrangebyscore(key, 0, now - window_seconds)
# 添加当前请求
pipeline.zadd(key, {str(now): now})
# 获取窗口内请求数
pipeline.zcard(key)
# 设置过期时间
pipeline.expire(key, window_seconds)
results = pipeline.execute()
request_count = results[2]
return request_count <= max_requests
# FastAPI 中间件
@app.middleware("http")
async def rate_limit_middleware(request, call_next):
client_ip = request.client.host
endpoint = request.url.path
# 不同端点不同限制
limits = {
"/api/login": (5, 60), # 5 次/分钟
"/api/register": (3, 300), # 3 次/5 分钟
"default": (100, 60), # 100 次/分钟
}
max_req, window = limits.get(endpoint, limits["default"])
key = f"rate_limit:{client_ip}:{endpoint}"
if not rate_limit(key, max_req, window):
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded"},
headers={
"Retry-After": str(window),
"X-RateLimit-Limit": str(max_req),
"X-RateLimit-Remaining": "0",
}
)
response = await call_next(request)
return response输入验证
from pydantic import BaseModel, Field, validator, EmailStr
from typing import Optional
import re
class CreateUserRequest(BaseModel):
username: str = Field(..., min_length=3, max_length=30, pattern=r'^[a-zA-Z0-9_]+$')
email: EmailStr
password: str = Field(..., min_length=8, max_length=128)
age: Optional[int] = Field(None, ge=0, le=150)
bio: Optional[str] = Field(None, max_length=500)
@validator('password')
def password_strength(cls, v):
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain lowercase letter')
if not re.search(r'[0-9]', v):
raise ValueError('Password must contain digit')
return v
@app.post("/api/users")
async def create_user(data: CreateUserRequest):
# Pydantic 自动验证,无效输入会返回 422
user = await db.create_user(**data.dict())
return {"id": user.id, "username": user.username}响应数据过滤
# 不要直接返回数据库模型(可能包含敏感字段)
# 错误:返回所有字段
@app.get("/api/users/{user_id}")
async def get_user_bad(user_id: int):
user = await db.get_user(user_id)
return user.__dict__ # 可能包含 password_hash, internal_notes 等
# 正确:使用响应模型过滤字段
class UserResponse(BaseModel):
id: int
username: str
email: str
created_at: datetime
class Config:
from_attributes = True
@app.get("/api/users/{user_id}", response_model=UserResponse)
async def get_user_good(user_id: int, user=Depends(get_current_user)):
db_user = await db.get_user(user_id)
return db_user # Pydantic 自动过滤,只返回 UserResponse 中定义的字段错误处理
# 安全的错误处理:不泄露内部信息
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
# 记录完整错误信息到日志
logger.error(f"Unhandled error: {exc}", exc_info=True, extra={
"request_id": request.state.request_id,
"path": request.url.path,
"method": request.method,
})
# 返回给用户的错误信息不包含内部细节
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
"request_id": request.state.request_id, # 用于追踪
# 不返回:堆栈跟踪、SQL 语句、文件路径、内部 IP
}
)
# 认证错误统一返回模糊信息
@app.post("/api/login")
async def login(credentials: LoginRequest):
user = await db.get_user_by_email(credentials.email)
if not user or not verify_password(credentials.password, user.password_hash):
# 不区分"用户不存在"和"密码错误"
raise HTTPException(status_code=401, detail="Invalid credentials")
return create_token(user)审计日志
import structlog
logger = structlog.get_logger()
@app.middleware("http")
async def audit_log_middleware(request, call_next):
request_id = str(uuid.uuid4())
request.state.request_id = request_id
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
logger.info("api_request",
request_id=request_id,
method=request.method,
path=request.url.path,
status_code=response.status_code,
duration_ms=round(duration * 1000, 2),
client_ip=request.client.host,
user_agent=request.headers.get("user-agent"),
user_id=getattr(request.state, 'user_id', None),
)
response.headers["X-Request-Id"] = request_id
return responseOWASP API Security Top 10
最佳实践
- 所有 API 都需要认证,除非明确设计为公开接口
- 每个请求都进行对象级授权检查(BOLA 防御),这是最常见的 API 漏洞
- 使用响应模型过滤输出,永远不要直接返回数据库模型
- 实施速率限制,根据端点敏感度设置不同的限制
- 使用强类型输入验证(Pydantic/JSON Schema),拒绝不合规的数据
- 错误信息不泄露内部细节,使用 request_id 关联内部日志
- 记录完整的审计日志,包括请求方法、路径、响应状态、用户身份
- API 版本化管理,及时下线废弃版本
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于