RabbitMQ发布延迟消息

延迟消息就是生产者发布消息后不立即消费,而是过一段时间再消费。延迟消息的使用场景很多,例如:
订单失效:用户下单后没有马上付款,超过一定时间后订单自动失效。
会员过期:包括临期发短信提醒,以及到期取消会员。


 
RabbitMQ可以使用插件实现延迟消息,传送门:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
把“rabbitmq_delayed_message_exchange-*.*.*.ez”文件下载到/rabbitmq/plugins目录下然后将其启用即可,相关命令如下:
[root@localhost ~]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
[root@localhost ~]# rabbitmq-plugins list
[E*] rabbitmq_delayed_message_exchange # 插件左边方括号里是“E*”则说明该插件已启用
[root@localhost ~]#
打开“http://192.168.*.*:15672/#/exchanges”页面,可以看到新增交换机表单里的Type下拉选择框多了一项“x-delayed-message”。



发布延迟消息和发布即时消息没有什么太大差别,绝大部分代码都是一样的,只是配置交换机有点不同,下面直接给出消费者和生产者的代码。



消息消费者代码(rabbitmq.consumer.php)
 
<?php
// +--------------------------------------------------------------+ //
// | 消息消费者 - rabbitmq.consumer.php                           | //
// +--------------------------------------------------------------+ //
declare(strict_types=1);
PHP_SAPI !== 'cli' && exit('脚本只能在命令行执行');
ini_set('display_errors', 'On');
error_reporting(-1);
set_time_limit(0);
ini_set('memory_limit', '-1');

const CONSUMER_ID = '消费者1'; // 如果需要启动多个消费者,可复制本脚本,然后修改消费者ID即可

// RabbitMQ服务器连接参数
$credentials = [
    'host' => 'localhost',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'RabbitMQ管理员账号',
    'password' => 'RabbitMQ管理员密码',
];

$exchangeName = 'manong.exchange'; // 交换机名称(生产者和消费者要使用相同的交换机名称)
$routingKey = 'manong.routing_key'; // 路由键(生产者和消费者要使用相同的路由键)
$queueName = 'manong.queue'; // 队列名称(若只启动一个消费者则该变量可以不设置)

/**
 * 在控制台输出一段DEBUG信息并终止程序
 *
 * @param string $text 参数说明
 * @return void echo&exit
 */
function shutdown(string $text): void
{
    debug($text);
    exit;
}

/**
 * 在控制台输出一段信息
 *
 * @param string $text 参数说明
 * @return void echo
 */
function debug(string $text): void
{
    echo '[' . CONSUMER_ID . "][DEBUG] $text" . PHP_EOL;
}

/**
 * 队列消息被消费后的回调函数
 *
 * @param AMQPEnvelope $envelope AMQPEnvelope对象
 * @param AMQPQueue $queue AMQPQueue对象
 * @return bool 消息消费结果
 */
function queue_consume_callback(AMQPEnvelope $envelope, AMQPQueue $queue): bool
{
    $ok = false;

    // 获取消息并JSON解码
    try {
        $body = json_decode($envelope->getBody(), true, 512, JSON_THROW_ON_ERROR);
    } catch (JsonException $e) {
        unset($e);
    }

    if (isset($body['time'], $body['content'])) {
        $datetime = date('Y-m-d H:i:s', $body['time']);
        $content = $body['content'];
        debug("[$datetime] $content");

        // 发送ACK应答(通知RabbitMQ服务器消费者已正常消费掉消息,可以将该消息从队列中删除)
        $deliveryTag = $envelope->getDeliveryTag();
        try {
            $ok = $queue->ack($deliveryTag);
        } catch (AMQPChannelException|AMQPConnectionException $e) {
            debug('发送ACK应答异常:' . $e->getMessage());
        }
    }

    return $ok;
}

// 与RabbitMQ服务器建立连接
$connection = new AMQPConnection($credentials); // 创建AMQP连接对象,此时尚未与RabbitMQ服务器建立连接
try {
    $connection->connect(); // 与RabbitMQ服务器建立连接
} catch (AMQPConnectionException $e) {
    shutdown('与RabbitMQ服务器建立连接异常:' . $e->getMessage());
}
$connection->isConnected() || shutdown('与RabbitMQ服务器建立连接失败');

// 建立网络信道
try {
    $channel = new AMQPChannel($connection);
} catch (AMQPConnectionException $e) {
    shutdown('建立网络信道异常:' . $e->getMessage());
}

// 创建交换机(在指定网络信道)
try {
    $exchange = new AMQPExchange($channel);
} catch (AMQPConnectionException|AMQPExchangeException $e) {
    shutdown('创建交换机异常:' . $e->getMessage());
}

// 创建队列,此时在RabbitMQ网页管理插件的Queues选项卡就可以看到该队列
try {
    $queue = new AMQPQueue($channel);
} catch (AMQPConnectionException|AMQPQueueException $e) {
    shutdown('创建队列异常:' . $e->getMessage());
}

