2022-02-03
使用docker安装MongoDB,以及一些简单的使用pymongo去连接和操作mongodb的实战代码
PyMongo 是 MongoDB 官方 Python 驱动。用它操作 MongoDB 很像操作 Python 字典:插入 dict,查询 dict,更新也用 dict 表达。
</pre>
# 1. 创建目录
sudo mkdir -p /data/mongo/{db,configdb,config,log}
# Mongo 官方镜像内 mongodb 用户常见 UID/GID 为 999
sudo chown -R 999:999 /data/mongo/db /data/mongo/configdb /data/mongo/log
sudo chmod -R 755 /data/mongo
# 2. 写入 MongoDB 配置
sudo tee /data/mongo/config/mongod.conf > /dev/null <<'EOF'
storage:
dbPath: /data/db
systemLog:
destination: file
path: /var/log/mongodb/mongod.log
logAppend: true
logRotate: reopen
net:
bindIp: 0.0.0.0
port: 27017
security:
authorization: enabled
EOF
# 3. 拉取固定大版本镜像,不建议用 latest
sudo docker pull mongo:8.0
# 4. 如果已存在同名容器,先删除容器,不删除数据
sudo docker rm -f mongodb 2>/dev/null || true
# 5. 启动 MongoDB
sudo docker run -d \
--name mongodb \
--restart unless-stopped \
-p 19941:27017 \
-e MONGO_INITDB_ROOT_USERNAME=root \
-e MONGO_INITDB_ROOT_PASSWORD=321 \
-v /data/mongo/db:/data/db \
-v /data/mongo/configdb:/data/configdb \
-v /data/mongo/config/mongod.conf:/etc/mongod.conf:ro \
-v /data/mongo/log:/var/log/mongodb \
--log-driver=json-file \
--log-opt max-size=10m \
--log-opt max-file=3 \
--ulimit nofile=64000:64000 \
--health-cmd='mongosh --quiet --username root --password 321 --authenticationDatabase admin --eval "db.adminCommand({ ping: 1 }).ok" | grep 1' \
--health-interval=30s \
--health-timeout=10s \
--health-retries=3 \
mongo:8.0 \
--config /etc/mongod.conf
# Ubuntu / Debian
sudo apt-get update
sudo apt-get install -y logrotate
# CentOS / Alibaba Cloud Linux / Rocky / AlmaLinux
sudo yum install -y logrotate || sudo dnf install -y logrotate
sudo tee /etc/logrotate.d/mongodb-docker > /dev/null <<'EOF'
/data/mongo/log/mongod.log {
daily
size 100M
rotate 14
missingok
notifempty
compress
delaycompress
create 0644 999 999
sharedscripts
postrotate
docker exec mongodb mongosh --quiet --username root --password 321 --authenticationDatabase admin --eval 'db.adminCommand({ logRotate: 1 })' >/dev/null 2>&1 || true
endscript
}
EOF
## 文档模型:灵活不是随便  一个订单文档可以长这样:它很适合表达“一个业务对象带一堆自然归属的子结构”。 但不要把“灵活”理解成“字段随便塞”。字段命名、嵌套深度、数组规模、索引设计,都会直接影响查询和维护成本。 ## 嵌入还是引用 MongoDB 建模里最常见的选择是:把子对象嵌进来,还是只保存引用 id。 适合嵌入: - 子对象总是跟主对象一起读取 - 子对象生命周期依附主对象 - 子对象规模可控 - 更新频率不高 适合引用: - 子对象会被多个文档复用 - 子对象规模很大 - 子对象需要单独查询 - 更新频率高,不想频繁改大文档 比如订单里的商品快照适合嵌入,因为订单生成后,商品名和成交价应该保持当时的样子。用户信息则更适合用 `buyer_id` 引用,不要把整份用户资料复制进每笔订单。 ## 连接 MongoDB 先安装:{ "_id": "order_10001", "buyer_id": "user_88", "status": "paid", "items": [ { "sku": "keyboard", "count": 1, "price": 199.0 } ], "address": { "province": "Guangdong", "city": "Guangzhou" }, "tags": ["app", "vip"] }连接本地 MongoDB:pip install pymongo连接 Atlas 或带账号密码的集群时,通常用 URI:from pymongo import MongoClient client = MongoClient("mongodb://127.0.0.1:27017") db = client["shop"] orders = db["orders"] client.admin.command("ping") print("connected")生产代码里不要把账号密码硬编码在仓库里。用环境变量、密钥系统或配置中心。 ## 插入文档 插入一条:from pymongo import MongoClient from pymongo.server_api import ServerApi uri = "mongodb+srv://user:[email protected]/?retryWrites=true&w=majority" client = MongoClient( uri, server_api=ServerApi("1"), appname="shop-data-service", ) client.admin.command("ping")批量插入:from pymongo import MongoClient client = MongoClient("mongodb://127.0.0.1:27017") orders = client["shop"]["orders"] result = orders.insert_one({ "_id": "order_10001", "buyer_id": "user_88", "status": "created", "amount": 199.0, "items": [ {"sku": "keyboard", "count": 1, "price": 199.0} ], }) print(result.inserted_id)`ordered=False` 表示遇到单条失败时,不一定停止后续写入。批量导入时很实用,但要把异常处理写清楚。  ## 查询:filter、projection、sort、limit 查一条:docs = [ {"_id": "order_10002", "buyer_id": "user_90", "status": "paid", "amount": 99.0}, {"_id": "order_10003", "buyer_id": "user_91", "status": "paid", "amount": 299.0}, ] result = orders.insert_many(docs, ordered=False) print(result.inserted_ids)按条件查多条:order = orders.find_one({"_id": "order_10001"}) print(order)第二个参数是 projection,用来控制返回字段。大文档查询时,projection 很重要,别每次把整份文档拖回来。 常见查询操作符:cursor = orders.find( {"status": "paid", "amount": {"$gte": 100}}, {"buyer_id": 1, "amount": 1, "status": 1}, ).sort("amount", -1).limit(20) for item in cursor: print(item)嵌套字段用点号访问,数组字段可以直接匹配元素。 ## 更新:用操作符,不要整份替换 更新单条:filters = { "amount": {"$gte": 100, "$lt": 500}, "status": {"$in": ["paid", "shipped"]}, "tags": "vip", "address.city": "Guangzhou", }更新多条:result = orders.update_one( {"_id": "order_10001"}, { "$set": {"status": "paid"}, "$inc": {"amount": 10}, "$addToSet": {"tags": "checked"}, }, ) print(result.matched_count, result.modified_count)常见更新操作符:result = orders.update_many( {"status": "created"}, {"$set": {"status": "pending"}}, ) print(result.modified_count)不要为了改一个字段就把整份文档读出来、改完再 replace。并发写入下,这样更容易覆盖别人的改动。 ## upsert:没有就插入 很多同步任务会用 upsert。$set:设置字段 $unset:删除字段 $inc:数值增加 $push:数组追加 $addToSet:数组去重追加 $pull:从数组移除如果匹配到文档,就更新;匹配不到,就插入。 注意:upsert 的 filter 会参与生成新文档。filter 里别放奇怪的条件,最好用业务唯一键。 ## 删除:先确认范围 删除一条:result = orders.update_one( {"_id": "order_10004"}, { "$set": { "buyer_id": "user_92", "status": "paid", "amount": 188.0, } }, upsert=True, ) print(result.upserted_id)删除多条:result = orders.delete_one({"_id": "order_10004"}) print(result.deleted_count)生产环境里删除要非常克制。更常见的做法是软删除:result = orders.delete_many({"status": "canceled"}) print(result.deleted_count)真正物理删除前,建议先跑同条件 `count_documents()` 看看影响范围。 ## 索引:查询能不能快,很多时候看这里 MongoDB 没有合适索引时,查询就可能扫描大量文档。 给状态和金额建复合索引:orders.update_one( {"_id": "order_10001"}, {"$set": {"deleted": True}}, )唯一索引:from pymongo import ASCENDING, DESCENDING orders.create_index([ ("status", ASCENDING), ("amount", DESCENDING), ])嵌套字段索引:orders.create_index("order_no", unique=True)数组字段也可以建索引,但要注意数组索引可能放大索引项数量。 索引设计的小规则:orders.create_index("address.city")索引不是越多越好。每个索引都会增加写入成本和存储成本。 ## 看执行计划 PyMongo 可以用 `explain` 看查询是否走索引:等值过滤字段放前面 排序字段尽量接在过滤字段后面 低选择性字段不要单独滥建索引 写多读少的集合要控制索引数量如果看到集合扫描,就要检查过滤条件和索引是否匹配。 查询优化不要靠感觉,先看计划,再改索引。 ## 聚合管道:MongoDB 里的数据流水线 Aggregation pipeline 很适合做分组统计、字段加工、数组展开、跨集合关联。  按状态统计订单金额:plan = orders.find( {"status": "paid"}, {"_id": 1, "amount": 1}, ).explain() print(plan["queryPlanner"]["winningPlan"])展开数组:pipeline = [ {"$match": {"status": {"$in": ["paid", "shipped"]}}}, { "$group": { "_id": "$status", "total_amount": {"$sum": "$amount"}, "order_count": {"$sum": 1}, "avg_amount": {"$avg": "$amount"}, } }, {"$sort": {"total_amount": -1}}, ] for item in orders.aggregate(pipeline): print(item)管道顺序很重要。能过滤就先 `$match`,能裁剪字段就先 `$project`,别把一堆无关字段带到后面处理。 ## `$lookup`:能用,但别滥用 MongoDB 支持 `$lookup` 做集合关联:pipeline = [ {"$unwind": "$items"}, { "$group": { "_id": "$items.sku", "sold_count": {"$sum": "$items.count"}, "sales": { "$sum": {"$multiply": ["$items.count", "$items.price"]} }, } }, {"$sort": {"sales": -1}}, ]`$lookup` 很方便,但不要把 MongoDB 当成关系型数据库来写一堆复杂 join。高频查询场景里,适当冗余、嵌入快照、提前聚合,往往更符合 MongoDB 的使用方式。 ## 事务:跨文档一致性 单文档写入天然是原子的。如果一个业务动作要同时改多个文档,可以用事务。pipeline = [ {"$match": {"status": "paid"}}, { "$lookup": { "from": "users", "localField": "buyer_id", "foreignField": "_id", "as": "buyer", } }, {"$unwind": "$buyer"}, { "$project": { "amount": 1, "buyer_name": "$buyer.name", } }, ]事务不是让你忽略建模。能用单文档原子更新解决的,就不要升级成多文档事务。事务会增加复杂度,也会提高系统开销。 ## 批量写入:bulk_write 批量同步时,`bulk_write` 很好用。from pymongo import MongoClient client = MongoClient("mongodb://127.0.0.1:27017") db = client["shop"] def pay_order(session, order_id: str, buyer_id: str, amount: float): db.orders.update_one( {"_id": order_id, "status": "created"}, {"$set": {"status": "paid"}}, session=session, ) db.accounts.update_one( {"_id": buyer_id}, {"$inc": {"balance": -amount}}, session=session, ) with client.start_session() as session: session.with_transaction( lambda s: pay_order(s, "order_10001", "user_88", 199.0) )比起循环里一条条 `update_one`,批量写入更适合导入、同步、补数据。 ## 分页:别只会 skip 小数据量用 `skip` 没问题:from pymongo import UpdateOne operations = [ UpdateOne( {"_id": "order_10001"}, {"$set": {"status": "paid", "amount": 199.0}}, upsert=True, ), UpdateOne( {"_id": "order_10002"}, {"$set": {"status": "shipped", "amount": 99.0}}, upsert=True, ), ] result = orders.bulk_write(operations, ordered=False) print(result.upserted_count, result.modified_count)但偏深分页时,`skip` 成本会上来。更稳的方式是基于游标字段继续翻:cursor = orders.find({"status": "paid"}).sort("_id", 1).skip(100).limit(20)这种方式依赖稳定排序字段,适合列表接口和后台数据浏览。 ## PyMongo 连接管理 `MongoClient` 内部会维护连接池。通常不要在每个请求里新建一个 client。 推荐在应用启动时创建:last_id = "order_10001" cursor = orders.find({ "status": "paid", "_id": {"$gt": last_id}, }).sort("_id", 1).limit(20)Web 服务里可以把 client 放在应用生命周期里统一管理。频繁创建 client 会浪费连接,也更容易把 MongoDB 压得乱七八糟。 ## 一个小型 Repository 封装 业务代码不要到处散落 PyMongo 查询,可以封装一层。from pymongo import MongoClient client = MongoClient("mongodb://127.0.0.1:27017") db = client["shop"] def get_orders_collection(): return db["orders"]这样业务层只关心方法,不用每次都写 filter 和 update operator。from dataclasses import dataclass from typing import Any from pymongo.collection import Collection @dataclass class OrderRepository: collection: Collection def create_order(self, order: dict[str, Any]) -> str: result = self.collection.insert_one(order) return str(result.inserted_id) def get_order(self, order_id: str) -> dict[str, Any] | None: return self.collection.find_one({"_id": order_id}) def mark_paid(self, order_id: str) -> bool: result = self.collection.update_one( {"_id": order_id, "status": "created"}, {"$set": {"status": "paid"}}, ) return result.modified_count == 1 def list_paid_orders(self, limit: int = 20): return list( self.collection.find( {"status": "paid"}, {"buyer_id": 1, "amount": 1, "status": 1}, ).sort("_id", 1).limit(limit) )