@@ -36,15 +36,6 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Cache do
3636 GenServer . start_link ( __MODULE__ , { cache , options } , name: cache )
3737 end
3838
39- @ doc """
40- Put the given entry into the cache.
41- The TTL controls the lifetime in milliseconds of the entry.
42- """
43- @ spec put ( atom , any , integer | nil ) :: :ok | { :error , any }
44- def put ( cache , entry , ttl \\ nil ) do
45- GenServer . call ( cache , { :put , cache , entry , ttl } )
46- end
47-
4839 @ doc """
4940 Insert the given entry into the cache if it doesn't exist.
5041 The TTL controls the lifetime in milliseconds of the entry.
@@ -63,14 +54,6 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Cache do
6354 GenServer . call ( cache , { :delete , cache , entry } )
6455 end
6556
66- @ doc """
67- True if the entry is contained within the cache.
68- """
69- @ spec delete ( atom , any ) :: boolean
70- def member? ( cache , entry ) do
71- GenServer . call ( cache , { :member? , cache , entry } )
72- end
73-
7457 @ doc """
7558 Flush the cache content.
7659 """
@@ -118,41 +101,15 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Cache do
118101 { :noreply , state }
119102 end
120103
121- # Puts a new entry in the cache.
122- # If the cache is full, remove an element to make space.
123- def handle_call ( { :put , cache , entry , ttl } , _from , state ) do
124- if cache_full? ( cache ) do
125- cache_delete_first ( cache )
126- end
127-
128- Mnesia . transaction ( fn ->
129- Mnesia . write ( { cache , entry , entry_expiration ( cache , ttl ) } )
130- end )
131-
132- { :reply , :ok , state }
133- end
134-
135104 # Inserts the entry if it doesn't exist.
136105 # If the cache is full, remove an element to make space.
137106 def handle_call ( { :insert , cache , entry , ttl } , _from , state ) do
138107 function = fn ->
139- entries = Mnesia . read ( cache , entry )
140- member? = case List . keyfind ( entries , entry , 1 ) do
141- { _ , _ , expiration } ->
142- if expiration <= Os . system_time ( :millisecond ) do
143- Mnesia . delete ( { cache , entry } )
144- false
145- else
146- true
147- end
148- nil -> false
149- end
150-
151- if member? do
108+ if cache_member? ( cache , entry ) do
152109 { :error , :already_exists }
153110 else
154111 if cache_full? ( cache ) do
155- Mnesia . delete ( { cache , Mnesia . first ( cache ) } )
112+ cache_delete_first ( cache )
156113 end
157114
158115 Mnesia . write ( { cache , entry , entry_expiration ( cache , ttl ) } )
@@ -174,11 +131,6 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Cache do
174131 { :reply , :ok , state }
175132 end
176133
177- # True if the entry is in the cache.
178- def handle_call ( { :member? , cache , entry } , _from , state ) do
179- { :reply , cache_member? ( cache , entry ) , state }
180- end
181-
182134 # Flush the Mnesia cache table.
183135 def handle_call ( { :flush , cache } , _from , state ) do
184136 case Mnesia . clear_table ( cache ) do
@@ -228,12 +180,16 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Cache do
228180 Mnesia . wait_for_tables ( [ cache ] , Timer . seconds ( 30 ) )
229181 end
230182
231- # Mnesia cache lookup. The entry is not returned if expired.
183+ # Lookup the entry within the cache, deletes the entry if expired
184+ # Must be included within transaction.
232185 defp cache_member? ( cache , entry ) do
233- { :atomic , entries } = Mnesia . transaction ( fn -> Mnesia . read ( cache , entry ) end )
234-
235- case List . keyfind ( entries , entry , 1 ) do
236- { _ , _ , expiration } -> expiration > Os . system_time ( :millisecond )
186+ case cache |> Mnesia . read ( entry ) |> List . keyfind ( entry , 1 ) do
187+ { _ , _ , expiration } -> if expiration <= Os . system_time ( :millisecond ) do
188+ Mnesia . delete ( { cache , entry } )
189+ false
190+ else
191+ true
192+ end
237193 nil -> false
238194 end
239195 end
@@ -258,10 +214,12 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Cache do
258214
259215 # Delete the first element from the cache.
260216 # As the Mnesia Set is not ordered, the first element is random.
217+ # Must be included within transaction.
261218 defp cache_delete_first ( cache ) do
262- Mnesia . transaction ( fn -> Mnesia . delete ( { cache , Mnesia . first ( cache ) } ) end )
219+ Mnesia . delete ( { cache , Mnesia . first ( cache ) } )
263220 end
264221
222+ # True if the cache is full, false otherwise.
265223 defp cache_full? ( cache ) do
266224 Mnesia . table_info ( cache , :size ) >= cache_property ( cache , :limit )
267225 end
0 commit comments