使用AMQP协议操作RabbitMQ

RabbitMQ连接参数配置文件(amqp.yaml)如下:
 
########## AMQP连接参数(RabbitMQ) ##########
amqp :
  host     : "localhost"     # RabbitMQ服务器主机
  port     : 5672            # RabbitMQ服务端口号
  vhost    : "/"             # 虚拟主机
  login    : "ZhangSan"      # 账号
  password : "*************" # 密码
  exchange : "demo.exchange" # 交换机名称
  routing  : "demo.routing"  # 路由键
  queue    : "demo.queue"    # 队列名称
 
 
 
使用Go实现消息生产和消息消费

package main

import (
   "encoding/json"
   "fmt"
   "github.com/streadway/amqp"
   "gopkg.in/yaml.v3"
   "math/rand"
   "os"
   "path/filepath"
   "time"
)

// GetDateTime
// @description 获取日期时间字符串
// @return string 日期时间字符串
func GetDateTime() string {
   location, _ := time.LoadLocation("Asia/Shanghai")
   return time.Unix(time.Now().In(location).Unix(), 0).In(location).Format(time.DateTime)
}

// RandSlice
// @description 随机获取切片里的一个元素
// @param slice []any 切片
// @return any 切片元素
func RandSlice(slice []any) any {
   return slice[rand.New(rand.NewSource(time.Now().UnixNano())).Intn(len(slice))]
}

// GetAMQPConf
// @description 获取AMQP配置
// @return map[string]string AMQP配置
func GetAMQPConf() map[string]string {
   // 从配置yaml文件获取RabbitMQ的连接参数
   path, _ := filepath.Abs(".")
   file := path + "/amqp.yaml"

   // 读取yaml配置文件
   contents, _ := os.ReadFile(file)

   // 把读取的yaml配置文件内容放入map
   var conf map[string]map[string]string
   _ = yaml.Unmarshal(contents, &conf)

   return conf["amqp"]
}

// GetConnectionAndChannel
// @description 获取RabbitMQ的连接对象和信道对象(生产端和消费端都需要用到这两个对象,故写成公共函数)
// @return connection *amqp.Connection 连接对象
// @return channel *amqp.Channel 信道对象
// @return err error 错误信息,若无错误则为nil
func GetConnectionAndChannel() (connection *amqp.Connection, channel *amqp.Channel, err error) {
   conf := GetAMQPConf()
   url := fmt.Sprintf(
      "amqp://%s:%s@%s:%s/",
      conf["login"],
      conf["password"],
      conf["host"],
      conf["port"],
   )

   // 连接RabbitMQ服务器
   connection, err = amqp.Dial(url)
   if err != nil {
      return
   }
   // 重要提醒:这里不能关闭连接,因为Connection对象是要返回给调用方使用的,关闭连接由调用方去执行。

   // 创建信道
   channel, err = connection.Channel()
   if err != nil {
      return
   }
   // 重要提醒:这里不能关闭信道,因为Channel对象是要返回给调用方使用的,关闭信道由调用方去执行。

   return
}

// Producer
// @description 生产消息
// @param message map[string]any 消息数据
// @return err error 是否发布成功
func Producer(message map[string]any) (err error) {
   var connection *amqp.Connection
   var channel *amqp.Channel
   connection, channel, err = GetConnectionAndChannel()
   if err != nil {
      return
   }

   // 关闭和RabbitMQ服务器的连接
   defer func(connection *amqp.Connection) {
      _ = connection.Close()
   }(connection)

   // 关闭信道
   defer func(channel *amqp.Channel) {
      _ = channel.Close()
   }(channel)

   conf := GetAMQPConf()

   // 声明交换机
   err = channel.ExchangeDeclare(conf["exchange"], amqp.ExchangeDirect, true, false, false, false, nil)
   if err != nil {
      return
   }
   defer func(channel *amqp.Channel) {
      _ = channel.Close()
   }(channel)

   // 声明队列
   _, err = channel.QueueDeclare(conf["queue"], true, false, false, false, nil)
   if err != nil {
      return
   }

   // 交换机绑定队列
   err = channel.QueueBind(conf["queue"], conf["routing"], conf["exchange"], false, nil)
   if err != nil {
      return
   }

   // 发布消息
   body, _ := json.Marshal(message)
   msg := amqp.Publishing{Headers: amqp.Table{}, ContentType: "text/plain", ContentEncoding: "", Body: body}
   err = channel.Publish(conf["exchange"], conf["routing"], false, false, msg)
   return
}

