Skip to content

Commit 1e90b9c

Browse files
committed
Code cleanup
Signed-off-by: Matteo Cafasso <[email protected]>
1 parent ac3516f commit 1e90b9c

File tree

3 files changed

+69
-53
lines changed

3 files changed

+69
-53
lines changed

lib/cache.ex

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ defmodule RabbitMQ.Cache do
1313

1414
use GenServer
1515

16-
require Record
17-
1816
alias :os, as: Os
1917
alias :timer, as: Timer
2018
alias :mnesia, as: Mnesia
@@ -49,60 +47,49 @@ defmodule RabbitMQ.Cache do
4947
GenServer.call(cache, {:drop, cache})
5048
end
5149

52-
@doc """
53-
Return the PID of the given cache, nil if non existing.
54-
"""
55-
def process(cache) do
56-
GenServer.whereis(cache)
57-
end
58-
5950
## Server Callbacks
6051

52+
# Creates the Mnesia table and starts the janitor process.
6153
def init({cache, options}) do
6254
Mnesia.start()
6355

6456
:ok = cache_create(cache, options)
6557

66-
Process.send_after(cache, {:cache, cache}, 3000)
58+
Process.send_after(cache, {:cache, cache}, Timer.seconds(3))
6759

6860
{:ok, %{}}
6961
end
7062

63+
# The janitor process deletes expired cache entries.
7164
def handle_info({:cache, cache}, state) do
7265
{_, result} = cache_delete_expired(cache)
7366
if (result == :ok) do
74-
Process.send_after(cache, {:cache, cache}, 3000)
67+
Process.send_after(cache, {:cache, cache}, Timer.seconds(3))
7568
end
7669

7770
{:noreply, state}
7871
end
7972

73+
# Puts a new entry in the cache.
74+
# If the cache is full, remove an element to make space.
8075
def handle_call({:put, cache, value, ttl}, _from, state) do
81-
{:default_ttl, default_ttl} = cache_default_ttl(cache)
82-
expiration = cond do
83-
ttl != nil -> Os.system_time(:seconds) + ttl
84-
default_ttl != nil -> Os.system_time(:seconds) + default_ttl
85-
true -> nil
86-
end
87-
88-
# Remove first element if cache is full
89-
size = Mnesia.table_info(cache, :size)
90-
{:limit, limit} = cache_limit(cache)
91-
if size >= limit do
76+
if cache_full?(cache) do
9277
cache_delete_first(cache)
9378
end
9479

9580
Mnesia.transaction(fn ->
96-
Mnesia.write({cache, value, expiration})
81+
Mnesia.write({cache, value, entry_expiration(cache, ttl)})
9782
end)
9883

9984
{:reply, :ok, state}
10085
end
10186

87+
# True if the value is in the cache.
10288
def handle_call({:member?, cache, value}, _from, state) do
10389
{:reply, cache_member?(cache, value), state}
10490
end
10591

92+
# Drop the Mnesia cache table.
10693
def handle_call({:drop, cache}, _from, state) do
10794
case Mnesia.delete_table(cache) do
10895
{:atomic, :ok} -> {:reply, :ok, state}
@@ -112,6 +99,7 @@ defmodule RabbitMQ.Cache do
11299

113100
## Utility functions
114101

102+
# Mnesia cache table creation.
115103
defp cache_create(cache, options) do
116104
persistence = case Keyword.get(options, :persistence) do
117105
:disk -> :disc_copies
@@ -128,6 +116,7 @@ defmodule RabbitMQ.Cache do
128116
Mnesia.wait_for_tables([cache], Timer.seconds(30))
129117
end
130118

119+
# Mnesia cache lookup. The entry is not returned if expired.
131120
defp cache_member?(cache, value) do
132121
{:atomic, entries} = Mnesia.transaction(fn -> Mnesia.read(cache, value) end)
133122

@@ -137,6 +126,7 @@ defmodule RabbitMQ.Cache do
137126
end
138127
end
139128

