Rabbitmq从入门到崩溃




2022-06-03

blog_main_img

`RabbitMQ + Python pika` 这条线,把消息模型、路由方式、可靠消费和常见实战写法串起来。 目标不是只把示例跑通,而是让你真正知道:消息到底是怎么走的,哪里最容易翻车

很多人第一次接触 RabbitMQ,感受往往很直接:

  • 生产者发一条消息
  • 消费者收一条消息
  • 队列像个中转箱,先塞进去再说

先别急着写代码,先把消息路径想清楚

消息不是直接“扔给队列”,更准确地说,是先发布到 exchange,再由 exchange 按绑定关系和路由键,把消息送进一个或多个队列,最后由消费者从队列里取走。

消息主路径

这里几个角色的职责非常明确:

  • Producer:负责发消息
  • Exchange:负责路由,不负责长期保存
  • Queue:真正存消息的地方
  • Consumer:负责处理消息
  • Binding:定义 exchangequeue 之间的关系
  • routing_key:消息路由时带上的关键字

RabbitMQ 官方对 exchange 的描述很直白:发布者把消息发布给它,它再根据交换机类型和绑定规则,把消息路由到队列。
所以你一旦写到多队列、多业务线、多级路由,重点已经不再是“有没有队列”,而是“交换机怎么选、绑定怎么设”。

pika 连接 RabbitMQ,先准备一个稳一点的连接模板

pika 是 Python 里非常常见的 RabbitMQ 客户端。
如果你只是做同步脚本、后台任务、轻量服务,BlockingConnection 已经够用,而且上手成本低。

先装依赖:

python -m pip install pika --upgrade

然后准备一个连接函数:

import pika


def create_connection():
    credentials = pika.PlainCredentials("guest", "guest")
    params = pika.ConnectionParameters(
        host="localhost",
        port=5672,
        virtual_host="/",
        credentials=credentials,
        heartbeat=120,
        blocked_connection_timeout=300,
    )
    return pika.BlockingConnection(params)

这几个参数里,比较值得注意的是:

  • heartbeat:心跳协商参数,别让连接悄悄死掉
  • blocked_connection_timeout:当 broker 因资源压力阻塞连接时,别让调用方一直卡着不动

如果你的发布端或消费端会跑很久,这两个设置比“能连上”更重要。

最小可用版本:一发一收

先看一眼最基础的发送端:

import json
import pika


connection = create_connection()
channel = connection.channel()

channel.queue_declare(queue="hello")

payload = {
    "event": "user.signup",
    "user_id": 1001,
}

channel.basic_publish(
    exchange="",
    routing_key="hello",
    body=json.dumps(payload).encode("utf-8"),
)

print("message sent")
connection.close()

这里的 exchange="" 很特殊,它表示默认交换机。
默认交换机会按照“队列名 = routing key”这件事直接投递,所以 routing_key="hello" 会把消息送到名为 hello 的队列。

消费端也很直接:

import json
import pika


def on_message(channel, method, properties, body):
    data = json.loads(body)
    print("received:", data)
    channel.basic_ack(delivery_tag=method.delivery_tag)


connection = create_connection()
channel = connection.channel()
channel.queue_declare(queue="hello")

channel.basic_consume(queue="hello", on_message_callback=on_message, auto_ack=False)
channel.start_consuming()

这里先埋一个重点:
消费端最好尽量别把 auto_ack=True 当默认值。你把自动确认开了,消息一送到消费者侧,RabbitMQ 就会认为“这条消息已经交付完成”;后面业务代码要是抛异常,消息也回不来了。

真正进入实战后,Work Queue 才是高频模式

最常见的业务场景通常不是“打一枪就走”,而是任务分发:

  • 把图片处理任务塞进队列
  • 把订单异步通知塞进队列
  • 把日志清洗任务塞进队列
  • 把批量邮件、风控检查、报表生成塞进队列

这时更像是在用一个任务队列,而不是一个普通消息箱。

发送端可以这样写:

import json
import pika


connection = create_connection()
channel = connection.channel()

channel.queue_declare(queue="task_queue", durable=True)

task = {
    "task_id": "job-1001",
    "kind": "thumbnail",
    "image": "demo.png",
}

channel.basic_publish(
    exchange="",
    routing_key="task_queue",
    body=json.dumps(task).encode("utf-8"),
    properties=pika.BasicProperties(
        delivery_mode=2,  # persistent
    ),
)

connection.close()

这里有两个很关键的点:

  • durable=True:队列本身要能在 broker 重启后继续存在
  • delivery_mode=2:消息本身按持久化消息处理

但这还不是“终点”。
RabbitMQ 官方文档对这一点强调得很明确:如果你要更强的数据安全保证,还要配合 publisher confirms,不能只靠“持久化队列 + 持久化消息”就盲目乐观。

消费端为什么要手动确认

看消费端:

