网络安全——Python与web防护




2025-03-09

blog_main_img

Web 安全里有一组概念经常一起出现:`Cookie`、`Token`、`IP 检测`、`指纹检测`、`DDoS 防护`。 它们看起来分散,其实都在解决同一个问题:服务端怎么判断“这个请求是不是可信、是不是异常、是不是该继续处理”。

一张图先看整体链路

一个请求从浏览器进入服务端,通常会经过这些步骤:

请求防护链路

可以简单拆成四层:

  • 身份层:Cookie、Session、Access Token
  • 完整性层:签名 Token、CSRF Token、请求签名
  • 风险层:IP 信誉、频率、代理特征、设备指纹
  • 流量层:限流、排队、缓存、WAF、CDN、熔断

不要指望某一个点解决所有问题。
比如 Cookie 加了 HttpOnly,不代表接口不需要权限校验;Token 做了签名,不代表不用考虑泄露;IP 被标记异常,也不代表可以永远拉黑。

Cookie:它是会话载体,不是保险箱

Cookie 的优势是浏览器原生支持。
服务端下发 Cookie,浏览器后续请求会自动带上它,所以它很适合保存 session id、刷新凭证、CSRF 相关标记。

但 Cookie 不是保险箱。
如果你把敏感信息明文塞进去,任何能拿到 Cookie 的地方都可能看到它。

Flask 设置安全 Cookie

from flask import Flask, jsonify, make_response


app = Flask(__name__)


@app.post("/login")
def login():
    session_id = "server_generated_random_session_id"

    resp = make_response(jsonify({"ok": True}))
    resp.set_cookie(
        "sid",
        session_id,
        secure=True,
        httponly=True,
        samesite="Lax",
        path="/",
    )
    return resp

几个关键属性:

  • Secure:只通过 HTTPS 发送
  • HttpOnly:不允许 JavaScript 读取
  • SameSite:减少跨站请求携带 Cookie 的风险
  • Path:限制 Cookie 被哪些路径使用

OWASP Session Management Cheat Sheet 明确建议给敏感 Cookie 配置 SecureHttpOnly 等属性。
这里最容易犯的错,是把用户信息、角色、权限列表直接塞进 Cookie,然后在后端“信任它”。不要这样做。

Cookie 里只放索引,状态放服务端

更稳的思路是:

Cookie: sid=random_id
Server: sid -> user_id / role / risk_state

Python 里可以先用字典模拟,真实项目换成 Redis 或数据库:

import secrets


SESSION_STORE = {}


def create_session(user_id: str):
    session_id = secrets.token_urlsafe(32)
    SESSION_STORE[session_id] = {
        "user_id": user_id,
        "risk": "normal",
    }
    return session_id


def get_session(session_id: str | None):
    if not session_id:
        return None
    return SESSION_STORE.get(session_id)

这样即使 Cookie 被篡改,服务端也不会直接相信里面的业务状态。

Token:签名和加密不是一回事

很多人把“加密 token”和“JWT”混在一起说。
这两件事要先分清:

  • 签名 Token:防篡改,但内容通常可读
  • 加密 Token:内容不可读,同时也要保证完整性

JWT 默认常见用法是签名,不是加密。
OWASP JWT Cheat Sheet 也提醒过,JWT 内容是 Base64URL 编码,不能把“看起来像乱码”当成保密。

Token 金库

用 PyJWT 做签名 Token

import jwt


JWT_SECRET = "replace-with-a-strong-secret"
JWT_ALG = "HS256"


def issue_access_token(user_id: str, role: str):
    payload = {
        "sub": user_id,
        "role": role,
        "scope": ["profile:read", "order:read"],
    }
    return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALG)


def verify_access_token(token: str):
    return jwt.decode(
        token,
        JWT_SECRET,
        algorithms=[JWT_ALG],
        options={"require": ["sub"]},
    )

重点:

  • 解码时显式限制 algorithms
  • 不要从 token header 里读算法后直接信任
  • 不要把密码、身份证、手机号等敏感信息放进 payload
  • token 失效策略要由服务端统一设计

PyJWT 文档里的 decode() 示例也强调传入 algorithms
这能减少算法混淆类问题。

用 Fernet 做加密 Token

如果你的确需要把一小段声明放到客户端,并且不希望客户端读取,可以用 cryptography.fernet 这类认证加密方案。

import json
from cryptography.fernet import Fernet, InvalidToken


