@@ -125,9 +125,10 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
125125 def init ( queue = amqqueue ( name: name , arguments: args ) , recovery , callback ) do
126126 if duplicate? ( queue ) do
127127 cache = Common . cache_name ( name )
128- options = [ ttl: Common . cache_argument ( args , "x-message-ttl" , :number ) ,
129- persistence: Common . cache_argument (
130- args , "x-cache-persistence" , :atom , "memory" ) ]
128+ options = [ ttl: Common . rabbit_argument (
129+ args , "x-message-ttl" , type: :number ) ,
130+ persistence: Common . rabbit_argument (
131+ args , "x-cache-persistence" , type: :atom , default: "memory" ) ]
131132
132133 RabbitLog . debug (
133134 "Starting queue deduplication cache ~s with options ~p~n" ,
@@ -145,8 +146,9 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
145146 passthrough1 ( state , do: terminate ( any , qs ) )
146147 end
147148
148- def delete_and_terminate (
149- any , state = dqstate ( queue: queue , queue_state: qs ) ) do
149+ def delete_and_terminate ( any , state ) do
150+ dqstate ( queue: queue , queue_state: qs ) = state
151+
150152 if duplicate? ( queue ) do
151153 cache = queue |> amqqueue ( :name ) |> Common . cache_name ( )
152154
@@ -188,15 +190,17 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
188190 passthrough1 ( state , do: batch_publish ( [ publish ] , pid , flow , qs ) )
189191 end
190192
191- def publish_delivered ( basic_message , message_properties , pid , flow ,
192- state = dqstate ( queue_state: qs ) ) do
193+ def publish_delivered ( message , message_properties , pid , flow , state ) do
194+ dqstate ( queue_state: qs ) = state
195+
193196 passthrough2 ( state ) do
194- publish_delivered ( basic_message , message_properties , pid , flow , qs )
197+ publish_delivered ( message , message_properties , pid , flow , qs )
195198 end
196199 end
197200
198- def batch_publish_delivered ( [ delivered_publish ] , pid , flow ,
199- state = dqstate ( queue_state: qs ) ) do
201+ def batch_publish_delivered ( [ delivered_publish ] , pid , flow , state ) do
202+ dqstate ( queue_state: qs ) = state
203+
200204 passthrough2 ( state ) do
201205 batch_publish_delivered ( [ delivered_publish ] , pid , flow , qs )
202206 end
@@ -222,54 +226,52 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
222226 passthrough3 ( state , do: fetchwhile ( msg_pred , msg_fun , A , qs ) )
223227 end
224228
225- def fetch ( need_ack , state = dqstate ( queue: queue , queue_state: qs ) ) do
226- case passthrough2 ( state , do: fetch ( need_ack , qs ) ) do
229+ def fetch ( needs_ack , state = dqstate ( queue: queue , queue_state: qs ) ) do
230+ case passthrough2 ( state , do: fetch ( needs_ack , qs ) ) do
227231 { :empty , state } -> { :empty , state }
228232 { { message , delivery , ack_tag } , state } ->
229- case duplicate? ( queue ) do
230- false -> { { message , delivery , dqack ( tag: ack_tag ) } , state }
231- true ->
232- case need_ack do
233- true ->
234- head = Common . message_header ( message , "x-deduplication-header" )
235- ack = dqack ( tag: ack_tag , header: head )
236- { { message , delivery , ack } , state }
237- false ->
238- maybe_delete_cache_entry ( queue , message )
239-
240- { { message , delivery , dqack ( tag: ack_tag ) } , state }
241- end
233+ if duplicate? ( queue ) do
234+ if needs_ack do
235+ head = Common . message_header ( message , "x-deduplication-header" )
236+ { { message , delivery , dqack ( tag: ack_tag , header: head ) } , state }
237+ else
238+ maybe_delete_cache_entry ( queue , message )
239+ { { message , delivery , dqack ( tag: ack_tag ) } , state }
240+ end
241+ else
242+ { { message , delivery , dqack ( tag: ack_tag ) } , state }
242243 end
243244 end
244245 end
245246
246247 # TODO: this is a bit of a hack.
247248 # As the drop callback returns only the message id, we can't retrieve
248- # the message deduplication header. As a workaround fetch is used.
249+ # the message deduplication header. As a workaround ` fetch` is used.
249250 # This assumes the backing queue drop and fetch behaviours are the same.
250251 # A better solution would be to store the message IDs in a dedicated index.
251252 def drop ( need_ack , state = dqstate ( queue: queue , queue_state: qs ) ) do
252- case duplicate? ( queue ) do
253- false -> passthrough2 ( state , do: drop ( need_ack , qs ) )
254- true ->
255- case fetch ( need_ack , state ) do
256- { :empty , state } -> { :empty , state }
257- { { message = basic_message ( id: id ) , _ , ack_tag } , state } ->
258- maybe_delete_cache_entry ( queue , message )
259-
260- { { id , ack_tag } , state }
261- end
253+ if duplicate? ( queue ) do
254+ case fetch ( need_ack , state ) do
255+ { :empty , state } -> { :empty , state }
256+ { { message = basic_message ( id: id ) , _ , ack_tag } , state } ->
257+ maybe_delete_cache_entry ( queue , message )
258+
259+ { { id , ack_tag } , state }
260+ end
261+ else
262+ passthrough2 ( state , do: drop ( need_ack , qs ) )
262263 end
263264 end
264265
265266 def ack ( acks , state = dqstate ( queue: queue , queue_state: qs ) ) do
266- acks = case duplicate? ( queue ) do
267- false -> Enum . map ( acks , fn ( dqack ( tag: ack ) ) -> ack end )
268- true -> Enum . map ( acks , fn ( dqack ( tag: ack , header: header ) ) ->
269- maybe_delete_cache_entry ( queue , header )
270- ack
271- end )
272- end
267+ acks = if duplicate? ( queue ) do
268+ Enum . map ( acks , fn ( dqack ( tag: ack , header: header ) ) ->
269+ maybe_delete_cache_entry ( queue , header )
270+ ack
271+ end )
272+ else
273+ Enum . map ( acks , fn ( dqack ( tag: ack ) ) -> ack end )
274+ end
273275
274276 passthrough2 ( state , do: ack ( acks , qs ) )
275277 end
@@ -329,9 +331,10 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
329331 def info ( atom , dqstate ( queue: queue , queue_state: qs ) ) do
330332 case passthrough do: info ( atom , qs ) do
331333 queue_info when is_list ( queue_info ) ->
332- case duplicate? ( queue ) do
333- true -> [ cache_info: cache_info ( queue ) ] ++ queue_info
334- false -> queue_info
334+ if duplicate? ( queue ) do
335+ [ cache_info: cache_info ( queue ) ] ++ queue_info
336+ else
337+ queue_info
335338 end
336339 queue_info -> queue_info
337340 end
@@ -354,8 +357,9 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
354357 passthrough1 ( state , do: set_queue_mode ( queue_mode , qs ) )
355358 end
356359
357- def zip_msgs_and_acks ( delivered_publish , [ ack ] ,
358- A , dqstate ( queue_state: qs ) ) do
360+ def zip_msgs_and_acks ( delivered_publish , [ ack ] , A , state ) do
361+ dqstate ( queue_state: qs ) = state
362+
359363 passthrough do: info ( delivered_publish , [ ack ] , A , qs )
360364 end
361365
@@ -367,7 +371,7 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
367371
368372 # Returns true if the queue supports message deduplication
369373 defp duplicate? ( amqqueue ( arguments: args ) ) do
370- Common . cache_argument ( args , "x-message-deduplication" , nil , false )
374+ Common . rabbit_argument ( args , "x-message-deduplication" , default: false )
371375 end
372376
373377 # Returns true if the queue supports message deduplication
@@ -380,8 +384,9 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
380384 end
381385
382386 # Returns the expiration property of the given message
383- defp message_expiration ( basic_message ( content:
384- content ( properties: properties ) ) ) do
387+ defp message_expiration ( message ) do
388+ basic_message ( content: content ( properties: properties ) ) = message
389+
385390 case properties do
386391 basic_properties ( expiration: ttl ) when is_bitstring ( ttl ) ->
387392 String . to_integer ( ttl )
0 commit comments