Go使用RabbitMQ
约 2289 字大约 8 分钟
2025-06-03
1.前言
Go 接入 RabbitMQ 的教程(RabbiMQ官网):https://www.rabbitmq.com/tutorials/tutorial-one-go
2.第一个实例
消息生产者:send.go
package main import ( "context" "log" "time" amqp "github.com/rabbitmq/amqp091-go" ) func failOnError(err error, msg string) { if err != nil { log.Panicf("%s: %s", msg, err) } } func main() { // 获取connection、channel conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 声明一个队列 _, err = ch.QueueDeclare( "queue.go-test", // 队列名 true, // 是否持久(重启rabbitmq,是否还存在) false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() body := "Hello World!" err = ch.PublishWithContext(ctx, "direct.go-test.exchange", // 交换机名称 "go-test", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s\n", body) }
消息消费者:receive.go
package main import ( amqp "github.com/rabbitmq/amqp091-go" "log" ) func FailOnError(err error, msg string) { if err != nil { log.Panicf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") FailOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() FailOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "queue.go-test", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) FailOnError(err, "Failed to declare a queue") msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) FailOnError(err, "Failed to register a consumer") var forever chan struct{} go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever }
3.工作队列
3.1 生产消息
// 获取connection、channel、
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
FailOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
FailOnError(err, "Failed to open a channel")
defer ch.Close()
_, err = ch.QueueDeclare(
"queue.go-test", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
FailOnError(err, "Failed to declare a queue")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,
"direct.go-test.exchange", // exchange
"go-test", // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
FailOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
3.2 多个消费者
3.3 消息确认
问题场景:
3.4 消息持久性
可分为队列持久性、消息持久性
队列持久性:在声明队列的时候,设置 durable 为 true
_, err = ch.QueueDeclare( "queue.go-test", // name:队列名 true, // durable:是否持久(重启rabbitmq,是否还存在) false, // delete when unused false, // exclusive false, // no-wait nil, // arguments )
消息持久性:在发送消息时,指定 DeliveryMode 值为 amqp.Persistent
4.广播(fanout)
4.1 发布
首先,需要声明一个交换机类型为 fanout (广播)的交换机,发送消息如下
err = ch.ExchangeDeclare( "logs.fanout.exchange", // name:交换机名称 "fanout", // type:交换机类型,fanout为广播类型 true, // durable:是否持久 false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) FailOnError(err, "Failed to declare an exchange") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() body := bodyFrom(os.Args) // 注意,即使 路由键 routing key 为空,也能把消息发送到队列,因为交换机的类型为 fanout ,交换机会把消息发送到和交换机绑定的所有队列中 err = ch.PublishWithContext(ctx, "logs.fanout.exchange", // exchange "", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) FailOnError(err, "Failed to publish a message")
完整的发布消息 go:emit_log.go
package main import ( "context" amqp "github.com/rabbitmq/amqp091-go" "log" "os" "strings" "time" ) func FailOnError(err error, msg string) { if err != nil { log.Panicf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") FailOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() FailOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs.fanout.exchange", // name:交换机名称 "fanout", // type:交换机类型,fanout为广播类型 true, // durable:是否持久 false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) FailOnError(err, "Failed to declare an exchange") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() body := bodyFrom(os.Args) err = ch.PublishWithContext(ctx, "logs.fanout.exchange", // exchange "log-key", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) FailOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s }
4.2 订阅
首先,需要声明无名的队列(程序会帮我们创建)、声明交换机和队列之间的绑定信息
// 不指定名称的队列 q, err := ch.QueueDeclare( "", // name: false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 绑定键 err = ch.QueueBind( q.Name, // queue name "log-key", // routing key "logs.fanout.exchange", // exchange false, nil, ) failOnError(err, "Failed to bind a queue")
完整的消费消息go:receive_logs.go
package main import ( "log" amqp "github.com/rabbitmq/amqp091-go" ) func failOnError(err error, msg string) { if err != nil { log.Panicf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs.fanout.exchange", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") // 不指定名称的队列 q, err := ch.QueueDeclare( "", // name: false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 绑定键 err = ch.QueueBind( q.Name, // queue name "log-key", // routing key "logs.fanout.exchange", // exchange false, nil, ) failOnError(err, "Failed to bind a queue") msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") var forever chan struct{} go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
测试:运行一个 emit_log.go 程序,对应一个生产者;运行两个 receive_logs.go 程序,对应两个消费者
验证:查看现有的队列和交换机绑定关系
sudo rabbitmqctl list_bindings
查看 RabbitMQ 后台:
测试生产一条消息,两个消费者消费
5.直接(direct)
与广播 fanout 不同的点在于,交换机类型为 direct 的交换机会将消息发送到 绑定键和路由键完全匹配的队列中,这里不作代码阐述辽
6.主题(topic)
6.1 消息生产者
package main
import (
"context"
"log"
"os"
"strings"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs.topic.exchange", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,
"logs.topic.exchange", // exchange
severityFrom(os.Args), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 3) || os.Args[2] == "" {
s = "hello"
} else {
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "anonymous.info"
} else {
s = os.Args[1]
}
return s
}
6.2 消息消费者
package main
import (
"log"
"os"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs.topic.exchange", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [binding_key]...", os.Args[0])
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs.topic.exchange", s)
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"logs.topic.exchange", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
6.3 测试
创建不同绑定键的消息消费者:
# 接收所有消息 go run receive_log_topic.go "#" # 接收 kern 开头的设备的消息 go run receive_log_topic.go "kern.*" # 接收 critical 结尾的设备的消息 go run receive_log_topic.go "*.critical"
发送测试消息
# 三个消息消费者都收到 go run emit_log_topic.go "kern.critical" "A critical kernel error" # 前两个消费者收到 go run emit_log_topic.go "kern.222" "A critical kernel error"
7.小结
RabbitMQ 常用的 Exchange Type 有 fanout、direct、topic、headers 这四种。
fanout :会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,不需要做任何判断操作,fanout 类型常用来广播消息。
direct:将消息路由到那些 路由键 与 绑定键 完全匹配的 Queue 中
topic :将消息路由到 路由键 和 绑定键 相匹配的队列中,但这里的匹配规则有些不同,它约定:
路由键为一个点号“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如 “com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”;
BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串;
BindingKey 中可以存在两种特殊字符串 “星号”和“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。
- 路由键为 “com.rabbitmq.client” 的消息会同时路由到 Queue1 和 Queue2;
- 路由键为 “com.hidden.client” 的消息只会路由到 Queue2 中;
- 路由键为 “com.hidden.demo” 的消息只会路由到 Queue2 中;
- 路由键为 “java.rabbitmq.demo” 的消息只会路由到 Queue1 中;
- 路由键为 “java.util.concurrent” 的消息将会被丢弃或者返回给生产者(需要设置 mandatory 参数),因为它没有匹配任何路由键。