Consumer Priorities
컨슈머에는 우선순위가 있는데, 우선순위가 높은 컨슈머가 메시지를 전부 소비한다.
우선순위가 낮은 컨슈머는 우선순위가 높은 컨슈머가 차단될 때에만 메시지를 소비할 수 있다.
우선순위가 높은 컨슈머가 여러개 있다면 라운드 로빈 방식으로 메시지를 소비한다.
컨슈머의 기본 우선순위 값은 0을 가진다.
Consumer Priorities | RabbitMQ
<!--
www.rabbitmq.com
Set the x-priority argument in the basic.consume method to an integer value. Consumers which do not specify a value have priority 0. Larger numbers indicate higher priority, and both positive and negative numbers can be used.
Channel의 Consume메서드를 사용할 때 args 테이블에
"x-priority" : 숫자
를 넣어주어 설정할 수 있다.
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
// When we return from ch.call, there may be a delivery already for the
// consumer that hasn't been added to the consumer hash yet. Because of
// this, we never rely on the server picking a consumer tag for us.
if err := args.Validate(); err != nil {
return nil, err
}
if consumer == "" {
consumer = uniqueConsumerTag()
}
req := &basicConsume{
Queue: queue,
ConsumerTag: consumer,
NoLocal: noLocal,
NoAck: autoAck,
Exclusive: exclusive,
NoWait: noWait,
Arguments: args,
}
res := &basicConsumeOk{}
deliveries := make(chan Delivery)
ch.consumers.add(consumer, deliveries)
if err := ch.call(req, res); err != nil {
ch.consumers.cancel(consumer)
return nil, err
}
return deliveries, nil
}
channel.Consume("test_queue", "test_consumer", false, false, false, false, amqp.Table{
"x-priority": 3,
})
예시 코드
package main
import (
"context"
"flag"
"log"
"time"
"github.com/munhwas1140/eventdrivenrabbit/internal"
amqp "github.com/rabbitmq/amqp091-go"
"golang.org/x/sync/errgroup"
)
var (
consumer = flag.String("consumer", "consumer", "consumer")
priority = flag.Int("priority", 1, "priority")
)
func init() {
flag.Parse()
}
func main() {
conn, err := internal.ConnectRabbitMQ("username", "password", "localhost:5672", "vhost")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client, err := internal.NewRabbitMQClient(conn)
if err != nil {
log.Fatal(err)
}
defer client.Close()
var blocking chan struct{}
Consuming(client, "test_queue", *consumer, amqp.Table{
"x-priority": *priority,
})
log.Println("Consuming, use CTRL+C to exit")
<-blocking
}
func Consuming(client internal.RabbitClient, queue, consumer string, args amqp.Table) error {
messageBus, err := client.Consume(queue, consumer, args)
if err != nil {
log.Fatal(err)
}
g, _ := errgroup.WithContext(context.Background())
g.SetLimit(10)
go func() {
for message := range messageBus {
msg := message
g.Go(func() error {
log.Printf("New message: %s, consumer: %s", string(msg.Body), consumer)
time.Sleep(5 * time.Second)
if err := msg.Ack(false); err != nil {
log.Println("Ack message failed")
return err
}
log.Printf("Acknowledged message %s\n", message.MessageId)
return nil
})
}
}()
return nil
}
메시지를 보내보면 priority가 높은 큐에서만 처리되는 것을 볼 수 있었다.
우선순위가 높은 컨슈머를 중지하면
우선순위가 낮은 컨슈머에게도 메시지가 전달된다