Skip to content

Commit 033605a

Browse files
committed
queue: fix fetch callback when queue is empty
Signed-off-by: Matteo Cafasso <[email protected]>
1 parent 37d5054 commit 033605a

File tree

1 file changed

+17
-21
lines changed

1 file changed

+17
-21
lines changed

lib/rabbit_message_deduplication_queue.ex

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -183,30 +183,26 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
183183
passthrough3(state, do: fetchwhile(msg_pred, msg_fun, A, qs))
184184
end
185185

186-
def fetch(true, state = dqstate(queue: queue, queue_state: qs)) do
187-
{{message, delivery, ack_tag}, state} = passthrough2(state) do
188-
fetch(true, qs)
186+
def fetch(need_ack, state = dqstate(queue: queue, queue_state: qs)) do
187+
case passthrough2(state, do: fetch(need_ack, qs)) do
188+
{:empty, state} -> {:empty, state}
189+
{{message, delivery, ack_tag}, state} ->
190+
case duplicate?(queue, message) do
191+
false -> {{message, delivery, dqack(tag: ack_tag)}, state}
192+
true ->
193+
header =
194+
message
195+
|> basic_message(:content)
196+
|> message_headers()
197+
|> rabbitmq_keyfind("x-deduplication-header")
198+
199+
{{message, delivery, dqack(tag: ack_tag, header: header)}, state}
200+
end
189201
end
190-
191-
case duplicate?(queue, message) do
192-
false -> {{message, delivery, dqack(tag: ack_tag)}, state}
193-
true ->
194-
header =
195-
message
196-
|> basic_message(:content)
197-
|> message_headers()
198-
|> rabbitmq_keyfind("x-deduplication-header")
199-
200-
{{message, delivery, dqack(tag: ack_tag, header: header)}, state}
201-
end
202-
end
203-
204-
def fetch(false, state = dqstate(queue_state: qs)) do
205-
passthrough2(state, do: fetch(false, qs))
206202
end
207203

208-
def drop(boolean, state = dqstate(queue_state: qs)) do
209-
passthrough2(state, do: drop(boolean, qs))
204+
def drop(need_ack, state = dqstate(queue_state: qs)) do
205+
passthrough2(state, do: drop(need_ack, qs))
210206
end
211207

212208
def ack(acks, state = dqstate(queue: queue, queue_state: qs)) do

0 commit comments

Comments
 (0)