2025-03-09
Web 安全里有一组概念经常一起出现:`Cookie`、`Token`、`IP 检测`、`指纹检测`、`DDoS 防护`。 它们看起来分散,其实都在解决同一个问题:服务端怎么判断“这个请求是不是可信、是不是异常、是不是该继续处理”。
一个请求从浏览器进入服务端,通常会经过这些步骤:
可以简单拆成四层:
不要指望某一个点解决所有问题。
比如 Cookie 加了 HttpOnly,不代表接口不需要权限校验;Token 做了签名,不代表不用考虑泄露;IP 被标记异常,也不代表可以永远拉黑。
Cookie 的优势是浏览器原生支持。
服务端下发 Cookie,浏览器后续请求会自动带上它,所以它很适合保存 session id、刷新凭证、CSRF 相关标记。
但 Cookie 不是保险箱。
如果你把敏感信息明文塞进去,任何能拿到 Cookie 的地方都可能看到它。
from flask import Flask, jsonify, make_response
app = Flask(__name__)
@app.post("/login")
def login():
session_id = "server_generated_random_session_id"
resp = make_response(jsonify({"ok": True}))
resp.set_cookie(
"sid",
session_id,
secure=True,
httponly=True,
samesite="Lax",
path="/",
)
return resp
几个关键属性:
Secure:只通过 HTTPS 发送HttpOnly:不允许 JavaScript 读取SameSite:减少跨站请求携带 Cookie 的风险Path:限制 Cookie 被哪些路径使用OWASP Session Management Cheat Sheet 明确建议给敏感 Cookie 配置 Secure 和 HttpOnly 等属性。
这里最容易犯的错,是把用户信息、角色、权限列表直接塞进 Cookie,然后在后端“信任它”。不要这样做。
更稳的思路是:
Cookie: sid=random_id
Server: sid -> user_id / role / risk_state
Python 里可以先用字典模拟,真实项目换成 Redis 或数据库:
import secrets
SESSION_STORE = {}
def create_session(user_id: str):
session_id = secrets.token_urlsafe(32)
SESSION_STORE[session_id] = {
"user_id": user_id,
"risk": "normal",
}
return session_id
def get_session(session_id: str | None):
if not session_id:
return None
return SESSION_STORE.get(session_id)
这样即使 Cookie 被篡改,服务端也不会直接相信里面的业务状态。
很多人把“加密 token”和“JWT”混在一起说。
这两件事要先分清:
JWT 默认常见用法是签名,不是加密。
OWASP JWT Cheat Sheet 也提醒过,JWT 内容是 Base64URL 编码,不能把“看起来像乱码”当成保密。
import jwt
JWT_SECRET = "replace-with-a-strong-secret"
JWT_ALG = "HS256"
def issue_access_token(user_id: str, role: str):
payload = {
"sub": user_id,
"role": role,
"scope": ["profile:read", "order:read"],
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALG)
def verify_access_token(token: str):
return jwt.decode(
token,
JWT_SECRET,
algorithms=[JWT_ALG],
options={"require": ["sub"]},
)
重点:
algorithmsPyJWT 文档里的 decode() 示例也强调传入 algorithms。
这能减少算法混淆类问题。
如果你的确需要把一小段声明放到客户端,并且不希望客户端读取,可以用 cryptography.fernet 这类认证加密方案。
import json
from cryptography.fernet import Fernet, InvalidToken
FERNET_KEY = Fernet.generate_key()
fernet = Fernet(FERNET_KEY)
def encrypt_claims(claims: dict) -> str:
data = json.dumps(claims, ensure_ascii=False).encode("utf-8")
return fernet.encrypt(data).decode("utf-8")
def decrypt_claims(token: str) -> dict | None:
try:
raw = fernet.decrypt(token.encode("utf-8"))
return json.loads(raw)
except (InvalidToken, json.JSONDecodeError):
return None
注意几点:
如果只是普通登录态,优先考虑服务端 session。
Token 更适合 API 调用、微服务间传递声明、短链路授权等场景。
IP 地址能提供风险线索,但不能直接当用户身份。
移动网络、公司出口、代理、NAT、云服务都会让 IP 和用户之间的关系变得复杂。
Python 标准库 ipaddress 很适合做基础判断:
import ipaddress
def parse_ip(value: str):
try:
return ipaddress.ip_address(value)
except ValueError:
return None
def classify_ip(value: str):
ip = parse_ip(value)
if ip is None:
return {"valid": False}
return {
"valid": True,
"version": ip.version,
"is_private": ip.is_private,
"is_loopback": ip.is_loopback,
"is_multicast": ip.is_multicast,
"is_reserved": ip.is_reserved,
}
如果服务部署在 Nginx、负载均衡或 CDN 后面,客户端 IP 可能来自转发头。
这里最重要的原则是:只信任你控制的代理层写入的头。
from flask import request
TRUSTED_PROXY_HEADERS = ["CF-Connecting-IP", "X-Real-IP"]
def get_client_ip():
for header in TRUSTED_PROXY_HEADERS:
value = request.headers.get(header)
if value:
ip = parse_ip(value.strip())
if ip:
return str(ip)
return request.remote_addr
不要让公网客户端随便传一个 X-Forwarded-For,服务端就照单全收。
这类头只有在可信代理清洗之后才有意义。
可以把 IP 检测写成评分,而不是一刀切:
def score_ip_risk(ip_info: dict, request_count: int):
score = 0
if not ip_info.get("valid"):
score += 40
if ip_info.get("is_private") or ip_info.get("is_loopback"):
score += 10
if request_count > 100:
score += 30
if request_count > 300:
score += 40
return min(score, 100)
风控里更推荐“分层动作”:
浏览器指纹通常会组合 User-Agent、Accept-Language、屏幕信息、时区、Canvas、字体等信号。
但指纹检测天然涉及隐私,必须明确用途、减少采集、避免过度追踪。
在后端防护里,建议只做轻量指纹:
不要为了“更准”就无限采集浏览器细节。
风险识别不是越重越好,合规和用户体验也要算进去。
import hashlib
from flask import request
def build_light_fingerprint():
parts = [
request.headers.get("User-Agent", ""),
request.headers.get("Accept-Language", ""),
request.headers.get("Accept-Encoding", ""),
]
raw = "|".join(parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
这个哈希不能当成强身份,只能当风险信号。
比如同一个账号突然从完全不同的 IP、不同指纹、异常频率访问,就可以提高风险等级。
def compare_fingerprint(old_fp: str | None, new_fp: str):
if not old_fp:
return {"changed": False, "risk": 0}
if old_fp == new_fp:
return {"changed": False, "risk": 0}
return {"changed": True, "risk": 25}
建议把指纹变化当成“加分项”,而不是直接封禁依据。
浏览器升级、插件变化、系统设置变化都可能导致指纹改变。
DDoS 防护不能只靠 Flask 或 FastAPI。
真正的 DDoS 防护通常要从更外层开始:
Python 应用能做的是应用层防护,比如限制某个 IP、某个账号、某个接口的请求量,降低后端被打穿的概率。
它不是流量清洗中心。
下面是一个内存版示例,适合理解思路。
真实项目建议换 Redis 或网关限流。
from collections import defaultdict, deque
from time import monotonic
class SlidingLimiter:
def __init__(self, limit: int, span: float):
self.limit = limit
self.span = span
self.records = defaultdict(deque)
def allow(self, key: str):
now = monotonic()
bucket = self.records[key]
while bucket and now - bucket[0] > self.span:
bucket.popleft()
if len(bucket) >= self.limit:
return False
bucket.append(now)
return True
Flask 里使用:
from flask import Flask, jsonify, request
app = Flask(__name__)
limiter = SlidingLimiter(limit=120, span=60.0)
@app.before_request
def rate_limit():
client_ip = get_client_ip() or "unknown"
key = f"{client_ip}:{request.path}"
if not limiter.allow(key):
return jsonify({"error": "too many requests"}), 429
这段代码只适合单进程演示。
如果部署多进程、多机器,必须把计数状态放到共享存储里,或者交给网关层处理。
import redis
r = redis.Redis.from_url("redis://localhost:6379/0", decode_responses=True)
def redis_limit(key: str, limit: int, span: int):
current = r.incr(key)
if current == 1:
r.expire(key, span)
return current <= limit
应用层还可以按业务做更细的保护:
下面把 Cookie、Token、IP、指纹和限流串成一个简化版中间件。
它不是完整生产代码,但能展示架构思路。
from flask import Flask, g, jsonify, request
app = Flask(__name__)
limiter = SlidingLimiter(limit=120, span=60.0)
@app.before_request
def security_gate():
client_ip = get_client_ip() or "unknown"
ip_info = classify_ip(client_ip)
fingerprint = build_light_fingerprint()
rate_key = f"{client_ip}:{request.path}"
if not limiter.allow(rate_key):
return jsonify({"error": "rate limited"}), 429
session_id = request.cookies.get("sid")
session = get_session(session_id)
request_count = len(limiter.records[rate_key])
ip_risk = score_ip_risk(ip_info, request_count)
fp_risk = 0
if session:
fp_risk = compare_fingerprint(
session.get("fingerprint"),
fingerprint,
)["risk"]
risk_score = min(ip_risk + fp_risk, 100)
g.security = {
"client_ip": client_ip,
"fingerprint": fingerprint,
"risk_score": risk_score,
"session": session,
}
if risk_score >= 80:
return jsonify({"error": "high risk request"}), 403
业务接口可以读取 g.security:
@app.get("/api/profile")
def profile():
session = g.security.get("session")
if not session:
return jsonify({"error": "unauthorized"}), 401
return jsonify(
{
"user_id": session["user_id"],
"risk_score": g.security["risk_score"],
}
)
这套模型的重点是“分数化”和“分层处理”。
不要因为一个信号异常就直接定性,也不要因为一个信号正常就完全放行。
安全防护如果没有日志,很快就会变成黑盒。
建议记录结构化日志:
import json
import logging
logger = logging.getLogger("security")
def log_security_event(event: str, payload: dict):
logger.info(
json.dumps(
{
"event": event,
"client_ip": payload.get("client_ip"),
"path": request.path,
"risk_score": payload.get("risk_score"),
"fingerprint": payload.get("fingerprint", "")[:12],
},
ensure_ascii=False,
)
)
日志里不要放明文 Token、完整 Cookie、完整指纹、密码、身份证、手机号等敏感信息。
能脱敏就脱敏,能哈希就哈希,能少记就少记。