RabbitMQ消息队列-一对多模式
- 2018-12-10 22:00:00
- 赵勤松 原创
- 2828
一对多模式,用图表示如下
一个生产者向消息队列中发送消息,多个消费者同时从消息队列中读取消息,在这个模式下,我们优先考虑的,是解决各个消费者如何读取消息的机制。
下面我们以ThinkPHP的代码来展示一下处理过程。
创建一个控制器MQSimple2,其对应的类文件MQSimple2.php中的代码如下
<?php
/**
* Created by PhpStorm.
* User: zhaoqinsong
* Date: 2018/12/10
* Time: 17:02 PM
*/
namespace app\msq\controller;
use think\App;
use think\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class MQSimple2 extends Controller
{
var $msq_connect;
var $msq_channel;
public function __construct(App $app)
{
parent::__construct($app);
$this->msq_connect = new AMQPStreamConnection("localhost", 5672, "phpmsq", "123456");
$this->msq_channel = $this->msq_connect->channel();
$this->msq_channel->queue_declare("simple", false, true, false, false); // 生成一个持久化消息队列
}
public function index()
{
echo "rabbitmq of test";
}
public function mq_send() {
// 向消息队列simple发送100个消息
for($i=0; $i<20; $i++)
{
$msg = new AMQPMessage("Hello World No.$i");
$this->msq_channel->basic_publish($msg, "", "simple");
}
}
public function mq_recv1() {
// 处理消息队列simple中的消息(时间较长)
$callback = function($msg) {
$body = $msg->body;
echo "recv:$body\n";
sleep(5);
};
$this->msq_channel->basic_consume("simple", "", false, true, false, false, $callback);
while ($this->msq_channel->callbacks) {
$this->msq_channel->wait();
}
}
public function mq_recv2() {
// 处理消息队列simple中的消息(时间较短)
$callback = function($msg) {
$body = $msg->body;
echo "recv:$body\n";
sleep(2);
};
$this->msq_channel->basic_consume("simple", "", false, true, false, false, $callback);
while ($this->msq_channel->callbacks) {
$this->msq_channel->wait();
}
}
public function __destruct()
{
$this->msq_channel->close();
$this->msq_connect->close();
}
}
指定路由,设定/simple2/send指向此类的mq_send方法,/simple2/recv1指向mq_recv1方法,/simple2/recv2(处理时间5秒))指向mq_recv2方法(处理时间2秒)。
在命令行下,先创建两个服务器连接,分别执行消费者1和消费者2命令(需要先切换到ThinkPHP框架的根目录下)
# php public/index.php /simple2/recv1
# php public/index.php /simple2/recv2然后再创建一个服务器连接,执行生产者命令,产生20个待处理消息(也需要切换么ThinkPHP框架的根目录下)
# php public/index.php /simple2/send这时候,通过查看消费者1和消费者2连接,可以看到两者在不断地处理消息
消费者1
recv:Hello World No.0 recv:Hello World No.2 recv:Hello World No.4 recv:Hello World No.6 recv:Hello World No.8 recv:Hello World No.10 recv:Hello World No.12 recv:Hello World No.14 recv:Hello World No.16 recv:Hello World No.18消费者2
recv:Hello World No.1 recv:Hello World No.3 recv:Hello World No.5 recv:Hello World No.7 recv:Hello World No.9 recv:Hello World No.11 recv:Hello World No.13 recv:Hello World No.15 recv:Hello World No.17 recv:Hello World No.19我们可以看到,虽然消费者1的处理时间远远大于消费者2,但它还是收到了10条消息并进行处理,也就是说,消息队列将收到的消息,平均分配给了两个消费者,而且是异步进行分配的,它没有考虑各个消费者端实际的处理性能,只是按个数进行简单地平均分配(将收到的消息分配给下一个)。
实际应用中,我们必须考虑各个消费者端的处理速度,因此,在这基础上,我们进行一下优化,采取basicQos以及手动应答消息完成,来实现能者多劳,按性能分配消息的机制。
修改mq_recv1和mq_recv2,修改后的相应函数代码如下
public function mq_recv1() {
// 处理消息队列simple中的消息(时间较长)
$callback = function($msg) {
$body = $msg->body;
echo "recv:$body\n";
sleep(5);
$channel = $msg->delivery_info['channel'];
$channel->basic_ack($msg->delivery_info['delivery_tag']);
};
$this->msq_channel->basic_qos(null, 1, null);
$this->msq_channel->basic_consume("simple", "", false, false, false, false, $callback);
while ($this->msq_channel->callbacks) {
$this->msq_channel->wait();
}
}
public function mq_recv2() {
// 处理消息队列simple中的消息(时间较短)
$callback = function($msg) {
$body = $msg->body;
echo "recv:$body\n";
sleep(2);
$channel = $msg->delivery_info['channel'];
$channel->basic_ack($msg->delivery_info['delivery_tag']);
};
$this->msq_channel->basic_qos(null, 1, null);
$this->msq_channel->basic_consume("simple", "", false, false, false, false, $callback);
while ($this->msq_channel->callbacks) {
$this->msq_channel->wait();
}
}
修改后的代码,限制每次只接受1条消息,在消息处理完成后,发送消息处理完毕的应答给消息队列,消息列表收到应答后再次发送信息给该连接,因此消息被有效地分给了各个消费者端,效率也大大增加。
| 联系人: | powereye |
|---|---|
| Email: | zqs@someapp.cn |
| QQ: | 1134846 |
| 微信: | powereye |
