本文最后更新于:2024年10月11日 上午
ZMQ(ZeroMQ)是一个开源的库,用于在应用程序中实现消息传递, 本文记录相关内容。
ZMQ
ZeroMQ (也称为 ØMQ,0MQ,或 zmq)看起来像一个可嵌入的网络库,但其作用类似于并发框架。它提供了跨进程、进程间、 TCP 和多播等各种传输方式携带原子消息的套接字。您可以使用诸如扇出、发布-订阅、任务分配和请求-应答等模式将套接字 N 到 N 连接起来。它的速度足以成为集群产品的结构。它的异步 I/O 模型为您提供了可伸缩的多核应用程序,构建为异步消息处理任务。它有许多语言 API,并且运行在大多数操作系统上。
它被设计为类似于socket的API,但其运作方式更像是消息队列(message queue)或企业消息传递系统(enterprise messaging system)。
特点:
- 高级抽象:ZMQ提供高级的抽象,使得消息传递变得简单,无需担心底层网络细节。
- 模式多样:支持多种通信模式,如请求-应答(request-reply)、发布-订阅(publish-subscribe)、推-拉(push-pull)等。
- 跨平台:可以在多种操作系统和编程语言上使用。
- 性能优异:经过优化,具有很高的消息吞吐量。
- 无中心:ZMQ不需要一个中心节点,每个节点既是客户端也是服务器。
通信方式: ZeroMQ的三种通信模式分别是:Request-Reply
, Publisher-subscriber
, Parallel Pipeline
官方网站:https://zeromq.org/
PyZMQ:https://pyzmq.readthedocs.io/en/latest/
Python 安装
PyZMQ 需要在 python 3.7 及以上的版本上使用,当前(2024.09)最新版本为 26.2
bind / connect
在 ZeroMQ(也称为 0MQ 或 zmq)中,bind
和 connect
是用于建立网络连接的两种主要方法,它们用于设置消息传递的端点,但是它们在作用和用途上有所区别。
bind
- 定义:当一个 Socket 在指定地址上使用
bind
方法时,它告诉 ZeroMQ 在该地址上监听进入的连接。
- 使用场景:通常用于服务器端或者说是消息的接收端,它监听来自客户端的连接。
- 地址:使用
bind
的 Socket 需要提供一个具体的、可访问的网络地址(如 tcp://*:5555
,其中 *
表示监听所有可用的接口,5555
是端口号)。
- 关系:一个端口只能被一个 Socket 绑定,即在一个网络接口上的特定端口上只能有一个
bind
调用。
connect
- 定义:当一个 Socket 在指定地址上使用
connect
方法时,它尝试建立一个到该地址的连接。
- 使用场景:通常用于客户端或者说是消息的发送端,它主动发起与服务器端的连接。
- 地址:使用
connect
的 Socket 需要指定它想要连接的服务器地址(如 tcp://localhost:5555
)。
- 关系:多个 Socket 可以使用
connect
方法连接到同一个由另一个 Socket 通过 bind
绑定的地址。
关系和区别
- 关系:
bind
和 connect
是相互补充的:一个 Socket 通过 bind
监听连接,而另一个 Socket 通过 connect
建立到前者的连接。
- 在对等网络(Peer-to-Peer)通信模式中,所有的节点既可以是客户端也可以是服务器端,因此它们可能会同时使用
bind
和 connect
。
- 区别:
- 角色:
bind
用于监听,而 connect
用于发起连接。
- 地址:
bind
通常使用通配符表示监听所有接口,而 connect
指定具体的远程地址。
- 数量限制:一个端口只能被一个 Socket 绑定,但可以有多个 Socket 连接到同一个绑定的端口。
理解这两个概念对于使用 ZeroMQ 构建网络通信模型至关重要。正确地使用 bind
和 connect
可以确保消息在复杂的网络拓扑中有效地传递。
Request-Reply(应答模式)

