Skip to content

Commit 328d6a1

Browse files
committed
issue #76: revert to broker boot steps to initialize the plugin
As per commit rabbitmq/rabbitmq-server@a0cd2e5, the original issue within the broker was fixed. Hence, the plugin is reverted to the original boot sequence as it is more lightweight compared to the previous one. Signed-off-by: Matteo Cafasso <[email protected]>
1 parent 9d9642c commit 328d6a1

File tree

4 files changed

+41
-71
lines changed

4 files changed

+41
-71
lines changed

lib/rabbitmq_message_deduplication.ex

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,21 @@ defmodule RabbitMQMessageDeduplication do
99

1010
use Application
1111

12+
# Start a dummy supervisor to enable the Application behaviour.
13+
# http://erlang.org/pipermail/erlang-questions/2010-April/050508.html
14+
@impl true
1215
def start(_, _) do
13-
children = [
14-
%{
15-
id: RabbitMQMessageDeduplication.CacheManager,
16-
start: {RabbitMQMessageDeduplication.CacheManager, :start_link, []}
17-
}
18-
]
19-
20-
supervisor = Supervisor.start_link(children, [strategy: :one_for_one])
21-
22-
RabbitMQMessageDeduplication.Exchange.register()
23-
RabbitMQMessageDeduplication.Queue.enable()
24-
RabbitMQMessageDeduplication.Queue.restart_queues()
25-
26-
supervisor
16+
Supervisor.start_link(__MODULE__, [], [])
2717
end
2818

19+
@impl true
2920
def stop(_) do
3021
RabbitMQMessageDeduplication.Exchange.unregister()
3122
RabbitMQMessageDeduplication.Queue.disable()
3223
end
24+
25+
@impl true
26+
def init([]) do
27+
Supervisor.init([], strategy: :one_for_one)
28+
end
3329
end

lib/rabbitmq_message_deduplication/cache_manager.ex

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,17 @@ defmodule RabbitMQMessageDeduplication.CacheManager do
1818
alias :mnesia, as: Mnesia
1919
alias RabbitMQMessageDeduplication.Cache, as: Cache
2020

21+
Module.register_attribute(__MODULE__,
22+
:rabbit_boot_step,
23+
accumulate: true, persist: true)
24+
25+
@rabbit_boot_step {
26+
__MODULE__,
27+
[description: "message deduplication plugin cache maintenance process",
28+
mfa: {:rabbit_sup, :start_child, [__MODULE__]},
29+
requires: :database,
30+
enables: :external_infrastructure]}
31+
2132
@caches :message_deduplication_caches
2233
@cleanup_period Timer.seconds(3)
2334
@table_wait_time Timer.seconds(30)

lib/rabbitmq_message_deduplication/rabbit_message_deduplication_exchange.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ defmodule RabbitMQMessageDeduplication.Exchange do
3838

3939
@behaviour :rabbit_exchange_type
4040

41+
Module.register_attribute(__MODULE__,
42+
:rabbit_boot_step,
43+
accumulate: true, persist: true)
44+
45+
@rabbit_boot_step {__MODULE__,
46+
[{:description, "exchange type x-message-deduplication"},
47+
{:mfa, {__MODULE__, :register, []}},
48+
{:requires, :rabbit_registry},
49+
{:enables, :kernel_ready}]}
50+
4151
defrecord :exchange, extract(
4252
:exchange, from_lib: "rabbit_common/include/rabbit.hrl")
4353

lib/rabbitmq_message_deduplication/rabbit_message_deduplication_queue.ex

Lines changed: 10 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,22 @@ defmodule RabbitMQMessageDeduplication.Queue do
2929

3030
alias :amqqueue, as: AMQQueue
3131
alias :rabbit_log, as: RabbitLog
32-
alias :supervisor2, as: Supervisor2
33-
alias :rabbit_amqqueue, as: RabbitAMQQueue
34-
alias :rabbit_amqqueue_sup_sup, as: RabbitAMQQueueSupervisor
3532
alias RabbitMQMessageDeduplication.Common, as: Common
3633
alias RabbitMQMessageDeduplication.Cache, as: Cache
3734
alias RabbitMQMessageDeduplication.CacheManager, as: CacheManager
3835

3936
@behaviour :rabbit_backing_queue
4037

38+
Module.register_attribute(__MODULE__,
39+
:rabbit_boot_step,
40+
accumulate: true, persist: true)
41+
42+
@rabbit_boot_step {__MODULE__,
43+
[{:description, "message deduplication queue"},
44+
{:mfa, {__MODULE__, :enable, []}},
45+
{:requires, :database},
46+
{:enables, :external_infrastructure}]}
47+
4148
defrecord :content, extract(
4249
:content, from_lib: "rabbit_common/include/rabbit.hrl")
4350

@@ -128,27 +135,6 @@ defmodule RabbitMQMessageDeduplication.Queue do
128135
end
129136
end
130137

131-
@doc """
132-
Restart deduplication queues.
133-
134-
When the broker starts, it initializes all queues before the Plugins.
135-
Therefore, we need to restart all queues marked as deduplication
136-
to ensure the correct implementation module is loaded.
137-
138-
The names of the restarted queues is returned.
139-
140-
"""
141-
@spec restart_queues() :: list
142-
def restart_queues() do
143-
RabbitLog.info(
144-
"Restarting deduplication queues to load the correct module. " <>
145-
"Ignore subsequent error messages.")
146-
147-
RabbitAMQQueue.list()
148-
|> Enum.filter(fn(q) -> duplicate?(q) and local?(q) end)
149-
|> Enum.map(fn(q) -> restart_queue_process(q) end)
150-
end
151-
152138
@impl :rabbit_backing_queue
153139
def start(vhost, queues) do
154140
passthrough do: start(vhost, queues)
@@ -506,39 +492,6 @@ defmodule RabbitMQMessageDeduplication.Queue do
506492
end
507493
end
508494

509-
# Returns true if the queue is local to this node
510-
defp local?(queue) do
511-
node(AMQQueue.get_pid(queue)) == node()
512-
end
513-
514-
# Restarts the AMQQueue associated process
515-
defp restart_queue_process(queue) do
516-
qpid = AMQQueue.get_pid(queue)
517-
{:resource, vhost, :queue, name} = AMQQueue.get_name(queue)
518-
supervisor = find_queue_supervisor(vhost, qpid)
519-
520-
RabbitLog.debug("Restarting deduplication queue ~s.~n", [name])
521-
522-
:ok = Supervisor2.terminate_child(supervisor, :rabbit_amqqueue)
523-
{:ok, _} = Supervisor2.restart_child(supervisor, :rabbit_amqqueue)
524-
525-
name
526-
end
527-
528-
defp find_queue_supervisor(vhost, qpid) do
529-
{:ok, parent} = RabbitAMQQueueSupervisor.find_for_vhost(vhost)
530-
531-
Supervisor2.which_children(parent)
532-
|> Enum.map(&child_pid/1)
533-
|> Enum.map(fn(s) ->
534-
{s, Supervisor2.which_children(s) |> Enum.map(&child_pid/1)}
535-
end)
536-
|> Enum.find_value(fn({s, c}) -> if Enum.member?(c, qpid), do: s end)
537-
end
538-
539-
defp child_pid({:rabbit_amqqueue, pid, _, _}), do: pid
540-
defp child_pid({_, pid, _, [:rabbit_amqqueue_sup]}), do: pid
541-
542495
# Returns the expiration property of the given message
543496
defp message_expiration(message) do
544497
basic_message(content: content(properties: properties)) = message

0 commit comments

Comments
 (0)