FERNET_KEY = Fernet.generate_key()
fernet = Fernet(FERNET_KEY)


def encrypt_claims(claims: dict) -> str:
    data = json.dumps(claims, ensure_ascii=False).encode("utf-8")
    return fernet.encrypt(data).decode("utf-8")


def decrypt_claims(token: str) -> dict | None:
    try:
        raw = fernet.decrypt(token.encode("utf-8"))
        return json.loads(raw)
    except (InvalidToken, json.JSONDecodeError):
        return None

注意几点:

  • 密钥不要写死在代码仓库里
  • 密钥轮换要有方案
  • 加密 Token 也不要无限期有效
  • 客户端保存加密 Token,不代表服务端可以跳过权限校验

如果只是普通登录态,优先考虑服务端 session。
Token 更适合 API 调用、微服务间传递声明、短链路授权等场景。

IP 检测:它是信号,不是身份

IP 地址能提供风险线索,但不能直接当用户身份。
移动网络、公司出口、代理、NAT、云服务都会让 IP 和用户之间的关系变得复杂。

Python 标准库 ipaddress 很适合做基础判断:

import ipaddress


def parse_ip(value: str):
    try:
        return ipaddress.ip_address(value)
    except ValueError:
        return None


def classify_ip(value: str):
    ip = parse_ip(value)
    if ip is None:
        return {"valid": False}

    return {
        "valid": True,
        "version": ip.version,
        "is_private": ip.is_private,
        "is_loopback": ip.is_loopback,
        "is_multicast": ip.is_multicast,
        "is_reserved": ip.is_reserved,
    }

代理后面的真实 IP

如果服务部署在 Nginx、负载均衡或 CDN 后面,客户端 IP 可能来自转发头。
这里最重要的原则是:只信任你控制的代理层写入的头。

from flask import request


TRUSTED_PROXY_HEADERS = ["CF-Connecting-IP", "X-Real-IP"]


def get_client_ip():
    for header in TRUSTED_PROXY_HEADERS:
        value = request.headers.get(header)
        if value:
            ip = parse_ip(value.strip())
            if ip:
                return str(ip)

    return request.remote_addr

不要让公网客户端随便传一个 X-Forwarded-For,服务端就照单全收。
这类头只有在可信代理清洗之后才有意义。

IP 风险评分

可以把 IP 检测写成评分,而不是一刀切:

def score_ip_risk(ip_info: dict, request_count: int):
    score = 0

    if not ip_info.get("valid"):
        score += 40

    if ip_info.get("is_private") or ip_info.get("is_loopback"):
        score += 10

    if request_count > 100:
        score += 30

    if request_count > 300:
        score += 40

    return min(score, 100)

风控里更推荐“分层动作”:

  • 低风险:正常放行
  • 中风险:要求额外验证
  • 高风险:限流、排队、冻结敏感动作
  • 极高风险:拒绝请求并记录审计

指纹检测:可以辅助风控,但要克制

浏览器指纹通常会组合 User-Agent、Accept-Language、屏幕信息、时区、Canvas、字体等信号。
但指纹检测天然涉及隐私,必须明确用途、减少采集、避免过度追踪。

在后端防护里,建议只做轻量指纹:

  • User-Agent
  • Accept-Language
  • Accept-Encoding
  • 是否携带 Cookie
  • 客户端声明的平台类型
  • 与 session 绑定的稳定随机设备标识

不要为了“更准”就无限采集浏览器细节。
风险识别不是越重越好,合规和用户体验也要算进去。

IP 与指纹风险台

生成轻量指纹哈希

import hashlib
from flask import request


def build_light_fingerprint():
    parts = [
        request.headers.get("User-Agent", ""),
        request.headers.get("Accept-Language", ""),
        request.headers.get("Accept-Encoding", ""),
    ]
    raw = "|".join(parts)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()

这个哈希不能当成强身份,只能当风险信号。
比如同一个账号突然从完全不同的 IP、不同指纹、异常频率访问,就可以提高风险等级。

指纹漂移检测

def compare_fingerprint(old_fp: str | None, new_fp: str):
    if not old_fp:
        return {"changed": False, "risk": 0}

    if old_fp == new_fp:
        return {"changed": False, "risk": 0}

    return {"changed": True, "risk": 25}

建议把指纹变化当成“加分项”,而不是直接封禁依据。
浏览器升级、插件变化、系统设置变化都可能导致指纹改变。

DDoS:Python 应用只能做一层防线

