Skip to content

Commit c9bdb96

Browse files
committed
cache: add insert function
The insert function atomically test if a key is already in the cache and inserts it if not. This function should be faster and more concise than using `member?` and `put` as the operation uses a single transaction instead of two or more. Signed-off-by: Matteo Cafasso <[email protected]>
1 parent 302d713 commit c9bdb96

File tree

1 file changed

+43
-0
lines changed

1 file changed

+43
-0
lines changed

lib/cache.ex

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Cache do
4545
GenServer.call(cache, {:put, cache, entry, ttl})
4646
end
4747

48+
@doc """
49+
Insert the given entry into the cache if it doesn't exist.
50+
The TTL controls the lifetime in milliseconds of the entry.
51+
"""
52+
@spec insert(atom, any, integer | nil) ::
53+
:ok | { :error, :already_exists | any }
54+
def insert(cache, entry, ttl \\ nil) do
55+
GenServer.call(cache, {:insert, cache, entry, ttl})
56+
end
57+
4858
@doc """
4959
Delete the given entry from the cache.
5060
"""
@@ -122,6 +132,39 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Cache do
122132
{:reply, :ok, state}
123133
end
124134

135+
# Inserts the entry if it doesn't exist.
136+
# If the cache is full, remove an element to make space.
137+
def handle_call({:insert, cache, entry, ttl}, _from, state) do
138+
function = fn ->
139+
entries = Mnesia.read(cache, entry)
140+
member? = case List.keyfind(entries, entry, 1) do
141+
{_, _, expiration} ->
142+
if expiration <= Os.system_time(:millisecond) do
143+
Mnesia.delete({cache, entry})
144+
false
145+
else
146+
true
147+
end
148+
nil -> false
149+
end
150+
151+
if member? do
152+
{:error, :already_exists}
153+
else
154+
if cache_full?(cache) do
155+
Mnesia.delete({cache, Mnesia.first(cache)})
156+
end
157+
158+
Mnesia.write({cache, entry, entry_expiration(cache, ttl)})
159+
end
160+
end
161+
162+
case Mnesia.transaction(function) do
163+
{:atomic, retval} -> {:reply, retval, state}
164+
{:aborted, reason} -> {:reply, {:error, reason}, state}
165+
end
166+
end
167+
125168
# Removes the given entry from the cache.
126169
def handle_call({:delete, cache, entry}, _from, state) do
127170
Mnesia.transaction(fn ->

0 commit comments

Comments
 (0)