Mongo一些神奇的玩法




2022-03-29

blog_main_img

上一篇讲的是 MongoDB 和 PyMongo 的日常主线:文档模型、CRUD、普通索引、聚合、事务、批量写入、分页和连接管理。这篇换一条更偏工程深水区的路线:Change Streams、Schema Validation、GridFS、读写关注、读偏好、命令监听、Raw BSON、Decimal128 和一些线上排查习惯。

这些能力不一定每个项目都用,但一旦你的 MongoDB 开始承接事件同步、文件存储、跨服务读写和高并发访问,就会很有价值。

Change Streams:不用轮询也能拿到变化

Change Streams 可以让应用订阅集合、数据库或整个部署里的数据变化。你不需要写循环查询“有没有新数据”,而是打开一个 stream,等 MongoDB 推送变化事件。

典型用途:

  • 订单状态变化后推送消息
  • 用户资料变更后刷新缓存
  • 数据同步到搜索引擎
  • 审计日志落地
  • 轻量 CDC 链路

MongoDB Change Streams

最小例子:

from pymongo import MongoClient


client = MongoClient("mongodb://127.0.0.1:27017")
orders = client["shop"]["orders"]

with orders.watch() as stream:
    for change in stream:
        print(change["operationType"], change["documentKey"])

operationType 常见值有 insertupdatereplacedeletedocumentKey 里通常会有 _id

Change Streams 需要副本集或分片集群环境。单机开发环境如果没启副本集,可能跑不起来。

只监听关心的变化

watch() 可以接 aggregation pipeline。比如只关心订单支付成功:

pipeline = [
    {
        "$match": {
            "operationType": {"$in": ["insert", "update", "replace"]},
            "fullDocument.status": "paid",
        }
    }
]

with orders.watch(
    pipeline=pipeline,
    full_document="updateLookup",
) as stream:
    for change in stream:
        order = change.get("fullDocument")
        print("paid order:", order["_id"])

full_document="updateLookup" 会在 update 事件中补出更新后的完整文档。这个能力很方便,但也意味着额外读取成本。只需要 delta 时,就别总是要完整文档。

resume token:断点续跑的关键

每个 change event 都有 _id,它就是 resume token。消费者处理完事件后,可以把 token 存起来。

import json
from pathlib import Path


TOKEN_FILE = Path("resume_token.json")


def load_token():
    if not TOKEN_FILE.exists():
        return None
    return json.loads(TOKEN_FILE.read_text(encoding="utf-8"))


def save_token(token) -> None:
    TOKEN_FILE.write_text(
        json.dumps(token),
        encoding="utf-8",
    )


resume_token = load_token()
watch_options = {"full_document": "updateLookup"}

if resume_token is not None:
    watch_options["resume_after"] = resume_token

with orders.watch(**watch_options) as stream:
    for change in stream:
        print(change["operationType"], change["documentKey"])
        save_token(change["_id"])

注意处理顺序:业务处理成功后再保存 token。否则应用重启后可能跳过还没处理完的事件。

Change Streams 的消费建议

消费端最好是幂等的。

比如同步到搜索引擎时,不要假设每条事件只处理一次。可以用文档 _id 做幂等键,重复处理也只覆盖同一份目标数据。

另外,Change Streams 不是消息队列的完全替代品。它很适合订阅 MongoDB 变化,但如果你需要复杂重试、死信、广播、多消费者组,通常还是会接入专门的消息系统。

JSON Schema Validation:让 MongoDB 帮你挡脏写入

MongoDB 的文档模型很灵活,但灵活不等于没边界。服务端 schema validation 可以拦住明显不合法的数据。

MongoDB Schema Validation GridFS

用 PyMongo 创建带校验的集合:

from pymongo import MongoClient


client = MongoClient("mongodb://127.0.0.1:27017")
db = client["shop"]

validator = {
    "$jsonSchema": {
        "bsonType": "object",
        "required": ["_id", "buyer_id", "status", "amount"],
        "properties": {
            "_id": {"bsonType": "string"},
            "buyer_id": {"bsonType": "string"},
            "status": {
                "enum": ["created", "paid", "shipped", "canceled"],
            },
            "amount": {"bsonType": ["double", "decimal"]},
            "items": {
                "bsonType": "array",
                "items": {
                    "bsonType": "object",
                    "required": ["sku", "count"],
                    "properties": {
                        "sku": {"bsonType": "string"},
                        "count": {"bsonType": "int", "minimum": 1},
                    },
                },
            },
        },
    }
}