// Consumer
// @description 消息消费
// @return error 是否启动成功
func Consumer() error {
   errChan := make(chan error)

   go func() {
      var connection *amqp.Connection
      var channel *amqp.Channel
      var err error
      connection, channel, err = GetConnectionAndChannel()
      if err != nil {
         errChan <- err
         return
      }

      // 关闭和RabbitMQ服务器的连接
      defer func(connection *amqp.Connection) {
         _ = connection.Close()
      }(connection)

      // 关闭信道
      defer func(channel *amqp.Channel) {
         _ = channel.Close()
      }(channel)

      conf := GetAMQPConf()
      queue, err := channel.QueueDeclare(conf["queue"], true, false, false, false, nil)
      if err != nil {
         errChan <- err
         return
      }

      // 创建消费者,强烈建议关闭自动应答(参数autoAck设为false即可关闭自动应答),采用手动应答的方式
      deliveryChan, err := channel.Consume(queue.Name, "", false, false, false, false, nil)
      if err != nil {
         errChan <- err
         return
      }

      errChan <- nil

      for message := range deliveryChan {
         // 使用协程处理消息,防止读取消息被阻塞
         go func(message amqp.Delivery) {
            // 获取消息体的数据
            body := map[string]any{}
            _ = json.Unmarshal(message.Body, &body)
            id := body["id"]
            name := body["name"]

            // 处理消息(这里休眠100毫秒模拟处理消息的耗时操作)
            time.Sleep(time.Millisecond * 100)

            err = message.Ack(false) // 由于上面设置了关闭自动应答,所以这里必须手动应答
            if err != nil {
               fmt.Printf("[%s] 应答消息(%s)失败:%s \n", GetDateTime(), id, err.Error())
            } else {
               fmt.Printf("[%s] 应答消息(%s)成功:name = %s \n", GetDateTime(), id, name)
            }
         }(message)
      }
   }()

   return <-errChan // 阻塞函数执行完,直到消费端初始化完成(无论成功还是失败)
}

func main() {
   err := Consumer()
   if err != nil {
      panic("启动消费端失败:" + err.Error())
   }
   fmt.Printf("[%s] ========== 启动消费端成功 ========== \n", GetDateTime())

   var name []any
   name = append(name, "平天大圣·牛魔王·Go")
   name = append(name, "覆海大圣·蛟魔王·Go")
   name = append(name, "混天大圣·鹏魔王·Go")
   name = append(name, "移山大圣·狮驼王·Go")
   name = append(name, "通风大圣·猕猴王·Go")
   name = append(name, "驱神大圣·禺狨王·Go")
   name = append(name, "齐天大圣·美猴王·Go")

   id := 0
   message := map[string]any{}

   // 使用Ticker定时向RabbitMQ发消息
   ticker := time.NewTicker(time.Second * 1)
   for range ticker.C {
      id++
      message["id"] = fmt.Sprintf("G%05d", id) // 消息ID
      message["name"] = RandSlice(name)

      err = Producer(message)
      if err != nil {
         // fmt.Printf("[%s] 发布消息(%s)失败:%s \n", GetDateTime(), message["id"], err.Error())
      } else {
         // fmt.Printf("[%s] 发布消息(%s)成功:name = %s \n", GetDateTime(), message["id"], message["name"])
      }
   }
}



使用PHP实现消息生产
 
<?php
// 重要提醒:当前PHP脚本文件需在命令行运行。

// 从配置yaml文件获取RabbitMQ的连接参数
$file = __DIR__ . '/amqp.yaml';
$conf = yaml_parse_file($file)['amqp'];

// RabbitMQ服务器连接参数
$credentials = [
    'host' => $conf['host'],
    'port' => $conf['port'],
    'vhost' => $conf['vhost'],
    'login' => $conf['login'],
    'password' => $conf['password'],
];

$connection = new AMQPConnection($credentials);

// 连接RabbitMQ服务器
try {
    if ($connection->connect() !== true) {
        exit('连接RabbitMQ服务器失败');
    }
} catch (AMQPConnectionException $e) {
    exit('连接RabbitMQ服务器出现异常:' . $e->getMessage());
}

// 建立网络信道
try {
    $channel = new AMQPChannel($connection);
} catch (AMQPConnectionException $e) {
    exit('建立网络信道出现异常:' . $e->getMessage());
}

