2022-03-29
上一篇讲的是 MongoDB 和 PyMongo 的日常主线:文档模型、CRUD、普通索引、聚合、事务、批量写入、分页和连接管理。这篇换一条更偏工程深水区的路线:Change Streams、Schema Validation、GridFS、读写关注、读偏好、命令监听、Raw BSON、Decimal128 和一些线上排查习惯。
这些能力不一定每个项目都用,但一旦你的 MongoDB 开始承接事件同步、文件存储、跨服务读写和高并发访问,就会很有价值。
Change Streams 可以让应用订阅集合、数据库或整个部署里的数据变化。你不需要写循环查询“有没有新数据”,而是打开一个 stream,等 MongoDB 推送变化事件。
典型用途:
最小例子:
from pymongo import MongoClient
client = MongoClient("mongodb://127.0.0.1:27017")
orders = client["shop"]["orders"]
with orders.watch() as stream:
for change in stream:
print(change["operationType"], change["documentKey"])
operationType 常见值有 insert、update、replace、delete。documentKey 里通常会有 _id。
Change Streams 需要副本集或分片集群环境。单机开发环境如果没启副本集,可能跑不起来。
watch() 可以接 aggregation pipeline。比如只关心订单支付成功:
pipeline = [
{
"$match": {
"operationType": {"$in": ["insert", "update", "replace"]},
"fullDocument.status": "paid",
}
}
]
with orders.watch(
pipeline=pipeline,
full_document="updateLookup",
) as stream:
for change in stream:
order = change.get("fullDocument")
print("paid order:", order["_id"])
full_document="updateLookup" 会在 update 事件中补出更新后的完整文档。这个能力很方便,但也意味着额外读取成本。只需要 delta 时,就别总是要完整文档。
每个 change event 都有 _id,它就是 resume token。消费者处理完事件后,可以把 token 存起来。
import json
from pathlib import Path
TOKEN_FILE = Path("resume_token.json")
def load_token():
if not TOKEN_FILE.exists():
return None
return json.loads(TOKEN_FILE.read_text(encoding="utf-8"))
def save_token(token) -> None:
TOKEN_FILE.write_text(
json.dumps(token),
encoding="utf-8",
)
resume_token = load_token()
watch_options = {"full_document": "updateLookup"}
if resume_token is not None:
watch_options["resume_after"] = resume_token
with orders.watch(**watch_options) as stream:
for change in stream:
print(change["operationType"], change["documentKey"])
save_token(change["_id"])
注意处理顺序:业务处理成功后再保存 token。否则应用重启后可能跳过还没处理完的事件。
消费端最好是幂等的。
比如同步到搜索引擎时,不要假设每条事件只处理一次。可以用文档 _id 做幂等键,重复处理也只覆盖同一份目标数据。
另外,Change Streams 不是消息队列的完全替代品。它很适合订阅 MongoDB 变化,但如果你需要复杂重试、死信、广播、多消费者组,通常还是会接入专门的消息系统。
MongoDB 的文档模型很灵活,但灵活不等于没边界。服务端 schema validation 可以拦住明显不合法的数据。
用 PyMongo 创建带校验的集合:
from pymongo import MongoClient
client = MongoClient("mongodb://127.0.0.1:27017")
db = client["shop"]
validator = {
"$jsonSchema": {
"bsonType": "object",
"required": ["_id", "buyer_id", "status", "amount"],
"properties": {
"_id": {"bsonType": "string"},
"buyer_id": {"bsonType": "string"},
"status": {
"enum": ["created", "paid", "shipped", "canceled"],
},
"amount": {"bsonType": ["double", "decimal"]},
"items": {
"bsonType": "array",
"items": {
"bsonType": "object",
"required": ["sku", "count"],
"properties": {
"sku": {"bsonType": "string"},
"count": {"bsonType": "int", "minimum": 1},
},
},
},
},
}
}
db.create_collection(
"orders_checked",
validator=validator,
validationAction="error",
)
如果集合已经存在,可以用 collMod 修改:
db.command({
"collMod": "orders_checked",
"validator": validator,
"validationAction": "error",
})
validationAction="error" 会拒绝不合法写入。也可以用警告模式做灰度,但核心链路里更常用强拦截。
服务端 schema validation 适合兜底,但不要把所有业务校验都丢给 MongoDB。
推荐分层:
API 层:校验请求格式和业务规则
应用层:构造清晰的数据模型
MongoDB:兜底字段类型和关键约束
比如“订单金额必须等于商品明细合计”,这种业务规则更适合在应用层做;“status 必须在枚举范围内”,很适合数据库层兜底。
GridFS 会把大文件拆成 chunks 存入 MongoDB,并在 files 集合里保存文件元信息。PyMongo 提供 GridFSBucket 操作它。
上传文件:
from pathlib import Path
from gridfs import GridFSBucket
from pymongo import MongoClient
client = MongoClient("mongodb://127.0.0.1:27017")
db = client["shop"]
bucket = GridFSBucket(db, bucket_name="media")
source = Path("invoice.pdf")
with source.open("rb") as file:
file_id = bucket.upload_from_stream(
source.name,
file,
metadata={
"owner": "finance",
"content_type": "application/pdf",
},
)
print(file_id)
下载文件:
target = Path("invoice_downloaded.pdf")
with target.open("wb") as file:
bucket.download_to_stream_by_name("invoice.pdf", file)
GridFS 适合需要和业务数据放在同一个 MongoDB 体系里管理的文件。静态资源高并发分发、图片 CDN、视频点播这类场景,通常还是对象存储更合适。
PyMongo 可以在 client、database、collection 层设置 read concern、write concern 和 read preference。
写入要求多数节点确认:
from pymongo import MongoClient
from pymongo.write_concern import WriteConcern
client = MongoClient("mongodb://127.0.0.1:27017")
db = client.get_database(
"shop",
write_concern=WriteConcern("majority", j=True),
)
orders = db["orders"]
orders.insert_one({"_id": "order_majority_1", "status": "paid"})
读取从副本节点优先:
from pymongo import MongoClient, ReadPreference
from pymongo.read_concern import ReadConcern
client = MongoClient("mongodb://127.0.0.1:27017")
analytics_db = client.get_database(
"shop",
read_preference=ReadPreference.SECONDARY_PREFERRED,
read_concern=ReadConcern("majority"),
)
docs = list(analytics_db["orders"].find({"status": "paid"}).limit(10))
print(len(docs))
这类配置不要盲目套。核心写链路更看重确认强度,分析查询可能更看重隔离主节点压力。读写边界要和业务目标对应。
SECONDARY_PREFERRED 可以把读流量更多引向副本节点,但它不保证你读到刚写入的数据。
常见策略:
核心读写:primary
后台分析:secondaryPreferred
可容忍旧数据的报表:secondary
跨地域读取:nearest + tag sets
如果刚写完马上读,而且必须读到自己的写入,就不要随手把读偏好切到 secondary。
PyMongo 的 monitoring 模块可以监听命令执行。它很适合做慢查询日志、命令计数、异常统计。
import logging
from pymongo import MongoClient, monitoring
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mongo.commands")
class CommandLogger(monitoring.CommandListener):
def started(self, event):
logger.info("mongo started %s %s", event.command_name, event.request_id)
def succeeded(self, event):
logger.info(
"mongo ok %s request=%s duration_us=%s",
event.command_name,
event.request_id,
event.duration_micros,
)
def failed(self, event):
logger.warning(
"mongo failed %s request=%s failure=%s",
event.command_name,
event.request_id,
event.failure,
)
monitoring.register(CommandLogger())
client = MongoClient("mongodb://127.0.0.1:27017")
client["shop"]["orders"].find_one({"status": "paid"})
生产环境不要把完整 command 全打出来,里面可能包含敏感字段。记录命令名、集合、耗时、错误就够排查很多问题。
MongoClient 内部维护连接池。高并发服务里,连接池参数会影响排队和抖动。
from pymongo import MongoClient
client = MongoClient(
"mongodb://127.0.0.1:27017",
maxPoolSize=200,
minPoolSize=10,
retryWrites=True,
appname="shop-api",
)
几个参数的直观含义:
maxPoolSize:单个服务实例最多保留多少连接
minPoolSize:预热保留的连接数量
retryWrites:允许驱动重试部分写入
appname:在服务端日志和监控里标识应用
连接池不是越大越好。应用实例很多时,连接总量会被放大。调大之前,先估算服务实例数、MongoDB 节点承载能力和查询耗时。
如果你做的是转发、归档、同步,不一定需要把 BSON 全部转成 Python dict。PyMongo 支持 RawBSONDocument。
from bson.raw_bson import RawBSONDocument
from pymongo import MongoClient
from pymongo.codec_options import CodecOptions
client = MongoClient("mongodb://127.0.0.1:27017")
db = client.get_database(
"shop",
codec_options=CodecOptions(document_class=RawBSONDocument),
)
raw_doc = db["orders"].find_one({"status": "paid"})
if raw_doc is not None:
print(type(raw_doc))
print(len(raw_doc.raw))
raw_doc.raw 是原始 BSON bytes。这个能力适合低层同步组件,不适合普通业务代码。业务代码还是 dict 更清晰。
金额、精确小数,建议用 Decimal128。
from decimal import Decimal
from bson.decimal128 import Decimal128
from pymongo import MongoClient
client = MongoClient("mongodb://127.0.0.1:27017")
orders = client["shop"]["orders_money"]
orders.insert_one({
"_id": "money_1",
"amount": Decimal128(Decimal("199.90")),
})
doc = orders.find_one({"_id": "money_1"})
amount = doc["amount"].to_decimal()
print(amount, type(amount))
浮点数适合测量值,不适合精确金额。金额字段用 Decimal128,应用层用 Decimal,会少很多精度问题。
MongoDB 驱动支持 retryable writes,但这只解决一部分网络或节点切换导致的写入不确定问题。
业务层仍然要考虑幂等。
比如创建订单,最好用业务唯一键作为 _id:
orders.insert_one({
"_id": "order_from_request_abc",
"buyer_id": "user_88",
"status": "created",
})
这样请求重试时,如果订单已经创建,重复写入会触发重复键错误,业务可以把它转换成“已经处理过”。
不要把“驱动能重试”理解成“业务不会重复执行”。这两个层次不是一回事。
如果集合被分片,查询条件里是否带 shard key 会影响路由效率。
比如按租户分片:
query = {
"tenant_id": "tenant_001",
"order_id": "order_10001",
}
doc = orders.find_one(query)
比不带租户条件的查询更容易精准路由。
建模时要提前想:
高频查询是否带 shard key
写入是否会集中到少数分片
范围查询是否会打散太多分片
热点租户是否需要额外设计
分片不是让查询自动变快,而是让数据和压力能横向拆开。查询不带关键路由条件,照样可能很重。
如果是一个订单系统,可以这样分层使用:
orders:核心订单文档,带 JSON Schema 兜底
order_events:写入事件,Change Streams 推给下游
order_files:GridFS 存附件和凭证
analytics read:副本节点承担可延迟查询
CommandListener:记录慢命令和异常
Decimal128:金额字段保持精确
业务唯一键:配合 retryable writes 做幂等