Skip to content

Commit 248c95f

Browse files
committed
queue: add support for RabbitMQ >= 3.7.18
Signed-off-by: Matteo Cafasso <[email protected]>
1 parent de98bbe commit 248c95f

File tree

1 file changed

+21
-12
lines changed

1 file changed

+21
-12
lines changed

lib/rabbit_message_deduplication_queue.ex

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
2727
require RabbitMQ.MessageDeduplicationPlugin.Cache
2828
require RabbitMQ.MessageDeduplicationPlugin.Common
2929

30+
alias :amqqueue, as: AMQQueue
3031
alias :rabbit_log, as: RabbitLog
3132
alias RabbitMQ.MessageDeduplicationPlugin.Common, as: Common
3233
alias RabbitMQ.MessageDeduplicationPlugin.Cache, as: Cache
@@ -47,9 +48,6 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
4748
defrecord :content, extract(
4849
:content, from_lib: "rabbit_common/include/rabbit.hrl")
4950

50-
defrecord :amqqueue, extract(
51-
:amqqueue, from_lib: "rabbit_common/include/rabbit.hrl")
52-
5351
defrecord :basic_message, extract(
5452
:basic_message, from_lib: "rabbit_common/include/rabbit.hrl")
5553

@@ -120,7 +118,10 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
120118
passthrough do: stop(vhost)
121119
end
122120

123-
def init(queue = amqqueue(name: name, arguments: args), recovery, callback) do
121+
def init(queue, recovery, callback) do
122+
name = AMQQueue.get_name(queue)
123+
args = AMQQueue.get_arguments(queue)
124+
124125
if duplicate?(queue) do
125126
cache = Common.cache_name(name)
126127
options = [ttl: Common.rabbit_argument(
@@ -148,7 +149,10 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
148149
dqstate(queue: queue, queue_state: qs) = state
149150

150151
if duplicate?(queue) do
151-
queue |> amqqueue(:name) |> Common.cache_name() |> CacheManager.destroy()
152+
queue
153+
|> AMQQueue.get_name()
154+
|> Common.cache_name()
155+
|> CacheManager.destroy()
152156
end
153157

154158
passthrough1(state) do
@@ -162,7 +166,7 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
162166

163167
def purge(state = dqstate(queue: queue, queue_state: qs)) do
164168
if duplicate?(queue) do
165-
cache = queue |> amqqueue(:name) |> Common.cache_name()
169+
cache = queue |> AMQQueue.get_name() |> Common.cache_name()
166170

167171
:ok = Cache.flush(cache)
168172
end
@@ -347,7 +351,7 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
347351
end
348352

349353
def info(:backing_queue_status, dqstate(queue: queue, queue_state: qs)) do
350-
amqqueue(arguments: args) = queue
354+
args = AMQQueue.get_arguments(queue)
351355
queue_info = passthrough do: info(:backing_queue_status, qs)
352356
priority = Common.rabbit_argument(args, "x-max-priority", default: false)
353357

@@ -397,13 +401,17 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
397401
# Utility functions
398402

399403
# Returns true if the queue supports message deduplication
400-
defp duplicate?(amqqueue(arguments: args)) do
404+
defp duplicate?(queue) do
405+
args = AMQQueue.get_arguments(queue)
406+
401407
Common.rabbit_argument(args, "x-message-deduplication", default: false)
402408
end
403409

404410
# Returns true if the queue supports message deduplication
405411
# and the message is a duplicate.
406-
defp duplicate?(queue = amqqueue(name: name), message = basic_message()) do
412+
defp duplicate?(queue, message = basic_message()) do
413+
name = AMQQueue.get_name(queue)
414+
407415
case duplicate?(queue) do
408416
true -> Common.duplicate?(name, message, message_expiration(message))
409417
false -> false
@@ -423,22 +431,23 @@ defmodule RabbitMQ.MessageDeduplicationPlugin.Queue do
423431
end
424432

425433
# Removes the message deduplication header from the cache
426-
defp maybe_delete_cache_entry(queue = amqqueue(), msg = basic_message()) do
434+
defp maybe_delete_cache_entry(queue, msg = basic_message()) do
427435
header = Common.message_header(msg, "x-deduplication-header")
428436
maybe_delete_cache_entry(queue, header)
429437
end
430438

431439
defp maybe_delete_cache_entry(queue, header) when not is_nil(header) do
432440
queue
433-
|> amqqueue(:name)
441+
|> AMQQueue.get_name()
434442
|> Common.cache_name()
435443
|> Cache.delete(header)
436444
end
437445

438446
defp maybe_delete_cache_entry(_queue, header) when is_nil(header) do end
439447

440448
# Returns the cache information
441-
defp cache_info(amqqueue(name: name)) do
449+
defp cache_info(queue) do
450+
name = AMQQueue.get_name(queue)
442451
cache = Common.cache_name(name)
443452

444453
try do

0 commit comments

Comments
 (0)