Skip to content

Commit 046a36a

Browse files
committed
issue #22: fix publish_delivered callback with deduplication queue
When the queue is empty, the message is passed straight to the first consumer to save one round trip time. In such cases, if message acknowledgement is enabled, the `publish_delivered` callback is called instead of `fetch`. If de-duplication is enabled, we need to ensure a `dqack` is returned instead of a simple acknowledgement tag. Signed-off-by: Matteo Cafasso <[email protected]>
1 parent 2f98c0f commit 046a36a

File tree

1 file changed

+11
-2
lines changed

1 file changed

+11
-2
lines changed

lib/rabbit_message_deduplication_queue.ex

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,21 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
190190
passthrough1(state, do: batch_publish(batch, pid, flow, qs))
191191
end
192192

193+
# Optimization for cases in which the queue is empty and the message
194+
# is delivered straight to the client. Acknowledgement is enabled.
193195
def publish_delivered(message, message_properties, pid, flow, state) do
194-
dqstate(queue_state: qs) = state
196+
dqstate(queue: queue, queue_state: qs) = state
195197

196-
passthrough2(state) do
198+
{ack_tag, state} = passthrough2(state) do
197199
publish_delivered(message, message_properties, pid, flow, qs)
198200
end
201+
202+
if duplicate?(queue) do
203+
head = Common.message_header(message, "x-deduplication-header")
204+
{dqack(tag: ack_tag, header: head), state}
205+
else
206+
{ack_tag, state}
207+
end
199208
end
200209

201210
def batch_publish_delivered(batch, pid, flow, state) do

0 commit comments

Comments
 (0)