Skip to content

Commit f875c8d

Browse files
committed
queue: code cleanup
Signed-off-by: Matteo Cafasso <[email protected]>
1 parent 5790ee4 commit f875c8d

File tree

1 file changed

+17
-23
lines changed

1 file changed

+17
-23
lines changed

lib/rabbit_message_deduplication_queue.ex

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -314,38 +314,32 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
314314

315315
# Returns true if the queue supports message deduplication
316316
# and the message is a duplicate.
317-
defp duplicate?(amqqueue(name: name, arguments: arguments),
318-
basic_message(content: content)) do
317+
# The message is added to the deduplication cache if missing.
318+
defp duplicate?(amqqueue(name: name, arguments: arguments), message) do
319319
with true <- rabbitmq_keyfind(arguments, "x-message-deduplication", false),
320-
headers when is_list(headers) <- message_headers(content) do
321-
name |> cache_name() |> cached?(headers)
320+
header when not is_nil(header) <- deduplication_header(message) do
321+
name |> cache_name() |> cached?(header)
322322
else
323323
_ -> false
324324
end
325325
end
326326

327-
# Returns true if the message includes the header `x-deduplication-header`
328-
# and its value is already present in the deduplication cache.
329-
#
330-
# false otherwise.
331-
#
332-
# If `x-deduplication-header` value is not present in the cache, it is added.
333-
defp cached?(cache, headers) do
334-
case rabbitmq_keyfind(headers, "x-deduplication-header") do
335-
nil -> false
336-
key -> case MessageCache.member?(cache, key) do
337-
true -> true
338-
false -> cache_put(cache, key, headers)
339-
false
340-
end
327+
# Returns true if the key is is already present in the deduplication cache.
328+
# Otherwise, it adds it to the cache and returns false.
329+
defp cached?(cache, key) do
330+
case MessageCache.member?(cache, key) do
331+
true -> true
332+
false -> MessageCache.put(cache, key)
333+
false
341334
end
342335
end
343336

344-
# Puts the key and related headers in the cache
345-
defp cache_put(cache, key, headers) do
346-
case rabbitmq_keyfind(headers, "x-cache-ttl") do
347-
nil -> MessageCache.put(cache, key)
348-
ttl -> MessageCache.put(cache, key, ttl)
337+
# Return the deduplication header of the given message, nil if none.
338+
defp deduplication_header(basic_message(content: content)) do
339+
case message_headers(content) do
340+
headers when is_list(headers) ->
341+
rabbitmq_keyfind(headers, "x-deduplication-header")
342+
_ -> nil
349343
end
350344
end
351345

0 commit comments

Comments
 (0)