Skip to content

Commit f78dcce

Browse files
committed
issue #91, queue: full policy support
Allow to enable and disable deduplication via policy as well as clearing existing ones. Signed-off-by: Matteo Cafasso <[email protected]>
1 parent 7fce536 commit f78dcce

File tree

2 files changed

+75
-41
lines changed

2 files changed

+75
-41
lines changed

lib/rabbitmq_message_deduplication/rabbit_message_deduplication_policy_event.ex

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ defmodule RabbitMQMessageDeduplication.PolicyEvent do
2525

2626
alias :amqqueue, as: AMQQueue
2727
alias :gen_event, as: GenEvent
28+
alias :rabbit_log, as: RabbitLog
2829
alias :rabbit_policy, as: RabbitPolicy
2930
alias :rabbit_amqqueue, as: RabbitQueue
3031
alias RabbitMQMessageDeduplication.Queue, as: DedupQueue
@@ -55,29 +56,39 @@ defmodule RabbitMQMessageDeduplication.PolicyEvent do
5556
def init(_), do: {:ok, []}
5657

5758
@impl :gen_event
58-
def handle_event({:event, :policy_set, policy, _, _}, state) do
59-
case List.keyfind(policy[:definition], "x-message-deduplication", 0) do
60-
{"x-message-deduplication", _} -> apply_to_queues()
61-
nil -> 0
59+
def handle_event({:event, :queue_policy_updated, policy, _, _}, state) do
60+
status = case List.keyfind(policy[:definition], "x-message-deduplication", 0) do
61+
{"x-message-deduplication", _} -> apply_to_queue(policy)
62+
nil -> :ok
6263
end
6364

64-
{:ok, state}
65+
{status, state}
6566
end
67+
68+
@impl :gen_event
69+
def handle_event({:event, :queue_policy_cleared, policy, _, _}, state) do
70+
{apply_to_queue(policy), state}
71+
end
72+
73+
@impl :gen_event
6674
def handle_event(_, state), do: {:ok, state}
6775

6876
@impl :gen_event
6977
def handle_call(_Request, state), do: {:ok, :not_understood, state}
7078

7179
# Apply new policies to matching queues
72-
defp apply_to_queues() do
73-
for queue <- RabbitQueue.list() |> Enum.map(&RabbitPolicy.set/1) do
74-
AMQQueue.get_pid(queue)
75-
|> RabbitQueue.run_backing_queue(DedupQueue,
76-
fn(_, state) ->
77-
state
78-
|> DedupQueue.dqstate(queue: queue)
79-
|> DedupQueue.maybe_enable_dedup_queue()
80-
end)
81-
end
80+
defp apply_to_queue(policy) do
81+
{:ok, queue} = RabbitQueue.lookup(policy[:name])
82+
queue = RabbitPolicy.set(queue)
83+
84+
RabbitLog.debug("Policy change for queue ~p ~n", [policy[:name]])
85+
86+
AMQQueue.get_pid(queue)
87+
|> RabbitQueue.run_backing_queue(DedupQueue,
88+
fn(_, state) ->
89+
state
90+
|> DedupQueue.dqstate(queue: queue)
91+
|> DedupQueue.maybe_toggle_dedup_queue()
92+
end)
8293
end
8394
end

lib/rabbitmq_message_deduplication/rabbit_message_deduplication_queue.ex