db.create_collection(
    "orders_checked",
    validator=validator,
    validationAction="error",
)

如果集合已经存在,可以用 collMod 修改:

db.command({
    "collMod": "orders_checked",
    "validator": validator,
    "validationAction": "error",
})

validationAction="error" 会拒绝不合法写入。也可以用警告模式做灰度,但核心链路里更常用强拦截。

应用层校验和数据库校验都要有

服务端 schema validation 适合兜底,但不要把所有业务校验都丢给 MongoDB。

推荐分层:

API 层:校验请求格式和业务规则
应用层:构造清晰的数据模型
MongoDB:兜底字段类型和关键约束

比如“订单金额必须等于商品明细合计”,这种业务规则更适合在应用层做;“status 必须在枚举范围内”,很适合数据库层兜底。

GridFS:存大文件,但别把它当文件服务器万能解

GridFS 会把大文件拆成 chunks 存入 MongoDB,并在 files 集合里保存文件元信息。PyMongo 提供 GridFSBucket 操作它。

上传文件:

from pathlib import Path

from gridfs import GridFSBucket
from pymongo import MongoClient


client = MongoClient("mongodb://127.0.0.1:27017")
db = client["shop"]
bucket = GridFSBucket(db, bucket_name="media")

source = Path("invoice.pdf")

with source.open("rb") as file:
    file_id = bucket.upload_from_stream(
        source.name,
        file,
        metadata={
            "owner": "finance",
            "content_type": "application/pdf",
        },
    )

print(file_id)

下载文件:

target = Path("invoice_downloaded.pdf")

with target.open("wb") as file:
    bucket.download_to_stream_by_name("invoice.pdf", file)

GridFS 适合需要和业务数据放在同一个 MongoDB 体系里管理的文件。静态资源高并发分发、图片 CDN、视频点播这类场景,通常还是对象存储更合适。

读写关注:把一致性边界写出来

PyMongo 可以在 client、database、collection 层设置 read concern、write concern 和 read preference。

写入要求多数节点确认:

from pymongo import MongoClient
from pymongo.write_concern import WriteConcern


client = MongoClient("mongodb://127.0.0.1:27017")
db = client.get_database(
    "shop",
    write_concern=WriteConcern("majority", j=True),
)
orders = db["orders"]

orders.insert_one({"_id": "order_majority_1", "status": "paid"})

读取从副本节点优先:

from pymongo import MongoClient, ReadPreference
from pymongo.read_concern import ReadConcern


client = MongoClient("mongodb://127.0.0.1:27017")
analytics_db = client.get_database(
    "shop",
    read_preference=ReadPreference.SECONDARY_PREFERRED,
    read_concern=ReadConcern("majority"),
)

docs = list(analytics_db["orders"].find({"status": "paid"}).limit(10))
print(len(docs))

这类配置不要盲目套。核心写链路更看重确认强度,分析查询可能更看重隔离主节点压力。读写边界要和业务目标对应。

读偏好不是负载均衡万能药

SECONDARY_PREFERRED 可以把读流量更多引向副本节点,但它不保证你读到刚写入的数据。

常见策略:

核心读写:primary
后台分析:secondaryPreferred
可容忍旧数据的报表:secondary
跨地域读取:nearest + tag sets

如果刚写完马上读,而且必须读到自己的写入,就不要随手把读偏好切到 secondary。

命令监听:让 PyMongo 查询自己暴露出来

PyMongo 的 monitoring 模块可以监听命令执行。它很适合做慢查询日志、命令计数、异常统计。

PyMongo observability consistency

import logging

from pymongo import MongoClient, monitoring


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mongo.commands")


class CommandLogger(monitoring.CommandListener):
    def started(self, event):
        logger.info("mongo started %s %s", event.command_name, event.request_id)

    def succeeded(self, event):
        logger.info(
            "mongo ok %s request=%s duration_us=%s",
            event.command_name,
            event.request_id,
            event.duration_micros,
        )

    def failed(self, event):
        logger.warning(
            "mongo failed %s request=%s failure=%s",
            event.command_name,
            event.request_id,
            event.failure,
        )