应答模式特点:
-
客户端提出请求,服务端必须回答请求,每个请求只回答一次
-
客户端没有收到答复前,不能再次进行请求
-
可以有多个客户端提出请求,服务端能保证各个客户端只接收到自己的答复
-
如果服务端断掉或者客户端断掉会产生怎样的影响?
如果是客户端断掉,对服务端没有任何影响,如果客户端随后又重新启动,那么两方继续一问一答,但是如果是服务端断掉了,就可能会产生一些问题,这要看服务端是在什么情况下断掉的,如果服务端收是在回答完问题后断掉的,那么没影响,重启服务端后,双发继续一问一答,但如果服务端是在收到问题后断掉了,还没来得及回答问题,这就有问题了,那个提问的客户端迟迟得不到答案,就会一直等待答案,因此不会再发送新的提问,服务端重启后,客户端迟迟不发问题,所以也就一直等待提问。
python 实现 1
zmq_server.py
1 2 3 4 5 6 7 8 9 10 11 12
| import zmq
context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555")
while True: message = socket.recv() print(type(message)) print("收到消息:{}".format(message)) socket.send(b"new message")
|
zmq_client.py
1 2 3 4 5 6 7 8 9 10 11
|
import zmq
context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555")
socket.send(b"A message") response = socket.recv() print(response)
|
常用数据发送API如下:
1 2 3 4 5 6 7 8 9 10 11 12
| socket.send_json(data) socket.send_string(data, encoding="utf-8") socket.send_pyobj(obj) socket.send_multipart(msg_parts)
socket.recv_json() socket.recv_string() socket.recv_pyobj() socket.recv_multipart()
|
python 实现 2
sever.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import zmq import sys context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: try: print("wait for client ...") message = socket.recv() print("message from client:", message.decode('utf-8')) socket.send(message) except Exception as e: print('异常:',e) sys.exit()
|
client.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import zmq import sys context = zmq.Context() print("Connecting to server...") socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") while True:
input1 = input("请输入内容:").strip() if input1 == 'b': sys.exit() socket.send(input1.encode('utf-8'))
message = socket.recv() print("Received reply: ", message.decode('utf-8'))
|
Publisher-Subscriber (发布-订阅模式)
publiser广播消息到所有客户端,客户端根据订阅主题过滤消息。
广播所有client,没有队列缓存,断开连接数据将永远丢失。
PUB发送,send。SUB接收,recv。和PUSH-PULL模式不同,PUB将消息同时发给和他建立的链接,类似于广播。另外发布订阅模式也可以使用订阅过滤来实现只接收特定的消息。订阅过滤是在服务器上进行过滤的,如果一个订阅者设定了过滤,那么发布者将只发布满足他订阅条件的消息。
这个就是广播和收听的关系。PUB-SUB模式虽然没有使用网络的广播功能,但是它内部是异步的。也就是一次发送没有结束立刻开始下一次发送。
广播所有client,没有队列缓存,断开连接数据将永远丢失。client可以进行数据过滤。

