本文最后更新于:2025年4月14日 晚上

本文详细记录 ZeroMQ 的代理 XPUB/XHUB 模式。

ZeroMQ 的 XPUB/XSUB 模式是构建消息代理系统的核心模式,常用于构建发布-订阅架构的中介代理。

核心概念说明

  1. XPUB (Extended PUB)

    • 特殊类型的发布者套接字
    • 可以接收订阅者的订阅/取消订阅请求
    • 会以二进制消息形式通知上游发布者订阅变化
  2. XSUB (Extended SUB)

    • 特殊类型的订阅者套接字
    • 可以显式发送订阅/取消订阅请求
    • 支持动态修改订阅过滤条件
  3. 典型拓扑结构

Python 代码示例

1. 代理服务器(Broker)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import zmq

context = zmq.Context()

# 前端接收发布者消息(XSUB)
frontend = context.socket(zmq.XSUB)
frontend.bind("tcp://*:5555") # 发布者连接到此端口

# 后端发送给订阅者(XPUB)
backend = context.socket(zmq.XPUB)
backend.bind("tcp://*:5556") # 订阅者连接到此端口

# 使用代理进行消息转发
zmq.proxy(frontend, backend)

2. 发布者(Publisher)

1
2
3
4
5
6
7
8
9
10
11
import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:5555") # 连接到代理的XSUB端口

while True:
socket.send_string("A/当前时间 %s" % time.ctime())
socket.send_string("B/当前时间 %s" % time.ctime())
time.sleep(1)

3. 订阅者(Subscriber)

1
2
3
4
5
6
7
8
9
10
11
12
import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556") # 连接到代理的XPUB端口

# 订阅以"A/"开头的消息
socket.setsockopt_string(zmq.SUBSCRIBE, "A/")

while True:
message = socket.recv_string()
print("收到消息:", message)

tcp://*:5555

  • 含义
    * 表示绑定到本机的所有可用网络接口(即所有 IP 地址)。例如:
    • 如果你的机器有 IP 192.168.1.10010.0.0.2,绑定到 * 后,外部可以通过这两个 IP 访问服务。
    • 如果替换为具体 IP(如 tcp://192.168.1.100:5555),则只允许通过该 IP 访问。
  • 典型场景
    当需要让其他机器连接到你的服务时,必须使用 *。如果仅在本地测试,可以用 tcp://127.0.0.1:5555(仅允许本机连接)。

关键特性说明

  1. 订阅通知机制

    • XPUB 会收到二进制格式的订阅通知
    • 首字节 \x01 表示订阅,\x00 表示取消订阅
    • 例如:b'\x01A/' 表示订阅以 “A/” 开头的消息
  2. 动态订阅管理

    1
    2
    3
    # 订阅者可以动态修改订阅条件
    socket.setsockopt_string(zmq.UNSUBSCRIBE, "A/")
    socket.setsockopt_string(zmq.SUBSCRIBE, "B/")
  3. 多对多通信

    • 允许多个发布者和多个订阅者同时连接
    • 自动处理消息路由
  4. bindconnect 的区别

    操作 角色 行为 典型用途
    bind 服务端/监听方 创建一个监听端口,等待他人连接 代理(Broker)、服务提供者
    connect 客户端/连接方 主动连接到一个已存在的地址 发布者(Publisher)、订阅者(Sub)

    关键差异:

    1. 顺序要求
      • 必须先有 bind(服务端),然后才能 connect(客户端)。
      • 如果客户端尝试连接未绑定的地址,会失败(Connection refused)。
    2. 灵活性
      • bind 方是稳定的服务节点(如消息代理)。
      • connect 方是动态的客户端节点(可以随时加入或离开)。
    3. 地址所有权
      • bind 的地址是独占的(同一端口只能被一个进程绑定)。
      • connect 可以多对一(多个客户端连接同一个服务端)。实际应用场景
  5. 消息总线(Message Bus)

  6. 服务解耦中间层

  7. 分布式日志系统

  8. 实时数据广播系统

常见问题处理

  1. 慢订阅者问题

    • 使用 zmq.CONFLATE=1 选项保留最新消息
    • 设置 HWM(高水位线)控制队列长度
  2. 调试技巧

    1
    2
    # 监控XPUB的订阅请求
    backend.setsockopt(zmq.XPUB_VERBOSE, 1)
  3. 性能优化

    • 使用 inproc 传输进行进程内通信
    • 启用多线程代理


文章链接:
https://www.zywvvd.com/notes/tools/zmq/zmq-proxy/zmq-proxy/


“觉得不错的话,给点打赏吧 ୧(๑•̀⌄•́๑)૭”

微信二维码

微信支付

支付宝二维码

支付宝支付

ZMQ 代理 XPUB/XSUB 模式
https://www.zywvvd.com/notes/tools/zmq/zmq-proxy/zmq-proxy/
作者
Yiwei Zhang
发布于
2025年3月5日
许可协议