66# All rights reserved.
77
88defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
9+ @ moduledoc """
10+ This module adds support for deduplication queues.
11+
12+ Messages carrying the `x-deduplication-header` header will be deduplicated
13+ if a message with the same header value is already present within the queue.
14+
15+ When a message is published within the queue, it's checked against duplicates.
16+ If no duplicate is found, the message is inserted and its deduplication header
17+ cached. Once the message is acknowledged or dropped, the header is removed
18+ from the cache.
19+
20+ This module implements the `rabbit_backing_queue` behaviour delegating
21+ all the queue related operation to the underlying backing queue.
22+
23+ """
24+
925 import Record , only: [ defrecord: 2 , defrecord: 3 , defrecordp: 2 , extract: 2 ]
1026
1127 require RabbitMQ.MessageDeduplicationPlugin.Cache
@@ -45,13 +61,19 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
4561 defrecordp :dqack , [ :tag , :header ]
4662 defrecordp :dqstate , [ :queue , :queue_state ]
4763
64+ # The passthrough macros call the underlying backing queue functions
65+ # The suffixes indicate the arity of the return values
66+ # of the backing queue functions they are wrapping.
67+
68+ # Backing queue functions returning one value, does not change the queue state
4869 defmacrop passthrough ( do: function ) do
4970 quote do
5071 backing_queue = Application . get_env ( __MODULE__ , :backing_queue_module )
5172 backing_queue . unquote ( function )
5273 end
5374 end
5475
76+ # Backing queue functions returning the state
5577 defmacrop passthrough1 ( state , do: function ) do
5678 quote do
5779 queue = dqstate ( unquote ( state ) , :queue )
@@ -61,6 +83,7 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
6183 end
6284 end
6385
86+ # Backing queue functions returning a tuple {result, state}
6487 defmacrop passthrough2 ( state , do: function ) do
6588 quote do
6689 queue = dqstate ( unquote ( state ) , :queue )
@@ -70,6 +93,7 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
7093 end
7194 end
7295
96+ # Backing queue functions returning a tuple {result1, result2, state}
7397 defmacrop passthrough3 ( state , do: function ) do
7498 quote do
7599 queue = dqstate ( unquote ( state ) , :queue )
@@ -239,15 +263,13 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
239263 end
240264
241265 def ack ( acks , state = dqstate ( queue: queue , queue_state: qs ) ) do
242- acks =
243- case duplicate? ( queue ) do
244- false -> Enum . map ( acks , fn ( dqack ( tag: ack ) ) -> ack end )
245- true ->
246- Enum . map ( acks , fn ( dqack ( tag: ack , header: header ) ) ->
247- maybe_delete_cache_entry ( queue , header )
248- ack
249- end )
250- end
266+ acks = case duplicate? ( queue ) do
267+ false -> Enum . map ( acks , fn ( dqack ( tag: ack ) ) -> ack end )
268+ true -> Enum . map ( acks , fn ( dqack ( tag: ack , header: header ) ) ->
269+ maybe_delete_cache_entry ( queue , header )
270+ ack
271+ end )
272+ end
251273
252274 passthrough2 ( state , do: ack ( acks , qs ) )
253275 end
@@ -305,18 +327,13 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
305327 end
306328
307329 def info ( atom , dqstate ( queue: queue , queue_state: qs ) ) do
308- case duplicate? ( queue ) do
309- true -> case passthrough do: info ( atom , qs ) do
310- queue_info when is_list ( queue_info ) ->
311- cache_info = queue
312- |> amqqueue ( :name )
313- |> Common . cache_name ( )
314- |> MessageCache . info ( )
315-
316- [ cache_info: cache_info ] ++ queue_info
317- queue_info -> queue_info
318- end
319- false -> passthrough do: info ( atom , qs )
330+ case passthrough do: info ( atom , qs ) do
331+ queue_info when is_list ( queue_info ) ->
332+ case duplicate? ( queue ) do
333+ true -> [ cache_info: cache_info ( queue ) ] ++ queue_info
334+ false -> queue_info
335+ end
336+ queue_info -> queue_info
320337 end
321338 end
322339
@@ -338,7 +355,7 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
338355 end
339356
340357 def zip_msgs_and_acks ( delivered_publish , [ ack ] ,
341- A , dqstate ( queue_state: qs ) ) do
358+ A , dqstate ( queue_state: qs ) ) do
342359 passthrough do: info ( delivered_publish , [ ack ] , A , qs )
343360 end
344361
@@ -355,34 +372,41 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
355372
356373 # Returns true if the queue supports message deduplication
357374 # and the message is a duplicate.
358- defp duplicate? ( queue = amqqueue ( name: name ) ,
359- message = basic_message ( content: content ( properties: properties ) ) ) do
360- ttl = case properties do
361- basic_properties ( expiration: ttl ) when is_bitstring ( ttl ) ->
362- String . to_integer ( ttl )
363- basic_properties ( expiration: :undefined ) -> nil
364- :undefined -> nil
365- end
366-
375+ defp duplicate? ( queue = amqqueue ( name: name ) , message = basic_message ( ) ) do
367376 case duplicate? ( queue ) do
368- true -> Common . duplicate? ( name , message , ttl )
377+ true -> Common . duplicate? ( name , message , message_expiration ( message ) )
369378 false -> false
370379 end
371380 end
372381
373- # Remove the message deduplication header from the cache
374- def maybe_delete_cache_entry ( queue = amqqueue ( ) , message = basic_message ( ) ) do
375- header = Common . message_header ( message , "x-deduplication-header" )
382+ # Returns the expiration property of the given message
383+ defp message_expiration ( basic_message ( content:
384+ content ( properties: properties ) ) ) do
385+ case properties do
386+ basic_properties ( expiration: ttl ) when is_bitstring ( ttl ) ->
387+ String . to_integer ( ttl )
388+ basic_properties ( expiration: :undefined ) -> nil
389+ :undefined -> nil
390+ end
391+ end
376392
393+ # Removes the message deduplication header from the cache
394+ defp maybe_delete_cache_entry ( queue = amqqueue ( ) , msg = basic_message ( ) ) do
395+ header = Common . message_header ( msg , "x-deduplication-header" )
377396 maybe_delete_cache_entry ( queue , header )
378397 end
379398
380- def maybe_delete_cache_entry ( queue , header ) when is_bitstring ( header ) do
399+ defp maybe_delete_cache_entry ( queue , header ) when is_bitstring ( header ) do
381400 queue
382401 |> amqqueue ( :name )
383402 |> Common . cache_name ( )
384403 |> MessageCache . delete ( header )
385404 end
386405
387- def maybe_delete_cache_entry ( _queue , header ) when is_nil ( header ) do end
406+ defp maybe_delete_cache_entry ( _queue , header ) when is_nil ( header ) do end
407+
408+ # Returns the cache information
409+ defp cache_info ( amqqueue ( name: name ) ) do
410+ name |> Common . cache_name ( ) |> MessageCache . info ( )
411+ end
388412end
0 commit comments