import json
import time
import pika


def handle_task(task: dict):
    print("processing:", task["task_id"])
    time.sleep(1.5)


def on_message(channel, method, properties, body):
    task = json.loads(body)

    try:
        handle_task(task)
        channel.basic_ack(delivery_tag=method.delivery_tag)
        print("done:", task["task_id"])
    except Exception as exc:
        print("failed:", exc)
        channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)


connection = create_connection()
channel = connection.channel()
channel.queue_declare(queue="task_queue", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
    queue="task_queue",
    on_message_callback=on_message,
    auto_ack=False,
)

print("waiting for tasks")
channel.start_consuming()

这段代码里,真正值钱的是下面三行:

channel.basic_ack(...)
channel.basic_nack(..., requeue=True)
channel.basic_qos(prefetch_count=1)

分别解决三类问题:

  • basic_ack:任务真处理完了,再确认
  • basic_nack:失败后告诉 broker,这条消息别当它没事发生
  • basic_qos(prefetch_count=1):一个消费者手里别先攒太多未确认消息

prefetch_count=1 到底在帮你什么

如果没有 prefetch 限制,RabbitMQ 可能会持续把消息推给某个消费者。
一旦这个消费者处理慢、消息又没确认,队列分发就会开始失衡。

prefetch_count=1 的效果可以理解成:

  • 当前消费者手里已有 1 条未确认消息
  • 那就先别再给它塞新的
  • 让其他空闲消费者也分担任务

这不是绝对公平,但在 Work Queue 模式里,通常比默认“看起来平均、实际可能失衡”的状态更稳。

路由这块,才是 RabbitMQ 和“普通队列”拉开差距的地方

如果你的业务永远只有一条队列,那默认交换机已经够用。
但只要你开始做这些事情:

  • 日志分级投递
  • 不同业务线分流
  • 事件广播
  • 多条件订阅

你迟早会碰到交换机类型。

RabbitMQ 常见的几个交换机类型里,最常用的是:

  • fanout:广播,忽略路由键
  • direct:精确匹配路由键
  • topic:按模式匹配路由键

路由方式

direct:适合明确分流

比如日志系统里,按 info / warning / error 分流:

connection = create_connection()
channel = connection.channel()

channel.exchange_declare(exchange="logs_direct", exchange_type="direct", durable=True)

channel.queue_declare(queue="error_queue", durable=True)
channel.queue_bind(
    exchange="logs_direct",
    queue="error_queue",
    routing_key="error",
)

channel.basic_publish(
    exchange="logs_direct",
    routing_key="error",
    body=b"database timeout",
)

connection.close()

这时只有绑定了 error 的队列会收到消息。

fanout:适合广播

如果你的目标是“一条消息,多方都要收到”,fanout 就很顺手。
比如:

  • 一份业务事件,同时投给日志系统、审计系统、缓存刷新系统
  • 一个通知,同时被多个消费组监听

fanout 不看 routing_key,只看谁绑在这个交换机上。

topic:适合有层次的路由规则

如果你的事件键长得像这样:

  • order.created
  • order.paid
  • user.profile.updated

topic 往往会更灵活。

channel.exchange_declare(exchange="events", exchange_type="topic", durable=True)

channel.queue_declare(queue="order_worker", durable=True)
channel.queue_bind(exchange="events", queue="order_worker", routing_key="order.*")

channel.queue_declare(queue="all_updates", durable=True)
channel.queue_bind(exchange="events", queue="all_updates", routing_key="#")

常见规则:

  • *:匹配一个单词
  • #:匹配零个或多个单词

这类写法很适合事件总线风格的系统。
业务拆分时,你不需要把所有队列名和消费端硬写死,可以先用 routing pattern 把路由层搭出来。

只会“发出去”还不够,发布端也要考虑可靠性

一个很常见的误区是:

“消费者有 ack,所以整体就可靠了。”

其实不够。
消费者确认的是“我处理完了”;发布端还需要确认“broker 真的收到了,而且按预期路由了”。

RabbitMQ 官方把这部分叫 publisher confirms

pika 里,BlockingConnection 可以这样开:

import json
import pika
from pika.exceptions import NackError, UnroutableError


connection = create_connection()
channel = connection.channel()
channel.confirm_delivery()

channel.exchange_declare(exchange="events", exchange_type="direct", durable=True)
channel.queue_declare(queue="billing_queue", durable=True)
channel.queue_bind(exchange="events", queue="billing_queue", routing_key="billing.created")

message = {"id": 101, "event": "billing.created"}

try:
    channel.basic_publish(
        exchange="events",
        routing_key="billing.created",
        body=json.dumps(message).encode("utf-8"),
        properties=pika.BasicProperties(delivery_mode=2),
        mandatory=True,
    )
    print("publish confirmed")
except UnroutableError:
    print("message cannot be routed to any queue")
except NackError:
    print("broker rejected the message")
