Skip to content

Commit d974bd9

Browse files
committed
queue: implement drop callback
Signed-off-by: Matteo Cafasso <[email protected]>
1 parent f875c8d commit d974bd9

File tree

1 file changed

+24
-2
lines changed

1 file changed

+24
-2
lines changed

lib/rabbit_message_deduplication_queue.ex

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,30 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
201201
end
202202
end
203203

204-
def drop(need_ack, state = dqstate(queue_state: qs)) do
205-
passthrough2(state, do: drop(need_ack, qs))
204+
# TODO: this is a bit of a hack.
205+
# As the drop callback returns only the message id, we can't retrieve
206+
# the message deduplication header. As a workaround fetch is used.
207+
# This assumes the backing queue drop and fetch behaviours are the same.
208+
# A better solution would be to store the message IDs in a dedicated index.
209+
def drop(need_ack, state = dqstate(queue: queue, queue_state: qs)) do
210+
case duplicate?(queue) do
211+
false -> passthrough2(state, do: drop(need_ack, qs))
212+
true ->
213+
case fetch(need_ack, state) do
214+
{:empty, state} -> {:empty, state}
215+
{{message = basic_message(id: id), _, ack_tag}, state} ->
216+
header = deduplication_header(message)
217+
218+
if not is_nil(header) do
219+
queue
220+
|> amqqueue(:name)
221+
|> cache_name()
222+
|> MessageCache.delete(header)
223+
end
224+
225+
{{id, ack_tag}, state}
226+
end
227+
end
206228
end
207229

208230
def ack(acks, state = dqstate(queue: queue, queue_state: qs)) do

0 commit comments

Comments
 (0)