DDoS 防护不能只靠 Flask 或 FastAPI。
真正的 DDoS 防护通常要从更外层开始:

  • DNS / CDN / Anycast
  • WAF / 边缘限流
  • 反向代理限流
  • 应用层限流
  • 缓存和降级
  • 任务排队
  • 监控和告警

Python 应用能做的是应用层防护,比如限制某个 IP、某个账号、某个接口的请求量,降低后端被打穿的概率。
它不是流量清洗中心。

DDoS 防护墙

简单滑动窗口限流

下面是一个内存版示例,适合理解思路。
真实项目建议换 Redis 或网关限流。

from collections import defaultdict, deque
from time import monotonic


class SlidingLimiter:
    def __init__(self, limit: int, span: float):
        self.limit = limit
        self.span = span
        self.records = defaultdict(deque)

    def allow(self, key: str):
        now = monotonic()
        bucket = self.records[key]

        while bucket and now - bucket[0] > self.span:
            bucket.popleft()

        if len(bucket) >= self.limit:
            return False

        bucket.append(now)
        return True

Flask 里使用:

from flask import Flask, jsonify, request


app = Flask(__name__)
limiter = SlidingLimiter(limit=120, span=60.0)


@app.before_request
def rate_limit():
    client_ip = get_client_ip() or "unknown"
    key = f"{client_ip}:{request.path}"

    if not limiter.allow(key):
        return jsonify({"error": "too many requests"}), 429

这段代码只适合单进程演示。
如果部署多进程、多机器,必须把计数状态放到共享存储里,或者交给网关层处理。

Redis 版限流思路

import redis


r = redis.Redis.from_url("redis://localhost:6379/0", decode_responses=True)


def redis_limit(key: str, limit: int, span: int):
    current = r.incr(key)
    if current == 1:
        r.expire(key, span)
    return current <= limit

应用层还可以按业务做更细的保护:

  • 登录接口更严格
  • 发送验证码更严格
  • 搜索接口缓存结果
  • 大文件接口加队列
  • 生成类任务需要账号和配额

把这些能力串成一个 Flask 中间件

下面把 Cookie、Token、IP、指纹和限流串成一个简化版中间件。
它不是完整生产代码,但能展示架构思路。

Python 防护中间件

from flask import Flask, g, jsonify, request


app = Flask(__name__)
limiter = SlidingLimiter(limit=120, span=60.0)


@app.before_request
def security_gate():
    client_ip = get_client_ip() or "unknown"
    ip_info = classify_ip(client_ip)
    fingerprint = build_light_fingerprint()

    rate_key = f"{client_ip}:{request.path}"
    if not limiter.allow(rate_key):
        return jsonify({"error": "rate limited"}), 429

    session_id = request.cookies.get("sid")
    session = get_session(session_id)

    request_count = len(limiter.records[rate_key])
    ip_risk = score_ip_risk(ip_info, request_count)

    fp_risk = 0
    if session:
        fp_risk = compare_fingerprint(
            session.get("fingerprint"),
            fingerprint,
        )["risk"]

    risk_score = min(ip_risk + fp_risk, 100)

    g.security = {
        "client_ip": client_ip,
        "fingerprint": fingerprint,
        "risk_score": risk_score,
        "session": session,
    }

    if risk_score >= 80:
        return jsonify({"error": "high risk request"}), 403

业务接口可以读取 g.security

@app.get("/api/profile")
def profile():
    session = g.security.get("session")
    if not session:
        return jsonify({"error": "unauthorized"}), 401

    return jsonify(
        {
            "user_id": session["user_id"],
            "risk_score": g.security["risk_score"],
        }
    )

这套模型的重点是“分数化”和“分层处理”。
不要因为一个信号异常就直接定性,也不要因为一个信号正常就完全放行。

日志和审计:风控必须能回看

安全防护如果没有日志,很快就会变成黑盒。
建议记录结构化日志:

import json
import logging


logger = logging.getLogger("security")


def log_security_event(event: str, payload: dict):
    logger.info(
        json.dumps(
            {
                "event": event,
                "client_ip": payload.get("client_ip"),
                "path": request.path,
                "risk_score": payload.get("risk_score"),
                "fingerprint": payload.get("fingerprint", "")[:12],
            },
            ensure_ascii=False,
        )
    )

日志里不要放明文 Token、完整 Cookie、完整指纹、密码、身份证、手机号等敏感信息。
能脱敏就脱敏,能哈希就哈希,能少记就少记。