Python 实现
python实现代码如下, 其中publisher发布两条消息,第一条消息的topic为client1, 被第一个subscriber接收到;第二条消息的topic为client2, 被第二个subscriber接收到。
注意的是 subscriber 在匹配时,并不是完全匹配的,消息的topic为client1开头的字符串都会被匹配到,如果topic为"client1cient2", 也会被第一个subscriber接收到
server.py
1 2 3 4 5 6 7 8 9 10 11 12 13
| import zmq import time import sys context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:5555")
while True: msg = input("请输入要发布的信息:").strip() if msg == 'b': sys.exit() socket.send(msg.encode('utf-8')) time.sleep(1)
|
client1.py
1 2 3 4 5 6 7 8 9 10
| import zmq
context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect("tcp://localhost:5555") socket.setsockopt(zmq.SUBSCRIBE,''.encode('utf-8')) while True: response = socket.recv().decode('utf-8'); print("response: %s" % response)
|
client2.py
1 2 3 4 5 6 7 8
| import zmq context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect("tcp://localhost:5555") socket.setsockopt(zmq.SUBSCRIBE,'123'.encode('utf-8')) while True: response = socket.recv().decode('utf-8'); print("response: %s" % response)
|
C++ 实现
server:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| #include <zmq.h> #include <stdio.h> #include <stdlib.h> #include "zmq_helper.h"
int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_PUB); zmq_bind(socket, "tcp://*:5556");
srandom((unsigned)time(NULL));
while(1) { int zipcode = randof(100000); int temp = randof(84) - 42; int relhumidity = randof(50) + 10;
char msg[20]; snprintf(msg, sizeof(msg), "%5d %d %d", zipcode, temp, relhumidity); s_send(socket, msg); }
zmq_close(socket); zmq_ctx_destroy(context);
return 0; }
|
client:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| #include <zmq.h> #include <stdio.h> #include "zmq_helper.h"
int main(void) { void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_SUB); zmq_connect(socket, "tcp://localhost:5556");
char * zipcode = "10001"; zmq_setsockopt(socket, ZMQ_SUBSCRIBE, zipcode, strlen(zipcode));
for(int i = 0; i < 50; ++i) { char * string = s_recv(socket); printf("[Subscriber] Received weather report msg: %s\n", string); free(string); }
zmq_close(socket); zmq_ctx_destroy(context); return 0; }
|
ZMQ_PUB
类型的socket, 如果没有任何client与其相连, 其所有消息都将被简单就地抛弃
ZMQ_SUB
类型的socket, 即是client, 可以与多个ZMQ_PUB
类型的socket相连, 即村民可以同时收听多个msg 但必须为每个msg都设置过滤器. 否则默认情况下, zmq认为client不关心msg里的所有内容.
- 当一个cline收听多个时, 接收消息采用公平队列策略
- 如果存在至少一个clint在收听, 那么这个消息就不会被随意抛弃: 这句话的意思是, 当消息过多, 而client的消化能力比较低的话, 未发送的消息会缓存在msg里.
- 在ZMQ大版本号在3以上的版本里, 当msg与client的速度不匹配时. 若使用的传输层协议是
tcp
或ipc
这种面向连接的协议, 则堆积的消息缓存在里, 当使用epgm
这种协议时, 堆积的消息缓存了client里. 在ZMQ 大版本号为2的版本中, 所有情况下, 消息都将堆积在clinet里
xsub / xpub
在 ZeroMQ 中,xsub
和 xpub
是高级消息传递模式,它们分别是 sub
(订阅者)和 pub
(发布者)模式的扩展。
- xsub (Extended Subscriber):
xsub
是一个扩展的订阅者套接字,它支持传统的订阅模式,同时还支持发送和接收来自其他订阅者的订阅信息。这意味着 xsub
套接字不仅可以接收发布者发布的消息,还可以接收其他订阅者订阅的主题。
- xpub (Extended Publisher):
xpub
是一个扩展的发布者套接字,它允许发送者不仅发送消息,还可以发送订阅信息。这允许 xpub
套接字在接收到订阅信息时将其广播给所有连接的订阅者。
xsub/xpub 与 sub/pub 的区别
- 消息流控制:
sub
和 pub
套接字只支持单向的消息流。sub
套接字接收来自 pub
的消息,而 pub
套接字只发送消息,不关心订阅者的存在。
xsub
和 xpub
套接字支持双向消息流。xsub
可以接收来自 xpub
的消息和订阅信息,而 xpub
可以发送消息和广播订阅信息。
- 订阅信息的传递:
- 在
sub
/pub
模式下,订阅者通过发送订阅信息来订阅特定主题,但订阅信息不会传递给其他订阅者。
- 在
xsub
/xpub
模式下,订阅信息会被广播给所有连接到 xpub
的订阅者,这样每个订阅者都能知道其他订阅者订阅了哪些主题。
- 消息过滤:
sub
套接字通过发送订阅信息来指定它想要接收哪些消息。
xsub
套接字不仅可以接收消息,还可以接收其他订阅者的订阅信息,这使得它可以在内部构建订阅列表,并可能对消息进行更复杂的过滤。
- 使用场景:
sub
和 pub
套接字适用于简单的发布-订阅模式,其中不需要关心订阅者的具体信息。
xsub
和 xpub
套接字适用于更复杂的场景,比如需要了解订阅者状态或者需要构建消息代理的场景。
示例场景
一个常见的使用 xsub
和 xpub
的场景是构建一个 ZeroMQ 消息代理。在这种情况下,xpub
套接字可以连接到多个 sub
或 xsub
套接字,并广播消息和订阅信息。同样,xsub
套接字可以连接到多个 pub
或 xpub
套接字,接收消息和订阅信息,并将它们转发给下游的订阅者。
在有 XSUB 服务器时,客户端 pub 仅需 connect 即可进行通信。
Parallel Pipeline(管道模型)
由三部分组成,push进行数据推送,work进行数据缓存,pull进行数据竞争获取处理。区别于Publish-Subscribe存在一个数据缓存和处理负载。
当连接被断开,数据不会丢失,重连后数据继续发送到对端。