Lines changed: 49 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ defmodule RabbitMQMessageDeduplication.Queue do
2222
2323
"""
2424

25-
import Record, only: [defrecord: 2, defrecord: 3, defrecordp: 2, extract: 2]
25+
import Record, only: [defrecord: 2, defrecord: 3, extract: 2]
2626

2727
require RabbitMQMessageDeduplication.Cache
2828
require RabbitMQMessageDeduplication.Common
@@ -55,7 +55,7 @@ defmodule RabbitMQMessageDeduplication.Queue do
5555
:P_basic, from_lib: "rabbit_common/include/rabbit_framing.hrl")
5656

5757
defrecord :dqack, [:tag, :header]
58-
defrecord :dqstate, [:queue, :queue_state, :dedup_enabled]
58+
defrecord :dqstate, [:queue, :queue_state, dedup_enabled: false]
5959

6060
# The passthrough macros call the underlying backing queue functions
6161
# The suffixes indicate the arity of the return values
@@ -149,7 +149,7 @@ defmodule RabbitMQMessageDeduplication.Queue do
149149

150150
@impl :rabbit_backing_queue
151151
def init(queue, recovery, callback) do
152-
state = maybe_enable_dedup_queue(dqstate(queue: queue))
152+
state = maybe_toggle_dedup_queue(dqstate(queue: queue))
153153

154154
passthrough1(state) do
155155
init(queue, recovery, callback)
@@ -164,10 +164,7 @@ defmodule RabbitMQMessageDeduplication.Queue do
164164
@impl :rabbit_backing_queue
165165
def delete_and_terminate(any, state = dqstate(queue: queue, queue_state: qs)) do
166166
if dedup_queue?(state) do
167-
queue
168-
|> AMQQueue.get_name()
169-
|> Common.cache_name()
170-
|> CacheManager.destroy()
167+
:ok = delete_cache(queue)
171168
end
172169

173170
passthrough1(state) do
@@ -468,28 +465,19 @@ defmodule RabbitMQMessageDeduplication.Queue do
468465

469466
# Utility functions
470467

471-
# Set the state according to whether the queue is a deduplication one or not
472-
def maybe_enable_dedup_queue(state = dqstate(queue: queue, queue_state: qs)) do
473-
if dedup_queue?(state) do
474-
:ok = init_cache(queue)
475-
dqstate(queue: queue, queue_state: qs, dedup_enabled: true)
476-
else
477-
dqstate(queue: queue, queue_state: qs, dedup_enabled: :undefined)
468+
# Enable/disable queue-level deduplication
469+
def maybe_toggle_dedup_queue(state = dqstate(queue: queue, queue_state: qs)) do
470+
cond do
471+
enable_dedup_queue?(state) ->
472+
:ok = init_cache(queue)
473+
dqstate(queue: queue, queue_state: qs, dedup_enabled: true)
474+
disable_dedup_queue?(state) ->
475+
:ok = delete_cache(queue)
476+
dqstate(queue: queue, queue_state: qs, dedup_enabled: false)
477+
true -> dqstate(queue: queue, queue_state: qs, dedup_enabled: false)
478478
end
479479
end
480480

481-
# Check if it's a deduplication enabled queue
482-
defp dedup_queue?(dqstate(dedup_enabled: val)) when is_boolean(val), do: val
483-
defp dedup_queue?(dqstate(queue: queue)) do
484-
args = AMQQueue.get_arguments(queue)
485-
pols = case AMQQueue.get_policy(queue) do
486-
:undefined -> []
487-
policy -> policy[:definition]
488-
end
489-
490-
Common.rabbit_argument(args ++ pols, "x-message-deduplication", default: false)
491-
end
492-
493481
# Initialize the deduplication cache
494482
defp init_cache(queue) do
495483
cache = queue |> AMQQueue.get_name() |> Common.cache_name()
@@ -509,6 +497,15 @@ defmodule RabbitMQMessageDeduplication.Queue do
509497
end
510498
end
511499

500+
# Remove the cache and all its content
501+
defp delete_cache(queue) do
502+
cache = queue |> AMQQueue.get_name() |> Common.cache_name()
503+
504+
RabbitLog.debug("Deleting queue deduplication cache ~s~n", [cache])
505+
506+
CacheManager.destroy(cache)
507+
end
508+
512509
# Returns true if the message is a duplicate.
513510
defp duplicate?(queue, message = basic_message()) do
514511
name = AMQQueue.get_name(queue)
@@ -559,4 +556,30 @@ defmodule RabbitMQMessageDeduplication.Queue do
559556
:exit, {:noproc, {GenServer, :call, [^cache | _]}} -> []
560557
end
561558
end
559+
560+
# True if `x-message-deduplication` is present within queue arguments or policy
561+
defp dedup_arg?(queue) do
562+
queue_policy(queue)
563+
++ AMQQueue.get_arguments(queue)
564+
|> Common.rabbit_argument("x-message-deduplication", default: false)
565+
end
566+
567+
# Return the list of policy arguments assigned to the queue
568+
defp queue_policy(queue) do
569+
case AMQQueue.get_policy(queue) do
570+
:undefined -> []
571+
policy -> policy[:definition]
572+
end
573+
end
574+
575+
# True if it's an active deduplication queue
576+
defp dedup_queue?(dqstate(dedup_enabled: val)), do: val
577+
578+
# True if deduplication should be enabled for the queue
579+
defp enable_dedup_queue?(dqstate(dedup_enabled: true)), do: false
580+
defp enable_dedup_queue?(dqstate(queue: q, dedup_enabled: false)), do: dedup_arg?(q)
581+
582+
# True if deduplication should be disabled for the queue
583+
defp disable_dedup_queue?(dqstate(dedup_enabled: false)), do: false
584+
defp disable_dedup_queue?(dqstate(queue: q, dedup_enabled: true)), do: not dedup_arg?(q)
562585
end

0 commit comments

Comments
 (0)