Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion pkg/rabbitmqamqp/amqp_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ RabbitMQ supports the following DeliveryState types:
- StateRejected
See: https://www.rabbitmq.com/docs/next/amqp#outcomes for more information.

Note: If the destination address is not defined during the creation, the message must have a TO property set.
If the destination address is not defined during the creation, the message must have a TO property set.
You can use the helper "MessagePropertyToAddress" to create the destination address.
See the examples:
Create a new publisher that sends messages to a specific destination address:
Expand All @@ -84,6 +84,16 @@ Create a new publisher that sends messages based on message destination address:
..:= MessagePropertyToAddress(msg, &QueueAddress{Queue: "myQueueName"})
..:= publisher.Publish(context.Background(), msg)

</code>

The message is persistent by default by setting the Header.Durable to true when Header is nil.
You can set the message to be non-persistent by setting the Header.Durable to false.
Note:
When you use the `Header` is up to you to set the message properties,
You need set the `Header.Durable` to true or false.

<code>

</code>
*/
func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*PublishResult, error) {
Expand All @@ -97,6 +107,14 @@ func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*Publis
return nil, err
}
}

// set the default persistence to the message
if message.Header == nil {
message.Header = &amqp.MessageHeader{
Durable: true,
}
}

r, err := m.sender.Load().SendWithReceipt(ctx, message, nil)
if err != nil {
return nil, err
Expand Down
58 changes: 58 additions & 0 deletions pkg/rabbitmqamqp/amqp_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,62 @@ var _ = Describe("AMQP publisher ", func() {

Expect(connection.Close(context.Background())).To(BeNil())
})

It("Message should durable by default", func() {
// https://github.com/rabbitmq/rabbitmq-server/pull/13918

// Here we test the default behavior of the message durability
// The lib should set the Header.Durable to true by default
// when the Header is set by the user
// it is up to the user to set the Header.Durable to true or false
connection, err := Dial(context.Background(), "amqp://", nil)
Expect(err).To(BeNil())
Expect(connection).NotTo(BeNil())
name := generateNameWithDateTime("Message should durable by default")
_, err = connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{
Name: name,
})
Expect(err).To(BeNil())

publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{Queue: name}, nil)
Expect(err).To(BeNil())
Expect(publisher).NotTo(BeNil())

msg := NewMessage([]byte("hello"))
Expect(msg.Header).To(BeNil())
publishResult, err := publisher.Publish(context.Background(), msg)
Expect(err).To(BeNil())
Expect(publishResult).NotTo(BeNil())
Expect(publishResult.Outcome).To(Equal(&StateAccepted{}))
Expect(msg.Header).NotTo(BeNil())
Expect(msg.Header.Durable).To(BeTrue())

consumer, err := connection.NewConsumer(context.Background(), name, nil)
Expect(err).To(BeNil())
Expect(consumer).NotTo(BeNil())
dc, err := consumer.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc).NotTo(BeNil())
Expect(dc.Message().Header).NotTo(BeNil())
Expect(dc.Message().Header.Durable).To(BeTrue())
Expect(dc.Accept(context.Background())).To(BeNil())

msgNotPersistent := NewMessageWithPersistence([]byte("hello"), false)
publishResult, err = publisher.Publish(context.Background(), msgNotPersistent)
Expect(err).To(BeNil())
Expect(publishResult).NotTo(BeNil())
Expect(publishResult.Outcome).To(Equal(&StateAccepted{}))
Expect(msgNotPersistent.Header).NotTo(BeNil())
Expect(msgNotPersistent.Header.Durable).To(BeFalse())
dc, err = consumer.Receive(context.Background())
Expect(err).To(BeNil())
Expect(dc).NotTo(BeNil())
Expect(dc.Message().Header).NotTo(BeNil())
Expect(dc.Message().Header.Durable).To(BeFalse())
Expect(dc.Accept(context.Background())).To(BeNil())
Expect(publisher.Close(context.Background())).To(BeNil())
Expect(connection.Management().DeleteQueue(context.Background(), name)).To(BeNil())
Expect(connection.Close(context.Background())).To(BeNil())

})
})
7 changes: 6 additions & 1 deletion pkg/rabbitmqamqp/amqp_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ func (a *AmqpQueueInfo) Members() []string {
}

func newAmqpQueueInfo(response map[string]any) *AmqpQueueInfo {
leader := ""
if response["leader"] != nil {
leader = response["leader"].(string)
}

return &AmqpQueueInfo{
name: response["name"].(string),
isDurable: response["durable"].(bool),
isAutoDelete: response["auto_delete"].(bool),
isExclusive: response["exclusive"].(bool),
queueType: TQueueType(response["type"].(string)),
leader: response["leader"].(string),
leader: leader,
members: response["replicas"].([]string),
arguments: response["arguments"].(map[string]any),
consumerCount: response["consumer_count"].(uint32),
Expand Down
11 changes: 11 additions & 0 deletions pkg/rabbitmqamqp/messages_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ func NewMessage(body []byte) *amqp.Message {
return amqp.NewMessage(body)
}

// NewMessageWithHeader creates a new AMQP 1.0 message with the given payload and sets the persistence to the given value.
// The persistence is set by setting the Header.Durable property to true or false.

func NewMessageWithPersistence(body []byte, persistence bool) *amqp.Message {
m := amqp.NewMessage(body)
m.Header = &amqp.MessageHeader{
Durable: persistence,
}
return m
}

// NewMessageWithAddress creates a new AMQP 1.0 new message with the given payload and sets the To property to the address of the target.
// The target must be a QueueAddress or an ExchangeAddress.
// This function is a helper that combines NewMessage and MessagePropertyToAddress.
Expand Down