RabbitMQ消息队列-一对多模式

2018-12-10 22:00:00
赵勤松
原创
2620
摘要:一对多模式,顾名思义,就是指一个消息队列,拥有一个生产者,二个或二个以上的消费者,如何有效分配消费者之间的消息,是我们关注的重点。

一对多模式,用图表示如下

一个生产者向消息队列中发送消息,多个消费者同时从消息队列中读取消息,在这个模式下,我们优先考虑的,是解决各个消费者如何读取消息的机制。


下面我们以ThinkPHP的代码来展示一下处理过程。


创建一个控制器MQSimple2,其对应的类文件MQSimple2.php中的代码如下


<?php
/**
 * Created by PhpStorm.
 * User: zhaoqinsong
 * Date: 2018/12/10
 * Time: 17:02 PM
 */
namespace app\msq\controller;
use think\App;
use think\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class MQSimple2 extends Controller
{
    var $msq_connect;
    var $msq_channel;
    public function __construct(App $app)
    {
        parent::__construct($app);
        $this->msq_connect = new AMQPStreamConnection("localhost", 5672, "phpmsq", "123456");
        $this->msq_channel = $this->msq_connect->channel();
        $this->msq_channel->queue_declare("simple", false, true, false, false);    //  生成一个持久化消息队列
    }
    public function index()
    {
        echo "rabbitmq of test";
    }
    public function mq_send() {
        //  向消息队列simple发送100个消息
        for($i=0; $i<20; $i++)
        {
            $msg = new AMQPMessage("Hello World No.$i");
            $this->msq_channel->basic_publish($msg, "", "simple");
        }
    }
    public function mq_recv1() {
        //  处理消息队列simple中的消息(时间较长)
        $callback = function($msg) {
            $body = $msg->body;
            echo "recv:$body\n";
            sleep(5);
        };
        $this->msq_channel->basic_consume("simple", "", false, true, false, false, $callback);
        while ($this->msq_channel->callbacks) {
            $this->msq_channel->wait();
        }
    }
    public function mq_recv2() {
        //  处理消息队列simple中的消息(时间较短)
        $callback = function($msg) {
            $body = $msg->body;
            echo "recv:$body\n";
            sleep(2);
        };
        $this->msq_channel->basic_consume("simple", "", false, true, false, false, $callback);
        while ($this->msq_channel->callbacks) {
            $this->msq_channel->wait();
        }
    }
    public function __destruct()
    {
        $this->msq_channel->close();
        $this->msq_connect->close();
    }
}

指定路由,设定/simple2/send指向此类的mq_send方法,/simple2/recv1指向mq_recv1方法,/simple2/recv2(处理时间5秒))指向mq_recv2方法(处理时间2秒)。


在命令行下,先创建两个服务器连接,分别执行消费者1和消费者2命令(需要先切换到ThinkPHP框架的根目录下)


# php public/index.php /simple2/recv1
# php public/index.php /simple2/recv2
然后再创建一个服务器连接,执行生产者命令,产生20个待处理消息(也需要切换么ThinkPHP框架的根目录下)


# php public/index.php /simple2/send
这时候,通过查看消费者1和消费者2连接,可以看到两者在不断地处理消息


消费者1


recv:Hello World No.0
recv:Hello World No.2
recv:Hello World No.4
recv:Hello World No.6
recv:Hello World No.8
recv:Hello World No.10
recv:Hello World No.12
recv:Hello World No.14
recv:Hello World No.16
recv:Hello World No.18
消费者2
recv:Hello World No.1
recv:Hello World No.3
recv:Hello World No.5
recv:Hello World No.7
recv:Hello World No.9
recv:Hello World No.11
recv:Hello World No.13
recv:Hello World No.15
recv:Hello World No.17
recv:Hello World No.19
我们可以看到,虽然消费者1的处理时间远远大于消费者2,但它还是收到了10条消息并进行处理,也就是说,消息队列将收到的消息,平均分配给了两个消费者,而且是异步进行分配的,它没有考虑各个消费者端实际的处理性能,只是按个数进行简单地平均分配(将收到的消息分配给下一个)。


实际应用中,我们必须考虑各个消费者端的处理速度,因此,在这基础上,我们进行一下优化,采取basicQos以及手动应答消息完成,来实现能者多劳,按性能分配消息的机制。

修改mq_recv1和mq_recv2,修改后的相应函数代码如下


    public function mq_recv1() {
        //  处理消息队列simple中的消息(时间较长)
        $callback = function($msg) {
            $body = $msg->body;
            echo "recv:$body\n";
            sleep(5);
            $channel = $msg->delivery_info['channel'];
            $channel->basic_ack($msg->delivery_info['delivery_tag']);
        };
        $this->msq_channel->basic_qos(null, 1, null);
        $this->msq_channel->basic_consume("simple", "", false, false, false, false, $callback);
        while ($this->msq_channel->callbacks) {
            $this->msq_channel->wait();
        }
    }
    public function mq_recv2() {
        //  处理消息队列simple中的消息(时间较短)
        $callback = function($msg) {
            $body = $msg->body;
            echo "recv:$body\n";
            sleep(2);
            $channel = $msg->delivery_info['channel'];
            $channel->basic_ack($msg->delivery_info['delivery_tag']);
        };
        $this->msq_channel->basic_qos(null, 1, null);
        $this->msq_channel->basic_consume("simple", "", false, false, false, false, $callback);
        while ($this->msq_channel->callbacks) {
            $this->msq_channel->wait();
        }
    }


修改后的代码,限制每次只接受1条消息,在消息处理完成后,发送消息处理完毕的应答给消息队列,消息列表收到应答后再次发送信息给该连接,因此消息被有效地分给了各个消费者端,效率也大大增加。


文章分类
联系我们
联系人: powereye
Email: zqs@someapp.cn
QQ: 1134846
微信: powereye