Skip to content

Commit f17f52c

Browse files
committed
issue-97: handle cache reconfiguration at startup
Exchange and Queue modules now make sure old caches are reconfigured at startup. Signed-off-by: Matteo Cafasso <[email protected]>
1 parent 1058a53 commit f17f52c

File tree

2 files changed

+32
-7
lines changed

2 files changed

+32
-7
lines changed

lib/rabbitmq_message_deduplication/rabbit_message_deduplication_exchange.ex

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ defmodule RabbitMQMessageDeduplication.Exchange do
4343
:rabbit_boot_step,
4444
accumulate: true, persist: true)
4545

46+
@exchange_type <<"x-message-deduplication">>
4647
@rabbit_boot_step {__MODULE__,
4748
[{:description, "exchange type x-message-deduplication"},
4849
{:mfa, {__MODULE__, :register, []}},
@@ -63,21 +64,22 @@ defmodule RabbitMQMessageDeduplication.Exchange do
6364
"""
6465
@spec register() :: :ok
6566
def register() do
66-
RabbitRegistry.register(:exchange, <<"x-message-deduplication">>, __MODULE__)
67+
RabbitRegistry.register(:exchange, @exchange_type, __MODULE__)
68+
maybe_reconfigure_caches()
6769
end
6870

6971
@doc """
7072
Unregister the exchange type from the Broker.
7173
"""
7274
@spec unregister() :: :ok
7375
def unregister() do
74-
RabbitRegistry.unregister(:exchange, <<"x-message-deduplication">>)
76+
RabbitRegistry.unregister(:exchange, @exchange_type)
7577
end
7678

7779
@impl :rabbit_exchange_type
7880
def description() do
7981
[
80-
{:name, <<"x-message-deduplication">>},
82+
{:name, @exchange_type},
8183
{:description, <<"Message Deduplication Exchange.">>}
8284
]
8385
end
@@ -165,9 +167,7 @@ defmodule RabbitMQMessageDeduplication.Exchange do
165167
name |> Common.cache_name() |> CacheManager.destroy()
166168
end
167169

168-
def delete(:transaction, exchange, _bs), do: delete(:none, exchange)
169-
170-
def delete(:none, _ex, _bs), do: :ok
170+
def delete(_tx, exchange, _bs), do: delete(:none, exchange)
171171

172172
@impl :rabbit_exchange_type
173173
def policy_changed(_ex, exchange(name: name, arguments: args, policy: :undefined)) do
@@ -249,4 +249,15 @@ defmodule RabbitMQMessageDeduplication.Exchange do
249249
Cache.change_option(cache, key, value)
250250
end
251251
end
252+
253+
# Caches created prior to v0.6.0 need to be reconfigured.
254+
defp maybe_reconfigure_caches() do
255+
RabbitLog.debug("Deduplication Exchanges startup, reconfiguring old caches")
256+
257+
RabbitExchange.list()
258+
|> Enum.filter(fn(exchange(name: type)) -> type == @exchange_type end)
259+
|> Enum.map(fn(exchange) -> create(:none, exchange) end)
260+
261+
:ok
262+
end
252263
end

lib/rabbitmq_message_deduplication/rabbit_message_deduplication_queue.ex

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ defmodule RabbitMQMessageDeduplication.Queue do
2929

3030
alias :amqqueue, as: AMQQueue
3131
alias :rabbit_log, as: RabbitLog
32+
alias :rabbit_amqqueue, as: RabbitQueue
3233
alias RabbitMQMessageDeduplication.Common, as: Common
3334
alias RabbitMQMessageDeduplication.Cache, as: Cache
3435
alias RabbitMQMessageDeduplication.CacheManager, as: CacheManager
@@ -116,6 +117,7 @@ defmodule RabbitMQMessageDeduplication.Queue do
116117
"Deduplication queues enabled, real BQ is ~s~n", [backing_queue])
117118
Application.put_env(__MODULE__, :backing_queue_module, backing_queue)
118119
Application.put_env(:rabbit, :backing_queue_module, __MODULE__)
120+
maybe_reconfigure_caches()
119121
end
120122
end
121123

@@ -474,10 +476,22 @@ defmodule RabbitMQMessageDeduplication.Queue do
474476
disable_dedup_queue?(state) ->
475477
:ok = delete_cache(queue)
476478
dqstate(queue: queue, queue_state: qs, dedup_enabled: false)
477-
true -> dqstate(queue: queue, queue_state: qs, dedup_enabled: false)
479+
true ->
480+
dqstate(queue: queue, queue_state: qs, dedup_enabled: false)
478481
end
479482
end
480483

484+
# Caches created prior to v0.6.0 need to be reconfigured.
485+
defp maybe_reconfigure_caches() do
486+
RabbitLog.debug("Deduplication Queues startup, reconfiguring old caches")
487+
488+
RabbitQueue.list()
489+
|> Enum.filter(&dedup_arg?/1)
490+
|> Enum.map(&init_cache/1)
491+
492+
:ok
493+
end
494+
481495
# Initialize the deduplication cache
482496
defp init_cache(queue) do
483497
cache = queue |> AMQQueue.get_name() |> Common.cache_name()

0 commit comments

Comments
 (0)