Skip to content

Commit 2f0cb21

Browse files
committed
issue #10: fix crash when acking messages on normal queues
The `ack` callback was expecting the ack tags to be always encapsulated within a `dqack` record which was not the case. Several callbacks, in fact, delegate their logic to the passthrough functions which do not handle the returned values but just the queue state. This patch simplifies the `ack` logic dealing with `dqacks` only when the queue is a deduplication queue and acknowledgement is enabled by the channels. For all the other cases, it falls back to the passthrough logic. Signed-off-by: Matteo Cafasso <[email protected]>
1 parent 7096c36 commit 2f0cb21

File tree

1 file changed

+11
-11
lines changed

1 file changed

+11
-11
lines changed

lib/rabbit_message_deduplication_queue.ex

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -236,10 +236,10 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
236236
{{message, delivery, dqack(tag: ack_tag, header: head)}, state}
237237
else
238238
maybe_delete_cache_entry(queue, message)
239-
{{message, delivery, dqack(tag: ack_tag)}, state}
239+
{{message, delivery, ack_tag}, state}
240240
end
241241
else
242-
{{message, delivery, dqack(tag: ack_tag)}, state}
242+
{{message, delivery, ack_tag}, state}
243243
end
244244
end
245245
end
@@ -263,16 +263,16 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
263263
end
264264
end
265265

266-
def ack(acks, state = dqstate(queue: queue, queue_state: qs)) do
267-
acks = if duplicate?(queue) do
268-
Enum.map(acks, fn(dqack(tag: ack, header: header)) ->
269-
maybe_delete_cache_entry(queue, header)
270-
ack
271-
end)
272-
else
273-
Enum.map(acks, fn(dqack(tag: ack)) -> ack end)
274-
end
266+
def ack(acks = [dqack()], state = dqstate(queue: queue, queue_state: qs)) do
267+
acks = Enum.map(acks, fn(dqack(tag: ack_tag, header: header)) ->
268+
maybe_delete_cache_entry(queue, header)
269+
ack_tag
270+
end)
271+
272+
passthrough2(state, do: ack(acks, qs))
273+
end
275274

275+
def ack(acks, state = dqstate(queue_state: qs)) do
276276
passthrough2(state, do: ack(acks, qs))
277277
end
278278

0 commit comments

Comments
 (0)