Skip to content

Commit 7fce536

Browse files
committed
issue #91, exchange: implement full policy logic into exchanges
The logic supports addition and removal of policies. Signed-off-by: Matteo Cafasso <[email protected]>
1 parent 4b6b961 commit 7fce536

File tree

1 file changed

+31
-5
lines changed

1 file changed

+31
-5
lines changed

lib/rabbitmq_message_deduplication/rabbit_message_deduplication_exchange.ex

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ defmodule RabbitMQMessageDeduplication.Exchange do
2929

3030
alias :rabbit_log, as: RabbitLog
3131
alias :rabbit_misc, as: RabbitMisc
32+
alias :rabbit_policy, as: RabbitPolicy
3233
alias :rabbit_router, as: RabbitRouter
3334
alias :rabbit_exchange, as: RabbitExchange
3435
alias :rabbit_registry, as: RabbitRegistry
@@ -175,13 +176,30 @@ defmodule RabbitMQMessageDeduplication.Exchange do
175176
end
176177

177178
@impl :rabbit_exchange_type
178-
def policy_changed(_old, exchange(name: name, policy: policy)) do
179+
def policy_changed(_ex, exchange(name: name, arguments: args, policy: :undefined)) do
179180
cache = Common.cache_name(name)
180181

182+
RabbitLog.debug(
183+
"All policies for exchange ~p were deleted, resetting to defaults ~p ~n",
184+
[name, format_options(args)])
185+
186+
reset_arguments(cache, args)
187+
end
188+
189+
@impl :rabbit_exchange_type
190+
def policy_changed(_ex, exch = exchange(name: name, arguments: args, policy: policy)) do
191+
cache = Common.cache_name(name)
192+
193+
RabbitLog.debug("Applying ~s policy to exchange ~p ~n",
194+
[RabbitPolicy.name(exch), name])
195+
196+
# We need to remove old policy before applying new one
197+
reset_arguments(cache, args)
198+
181199
for policy_definition <- policy[:definition] do
182200
case policy_definition do
183-
{"x-cache-size", value} -> Cache.reconfigure(cache, :limit, value)
184-
{"x-cache-ttl", value} -> Cache.reconfigure(cache, :default_ttl, value)
201+
{"x-cache-ttl", value} -> Cache.reconfigure(cache, :ttl, value)
202+
{"x-cache-size", value} -> Cache.reconfigure(cache, :size, value)
185203
{"x-cache-persistence", value} -> Cache.reconfigure(cache, :persistence, value)
186204
end
187205
end
@@ -198,8 +216,8 @@ defmodule RabbitMQMessageDeduplication.Exchange do
198216
end
199217

200218
@impl :rabbit_exchange_type
201-
def assert_args_equivalence(exchange, args) do
202-
RabbitExchange.assert_args_equivalence(exchange, args)
219+
def assert_args_equivalence(exch, args) do
220+
RabbitExchange.assert_args_equivalence(exch, args)
203221
end
204222

205223
@impl :rabbit_exchange_type
@@ -225,6 +243,7 @@ defmodule RabbitMQMessageDeduplication.Exchange do
225243
not Common.duplicate?(exchange_name, message, ttl)
226244
end
227245

246+
# Format arguments into options
228247
defp format_options(args) do
229248
[size: Common.rabbit_argument(
230249
args, "x-cache-size", type: :number),
@@ -233,4 +252,11 @@ defmodule RabbitMQMessageDeduplication.Exchange do
233252
persistence: Common.rabbit_argument(
234253
args, "x-cache-persistence", type: :atom, default: "memory")]
235254
end
255+
256+
# Reconfigure cache to default arguments
257+
defp reset_arguments(cache, args) do
258+
for {key, value} <- format_options(args) do
259+
Cache.reconfigure(cache, key, value)
260+
end
261+
end
236262
end

0 commit comments

Comments
 (0)