分治套路里有三个角色:
- Ventilator. 包工头, 向手下各个工程队分派任务. (一个)
- Worker. 工程队, 从包工头里接收任务, 干活. (多个)
- Sink. 甲方监理, 工程队干完活后, 向甲方监理报告. 所以工程队的活干完之后, 监理统一收集所有工程队的成果. (一个)
Python 实现
server.py
1 2 3 4 5 6 7 8 9 10 11 12
| import zmq import time
context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://*:5557")
while True: msg = input("请输入要发布的信息:").strip() socket.send(msg.encode('utf-8')) print("已发送") time.sleep(1)
|
worker.py
1 2 3 4 5 6 7 8 9 10 11
| import zmq context = zmq.Context() receive = context.socket(zmq.PULL) receive.connect('tcp://127.0.0.1:5557') sender = context.socket(zmq.PUSH) sender.connect('tcp://127.0.0.1:5558')
while True: data = receive.recv() print("正在转发...") sender.send(data)
|
client.py
1 2 3 4 5 6 7 8
| import zmq context = zmq.Context() socket = context.socket(zmq.PULL) socket.bind("tcp://*:5558")
while True: response = socket.recv().decode('utf-8') print("response: %s" % response)
|
C++ 实现
包工头代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| #include <zmq.h> #include <stdio.h> #include <time.h> #include "zmq_helper.h"
int main(void) { void * context = zmq_ctx_new(); void * socket_to_sink = zmq_socket(context, ZMQ_PUSH); void * socket_to_worker = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket_to_sink, "tcp://localhost:5558"); zmq_bind(socket_to_worker, "tcp://*:5557");
printf("Press Enter when all workers get ready:"); getchar(); printf("Sending tasks to workers...\n");
s_send(socket_to_sink, "Get ur ass up");
srandom((unsigned)time(NULL));
int total_ms = 0; for(int i = 0; i < 100; ++i) { int workload = randof(100) + 1; total_ms += workload; char string[10]; snprintf(string, sizeof(string), "%d", workload); s_send(socket_to_worker, string); }
printf("Total expected cost: %d ms\n", total_ms);
zmq_close(socket_to_sink); zmq_close(socket_to_worker); zmq_ctx_destroy(context);
return 0; }
|
工程队代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| #include <zmq.h> #include <stdio.h> #include "zmq_helper.h"
int main(void) { void * context = zmq_ctx_new(); void * socket_to_ventilator = zmq_socket(context, ZMQ_PULL); void * socket_to_sink = zmq_socket(context, ZMQ_PUSH); zmq_connect(socket_to_ventilator, "tcp://localhost:5557"); zmq_connect(socket_to_sink, "tcp://localhost:5558");
while(1) { char * msg = s_recv(socket_to_ventilator); printf("Received msg: %s\n", msg); fflush(stdout); s_sleep(atoi(msg)); free(msg); s_send(socket_to_sink, "DONE"); }
zmq_close(socket_to_ventilator); zmq_close(socket_to_sink); zmq_ctx_destroy(context);
return 0; }
|
监理代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| #include <zmq.h> #include <stdio.h> #include "zmq_helper.h"
int main(void) { void * context = zmq_ctx_new(); void * socket_to_worker_and_ventilator = zmq_socket(context, ZMQ_PULL); zmq_bind(socket_to_worker_and_ventilator, "tcp://*:5558");
char * msg = s_recv(socket_to_worker_and_ventilator); printf("Received msg: %s", msg); free(msg);
int64_t start_time = s_clock();
for(int i = 0; i < 100; ++i) { char * msg = s_recv(socket_to_worker_and_ventilator); free(msg);
if(i / 10 * 10 == i) printf(":"); else printf("."); fflush(stdout); }
printf("Total elapsed time: %d ms]\n", (int)(s_clock() - start_time));
zmq_close(socket_to_worker_and_ventilator); zmq_ctx_destroy(context);
return 0; }
|
这个示例程序的逻辑流程是这样的:
- 包工头向两个角色发送消息: 向工程队发送共计100个任务, 向监理发送消息, 通知监理开始干活
- 工程队接收来自包工头的消息, 并按消息里的数值, 睡眠指定毫秒. 每个任务结束后都通知监理.
- 监理先是接收来自包工头的消息, 开始计时. 然后统计来自工程队的消息, 当收集到100个任务完成的消息后, 计算实际耗时.
包工头里输出的预计耗时是100个任务的共计耗时, 在监理那里统计的实际耗时则是由多个工程队并行处理100个任务实际的耗时.
这里个例子中需要注意的点有:
- 这个例子中使用了
ZMQ_PULL
与ZMQ_PUSH
两种socket. 分别供消息分发方与消息接收方使用. 看起来略微有点类似于发布-订阅套路, 具体之间的区别后续章节会讲到.
- 工程队上接包工头, 下接监理. 在任务执行过程中, 你可以随意的增加工程队的数量.
- 我们通过让包工头通知监理, 以及手动输入enter来启动任务分发的方式, 手动同步了工程队/包工头/监理.
PUSH/PULL
模式虽然和PUB/SUB
不一样, 不会丢失消息. 但如果不手动同步的话, 最先建立连接的工程队将几乎把所有任务都接收到手, 导致后续完成连接的工程队拿不到任务, 任务分配不平衡.
- 包工头分派任务使用的是轮流/平均分配的方式.这是一种简单的负载均衡
- 监理接收多个工程队的消息, 使用的是公平队列策略.
参考资料
文章链接:
https://www.zywvvd.com/notes/tools/zmq/zmq/