monitoring.register(CommandLogger())

client = MongoClient("mongodb://127.0.0.1:27017")
client["shop"]["orders"].find_one({"status": "paid"})

生产环境不要把完整 command 全打出来,里面可能包含敏感字段。记录命令名、集合、耗时、错误就够排查很多问题。

连接池参数:别只用默认值硬扛

MongoClient 内部维护连接池。高并发服务里,连接池参数会影响排队和抖动。

from pymongo import MongoClient


client = MongoClient(
    "mongodb://127.0.0.1:27017",
    maxPoolSize=200,
    minPoolSize=10,
    retryWrites=True,
    appname="shop-api",
)

几个参数的直观含义:

maxPoolSize:单个服务实例最多保留多少连接
minPoolSize:预热保留的连接数量
retryWrites:允许驱动重试部分写入
appname:在服务端日志和监控里标识应用

连接池不是越大越好。应用实例很多时,连接总量会被放大。调大之前,先估算服务实例数、MongoDB 节点承载能力和查询耗时。

Raw BSON:少反序列化一点

如果你做的是转发、归档、同步,不一定需要把 BSON 全部转成 Python dict。PyMongo 支持 RawBSONDocument

from bson.raw_bson import RawBSONDocument
from pymongo import MongoClient
from pymongo.codec_options import CodecOptions


client = MongoClient("mongodb://127.0.0.1:27017")
db = client.get_database(
    "shop",
    codec_options=CodecOptions(document_class=RawBSONDocument),
)

raw_doc = db["orders"].find_one({"status": "paid"})

if raw_doc is not None:
    print(type(raw_doc))
    print(len(raw_doc.raw))

raw_doc.raw 是原始 BSON bytes。这个能力适合低层同步组件,不适合普通业务代码。业务代码还是 dict 更清晰。

Decimal128:金额别随手用 float

金额、精确小数,建议用 Decimal128。

from decimal import Decimal

from bson.decimal128 import Decimal128
from pymongo import MongoClient


client = MongoClient("mongodb://127.0.0.1:27017")
orders = client["shop"]["orders_money"]

orders.insert_one({
    "_id": "money_1",
    "amount": Decimal128(Decimal("199.90")),
})

doc = orders.find_one({"_id": "money_1"})
amount = doc["amount"].to_decimal()

print(amount, type(amount))

浮点数适合测量值,不适合精确金额。金额字段用 Decimal128,应用层用 Decimal,会少很多精度问题。

Retryable Writes:能重试不代表业务幂等

MongoDB 驱动支持 retryable writes,但这只解决一部分网络或节点切换导致的写入不确定问题。

业务层仍然要考虑幂等。

比如创建订单,最好用业务唯一键作为 _id

orders.insert_one({
    "_id": "order_from_request_abc",
    "buyer_id": "user_88",
    "status": "created",
})

这样请求重试时,如果订单已经创建,重复写入会触发重复键错误,业务可以把它转换成“已经处理过”。

不要把“驱动能重试”理解成“业务不会重复执行”。这两个层次不是一回事。

Sharding 场景:查询必须尊重 shard key

如果集合被分片,查询条件里是否带 shard key 会影响路由效率。

比如按租户分片:

query = {
    "tenant_id": "tenant_001",
    "order_id": "order_10001",
}

doc = orders.find_one(query)

比不带租户条件的查询更容易精准路由。

建模时要提前想:

高频查询是否带 shard key
写入是否会集中到少数分片
范围查询是否会打散太多分片
热点租户是否需要额外设计

分片不是让查询自动变快,而是让数据和压力能横向拆开。查询不带关键路由条件,照样可能很重。

一套高阶落地姿势

如果是一个订单系统,可以这样分层使用:

orders:核心订单文档,带 JSON Schema 兜底
order_events:写入事件,Change Streams 推给下游
order_files:GridFS 存附件和凭证
analytics read:副本节点承担可延迟查询
CommandListener:记录慢命令和异常
Decimal128:金额字段保持精确
业务唯一键:配合 retryable writes 做幂等