Rabbitmq 是一个功能很强大消息队列系统,使用起来可能不像某些 push 、pop 类型的队列简单(比如 redis 的list),Rabbitmq 支持消息的订阅发布模式,方便大型系统各个服务组件之间解耦和通信。我们首先要了解一些基本概念。
12345 | Connection:即连接, 与你日常理解的连接没有什么不同,比如 redis的连接,mysql的连接 Channel:即通道, 可以理解为一个连接中的子通道,想象一条高速公路,可能是 4车道的,也可能是 8 车道的,这些车道就是 Channel。 Exchange : 交换机,如果你知道交换机设备的话,它的原理跟交换机是基本一样的。 routingkey : 用来绑定交换机和队列的一个字符串。发送消息时需要指定 routingKey, 绑定了此 routingKey的队列将接收到此消息。 queue队列: 消息经过交换机,最终发送到队列中。 |
发布消息:
新建文件send.php
1234567891011121314151617181920212223242526272829303132333435 | <?php $conn = [ // Rabbitmq 服务地址 'host' => '127.0.0.1' , // Rabbitmq 服务端口 'port' => '5672' , // Rabbitmq 帐号 'login' => 'guest' , // Rabbitmq 密码 'password' => 'guest' , ]; // 创建连接和channel $conn = new AMQPConnection($conn); if (!$conn->connect()) { die( "Cannot connect to the broker!\n" ); } $channel = new AMQPChannel($conn); // 用来绑定交换机和队列 $routingKey = 'key_1' ; $ex = new AMQPExchange($channel); // 交换机名称 $exchangeName = 'ex1' ; $ex->setName($exchangeName); // 设置交换机类型 $ex->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机是否持久化消息 $ex->setFlags(AMQP_DURABLE); for ($i=0; $i<5; ++$i){ echo "Send Message:" .$ex->publish( date ( 'H:i:s' ). "用户" .$i. "注册" , $routingKey ). "\n" ; } |
消费消息:
新建文件receive.php
12345678910111213141516171819202122232425262728293031323334353637383940414243444546 | <?php $conn = [ // Rabbitmq 服务地址 'host' => '127.0.0.1' , // Rabbitmq 服务端口 'port' => '5672' , // Rabbitmq 帐号 'login' => 'guest' , // Rabbitmq 密码 'password' => 'guest' , ]; // 创建连接和channel $conn = new AMQPConnection($conn); if (!$conn->connect()) { die( "Cannot connect to the broker!\n" ); } $channel = new AMQPChannel($conn); $exchangeName = 'ex1' ; // 创建交换机 $ex = new AMQPExchange($channel); $ex->setName($exchangeName); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct 类型 $ex->setFlags(AMQP_DURABLE); // 持久化 // 创建队列 $queueName = 'queue1' ; $q = new AMQPQueue($channel); $q->setName($queueName); $q->setFlags(AMQP_DURABLE); $q->declareQueue(); // 用于绑定队列和交换机,跟 send.php 中的一致。 $routingKey = 'key_1' ; $q->bind($exchangeName, $routingKey); // 接收消息 $q->consume( function ($envelope, $queue) { $msg = $envelope->getBody(); echo $msg. "\n" ; // 处理消息 }, AMQP_AUTOACK); $conn->disconnect(); |
运行代码:
启动rabbitmq服务。
一开始队列是不存在的,我们需要先启动 consume.php 来初始化队列,打开命令行页面,运行
1 | php receive.php |
启动后 , php receive.php 将阻塞监听队列消息。
然后打开新的命令行窗口运行:
1 | php send.php |
然后在新 receive.php 所在的终端窗口将看到接收到的消息。