mongo——python上手,天下我有




2022-02-03

blog_main_img

使用docker安装MongoDB,以及一些简单的使用pymongo去连接和操作mongodb的实战代码

PyMongo 是 MongoDB 官方 Python 驱动。用它操作 MongoDB 很像操作 Python 字典:插入 dict,查询 dict,更新也用 dict 表达。

先上docker的安装办法

</pre>

# 1. 创建目录

sudo mkdir -p /data/mongo/{db,configdb,config,log}



# Mongo 官方镜像内 mongodb 用户常见 UID/GID 为 999

sudo chown -R 999:999 /data/mongo/db /data/mongo/configdb /data/mongo/log

sudo chmod -R 755 /data/mongo



# 2. 写入 MongoDB 配置

sudo tee /data/mongo/config/mongod.conf > /dev/null <<'EOF'

storage:

  dbPath: /data/db



systemLog:

  destination: file

  path: /var/log/mongodb/mongod.log

  logAppend: true

  logRotate: reopen



net:

  bindIp: 0.0.0.0

  port: 27017



security:

  authorization: enabled

EOF



# 3. 拉取固定大版本镜像,不建议用 latest

sudo docker pull mongo:8.0



# 4. 如果已存在同名容器,先删除容器,不删除数据

sudo docker rm -f mongodb 2>/dev/null || true



# 5. 启动 MongoDB

sudo docker run -d \

  --name mongodb \

  --restart unless-stopped \

  -p 19941:27017 \

  -e MONGO_INITDB_ROOT_USERNAME=root \

  -e MONGO_INITDB_ROOT_PASSWORD=321 \

  -v /data/mongo/db:/data/db \

  -v /data/mongo/configdb:/data/configdb \

  -v /data/mongo/config/mongod.conf:/etc/mongod.conf:ro \

  -v /data/mongo/log:/var/log/mongodb \

  --log-driver=json-file \

  --log-opt max-size=10m \

  --log-opt max-file=3 \

  --ulimit nofile=64000:64000 \

  --health-cmd='mongosh --quiet --username root --password 321 --authenticationDatabase admin --eval "db.adminCommand({ ping: 1 }).ok" | grep 1' \

  --health-interval=30s \

  --health-timeout=10s \

  --health-retries=3 \

  mongo:8.0 \

  --config /etc/mongod.conf

 

# Ubuntu / Debian

sudo apt-get update

sudo apt-get install -y logrotate



# CentOS / Alibaba Cloud Linux / Rocky / AlmaLinux

sudo yum install -y logrotate || sudo dnf install -y logrotate

 

sudo tee /etc/logrotate.d/mongodb-docker > /dev/null <<'EOF'

/data/mongo/log/mongod.log {

    daily

    size 100M

    rotate 14

    missingok

    notifempty

    compress

    delaycompress

    create 0644 999 999

    sharedscripts

    postrotate

        docker exec mongodb mongosh --quiet --username root --password 321 --authenticationDatabase admin --eval 'db.adminCommand({ logRotate: 1 })' >/dev/null 2>&1 || true

    endscript

}

EOF



## 文档模型:灵活不是随便