129+
# Remove all expired entries from the Mnesia cache.
140130
defp cache_delete_expired(cache) do
141131
select = fn ->
142132
Mnesia.select(cache, [{{cache, :"$1", :_, :"$3"},
@@ -154,20 +144,33 @@ defmodule RabbitMQ.Cache do
154144
end
155145
end
156146

147+
# Delete the first element from the cache.
148+
# As the Mnesia Set is not ordered, the first element is random.
157149
defp cache_delete_first(cache) do
158-
Mnesia.transaction(
159-
fn ->
160-
Mnesia.delete({cache, Mnesia.first(cache)})
161-
end)
150+
Mnesia.transaction(fn -> Mnesia.delete({cache, Mnesia.first(cache)}) end)
151+
end
152+
153+
defp cache_full?(cache) do
154+
Mnesia.table_info(cache, :size) >= cache_property(cache, :limit)
162155
end
163156

164-
defp cache_limit(cache) do
165-
Enum.find(Mnesia.table_info(cache, :user_properties),
166-
fn(element) -> match?({:limit, _}, element) end)
157+
# Calculate the expiration given a TTL or the cache default TTL
158+
defp entry_expiration(cache, ttl) do
159+
default = cache_property(cache, :default_ttl)
160+
161+
cond do
162+
ttl != nil -> Os.system_time(:seconds) + ttl
163+
default != nil -> Os.system_time(:seconds) + default
164+
true -> nil
165+
end
167166
end
168167

169-
defp cache_default_ttl(cache) do
170-
Enum.find(Mnesia.table_info(cache, :user_properties),
171-
fn(element) -> match?({:default_ttl, _}, element) end)
168+
defp cache_property(cache, property) do
169+
{^property, value} =
170+
cache
171+
|> Mnesia.table_info(:user_properties)
172+
|> Enum.find(fn(element) -> match?({^property, _}, element) end)
173+
174+
value
172175
end
173176
end

lib/rabbit_message_deduplication_exchange.ex

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ defmodule RabbitMQ.ExchangeTypeMessageDeduplication do
160160
# Whether to route the message or not.
161161
defp route?(cache, message) do
162162
case message_headers(message) do
163-
headers when is_list(headers) -> not cached_message?(cache, headers)
163+
headers when is_list(headers) -> not cached?(cache, headers)
164164
:undefined -> true
165165
end
166166
end
@@ -171,7 +171,7 @@ defmodule RabbitMQ.ExchangeTypeMessageDeduplication do
171171
# false otherwise.
172172
#
173173
# If `x-deduplication-header` value is not present in the cache, it is added.
174-
defp cached_message?(cache, headers) do
174+
defp cached?(cache, headers) do
175175
case rabbitmq_keyfind(headers, "x-deduplication-header") do
176176
nil -> false
177177
key ->
@@ -189,23 +189,10 @@ defmodule RabbitMQ.ExchangeTypeMessageDeduplication do
189189
end
190190
end
191191

192-
# Unpacks the message headers
193-
defp message_headers(message) do
194-
message |> elem(2) |> elem(3)
195-
end
196-
197192
# Returns a sanitized atom composed by the resource and exchange name
198193
defp cache_name({:resource, resource, :exchange, exchange}) do
199-
resource =
200-
resource
201-
|> String.replace(~r/[-\. ]/, "_")
202-
|> String.replace("/", "")
203-
|> String.downcase()
204-
exchange =
205-
exchange
206-
|> String.replace(~r/[-\. ]/, "_")
207-
|> String.replace("/", "")
208-
|> String.downcase()
194+
resource = sanitize_string(resource)
195+
exchange = sanitize_string(exchange)
209196

210197
String.to_atom("cache_#{resource}_#{exchange}")
211198
end
@@ -217,4 +204,16 @@ defmodule RabbitMQ.ExchangeTypeMessageDeduplication do
217204
_ -> default
218205
end
219206
end
207+
208+
# Unpacks the message headers
209+
defp message_headers(message) do
210+
message |> elem(2) |> elem(3)
211+
end
212+
213+
defp sanitize_string(string) do
214+
string
215+
|> String.replace(~r/[-\. ]/, "_")
216+
|> String.replace("/", "")
217+
|> String.downcase()
218+
end
220219
end

lib/supervisor.ex

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
11
defmodule RabbitMQ.CacheSupervisor do
2-
use DynamicSupervisor
2+
@moduledoc """
3+
The Cache Supervisor supervisions the Cache GenServer Processes.
4+
"""
35

4-
require RabbitMQ.Cache
6+
use DynamicSupervisor
57

8+
@doc """
9+
Start the Supervisor process.
10+
"""
611
def start_link() do
712
case DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__) do
813
{:ok, _pid} -> :ok
914
_ -> :error
1015
end
1116
end
1217

18+
@doc """
19+
Starts a new cache with the given name and options.
20+
"""
1321
def start_cache(cache, options) do
1422
specifications = %{id: cache,
1523
start: {RabbitMQ.Cache, :start_link, [cache, options]}}
@@ -21,10 +29,16 @@ defmodule RabbitMQ.CacheSupervisor do
2129
end
2230
end
2331

32+
@doc """
33+
Stops the given cache.
34+
"""
2435
def stop_cache(cache) do
25-
DynamicSupervisor.terminate_child(__MODULE__, RabbitMQ.Cache.process(cache))
36+
DynamicSupervisor.terminate_child(__MODULE__, Process.whereis(cache))
2637
end
2738

39+
@doc """
40+
Supervisor initialization callback.
41+
"""
2842
def init(_args) do
2943
DynamicSupervisor.init(strategy: :one_for_one)
3044
end

0 commit comments

Comments
 (0)