文章目录
如何保证消息不丢失Go 实现安装操作库发送端的确认消费端的确认
如何保证消息不丢失
在使用RabbitMQ的时候,我们需要保证消息不能丢失,消息从生产者生产出来一直到消费者消费成功,这条链路是这样的:
消息的可靠投递分为了两大内容:发送端的确认(p->broker和exchange->queue)和消费端的确认(queue->c)。
发送端的确认
Rabbit提供了两种方式来保证发送端的消息可靠性投递:confirm 确认模式
和return 退回模式。
confirm 确认模式:消息从 producer 到达 exchange 则会给 producer 发送一个应答,我们需要开启confirm模式
,才能接收到这条应答。开启方式是将Channel.Confirm(noWait bool)
参数设置为false
,表示同意发送者将当前channel信道设置为confirm模式。
return 退回模式:消息从 exchange–>queue 投递失败,会将消息退回给producer。
消费端的确认
消息从Queue发送到消费端之后,消费端会发送一个确认消息:Consumer Ack
,有两种确认方式:自动确认和手动确认。
在编码中,关于消息的确认方式,我们需要在消费者端调用Consumer
函数时,设置第三个参数:autoAck
是false还是true(false表示手动,true表示自动)。
自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。
但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用ch.Ack(false)
,手动签收,如果出现异常,则调用d.Reject(true)
让其自动重新发送消息。
Go 实现
安装操作库
安装API库
Go可以使用streadway/amqp
库来操作rabbit,使用以下命令来安装:
go get github.com/streadway/amqp
封装rabbitmq
接下来我们对streadway/amqp
库的内容进行一个二次封装,封装为一个rabbitmq.go
文件:
package rabbitmqimport ("encoding/json""github.com/streadway/amqp""log")// RabbitMQ RabbitMQ结构type RabbitMQ struct {channel *amqp.ChannelName stringexchange string}// Connect 连接服务器func Connect(s string) *RabbitMQ {//连接rabbitmqconn, e := amqp.Dial(s)failOnError(e, "连接Rabbitmq服务器失败!")ch, e := conn.Channel()failOnError(e, "无法打开频道!")mq := new(RabbitMQ)mq.channel = chreturn mq}// New 初始化消息队列//第一个参数:rabbitmq服务器的链接,第二个参数:队列名字func New(s string, name string) *RabbitMQ {//连接rabbitmqconn, e := amqp.Dial(s)failOnError(e, "连接Rabbitmq服务器失败!")ch, e := conn.Channel()failOnError(e, "无法打开频道!")q, e := ch.QueueDeclare(name, //队列名false, //是否开启持久化true, //不使用时删除false, //排他false, //不等待nil, //参数)failOnError(e, "初始化消息队列失败!")mq := new(RabbitMQ)mq.channel = chmq.Name = q.Namereturn mq}// QueueDeclare 声明queuefunc (q *RabbitMQ) QueueDeclare(queue string) {_, e := q.channel.QueueDeclare(queue, false, true, false, false, nil)failOnError(e, "声明queue失败!")}// QueueDelete 删除queuefunc (q *RabbitMQ) QueueDelete(queue string) {_, e := q.channel.QueueDelete(queue, false, true, false)failOnError(e, "删除queue失败!")}// Qos 配置queue参数func (q *RabbitMQ) Qos() {e := q.channel.Qos(1, 0, false)failOnError(e, "无法设置QoS")}// NewExchange 初始化交换机//第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型func NewExchange(s string, name string, typename string) {//连接rabbitmqconn, e := amqp.Dial(s)failOnError(e, "连接Rabbitmq服务器失败!")ch, e := conn.Channel()failOnError(e, "无法打开频道!")e = ch.ExchangeDeclare(name, // nametypename, // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(e, "初始化交换机失败!")}// ExchangeDelete 删除交换机func (q *RabbitMQ) ExchangeDelete(exchange string) {e := q.channel.ExchangeDelete(exchange, false, true)failOnError(e, "删除交换机失败!")}// Bind 绑定消息队列到exchangefunc (q *RabbitMQ) Bind(exchange string, key string) {e := q.channel.QueueBind(q.Name,key,exchange,false,nil,)failOnError(e, "绑定队列失败!")q.exchange = exchange}// Send 向消息队列发送消息//Send方法可以往某个消息队列发送消息func (q *RabbitMQ) Send(queue string, body interface{}) {str, e := json.Marshal(body)failOnError(e, "消息序列化失败!")e = q.channel.Publish("", //交换queue, //路由键false, //必填false, //立即amqp.Publishing{ReplyTo: q.Name,Body: []byte(str),})msg := "向队列:" + q.Name + "发送消息失败!"failOnError(e, msg)}// Publish 向exchange发送消息//Publish方法可以往某个exchange发送消息func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) {str, e := json.Marshal(body)failOnError(e, "消息序列化失败!")e = q.channel.Publish(exchange,key,false,false,amqp.Publishing{ReplyTo: q.Name,Body: []byte(str)},)failOnError(e, "向交换机发送消息失败!")}// Consume 接收某个消息队列的消息func (q *RabbitMQ) Consume() <-chan amqp.Delivery {c, e := q.channel.Consume(q.Name, //指定从哪个队列中接收消息"",true,false,false,false,nil,)failOnError(e, "接收消息失败!")return c}// Close 关闭队列连接func (q *RabbitMQ) Close() {q.channel.Close()}//错误处理函数func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}}
发送端的确认
首先初始化消息队列的时候,我们要开启confirm模式
,才能接收到这条应答。开启方式是将Channel.Confirm(noWait bool)
参数设置为false
,表示同意发送者将当前channel信道设置为confirm模式。
func New(s string, name string) *RabbitMQ {conn, e := amqp.Dial(s)failOnError(e, "连接Rabbitmq服务器失败!")ch, e := conn.Channel()failOnError(e, "无法打开频道!")q, e := ch.QueueDeclare(name, //队列名false, //是否开启持久化true, //不使用时删除false, //排他false, //不等待nil, //参数)failOnError(e, "初始化消息队列失败!")mq := new(RabbitMQ)mq.channel = chmq.Name = q.Name// 设置为confirm模式mq.channel.Confirm(false)return mq}
然后在封装库中创建一个函数handleConfirm()
用于接收来自Borker的回复:
func (q *RabbitMQ) ConfirmFromBroker(ch chan amqp.Confirmation) chan amqp.Confirmation {return q.channel.NotifyPublish(ch)}
生产者
生产者端在向Broker发送消息的时候,我们使用一个无缓冲的通道来接收来自Broker的回复,然后创建一个协程监听这个无缓冲通道。
func main() {producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")// 指定为topic类型rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange1", "fanout")confirm := producer.ConfirmFromBroker(make(chan amqp.Confirmation))go handleConfirm(confirm)var i intfor {time.Sleep(time.Second)producer.Publish("exchange1", "fanout message: "+strconv.Itoa(i), "")i++}}func handleConfirm(confirm <-chan amqp.Confirmation) {for {select {case message := <-confirm:fmt.Println("接收到来自Broker的回复:", message)}}}
运行结果:
接收到来自Broker的回复: {1 true}接收到来自Broker的回复: {2 true}接收到来自Broker的回复: {3 true}接收到来自Broker的回复: {4 true}接收到来自Broker的回复: {5 true}
消费端的确认
首先将Consume
函数的第三个参数autoAck
参数标记为false:
// Consume 接收某个消息队列的消息func (q *RabbitMQ) Consume() <-chan amqp.Delivery {c, e := q.channel.Consume(q.Name,"",false, // 不自动确认消息false,false,false,nil,)failOnError(e, "接收消息失败!")return c}
在消费者端我们采用公平派遣模式
,即队列发送消息给消费者的时候,不再采用轮询机制,而是一个消费者消费完消息之后,会调用Ack(false)
函数向队列发送一个回复,队列每次会将消息优先发送给消费完消息的消费者(回复过)。
消费端限流:
实现公平派遣模式
我们需要设置消费者端一次只能消费一条消息,之前我们已经进行了封装,直接在消费者端调用即可:
// Qos 配置queue参数func (q *RabbitMQ) Qos() {e := q.channel.Qos(1, 0, false)failOnError(e, "无法设置QoS")}
生产者
func main() {producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")// 指定为direct类型rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange", "direct")i := 0for {time.Sleep(time.Second)producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key1")i = i + 1}}
消费者1
消费者2在消费第三条消息的时候,假设发生了错误,我们调用d.Reject(true)
函数让队列重新发送消息。
func main() {//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")// 指定一次只消费一条消息,直到消费完才重新接收consumer1.Qos()// 队列绑定到exchangeconsumer1.Bind("exchange", "key1")//接收消息msgs := consumer1.Consume()go func() {var i intfor d := range msgs {time.Sleep(time.Second * 1)log.Printf("Consumer1 received a message: %s", d.Body)// 假设消费第三条消息的时候出现了错误,我们就调用d.Reject(true),队列会重新发送消息给消费者if i == 2 {d.Reject(true)} else {// 消息消费成功之后就回复d.Ack(false)}i++}}()select {}}
消费者2
func main() {//第一个参数指定rabbitmq服务器的链接,第二个参数指定创建队列的名字consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")// 指定一次只消费一条消息,直到消费完才重新接收consumer2.Qos()// 队列绑定到exchangeconsumer2.Bind("exchange", "key1")//接收消息msgs := consumer2.Consume()go func() {for d := range msgs {time.Sleep(time.Second * 5)log.Printf("Consumer2 received a message: %s", d.Body)// 消息消费成功之后就回复d.Ack(false)}}()select {}}
运行结果:
# 消费者12022/11/06 19:55:08 Consumer1 received a message: "routing message: 0"2022/11/06 19:55:10 Consumer1 received a message: "routing message: 2"2022/11/06 19:55:11 Consumer1 received a message: "routing message: 3"2022/11/06 19:55:12 Consumer1 received a message: "routing message: 3"2022/11/06 19:55:13 Consumer1 received a message: "routing message: 4"2022/11/06 19:55:14 Consumer1 received a message: "routing message: 6"# 消费者22022/11/06 19:55:13 Consumer2 received a message: "routing message: 1"