<?php /** * 消息生产者(即发送消息) */ // RabbitMQ服务器连接参数 $credentials = array( 'host' => 'localhost', 'port' => '5672', 'vhost' => '/', 'login' => 'admin', 'password' => '88888888', ); $exchange_name = 'manong.exchange'; // 交换机名称(生产者和消费者要使用相同的交换机名称) $routing_key = 'manong.routing_key'; // 路由键(生产者和消费者要使用相同的路由键) try { $connection = new AMQPConnection($credentials); // 连接RabbitMQ服务器 if ($connection->connect() === true) { $channel = new AMQPChannel($connection); // 建立网络信道 $exchange = new AMQPExchange($channel); // 创建交换机(在指定网络信道) $exchange->setName($exchange_name); // 设置交换机名称 $exchange->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型 $exchange->setFlags(AMQP_DURABLE); // 开启交换机持久化 $exchange->declareExchange(); // 设置消息内容 $content = array( '平天大圣·牛魔王', '覆海大圣·蛟魔王', '混天大圣·鹏魔王', '移山大圣·狮驼王', '通风大圣·猕猴王', '驱神大圣·禺狨王', '齐天大圣·美猴王', ); $message = array( 'time' => time(), 'content' => $content[array_rand($content)], ); $message = json_encode($message); // 发布消息 $publish = $exchange->publish($message, $routing_key); if ($publish === true) { echo '发布消息成功。' . PHP_EOL; } else { echo '发布消息失败。' . PHP_EOL; } $connection->disconnect(); // 断开连接 } else { echo '连接RabbitMQ服务器失败。' . PHP_EOL; } } catch (Exception $exception) { echo 'Exception message: ' . $exception->getMessage(); }
<?php /** * 消息消费者(即接收消息) * 重要提醒:1、当前PHP脚本必须以CLI模式运行(即在命令行运行)。 * 2、若要建立多个消费者可复制当前脚本,然后各自以CLI模式运行即可。 */ // RabbitMQ服务器连接参数 $credentials = array( 'host' => 'localhost', 'port' => '5672', 'vhost' => '/', 'login' => 'admin', 'password' => '88888888', ); $exchange_name = 'manong.exchange'; // 交换机名称(生产者和消费者要使用相同的交换机名称) $routing_key = 'manong.routing_key'; // 路由键(生产者和消费者要使用相同的路由键) $queue_name = 'manong.queue'; // 队列名称,如有多个消费者则无需设置该变量,也无需调用“$queue->setName($queue_name)” /** * 队列消息被消费后的回调函数 * * @return void */ function queue_consume_callback($envelope, $queue) { $body = json_decode($envelope->getBody(), true); $datetime = date('Y-m-d H:i:s', $body['time']); $content = $body['content']; echo "[消费者1][{$datetime}] {$content}" . PHP_EOL; // 发送ACK应答(通知RabbitMQ服务器消费者已正常消费掉消息,可以将该消息从队列中删除) $queue->ack($envelope->getDeliveryTag()); } try { $connection = new AMQPConnection($credentials); // 连接RabbitMQ服务器 if ($connection->connect() === true) { $channel = new AMQPChannel($connection); // 建立网络信道 $exchange = new AMQPExchange($channel); // 创建交换机(在指定网络信道) $queue = new AMQPQueue($channel); // 创建队列,此时在RabbitMQ网页管理插件的Queues选项卡就可以看到该队列 $exchange->setName($exchange_name); // 设置交换机名称 $exchange->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型 $exchange->setFlags(AMQP_DURABLE); // 开启交换机持久化 $exchange->declareExchange(); // $queue->setName($queue_name); // 如有多个消费者请勿调用本方法 $queue->declareQueue(); $queue->bind($exchange_name, $routing_key); // 消费队列消息 echo '消费者1等待队列消息中……' . PHP_EOL; while (true) $queue->consume('queue_consume_callback'); $connection->disconnect(); // 断开连接 } else { echo '连接RabbitMQ服务器失败。' . PHP_EOL; } } catch (Exception $exception) { echo 'Exception message: ' . $exception->getMessage(); }
Copyright © 2023 码农人生. All Rights Reserved