RabbitMQ消息队列-交换机Topic模式
- 2018-12-11 21:30:00
- 赵勤松 原创
- 1798
在Topic类型的路由控制下,交换机将每一个消息,会发给每一个符合路由键匹配规则的消息队列,下面我们以ThinkPHP的代码来详细说明这一过程。
创建一个控制器类MQExchangeTopic,其对应的类文件MQExchangeTopic.php中的代码如下
<?php /** * Created by PhpStorm. * User: zhaoqinsong * Date: 2018/12/11 * Time: 9:44 AM */ namespace app\msq\controller; use think\App; use think\Controller; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class MQExchangeTopic extends Controller { var $msq_connect; var $msq_channel; public function __construct(App $app) { parent::__construct($app); $this->msq_connect = new AMQPStreamConnection("localhost", 5672, "phpmsq", "pxzline@2018"); $this->msq_channel = $this->msq_connect->channel(); $this->msq_channel->exchange_declare("exchange2", "topic"); } public function index() { echo "rabbitmq of test"; } public function mq_send() { $msg = new AMQPMessage("Hello World of number.one"); $this->msq_channel->basic_publish($msg, "exchange2", "number.one"); $msg = new AMQPMessage("Hello World of number.two"); $this->msq_channel->basic_publish($msg, "exchange2", "number.two"); } public function mq_recv1() { $this->msq_channel->queue_declare("simple1", false, true, false, false); // 生成一个持久化消息队列 $this->msq_channel->queue_bind("simple1", "exchange2", "number.#"); $this->msq_channel->basic_qos(null,1, null); // 处理消息队列simple中的消息 $callback = function($msg) { $body = $msg->body; echo "recv:$body\n"; }; $this->msq_channel->basic_consume("simple1", "", false, true, false, false, $callback); while ($this->msq_channel->callbacks) { $this->msq_channel->wait(); } } public function mq_recv2() { $this->msq_channel->queue_declare("simple2", false, true, false, false); // 生成一个持久化消息队列 $this->msq_channel->queue_bind("simple2", "exchange2", "#.one"); $this->msq_channel->basic_qos(null,1, null); // 处理消息队列simple中的消息 $callback = function($msg) { $body = $msg->body; echo "recv:$body\n"; }; $this->msq_channel->basic_consume("simple2", "", false, true, false, false, $callback); while ($this->msq_channel->callbacks) { $this->msq_channel->wait(); } } public function mq_recv3() { $this->msq_channel->queue_declare("simple2", false, true, false, false); // 生成一个持久化消息队列 $this->msq_channel->queue_bind("simple2", "exchange2", "#.two"); $this->msq_channel->basic_qos(null,1, null); // 处理消息队列simple中的消息 $callback = function($msg) { $body = $msg->body; echo "recv:$body\n"; }; $this->msq_channel->basic_consume("simple2", "", false, true, false, false, $callback); while ($this->msq_channel->callbacks) { $this->msq_channel->wait(); } } public function __destruct() { $this->msq_channel->close(); $this->msq_connect->close(); } }
指定路由,设定/topic/send指向此类的mq_send方法,/topic/recv1指向mq_recv1方法,/topic/recv2指向mq_recv2方法,/topic/recv3指向mq_recv3。
在命令行下,先创建三个服务器连接,用于执行接收消息的程序
# php public/index.php /topic/recv1(接收路由键为number.#)
# php public/index.php /topic/recv2(接收路由键为#.one)
# php public/index.php /topic/recv3(接收路由键为#.two)然后再创建一个服务器连接,用于执行发送消息的程序
# php public/index.php /topic/send这时,可以在三个消息接收端,分别看到以下信息
# php public/index.php /topic/recv1 recv:Hello World of number.one recv:Hello World of number.two
# php public/index.php /topic/recv2 recv:Hello World of number.one
# php public/index.php /topic/recv3 recv:Hello World of number.two可以看到,消息队列1接收了路由键为number.one和number.two的消息,消息队列2接收了路由键为number.one的消息,消息队列3接收了路由键为number.two的消息,通过匹配不同的接收规则,消息可以被指定到相应消息队列进行处理。
文章分类
联系我们
联系人: | powereye |
---|---|
Email: | zqs@someapp.cn |
QQ: | 1134846 |
微信: | powereye |