Skip to content

Commit 54cbcb2

Browse files
committed
queue: handle NoAck message consumers
Some code cleanup as well. Signed-off-by: Matteo Cafasso <[email protected]>
1 parent 5121213 commit 54cbcb2

File tree

1 file changed

+27
-21
lines changed

1 file changed

+27
-21
lines changed

lib/rabbit_message_deduplication_queue.ex

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -191,16 +191,18 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
191191
case passthrough2(state, do: fetch(need_ack, qs)) do
192192
{:empty, state} -> {:empty, state}
193193
{{message, delivery, ack_tag}, state} ->
194-
case duplicate?(queue, message) do
194+
case duplicate?(queue) do
195195
false -> {{message, delivery, dqack(tag: ack_tag)}, state}
196196
true ->
197-
header =
198-
message
199-
|> basic_message(:content)
200-
|> message_headers()
201-
|> rabbitmq_keyfind("x-deduplication-header")
202-
203-
{{message, delivery, dqack(tag: ack_tag, header: header)}, state}
197+
case need_ack do
198+
true ->
199+
ack = dqack(tag: ack_tag, header: deduplication_header(message))
200+
{{message, delivery, ack}, state}
201+
false ->
202+
maybe_delete_cache_entry(queue, message)
203+
204+
{{message, delivery, dqack(tag: ack_tag)}, state}
205+
end
204206
end
205207
end
206208
end
@@ -217,14 +219,7 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
217219
case fetch(need_ack, state) do
218220
{:empty, state} -> {:empty, state}
219221
{{message = basic_message(id: id), _, ack_tag}, state} ->
220-
header = deduplication_header(message)
221-
222-
if not is_nil(header) do
223-
queue
224-
|> amqqueue(:name)
225-
|> cache_name()
226-
|> MessageCache.delete(header)
227-
end
222+
maybe_delete_cache_entry(queue, message)
228223

229224
{{id, ack_tag}, state}
230225
end
@@ -236,12 +231,8 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
236231
case duplicate?(queue) do
237232
false -> Enum.map(acks, fn(dqack(tag: ack)) -> ack end)
238233
true ->
239-
cache = queue |> amqqueue(:name) |> cache_name()
240-
241234
Enum.map(acks, fn(dqack(tag: ack, header: header)) ->
242-
if not is_nil(header) do
243-
MessageCache.delete(cache, header)
244-
end
235+
maybe_delete_cache_entry(queue, header)
245236
ack
246237
end)
247238
end
@@ -377,6 +368,21 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
377368
String.to_atom("cache_queue_#{resource}_#{queue}")
378369
end
379370

371+
# Remove the message deduplication header from the cache
372+
def maybe_delete_cache_entry(queue, header) when is_bitstring(header) do
373+
queue |> amqqueue(:name) |> cache_name() |> MessageCache.delete(header)
374+
end
375+
376+
def maybe_delete_cache_entry(_queue, header) when is_nil(header) do end
377+
378+
def maybe_delete_cache_entry(queue, message = basic_message()) do
379+
header = deduplication_header(message)
380+
381+
if not is_nil(header) do
382+
queue |> amqqueue(:name) |> cache_name() |> MessageCache.delete(header)
383+
end
384+
end
385+
380386
# Returns the value given a key from a RabbitMQ list [{"key", :type, value}]
381387
defp rabbitmq_keyfind(list, key, default \\ nil) do
382388
case List.keyfind(list, key, 0) do

0 commit comments

Comments
 (0)