Skip to content

Commit eca6b86

Browse files
committed
issue #28, #30: use new cache APIs in exchange and queue behaviours
Signed-off-by: Matteo Cafasso <[email protected]>
1 parent 385e6ff commit eca6b86

File tree

2 files changed

+15
-25
lines changed

2 files changed

+15
-25
lines changed

lib/rabbit_message_deduplication_exchange.ex

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,14 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Exchange do
2626

2727
require RabbitMQ.MessageDeduplicationPlugin.Cache
2828
require RabbitMQ.MessageDeduplicationPlugin.Common
29-
require RabbitMQ.MessageDeduplicationPlugin.Supervisor
3029

3130
alias :rabbit_log, as: RabbitLog
3231
alias :rabbit_misc, as: RabbitMisc
3332
alias :rabbit_router, as: RabbitRouter
3433
alias :rabbit_exchange, as: RabbitExchange
35-
alias RabbitMQ.MessageDeduplicationPlugin.Cache, as: MessageCache
3634
alias RabbitMQ.MessageDeduplicationPlugin.Common, as: Common
37-
alias RabbitMQ.MessageDeduplicationPlugin.Supervisor, as: CacheSupervisor
35+
alias RabbitMQ.MessageDeduplicationPlugin.Cache, as: Cache
36+
alias RabbitMQ.MessageDeduplicationPlugin.CacheManager, as: CacheManager
3837

3938
@behaviour :rabbit_exchange_type
4039

@@ -141,19 +140,15 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Exchange do
141140
"Starting exchange deduplication cache ~s with options ~p~n",
142141
[cache, options])
143142

144-
CacheSupervisor.start_cache(cache, options)
143+
CacheManager.create(cache, options)
145144
end
146145

147-
def create(:none, _ex) do
146+
def create(_tx, _ex) do
148147
:ok
149148
end
150149

151150
def delete(:transaction, exchange(name: name), _bs) do
152-
cache = Common.cache_name(name)
153-
154-
:ok = MessageCache.drop(cache)
155-
156-
CacheSupervisor.stop_cache(cache)
151+
name |> Common.cache_name() |> CacheManager.destroy()
157152
end
158153

159154
def delete(:none, _ex, _bs) do
@@ -181,7 +176,7 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Exchange do
181176
end
182177

183178
def info(exchange(name: name), [:cache_info]) do
184-
[cache_info: name |> Common.cache_name() |> MessageCache.info()]
179+
[cache_info: name |> Common.cache_name() |> Cache.info()]
185180
end
186181

187182
def info(_ex, _it) do

lib/rabbit_message_deduplication_queue.ex

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# License, v. 2.0. If a copy of the MPL was not distributed with this
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
44
#
5-
# Copyright (c) 2017-2018, Matteo Cafasso.
5+
# Copyright (c) 2017-2019, Matteo Cafasso.
66
# All rights reserved.
77

88
defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
@@ -26,12 +26,11 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
2626

2727
require RabbitMQ.MessageDeduplicationPlugin.Cache
2828
require RabbitMQ.MessageDeduplicationPlugin.Common
29-
require RabbitMQ.MessageDeduplicationPlugin.Supervisor
3029

3130
alias :rabbit_log, as: RabbitLog
32-
alias RabbitMQ.MessageDeduplicationPlugin.Cache, as: MessageCache
3331
alias RabbitMQ.MessageDeduplicationPlugin.Common, as: Common
34-
alias RabbitMQ.MessageDeduplicationPlugin.Supervisor, as: CacheSupervisor
32+
alias RabbitMQ.MessageDeduplicationPlugin.Cache, as: Cache
33+
alias RabbitMQ.MessageDeduplicationPlugin.CacheManager, as: CacheManager
3534

3635
@behaviour :rabbit_backing_queue
3736

@@ -40,8 +39,7 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
4039
accumulate: true, persist: true
4140

4241
@rabbit_boot_step {__MODULE__,
43-
[{:description,
44-
"message deduplication queue: supervisor"},
42+
[{:description, "message deduplication queue"},
4543
{:mfa, {__MODULE__, :enable_deduplication_queues, []}},
4644
{:requires, :database},
4745
{:enables, :external_infrastructure}]}
@@ -134,7 +132,7 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
134132
"Starting queue deduplication cache ~s with options ~p~n",
135133
[cache, options])
136134

137-
CacheSupervisor.start_cache(cache, options)
135+
CacheManager.create(cache, options)
138136
end
139137

140138
passthrough1(dqstate(queue: queue)) do
@@ -150,10 +148,7 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
150148
dqstate(queue: queue, queue_state: qs) = state
151149

152150
if duplicate?(queue) do
153-
cache = queue |> amqqueue(:name) |> Common.cache_name()
154-
155-
:ok = MessageCache.drop(cache)
156-
:ok = CacheSupervisor.stop_cache(cache)
151+
queue |> amqqueue(:name) |> Common.cache_name() |> CacheManager.destroy()
157152
end
158153

159154
passthrough1(state) do
@@ -169,7 +164,7 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
169164
if duplicate?(queue) do
170165
cache = queue |> amqqueue(:name) |> Common.cache_name()
171166

172-
:ok = MessageCache.flush(cache)
167+
:ok = Cache.flush(cache)
173168
end
174169

175170
passthrough2(state, do: purge(qs))
@@ -437,7 +432,7 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
437432
queue
438433
|> amqqueue(:name)
439434
|> Common.cache_name()
440-
|> MessageCache.delete(header)
435+
|> Cache.delete(header)
441436
end
442437

443438
defp maybe_delete_cache_entry(_queue, header) when is_nil(header) do end
@@ -447,7 +442,7 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
447442
cache = Common.cache_name(name)
448443

449444
try do
450-
MessageCache.info(cache)
445+
Cache.info(cache)
451446
catch
452447
:exit, {:noproc, {GenServer, :call, [^cache | _]}} -> []
453448
end

0 commit comments

Comments
 (0)