为什么用Rabbitmq instead of python queue ?
是因为python queue 不能跨进程
队列的作用:
1. 存储消息、数据
2. 保证消息顺序
3. 保证数据的交付
1 | # 斐波那契数列 |
基本使用实现
发送端
1 | import pika |
接收端
1 | import pika |
工作队列
消息不丢失
生产者
1 | for i in range(5): |
消费者
no_ack=False
消费者退出不消息不丢失
1 | # 修改回调函数 |
消息持久化
- 消息持久化存储,
虽然消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:
1 | # 原队列 |
但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列
1 | # 重新定义一个队列 |
在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
1 | channel.basic_publish(exchange='', |
公平调度
prefetch_count = 1
虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。
1 | channel.basic_qos(prefetch_count=1) |
- new_task.py完整代码
发送者/生产者
1 | import pika |
- worker.py完整代码
接受者/消费者
1 | import pika |
广播
广播交换机的工作原理:消息发送端先将消息发送给交换机,交换机再将消息发送到绑定的消息队列,而后每个接收端都能从各自的消息队列里接收到信息。
消费者/接收端receive.py代码分析
和最早的receive.py相比,主要是做了两个改动:
- 定义交换机
- 不使用hello队列了,随机生成一个临时队列,并绑定到交换机上
1 | import pika |
执行rabbitmqctl list_queues
1 | task_queue 0 |
生产者/发送端send.py代码分析
和最早的send.py相比,也只做了两个改动:
- 定义交换机
- 不是将消息发送到hello队列,而是发送到交换机
1 | import pika |
exchange如果为空,表示是使用匿名的交换机,在上面交换机信息的图片中可以看到有amq.*这样的交换机,就是系统默认的交换机了。routing_key在使用匿名交换机的时候才需要指定,表示发送到哪个队列的意思。第一篇的例子演示了这个功能。
打开另外一个终端,执行send.py,可以观察到receive.py接收到了消息。如果有多个终端执行receive.py,那么每个receive.py都会接收到消息。
组播/路由
生产者/send.py代码分析
和广播相比,改动点主要在两个方面:
- 设定交换机的类型(type)为direct。上一篇是设置为fanout,表示广播的意思,会将消息发送到所有接收端,这里设置为direct表示要根据设定的路由键来发送消息。
- 发送信息时设置发送的路由键。
1 | import pika |
消费者/receive.py代码分析
和广播相比,改动点主要在三个方面:
- 设定交换机的类型(type)为direct。
- 增加命令行获取参数功能,参数即为路由键。
- 将队列绑定到交换机上时,设定路由键。
1 | import pika, sys |
打开两个终端,一个运行代码python receive.py info warning,表示只接收info和warning的消息。另外一个终端运行send.py,可以观察到接收终端只接收到了info和warning的消息。如果打开多个终端运行receive.py,并传入不同的路由键参数,可以看到更明显的效果。
当接收端正在运行时,可以使用rabbitmqctl list_bindings来查看绑定情况。
按规则发送/正则
上面路由键/组播的功能,通过设置路由键,可以将消息发送到相应的队列,这里的路由键是要完全匹配,比如info消息的只能发到路由键为info的消息队列。
路由键模糊匹配,就是可以使用正则表达式,和常用的正则表示式不同,这里的话“#”表示所有、全部的意思;“*”只匹配到一个词。看完示例就能明白了。
send.py代码分析
因为要进行路由键模糊匹配,所以交换机的类型要设置为topic,设置为topic,就可以使用#,*的匹配符号了。
1 | import pika |
receive.py代码分析
类型要设定为topic就可以了。从命令行接收参数的功能稍微调整了一下,没有参数时报错退出。
1 | import pika, sys |
实验运行
打开多个终端,分别传入不同的规则,观察结果
如:
1 | python3 receive_topic.py "#" |
难点
1、发送信息时,如果不设置路由键,那么路由键设置为”*”的接收端是否能接收到消息?
发送信息时,如果不设置路由键,默认是表示广播出去,理论上所有接收端都可以收到消息,但是笔者试了下,路由键设置为”*”的接收端收不到任何消息。
只有发送消息时,设置路由键为一个词,路由键设置为”*”的接收端才能收到消息。在这里,每个词使用”.”符号分开的。
2、发送消息时,如果路由键设置为”..”,那么路由键设置为”#.*”的接收端是否能接收到消息?如果发送消息时,路由键设置为一个词呢?
两种情况,笔者都测试过了,可以的。
3、”a.*.#” 和”a.#”的区别
- “a.#”只要字符串开头的一个词是a就可以了,比如a、a.haha、a.haha.haha。而这样的词是不行的,如abs、abc、abc.haha。
- “a..#”必须要满足a.的字符串才可以,比如a.、a.haha、a.haha.haha。而这样的词是不行的,如a。
远程结果返回RPC
Remote Producre Call
处理方法描述:
发送端在发送信息前,产生一个接收消息的临时队列,该队列用来接收返回的结果。其实在这里接收端、发送端的概念已经比较模糊了,因为发送端也同样要接收消息,接收端同样也要发送消息,所以这里笔者使用另外的示例来演示这一过程。
compute.py代码分析
1 | import pika |
center.py代码分析
1 | import pika |
上面代码定义了接收返回数据的队列和处理方法,并且在发送请求的时候将该队列赋值给reply_to
,在计算节点代码中就是通过这个参数来获取返回队列的。
相互关联编号correlation id
correlation id
运行原理:
控制中心发送计算请求时设置correlation id,而后计算节点将计算结果,连同接收到的correlation id一起返回,这样控制中心就能通过correlation id来标识请求。其实correlation id也可以理解为请求的唯一标识码。
示例内容:
控制中心开启多个线程,每个线程都发起一次计算请求,通过correlation id,每个线程都能准确收到相应的计算结果。
compute.py代码分析
和上面相比,只需修改一个地方:
将计算结果发送回控制中心时,增加参数correlation_id的设定,该参数的值其实是从控制中心发送过来的,这里只是再次发送回去。代码如下:
1 | import pika |
center.py代码分析
控制中心代码稍微复杂些,其中比较关键的有三个地方:
- 使用python的uuid来产生唯一的correlation_id。
- 发送计算请求时,设定参数correlation_id。
- 定义一个字典来保存返回的数据,并且键值为相应线程产生的correlation_id。
1 | import pika, threading, uuid |