finally:
    connection.close()

这段代码里:

  • confirm_delivery():让 channel 进入 confirm 模式
  • mandatory=True:如果消息压根路由不到任何队列,别默默吞掉

这就比“直接 publish,然后默认它一定成功”靠谱得多。

RabbitMQ 里的“可靠”到底是怎么拼出来的

如果把消息可靠性拆开看,它不是某一个按钮负责的,而是一组配置和代码动作共同完成的:

  • 队列可持久化:durable=True
  • 消息可持久化:delivery_mode=2
  • 消费端手动确认:basic_ack
  • 失败时明确拒绝或重回队列:basic_nack / basic_reject
  • 发布端确认:confirm_delivery()
  • 合理的预取:basic_qos(prefetch_count=...)

可靠消费与发布

可以把它理解成两段链路:

  1. 发布者 -> RabbitMQ:靠持久化 + confirms
  2. RabbitMQ -> 消费者:靠手动 ack + 合理重试

少了其中一段,整体就可能看起来“差不多”,但并不稳。

一段更像生产代码的 Python 封装

如果你不想在项目里到处散落 channel.basic_publish(...),可以先封一层:

import json
import pika
from pika.exceptions import AMQPError, NackError, UnroutableError


class RabbitPublisher:
    def __init__(self):
        self.connection = create_connection()
        self.channel = self.connection.channel()
        self.channel.confirm_delivery()

    def declare(self):
        self.channel.exchange_declare(
            exchange="events",
            exchange_type="topic",
            durable=True,
        )

    def publish_json(self, routing_key: str, payload: dict):
        body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
        try:
            self.channel.basic_publish(
                exchange="events",
                routing_key=routing_key,
                body=body,
                properties=pika.BasicProperties(
                    content_type="application/json",
                    delivery_mode=2,
                ),
                mandatory=True,
            )
            return True
        except (NackError, UnroutableError, AMQPError) as exc:
            print("publish failed:", exc)
            return False

    def close(self):
        self.connection.close()

使用起来会更清楚:

publisher = RabbitPublisher()
publisher.declare()

publisher.publish_json(
    "order.created",
    {
        "order_id": "A10001",
        "amount": 199.0,
    },
)

publisher.close()

这种封装很适合:

  • 业务服务统一发事件
  • 定时任务批量投递消息
  • 把 RabbitMQ 接入层和业务层拆开

再往前一步,消费端也该有自己的“边界感”

消费端最容易写成“大杂烩”:

  • 收消息
  • 解析数据
  • 查数据库
  • 调外部接口
  • 改缓存
  • 记录审计
  • 出错再试着补救

结果就是:一段回调函数既负责协议动作,又负责业务编排,最后难调、难测、难重试。

更推荐把结构拆开:

import json
import pika


def process_order(payload: dict):
    order_id = payload["order_id"]
    print(f"process order: {order_id}")


def on_order_message(channel, method, properties, body):
    try:
        payload = json.loads(body)
        process_order(payload)
        channel.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as exc:
        print("consume failed:", exc)
        channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)


connection = create_connection()
channel = connection.channel()
channel.exchange_declare(exchange="events", exchange_type="topic", durable=True)
channel.queue_declare(queue="order_service", durable=True)
channel.queue_bind(exchange="events", queue="order_service", routing_key="order.*")
channel.basic_qos(prefetch_count=8)
channel.basic_consume(
    queue="order_service",
    on_message_callback=on_order_message,
    auto_ack=False,
)
channel.start_consuming()

这样做的好处是:

  • 协议层动作更清楚
  • 业务处理可单测
  • 是否重回队列、是否丢进死信队列,策略更好扩展

绕坑指南!!!

1. 把 auto_ack=True 当省事按钮

短期看代码更短,长期看排障更痛苦。
只要业务处理不是“收到就算完成”,就优先考虑手动确认。

2. 只设了 durable=True,却没开发布确认

队列持久化很重要,但它不等于“发布端一定知道消息已经安全落地”。
要追求更强保证,发布确认不能省。

3. 一个消费者回调里做太重的工作

BlockingConnection 的使用方式很顺手,但它是单线程语义。
如果回调里做超长耗时任务,连接心跳、阻塞处理、整体吞吐都会受影响。
任务特别重时,可以考虑把消息处理继续拆分,或者切换到更合适的连接适配器和并发模型。

4. requeue=True 用得太随意

如果失败消息没有区分“临时失败”和“永久失败”,一股脑回队列,很容易形成反复重试。
更稳的做法通常是:

  • 临时故障:有限重试
  • 脏数据或不可恢复错误:丢死信或单独记录

5. 过早追求复杂拓扑

不是每个项目都一上来就需要 topic、死信、延迟、优先级、RPC 全家桶。
先把主链路跑稳:
queue + manual ack + qos + confirms
很多系统已经能用得很舒服。