$exchange->setName($exchangeName); // 设置交换机名称
$exchange->setType('x-delayed-message'); // 设置交换机类型
$exchange->setArgument('x-delayed-type', 'direct'); // 重要提醒:这是延迟消息必须设置的参数
$exchange->setFlags(AMQP_DURABLE); // 开启交换机持久化
try {
    $exchange->declareExchange();
} catch (AMQPChannelException|AMQPConnectionException|AMQPExchangeException $e) {
    shutdown('配置交换机异常:' . $e->getMessage());
}

$queue->setName($queueName); // 重要提醒:若只启动一个消费者则可以不调用AMQPQueue::setName()方法
try {
    $queue->declareQueue();
} catch (AMQPChannelException|AMQPConnectionException $e) {
    shutdown('配置队列异常:' . $e->getMessage());
}

try {
    $queue->bind($exchangeName, $routingKey);
} catch (AMQPChannelException|AMQPConnectionException $e) {
    shutdown('队列绑定到交换机异常:' . $e->getMessage());
}

// 启动消费者
try {
    $queue->consume('queue_consume_callback'); // 重要提醒:这里会一直阻塞
} catch (AMQPChannelException|AMQPConnectionException|AMQPEnvelopeException $e) {
    $connection->disconnect(); // 断开连接
    shutdown('启动消费者异常:' . $e->getMessage());
}

// 重要提醒:这里是执行不到的,因为$queue->consume()会一直阻塞,直到收到消息,并且处理完消息后又会进入阻塞状态
debug('启动消费者成功');


 
消息生产者代码(rabbitmq.producer.php)
 
<?php
// +--------------------------------------------------------------+ //
// | 消息生产者 - rabbitmq.producer.php                           | //
// +--------------------------------------------------------------+ //
declare(strict_types=1);
ini_set('display_errors', 'On');
error_reporting(-1);

// RabbitMQ服务器连接参数
$credentials = [
    'host' => 'localhost',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'RabbitMQ管理员账号',
    'password' => 'RabbitMQ管理员密码',
];

$exchangeName = 'manong.exchange'; // 交换机名称(生产者和消费者要使用相同的交换机名称)
$routingKey = 'manong.routing_key'; // 路由键(生产者和消费者要使用相同的路由键)

// 与RabbitMQ服务器建立连接
$connection = new AMQPConnection($credentials); // 创建AMQP连接对象,此时尚未与RabbitMQ服务器建立连接
try {
    $connection->connect(); // 与RabbitMQ服务器建立连接
} catch (AMQPConnectionException $e) {
    exit('与RabbitMQ服务器建立连接异常:' . $e->getMessage());
}
$connection->isConnected() || exit('与RabbitMQ服务器建立连接失败');

// 建立网络信道
try {
    $channel = new AMQPChannel($connection);
} catch (AMQPConnectionException $e) {
    exit('建立网络信道异常:' . $e->getMessage());
}

// 创建交换机(在指定网络信道)
try {
    $exchange = new AMQPExchange($channel);
} catch (AMQPConnectionException|AMQPExchangeException $e) {
    exit('创建交换机异常:' . $e->getMessage());
}

$exchange->setName($exchangeName); // 设置交换机名称
$exchange->setType('x-delayed-message'); // 设置交换机类型
$exchange->setArgument('x-delayed-type', 'direct'); // 重要提醒:这是延迟消息必须设置的参数
$exchange->setFlags(AMQP_DURABLE); // 开启交换机持久化
try {
    $exchange->declareExchange();
} catch (AMQPChannelException|AMQPConnectionException|AMQPExchangeException $e) {
    exit('配置交换机异常:' . $e->getMessage());
}

// 设置消息内容
$content = [
    '平天大圣·牛魔王',
    '覆海大圣·蛟魔王',
    '混天大圣·鹏魔王',
    '移山大圣·狮驼王',
    '通风大圣·猕猴王',
    '驱神大圣·禺狨王',
    '齐天大圣·美猴王',
];
$message = ['time' => time(), 'content' => $content[mt_rand(0, count($content) - 1)]];
try {
    $message = json_encode($message, JSON_THROW_ON_ERROR);
} catch (JsonException $e) {
    exit('JSON编码异常:' . $e->getMessage());
}

// 发布消息
$delay = 1000 * 10; // 延迟时间(单位为毫秒),若不需要延迟可设为0
try {
    $ok = $exchange->publish($message, $routingKey, AMQP_NOPARAM, ['headers' => ['x-delay' => $delay]]);
} catch (AMQPChannelException|AMQPConnectionException|AMQPExchangeException) {
    $ok = false;
}
$ok = $ok ?? false;
echo '发布消息' . ($ok ? '成功' : '失败') . PHP_EOL;

$connection->disconnect(); // 断开连接

Copyright © 2024 码农人生. All Rights Reserved