RabbitMQ消息队列-交换机Topic模式

2018-12-11 21:30:00
赵勤松
原创
1798
摘要:对于复杂的消息发送机制,我们一般使用RabbitMQ内转的交换机模式进行消息的分发,相对于一对一,一对多模式下对单个消息队列的处理模式,通常交换机同时管理着多个消息队列,下面我们将详细剖析交换机方式中的Topic类型

在Topic类型的路由控制下,交换机将每一个消息,会发给每一个符合路由键匹配规则的消息队列,下面我们以ThinkPHP的代码来详细说明这一过程。


创建一个控制器类MQExchangeTopic,其对应的类文件MQExchangeTopic.php中的代码如下


<?php
/**
 * Created by PhpStorm.
 * User: zhaoqinsong
 * Date: 2018/12/11
 * Time: 9:44 AM
 */
namespace app\msq\controller;
use think\App;
use think\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class MQExchangeTopic extends Controller
{
    var $msq_connect;
    var $msq_channel;
    public function __construct(App $app)
    {
        parent::__construct($app);
        $this->msq_connect = new AMQPStreamConnection("localhost", 5672, "phpmsq", "pxzline@2018");
        $this->msq_channel = $this->msq_connect->channel();
        $this->msq_channel->exchange_declare("exchange2", "topic");
    }
    public function index()
    {
        echo "rabbitmq of test";
    }
    public function mq_send()
    {
        $msg = new AMQPMessage("Hello World of number.one");
        $this->msq_channel->basic_publish($msg, "exchange2", "number.one");
        $msg = new AMQPMessage("Hello World of number.two");
        $this->msq_channel->basic_publish($msg, "exchange2", "number.two");
    }
    public function mq_recv1()
    {
        $this->msq_channel->queue_declare("simple1", false, true, false, false);    //  生成一个持久化消息队列
        $this->msq_channel->queue_bind("simple1", "exchange2", "number.#");
        $this->msq_channel->basic_qos(null,1, null);
        //  处理消息队列simple中的消息
        $callback = function($msg) {
            $body = $msg->body;
            echo "recv:$body\n";
        };
        $this->msq_channel->basic_consume("simple1", "", false, true, false, false, $callback);
        while ($this->msq_channel->callbacks) {
            $this->msq_channel->wait();
        }
    }
    public function mq_recv2()
    {
        $this->msq_channel->queue_declare("simple2", false, true, false, false);    //  生成一个持久化消息队列
        $this->msq_channel->queue_bind("simple2", "exchange2", "#.one");
        $this->msq_channel->basic_qos(null,1, null);
        //  处理消息队列simple中的消息
        $callback = function($msg) {
            $body = $msg->body;
            echo "recv:$body\n";
        };
        $this->msq_channel->basic_consume("simple2", "", false, true, false, false, $callback);
        while ($this->msq_channel->callbacks) {
            $this->msq_channel->wait();
        }
    }
    public function mq_recv3()
    {
        $this->msq_channel->queue_declare("simple2", false, true, false, false);    //  生成一个持久化消息队列
        $this->msq_channel->queue_bind("simple2", "exchange2", "#.two");
        $this->msq_channel->basic_qos(null,1, null);
        //  处理消息队列simple中的消息
        $callback = function($msg) {
            $body = $msg->body;
            echo "recv:$body\n";
        };
        $this->msq_channel->basic_consume("simple2", "", false, true, false, false, $callback);
        while ($this->msq_channel->callbacks) {
            $this->msq_channel->wait();
        }
    }
    public function __destruct()
    {
        $this->msq_channel->close();
        $this->msq_connect->close();
    }
}


指定路由,设定/topic/send指向此类的mq_send方法,/topic/recv1指向mq_recv1方法,/topic/recv2指向mq_recv2方法,/topic/recv3指向mq_recv3。

在命令行下,先创建三个服务器连接,用于执行接收消息的程序


# php public/index.php /topic/recv1(接收路由键为number.#)
# php public/index.php /topic/recv2(接收路由键为#.one)
# php public/index.php /topic/recv3(接收路由键为#.two)
然后再创建一个服务器连接,用于执行发送消息的程序


# php public/index.php /topic/send
这时,可以在三个消息接收端,分别看到以下信息


# php public/index.php /topic/recv1
recv:Hello World of number.one
recv:Hello World of number.two
# php public/index.php /topic/recv2
recv:Hello World of number.one
# php public/index.php /topic/recv3
recv:Hello World of number.two
可以看到,消息队列1接收了路由键为number.one和number.two的消息,消息队列2接收了路由键为number.one的消息,消息队列3接收了路由键为number.two的消息,通过匹配不同的接收规则,消息可以被指定到相应消息队列进行处理。


文章分类
联系我们
联系人: powereye
Email: zqs@someapp.cn
QQ: 1134846
微信: powereye