Skip to content

Commit 7784274

Browse files
committed
queue: handle message TTL
Cache TTL mechanism is used in place of `dropwhile` and `fetchwhile` callbacks. Signed-off-by: Matteo Cafasso <[email protected]>
1 parent 36eb6bf commit 7784274

File tree

1 file changed

+37
-25
lines changed

1 file changed

+37
-25
lines changed

lib/rabbit_message_deduplication_queue.ex

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,15 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
9090
passthrough do: stop(vhost)
9191
end
9292

93-
def init(queue, recovery, callback) do
93+
def init(queue = amqqueue(name: name, arguments: args), recovery, callback) do
9494
if duplicate?(queue) do
95-
cache = queue |> amqqueue(:name) |> cache_name()
95+
cache = cache_name(name)
96+
ttl = rabbitmq_keyfind(args, "x-message-ttl")
9697
persistence =
97-
queue
98-
|> amqqueue(:arguments)
98+
args
9999
|> rabbitmq_keyfind("x-cache-persistence", "memory")
100100
|> String.to_atom()
101-
options = [persistence: persistence]
101+
options = [ttl: ttl, persistence: persistence]
102102

103103
RabbitLog.debug("Starting queue deduplication cache ~s~n", [cache])
104104

@@ -175,10 +175,14 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
175175
passthrough2(state, do: drain_confirmed(qs))
176176
end
177177

178+
# The dropwhile callback handles message TTL expiration.
179+
# The duplicates cache TTL mechanism is used instead.
178180
def dropwhile(msg_pred, state = dqstate(queue_state: qs)) do
179181
passthrough2(state, do: dropwhile(msg_pred, qs))
180182
end
181183

184+
# The fetchwhile callback handles message TTL dead lettering.
185+
# The duplicates cache TTL mechanism is used instead.
182186
def fetchwhile(msg_pred, msg_fun, A, state = dqstate(queue_state: qs)) do
183187
passthrough3(state, do: fetchwhile(msg_pred, msg_fun, A, qs))
184188
end
@@ -228,19 +232,19 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
228232
end
229233

230234
def ack(acks, state = dqstate(queue: queue, queue_state: qs)) do
231-
acks = case duplicate?(queue) do
232-
false -> Enum.map(acks, fn(dqack(tag: ack)) -> ack end)
233-
true ->
234-
cache = queue |> amqqueue(:name) |> cache_name()
235+
acks =
236+
case duplicate?(queue) do
237+
false -> Enum.map(acks, fn(dqack(tag: ack)) -> ack end)
238+
true ->
239+
cache = queue |> amqqueue(:name) |> cache_name()
235240

236-
Enum.map(acks, fn(dqack(tag: ack, header: header)) ->
237-
if not is_nil(header) do
238-
MessageCache.delete(cache, header)
239-
end
240-
241-
ack
242-
end)
243-
end
241+
Enum.map(acks, fn(dqack(tag: ack, header: header)) ->
242+
if not is_nil(header) do
243+
MessageCache.delete(cache, header)
244+
end
245+
ack
246+
end)
247+
end
244248

245249
passthrough2(state, do: ack(acks, qs))
246250
end
@@ -340,25 +344,25 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
340344
defp duplicate?(amqqueue(name: name, arguments: arguments), message) do
341345
with true <- rabbitmq_keyfind(arguments, "x-message-deduplication", false),
342346
header when not is_nil(header) <- deduplication_header(message) do
343-
name |> cache_name() |> cached?(header)
347+
name |> cache_name() |> cached?(header, message_ttl(message))
344348
else
345349
_ -> false
346350
end
347351
end
348352

349-
# Returns true if the key is is already present in the deduplication cache.
353+
# Returns true if the key is already present in the deduplication cache.
350354
# Otherwise, it adds it to the cache and returns false.
351-
defp cached?(cache, key) do
355+
defp cached?(cache, key, ttl) do
352356
case MessageCache.member?(cache, key) do
353357
true -> true
354-
false -> MessageCache.put(cache, key)
358+
false -> MessageCache.put(cache, key, ttl)
355359
false
356360
end
357361
end
358362

359363
# Return the deduplication header of the given message, nil if none.
360-
defp deduplication_header(basic_message(content: content)) do
361-
case message_headers(content) do
364+
defp deduplication_header(message) do
365+
case message_headers(message) do
362366
headers when is_list(headers) ->
363367
rabbitmq_keyfind(headers, "x-deduplication-header")
364368
_ -> nil
@@ -382,8 +386,16 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
382386
end
383387

384388
# Unpacks the message headers
385-
defp message_headers(message) do
386-
message |> elem(2) |> elem(3)
389+
defp message_headers(basic_message(content: content)) do
390+
content |> elem(2) |> elem(3)
391+
end
392+
393+
# Unpacks the message TTL in seconds
394+
defp message_ttl(basic_message(content: content)) do
395+
case content |> elem(2) |> elem(8) do
396+
:undefined -> nil
397+
ttl -> ttl |> String.to_integer()
398+
end
387399
end
388400

389401
defp sanitize_string(string) do

0 commit comments

Comments
 (0)