使用amqp扩展操作RabbitMQ

操作系统:CentOS 7
PHP版本:8.1
amqp扩展版本:1.11.0
 
PHP操作RabbitMQ必须使用AMQP协议(Advanced Message Queuing Protocol,高级消息队列协议),而PHP要使用AMQP协议必须安装amqp扩展。
 
安装amqp扩展
[root@localhost src]# tar -xf amqp-1.11.0.tgz
[root@localhost src]# cd amqp-1.11.0
[root@localhost amqp-1.11.0]# /program/php/bin/phpize
[root@localhost amqp-1.11.0]# ./configure --with-php-config=/program/php/bin/php-config --with-amqp
[root@localhost amqp-1.11.0]# make
[root@localhost amqp-1.11.0]# make install
[root@localhost amqp-1.11.0]# vim /program/php/php.ini
extension=amqp.so
[root@localhost amqp-1.11.0]# service php-fpm restart
Gracefully shutting down php-fpm . done
Starting php-fpm  done
[root@localhost amqp-1.11.0]#

消息生产者demo代码(rabbitmq.producer.php)
<?php
/**
 * 消息生产者(即发送消息)
 */

// RabbitMQ服务器连接参数
$credentials = array(
    'host' => 'localhost',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'admin',
    'password' => '88888888',
);

$exchange_name = 'manong.exchange'; // 交换机名称(生产者和消费者要使用相同的交换机名称)
$routing_key = 'manong.routing_key'; // 路由键(生产者和消费者要使用相同的路由键)

try {
    $connection = new AMQPConnection($credentials); // 连接RabbitMQ服务器
    if ($connection->connect() === true) {
        $channel = new AMQPChannel($connection); // 建立网络信道
        $exchange = new AMQPExchange($channel); // 创建交换机(在指定网络信道)

        $exchange->setName($exchange_name); // 设置交换机名称
        $exchange->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型
        $exchange->setFlags(AMQP_DURABLE); // 开启交换机持久化
        $exchange->declareExchange();

        // 设置消息内容
        $content = array(
            '平天大圣·牛魔王',
            '覆海大圣·蛟魔王',
            '混天大圣·鹏魔王',
            '移山大圣·狮驼王',
            '通风大圣·猕猴王',
            '驱神大圣·禺狨王',
            '齐天大圣·美猴王',
        );
        $message = array(
            'time' => time(),
            'content' => $content[array_rand($content)],
        );
        $message = json_encode($message);

        // 发布消息
        $publish = $exchange->publish($message, $routing_key);
        if ($publish === true) {
            echo '发布消息成功。' . PHP_EOL;
        } else {
            echo '发布消息失败。' . PHP_EOL;
        }

        $connection->disconnect(); // 断开连接
    } else {
        echo '连接RabbitMQ服务器失败。' . PHP_EOL;
    }
} catch (Exception $exception) {
    echo 'Exception message: ' . $exception->getMessage();
}

消息消费者demo代码(rabbitmq.consumer.php)
<?php
/**
 * 消息消费者(即接收消息)
 * 重要提醒:1、当前PHP脚本必须以CLI模式运行(即在命令行运行)。
 *           2、若要建立多个消费者可复制当前脚本,然后各自以CLI模式运行即可。
 */

// RabbitMQ服务器连接参数
$credentials = array(
    'host' => 'localhost',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'admin',
    'password' => '88888888',
);

$exchange_name = 'manong.exchange'; // 交换机名称(生产者和消费者要使用相同的交换机名称)
$routing_key = 'manong.routing_key'; // 路由键(生产者和消费者要使用相同的路由键)
$queue_name = 'manong.queue'; // 队列名称,如有多个消费者则无需设置该变量,也无需调用“$queue->setName($queue_name)”

/**
 * 队列消息被消费后的回调函数
 *
 * @return void
 */
function queue_consume_callback($envelope, $queue)
{
    $body = json_decode($envelope->getBody(), true);
    $datetime = date('Y-m-d H:i:s', $body['time']);
    $content = $body['content'];
    echo "[消费者1][{$datetime}] {$content}" . PHP_EOL;

    // 发送ACK应答(通知RabbitMQ服务器消费者已正常消费掉消息,可以将该消息从队列中删除)
    $queue->ack($envelope->getDeliveryTag());
}

try {
    $connection = new AMQPConnection($credentials); // 连接RabbitMQ服务器
    if ($connection->connect() === true) {
        $channel = new AMQPChannel($connection); // 建立网络信道
        $exchange = new AMQPExchange($channel); // 创建交换机(在指定网络信道)
        $queue = new AMQPQueue($channel); // 创建队列,此时在RabbitMQ网页管理插件的Queues选项卡就可以看到该队列

        $exchange->setName($exchange_name); // 设置交换机名称
        $exchange->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型
        $exchange->setFlags(AMQP_DURABLE); // 开启交换机持久化
        $exchange->declareExchange();

        // $queue->setName($queue_name); // 如有多个消费者请勿调用本方法
        $queue->declareQueue();
        $queue->bind($exchange_name, $routing_key);

        // 消费队列消息
        echo '消费者1等待队列消息中……' . PHP_EOL;
        while (true) $queue->consume('queue_consume_callback');

        $connection->disconnect(); // 断开连接
    } else {
        echo '连接RabbitMQ服务器失败。' . PHP_EOL;
    }
} catch (Exception $exception) {
    echo 'Exception message: ' . $exception->getMessage();
}

以rabbitmq.consumer.php为蓝本,建立两个消费者(文件名分别为rabbitmq.consumer1.php和rabbitmq.consumer2.php)并且在命令行运行脚本,这时每次刷新rabbitmq.producer.php页面在终端都可以看到两个脚本的回调函数被执行
 
[root@localhost wwwroot]# /program/php/bin/php rabbitmq.consumer1.php
消费者1等待队列消息中……
[消费者1][yyyy-mm-dd hh:ii:ss] 移山大圣·狮驼王
[消费者1][yyyy-mm-dd hh:ii:ss] 通风大圣·猕猴王
[消费者1][yyyy-mm-dd hh:ii:ss] 平天大圣·牛魔王
[消费者1][yyyy-mm-dd hh:ii:ss] 齐天大圣·美猴王
[消费者1][yyyy-mm-dd hh:ii:ss] 覆海大圣·蛟魔王
^C
[root@localhost wwwroot]#

[root@localhost wwwroot]# /program/php/bin/php rabbitmq.consumer1.php
消费者2等待队列消息中……
[消费者2][yyyy-mm-dd hh:ii:ss] 移山大圣·狮驼王
[消费者2][yyyy-mm-dd hh:ii:ss] 通风大圣·猕猴王
[消费者2][yyyy-mm-dd hh:ii:ss] 平天大圣·牛魔王
[消费者2][yyyy-mm-dd hh:ii:ss] 齐天大圣·美猴王
[消费者2][yyyy-mm-dd hh:ii:ss] 覆海大圣·蛟魔王
^C
[root@localhost wwwroot]#

Copyright © 2023 码农人生. All Rights Reserved