RabbitMQ消息队列-一对多模式
- 2018-12-10 22:00:00
- 赵勤松 原创
- 2620
一对多模式,用图表示如下
一个生产者向消息队列中发送消息,多个消费者同时从消息队列中读取消息,在这个模式下,我们优先考虑的,是解决各个消费者如何读取消息的机制。
下面我们以ThinkPHP的代码来展示一下处理过程。
创建一个控制器MQSimple2,其对应的类文件MQSimple2.php中的代码如下
<?php /** * Created by PhpStorm. * User: zhaoqinsong * Date: 2018/12/10 * Time: 17:02 PM */ namespace app\msq\controller; use think\App; use think\Controller; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class MQSimple2 extends Controller { var $msq_connect; var $msq_channel; public function __construct(App $app) { parent::__construct($app); $this->msq_connect = new AMQPStreamConnection("localhost", 5672, "phpmsq", "123456"); $this->msq_channel = $this->msq_connect->channel(); $this->msq_channel->queue_declare("simple", false, true, false, false); // 生成一个持久化消息队列 } public function index() { echo "rabbitmq of test"; } public function mq_send() { // 向消息队列simple发送100个消息 for($i=0; $i<20; $i++) { $msg = new AMQPMessage("Hello World No.$i"); $this->msq_channel->basic_publish($msg, "", "simple"); } } public function mq_recv1() { // 处理消息队列simple中的消息(时间较长) $callback = function($msg) { $body = $msg->body; echo "recv:$body\n"; sleep(5); }; $this->msq_channel->basic_consume("simple", "", false, true, false, false, $callback); while ($this->msq_channel->callbacks) { $this->msq_channel->wait(); } } public function mq_recv2() { // 处理消息队列simple中的消息(时间较短) $callback = function($msg) { $body = $msg->body; echo "recv:$body\n"; sleep(2); }; $this->msq_channel->basic_consume("simple", "", false, true, false, false, $callback); while ($this->msq_channel->callbacks) { $this->msq_channel->wait(); } } public function __destruct() { $this->msq_channel->close(); $this->msq_connect->close(); } }
指定路由,设定/simple2/send指向此类的mq_send方法,/simple2/recv1指向mq_recv1方法,/simple2/recv2(处理时间5秒))指向mq_recv2方法(处理时间2秒)。
在命令行下,先创建两个服务器连接,分别执行消费者1和消费者2命令(需要先切换到ThinkPHP框架的根目录下)
# php public/index.php /simple2/recv1
# php public/index.php /simple2/recv2然后再创建一个服务器连接,执行生产者命令,产生20个待处理消息(也需要切换么ThinkPHP框架的根目录下)
# php public/index.php /simple2/send这时候,通过查看消费者1和消费者2连接,可以看到两者在不断地处理消息
消费者1
recv:Hello World No.0 recv:Hello World No.2 recv:Hello World No.4 recv:Hello World No.6 recv:Hello World No.8 recv:Hello World No.10 recv:Hello World No.12 recv:Hello World No.14 recv:Hello World No.16 recv:Hello World No.18消费者2
recv:Hello World No.1 recv:Hello World No.3 recv:Hello World No.5 recv:Hello World No.7 recv:Hello World No.9 recv:Hello World No.11 recv:Hello World No.13 recv:Hello World No.15 recv:Hello World No.17 recv:Hello World No.19我们可以看到,虽然消费者1的处理时间远远大于消费者2,但它还是收到了10条消息并进行处理,也就是说,消息队列将收到的消息,平均分配给了两个消费者,而且是异步进行分配的,它没有考虑各个消费者端实际的处理性能,只是按个数进行简单地平均分配(将收到的消息分配给下一个)。
实际应用中,我们必须考虑各个消费者端的处理速度,因此,在这基础上,我们进行一下优化,采取basicQos以及手动应答消息完成,来实现能者多劳,按性能分配消息的机制。
修改mq_recv1和mq_recv2,修改后的相应函数代码如下
public function mq_recv1() { // 处理消息队列simple中的消息(时间较长) $callback = function($msg) { $body = $msg->body; echo "recv:$body\n"; sleep(5); $channel = $msg->delivery_info['channel']; $channel->basic_ack($msg->delivery_info['delivery_tag']); }; $this->msq_channel->basic_qos(null, 1, null); $this->msq_channel->basic_consume("simple", "", false, false, false, false, $callback); while ($this->msq_channel->callbacks) { $this->msq_channel->wait(); } } public function mq_recv2() { // 处理消息队列simple中的消息(时间较短) $callback = function($msg) { $body = $msg->body; echo "recv:$body\n"; sleep(2); $channel = $msg->delivery_info['channel']; $channel->basic_ack($msg->delivery_info['delivery_tag']); }; $this->msq_channel->basic_qos(null, 1, null); $this->msq_channel->basic_consume("simple", "", false, false, false, false, $callback); while ($this->msq_channel->callbacks) { $this->msq_channel->wait(); } }
修改后的代码,限制每次只接受1条消息,在消息处理完成后,发送消息处理完毕的应答给消息队列,消息列表收到应答后再次发送信息给该连接,因此消息被有效地分给了各个消费者端,效率也大大增加。
联系人: | powereye |
---|---|
Email: | zqs@someapp.cn |
QQ: | 1134846 |
微信: | powereye |