// 创建交换机
try {
    $exchange = new AMQPExchange($channel); // 在指定网络信道创建交换机
} catch (Exception $e) {
    exit('创建交换机出现异常:' . $e->getMessage());
}

$exchange->setName($conf['exchange']); // 设置交换机名称
$exchange->setType(AMQP_EX_TYPE_DIRECT); // 设置交换机类型
$exchange->setFlags(AMQP_DURABLE); // 开启交换机持久化

try {
    $exchange->declareExchange();
} catch (Exception $e) {
    exit('声明交换机出现异常:' . $e->getMessage());
}

$name = [
    '平天大圣·牛魔王·PHP',
    '覆海大圣·蛟魔王·PHP',
    '混天大圣·鹏魔王·PHP',
    '移山大圣·狮驼王·PHP',
    '通风大圣·猕猴王·PHP',
    '驱神大圣·禺狨王·PHP',
    '齐天大圣·美猴王·PHP',
];

for ($i = 1; $i <= 999999; $i++) {
    $id = sprintf('P%05d', $i);
    $nm = $name[(int)array_rand($name)];
    $message = json_encode(['id' => $id, 'name' => $nm]);

    // 发布消息
    try {
        $publish = $exchange->publish($message, $conf['routing']);
    } catch (Exception $e) {
        $publish = false;
        unset($e);
    }

    $datetime = date('Y-m-d H:i:s');

    if ($publish === true) {
        echo "[{$datetime}] 发布消息({$id})成功:name = {$nm}" . PHP_EOL;
    } else {
        echo "[{$datetime}] 发布消息({$id})失败" . PHP_EOL;
    }

    sleep(1);
}
 
 
 
输出结果:
 
[yyyy-mm-dd hh:ii:ss] ========== 启动消费端成功 ========== 
[yyyy-mm-dd hh:ii:ss] 应答消息(G00001)成功:name = 通风大圣·猕猴王·Go
[yyyy-mm-dd hh:ii:ss] 应答消息(P00001)成功:name = 驱神大圣·禺狨王·PHP
[yyyy-mm-dd hh:ii:ss] 应答消息(G00002)成功:name = 覆海大圣·蛟魔王·Go
[yyyy-mm-dd hh:ii:ss] 应答消息(P00002)成功:name = 移山大圣·狮驼王·PHP
[yyyy-mm-dd hh:ii:ss] 应答消息(G00003)成功:name = 平天大圣·牛魔王·Go
[yyyy-mm-dd hh:ii:ss] 应答消息(P00003)成功:name = 平天大圣·牛魔王·PHP
[yyyy-mm-dd hh:ii:ss] 应答消息(G00004)成功:name = 混天大圣·鹏魔王·Go
[yyyy-mm-dd hh:ii:ss] 应答消息(P00004)成功:name = 齐天大圣·美猴王·PHP
[yyyy-mm-dd hh:ii:ss] 应答消息(G00005)成功:name = 通风大圣·猕猴王·Go
[yyyy-mm-dd hh:ii:ss] 应答消息(P00005)成功:name = 通风大圣·猕猴王·PHP
[yyyy-mm-dd hh:ii:ss] 应答消息(G00006)成功:name = 移山大圣·狮驼王·Go
[yyyy-mm-dd hh:ii:ss] 应答消息(P00006)成功:name = 移山大圣·狮驼王·PHP
[yyyy-mm-dd hh:ii:ss] 应答消息(G00007)成功:name = 通风大圣·猕猴王·Go
[yyyy-mm-dd hh:ii:ss] 应答消息(P00007)成功:name = 覆海大圣·蛟魔王·PHP
[yyyy-mm-dd hh:ii:ss] 应答消息(G00008)成功:name = 移山大圣·狮驼王·Go
[yyyy-mm-dd hh:ii:ss] 应答消息(P00008)成功:name = 齐天大圣·美猴王·PHP
[yyyy-mm-dd hh:ii:ss] 应答消息(G00009)成功:name = 覆海大圣·蛟魔王·Go
[yyyy-mm-dd hh:ii:ss] 应答消息(P00009)成功:name = 驱神大圣·禺狨王·PHP
[yyyy-mm-dd hh:ii:ss] 应答消息(G00010)成功:name = 平天大圣·牛魔王·Go
[yyyy-mm-dd hh:ii:ss] 应答消息(P00010)成功:name = 驱神大圣·禺狨王·PHP
 
可以看到无论是Go还是PHP,它们生产的消息都被消费了。

Copyright © 2024 码农人生. All Rights Reserved