![MongoDB 文档模型](https://zoyblogs.oss-cn-guangzhou.aliyuncs.com/mongodb_document_model_hvnt.svg)

一个订单文档可以长这样:


{
  "_id": "order_10001",
  "buyer_id": "user_88",
  "status": "paid",
  "items": [
    {
      "sku": "keyboard",
      "count": 1,
      "price": 199.0
    }
  ],
  "address": {
    "province": "Guangdong",
    "city": "Guangzhou"
  },
  "tags": ["app", "vip"]
}
它很适合表达“一个业务对象带一堆自然归属的子结构”。 但不要把“灵活”理解成“字段随便塞”。字段命名、嵌套深度、数组规模、索引设计,都会直接影响查询和维护成本。 ## 嵌入还是引用 MongoDB 建模里最常见的选择是:把子对象嵌进来,还是只保存引用 id。 适合嵌入: - 子对象总是跟主对象一起读取 - 子对象生命周期依附主对象 - 子对象规模可控 - 更新频率不高 适合引用: - 子对象会被多个文档复用 - 子对象规模很大 - 子对象需要单独查询 - 更新频率高,不想频繁改大文档 比如订单里的商品快照适合嵌入,因为订单生成后,商品名和成交价应该保持当时的样子。用户信息则更适合用 `buyer_id` 引用,不要把整份用户资料复制进每笔订单。 ## 连接 MongoDB 先安装:
pip install pymongo
连接本地 MongoDB:
from pymongo import MongoClient


client = MongoClient("mongodb://127.0.0.1:27017")
db = client["shop"]
orders = db["orders"]

client.admin.command("ping")
print("connected")
连接 Atlas 或带账号密码的集群时,通常用 URI:
from pymongo import MongoClient
from pymongo.server_api import ServerApi


uri = "mongodb+srv://user:[email protected]/?retryWrites=true&w=majority"

client = MongoClient(
    uri,
    server_api=ServerApi("1"),
    appname="shop-data-service",
)

client.admin.command("ping")
生产代码里不要把账号密码硬编码在仓库里。用环境变量、密钥系统或配置中心。 ## 插入文档 插入一条:
from pymongo import MongoClient


client = MongoClient("mongodb://127.0.0.1:27017")
orders = client["shop"]["orders"]

result = orders.insert_one({
    "_id": "order_10001",
    "buyer_id": "user_88",
    "status": "created",
    "amount": 199.0,
    "items": [
        {"sku": "keyboard", "count": 1, "price": 199.0}
    ],
})

print(result.inserted_id)
批量插入:
docs = [
    {"_id": "order_10002", "buyer_id": "user_90", "status": "paid", "amount": 99.0},
    {"_id": "order_10003", "buyer_id": "user_91", "status": "paid", "amount": 299.0},
]

result = orders.insert_many(docs, ordered=False)
print(result.inserted_ids)
`ordered=False` 表示遇到单条失败时,不一定停止后续写入。批量导入时很实用,但要把异常处理写清楚。 ![PyMongo CRUD 和索引](https://zoyblogs.oss-cn-guangzhou.aliyuncs.com/pymongo_crud_index_zxqe.svg) ## 查询:filter、projection、sort、limit 查一条:
order = orders.find_one({"_id": "order_10001"})
print(order)
按条件查多条:
cursor = orders.find(
    {"status": "paid", "amount": {"$gte": 100}},
    {"buyer_id": 1, "amount": 1, "status": 1},
).sort("amount", -1).limit(20)

for item in cursor:
    print(item)
第二个参数是 projection,用来控制返回字段。大文档查询时,projection 很重要,别每次把整份文档拖回来。 常见查询操作符:
filters = {
    "amount": {"$gte": 100, "$lt": 500},
    "status": {"$in": ["paid", "shipped"]},
    "tags": "vip",
    "address.city": "Guangzhou",
}
嵌套字段用点号访问,数组字段可以直接匹配元素。 ## 更新:用操作符,不要整份替换 更新单条:
result = orders.update_one(
    {"_id": "order_10001"},
    {
        "$set": {"status": "paid"},
        "$inc": {"amount": 10},
        "$addToSet": {"tags": "checked"},
    },
)

print(result.matched_count, result.modified_count)
更新多条:
result = orders.update_many(
    {"status": "created"},
    {"$set": {"status": "pending"}},
)

print(result.modified_count)
常见更新操作符:
$set:设置字段
$unset:删除字段
$inc:数值增加
$push:数组追加
$addToSet:数组去重追加
$pull:从数组移除
不要为了改一个字段就把整份文档读出来、改完再 replace。并发写入下,这样更容易覆盖别人的改动。 ## upsert:没有就插入 很多同步任务会用 upsert。
result = orders.update_one(
    {"_id": "order_10004"},
    {
        "$set": {
            "buyer_id": "user_92",
            "status": "paid",
            "amount": 188.0,
        }
    },
    upsert=True,
)

print(result.upserted_id)
如果匹配到文档,就更新;匹配不到,就插入。 注意:upsert 的 filter 会参与生成新文档。filter 里别放奇怪的条件,最好用业务唯一键。 ## 删除:先确认范围 删除一条:
result = orders.delete_one({"_id": "order_10004"})
print(result.deleted_count)
删除多条:
result = orders.delete_many({"status": "canceled"})
print(result.deleted_count)
生产环境里删除要非常克制。更常见的做法是软删除:
orders.update_one(
    {"_id": "order_10001"},
    {"$set": {"deleted": True}},
)
真正物理删除前,建议先跑同条件 `count_documents()` 看看影响范围。 ## 索引:查询能不能快,很多时候看这里 MongoDB 没有合适索引时,查询就可能扫描大量文档。 给状态和金额建复合索引:
from pymongo import ASCENDING, DESCENDING


orders.create_index([
    ("status", ASCENDING),
    ("amount", DESCENDING),
])
唯一索引:
orders.create_index("order_no", unique=True)
嵌套字段索引:
orders.create_index("address.city")
数组字段也可以建索引,但要注意数组索引可能放大索引项数量。 索引设计的小规则:
等值过滤字段放前面
排序字段尽量接在过滤字段后面
低选择性字段不要单独滥建索引
写多读少的集合要控制索引数量
索引不是越多越好。每个索引都会增加写入成本和存储成本。 ## 看执行计划 PyMongo 可以用 `explain` 看查询是否走索引:
plan = orders.find(
    {"status": "paid"},
    {"_id": 1, "amount": 1},
).explain()

print(plan["queryPlanner"]["winningPlan"])
如果看到集合扫描,就要检查过滤条件和索引是否匹配。 查询优化不要靠感觉,先看计划,再改索引。 ## 聚合管道:MongoDB 里的数据流水线 Aggregation pipeline 很适合做分组统计、字段加工、数组展开、跨集合关联。 ![MongoDB 聚合和事务](https://zoyblogs.oss-cn-guangzhou.aliyuncs.com/mongo_aggregate_txn_pvls.svg) 按状态统计订单金额:
pipeline = [
    {"$match": {"status": {"$in": ["paid", "shipped"]}}},
    {
        "$group": {
            "_id": "$status",
            "total_amount": {"$sum": "$amount"},
            "order_count": {"$sum": 1},
            "avg_amount": {"$avg": "$amount"},
        }
    },
    {"$sort": {"total_amount": -1}},
]

for item in orders.aggregate(pipeline):
    print(item)
展开数组:
pipeline = [
    {"$unwind": "$items"},
    {
        "$group": {
            "_id": "$items.sku",
            "sold_count": {"$sum": "$items.count"},
            "sales": {
                "$sum": {"$multiply": ["$items.count", "$items.price"]}
            },
        }
    },
    {"$sort": {"sales": -1}},
]
管道顺序很重要。能过滤就先 `$match`,能裁剪字段就先 `$project`,别把一堆无关字段带到后面处理。 ## `$lookup`:能用,但别滥用 MongoDB 支持 `$lookup` 做集合关联:
pipeline = [
    {"$match": {"status": "paid"}},
    {
        "$lookup": {
            "from": "users",
            "localField": "buyer_id",
            "foreignField": "_id",
            "as": "buyer",
        }
    },
    {"$unwind": "$buyer"},
    {
        "$project": {
            "amount": 1,
            "buyer_name": "$buyer.name",
        }
    },
]
`$lookup` 很方便,但不要把 MongoDB 当成关系型数据库来写一堆复杂 join。高频查询场景里,适当冗余、嵌入快照、提前聚合,往往更符合 MongoDB 的使用方式。 ## 事务:跨文档一致性 单文档写入天然是原子的。如果一个业务动作要同时改多个文档,可以用事务。
from pymongo import MongoClient


client = MongoClient("mongodb://127.0.0.1:27017")
db = client["shop"]


def pay_order(session, order_id: str, buyer_id: str, amount: float):
    db.orders.update_one(
        {"_id": order_id, "status": "created"},
        {"$set": {"status": "paid"}},
        session=session,
    )
    db.accounts.update_one(
        {"_id": buyer_id},
        {"$inc": {"balance": -amount}},
        session=session,
    )


with client.start_session() as session:
    session.with_transaction(
        lambda s: pay_order(s, "order_10001", "user_88", 199.0)
    )
事务不是让你忽略建模。能用单文档原子更新解决的,就不要升级成多文档事务。事务会增加复杂度,也会提高系统开销。 ## 批量写入:bulk_write 批量同步时,`bulk_write` 很好用。
from pymongo import UpdateOne


operations = [
    UpdateOne(
        {"_id": "order_10001"},
        {"$set": {"status": "paid", "amount": 199.0}},
        upsert=True,
    ),
    UpdateOne(
        {"_id": "order_10002"},
        {"$set": {"status": "shipped", "amount": 99.0}},
        upsert=True,
    ),
]

result = orders.bulk_write(operations, ordered=False)

print(result.upserted_count, result.modified_count)
比起循环里一条条 `update_one`,批量写入更适合导入、同步、补数据。 ## 分页:别只会 skip 小数据量用 `skip` 没问题:
cursor = orders.find({"status": "paid"}).sort("_id", 1).skip(100).limit(20)
但偏深分页时,`skip` 成本会上来。更稳的方式是基于游标字段继续翻:
last_id = "order_10001"

cursor = orders.find({
    "status": "paid",
    "_id": {"$gt": last_id},
}).sort("_id", 1).limit(20)
这种方式依赖稳定排序字段,适合列表接口和后台数据浏览。 ## PyMongo 连接管理 `MongoClient` 内部会维护连接池。通常不要在每个请求里新建一个 client。 推荐在应用启动时创建:
from pymongo import MongoClient


client = MongoClient("mongodb://127.0.0.1:27017")
db = client["shop"]


def get_orders_collection():
    return db["orders"]
Web 服务里可以把 client 放在应用生命周期里统一管理。频繁创建 client 会浪费连接,也更容易把 MongoDB 压得乱七八糟。 ## 一个小型 Repository 封装 业务代码不要到处散落 PyMongo 查询,可以封装一层。
from dataclasses import dataclass
from typing import Any

from pymongo.collection import Collection


@dataclass
class OrderRepository:
    collection: Collection

    def create_order(self, order: dict[str, Any]) -> str:
        result = self.collection.insert_one(order)
        return str(result.inserted_id)

    def get_order(self, order_id: str) -> dict[str, Any] | None:
        return self.collection.find_one({"_id": order_id})

    def mark_paid(self, order_id: str) -> bool:
        result = self.collection.update_one(
            {"_id": order_id, "status": "created"},
            {"$set": {"status": "paid"}},
        )
        return result.modified_count == 1

    def list_paid_orders(self, limit: int = 20):
        return list(
            self.collection.find(
                {"status": "paid"},
                {"buyer_id": 1, "amount": 1, "status": 1},
            ).sort("_id", 1).limit(limit)
        )
这样业务层只关心方法,不用每次都写 filter 和 update operator。