Skip to content

Commit ec69ce8

Browse files
authored
Merge pull request #4 from noxdafox/queue
Queue-level de duplication support
2 parents b36bebe + 54cbcb2 commit ec69ce8

File tree

7 files changed

+574
-90
lines changed

7 files changed

+574
-90
lines changed

README.md

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
1-
RabbitMQ Message Deduplication Plugin
2-
=====================================
1+
# RabbitMQ Message Deduplication Plugin
32

43
A plugin for filtering duplicate messages.
54

6-
Exchange Type: `x-message-deduplication`
5+
Messages can be deduplicated when published into an exchange or enqueued to a queue.
76

8-
Installing
9-
----------
7+
## Installing
108

119
Supported RabbitMQ versions:
1210

@@ -20,8 +18,7 @@ Enable the plugin:
2018
[sudo] rabbitmq-plugins enable rabbitmq_message_deduplication_exchange
2119
```
2220

23-
Building from Source
24-
--------------------
21+
## Building from Source
2522

2623
Please see RabbitMQ Plugin Development guide.
2724

@@ -39,35 +36,55 @@ Then copy all the *.ez files inside the plugins folder to the [RabbitMQ plugins
3936
[sudo] rabbitmq-plugins enable rabbitmq_message_deduplication_exchange
4037
```
4138

42-
Declare an exchange
43-
-------------------
39+
## Exchange level deduplication
40+
41+
The exchange type `x-message-deduplication` allows to filter message duplicates before any routing rule is applied.
42+
43+
Each message containing the `x-deduplication-header` header will not be routed if its value has been submitted previously. The amount of time a given message will be guaranteed to be unique can be controlled via the `x-cache-ttl` exchange argument or message header.
44+
45+
### Declare an exchange
4446

4547
To create a message deduplication exchange, just declare it providing the type `x-message-deduplication`.
4648

4749
Extra arguments:
4850

49-
* `x-cache-size`: maximum size for the deduplication cache.
51+
* `x-cache-size`: maximum size for the deduplication cache. If the deduplication cache fills up, older entries will be removed to give space to new ones.
5052
This parameter is mandatory.
51-
* `x-cache-ttl`: amount of time in seconds messages are kept in cache.
53+
* `x-cache-ttl`: amount of time in milliseconds duplicate headers are kept in cache.
5254
This parameter is optional.
53-
* `x-cache-persistence`: whether the cache will persist on disk or in memory.
54-
This parameter is optional. Default persistence is memory.
55+
* `x-cache-persistence`: whether the duplicates cache will persist on disk or in memory.
56+
This parameter is optional. Default persistence type is `memory`.
57+
58+
### Message headers
59+
60+
* `x-deduplication-header`: messages will be deduplicated based on the content of this header. If the header is not provided, the message will not be checked against duplicates.
61+
* `x-cache-ttl`: this header is optional and will override the default value provided during the exchange declaration. This header controls for how many milliseconds to deduplicate the message. After the TTL expires, a new message with the same header will be routed again.
62+
63+
## Queue level deduplication
64+
65+
A queue declared with the `x-message-deduplication` parameter enabled will filter message duplicates before they are published within.
66+
67+
Each message containing the `x-deduplication-header` header will not be enqueued if another message with the same header is already present within the queue.
68+
69+
### Declare a queue
70+
71+
When declaring a queue, it is possible to enable message deduplication via the `x-message-deduplication` boolean argument.
72+
73+
Extra arguments:
5574

56-
Message deduplication
57-
---------------------
75+
* `x-cache-persistence`: whether the duplicates cache will persist on disk or in memory.
76+
This parameter is optional. Default persistence type is `memory`.
5877

59-
Each message containing the `x-deduplication-header` header will not be routed if its value has been already submitted previously and has not expired.
78+
### Message headers
6079

61-
The optional header `x-cache-ttl` will override the default one if provided during the exchange declaration. This parameter controls for how many seconds to deduplicate the message. After the TTL expires, a new message with the same `x-deduplication-header` header will be routed again.
80+
* `x-deduplication-header`: messages will be deduplicated based on the content of this header. If the header is not provided, the message will not be checked against duplicates.
6281

63-
Running the tests
64-
-----------------
82+
## Running the tests
6583

6684
```bash
6785
make tests
6886
```
6987

70-
License
71-
-------
88+
## License
7289

7390
See the LICENSE file.

lib/cache.ex

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# All rights reserved.
77

88

9-
defmodule RabbitMQ.Cache do
9+
defmodule RabbitMQ.MessageDeduplicationPlugin.Cache do
1010
@moduledoc """
1111
Simple cache implemented on top of Mnesia.
1212
@@ -35,12 +35,19 @@ defmodule RabbitMQ.Cache do
3535
end
3636

3737
@doc """
38-
Put the given value into the cache.
38+
Put the given entry into the cache.
3939
"""
4040
def put(cache, value, ttl \\ nil) do
4141
GenServer.call(cache, {:put, cache, value, ttl})
4242
end
4343

44+
@doc """
45+
Delete the given value from the cache.
46+
"""
47+
def delete(cache, value) do
48+
GenServer.call(cache, {:delete, cache, value})
49+
end
50+
4451
@doc """
4552
True if the value is contained within the cache.
4653
"""
@@ -49,10 +56,10 @@ defmodule RabbitMQ.Cache do
4956
end
5057

5158
@doc """
52-
Return information related to the given cache.
59+
Flush the cache content.
5360
"""
54-
def info(cache) do
55-
GenServer.call(cache, {:info, cache})
61+
def flush(cache) do
62+
GenServer.call(cache, {:flush, cache})
5663
end
5764

5865
@doc """
@@ -62,6 +69,13 @@ defmodule RabbitMQ.Cache do
6269
GenServer.call(cache, {:drop, cache})
6370
end
6471

72+
@doc """
73+
Return information related to the given cache.
74+
"""
75+
def info(cache) do
76+
GenServer.call(cache, {:info, cache})
77+
end
78+
6579
## Server Callbacks
6680

6781
# Creates the Mnesia table and starts the janitor process.
@@ -99,17 +113,26 @@ defmodule RabbitMQ.Cache do
99113
{:reply, :ok, state}
100114
end
101115

116+
# Removes the given entry from the cache.
117+
def handle_call({:delete, cache, value}, _from, state) do
118+
Mnesia.transaction(fn ->
119+
Mnesia.delete({cache, value})
120+
end)
121+
122+
{:reply, :ok, state}
123+
end
124+
102125
# True if the value is in the cache.
103126
def handle_call({:member?, cache, value}, _from, state) do
104127
{:reply, cache_member?(cache, value), state}
105128
end
106129

107-
# Return cache information: number of elements and max size
108-
def handle_call({:info, cache}, _from, state) do
109-
info = [size: cache_property(cache, :limit),
110-
entries: Mnesia.table_info(cache, :size)]
111-
112-
{:reply, info, state}
130+
# Flush the Mnesia cache table.
131+
def handle_call({:flush, cache}, _from, state) do
132+
case Mnesia.clear_table(cache) do
133+
{:atomic, :ok} -> {:reply, :ok, state}
134+
_ -> {:reply, :error, state}
135+
end
113136
end
114137

115138
# Drop the Mnesia cache table.
@@ -120,6 +143,14 @@ defmodule RabbitMQ.Cache do
120143
end
121144
end
122145

146+
# Return cache information: number of elements and max size
147+
def handle_call({:info, cache}, _from, state) do
148+
info = [size: cache_property(cache, :limit),
149+
entries: Mnesia.table_info(cache, :size)]
150+
151+
{:reply, info, state}
152+
end
153+
123154
## Utility functions
124155

125156
# Mnesia cache table creation.
@@ -144,16 +175,16 @@ defmodule RabbitMQ.Cache do
144175
{:atomic, entries} = Mnesia.transaction(fn -> Mnesia.read(cache, value) end)
145176

146177
case List.keyfind(entries, value, 1) do
147-
{_, _, expiration} -> expiration > Os.system_time(:seconds)
178+
{_, _, expiration} -> expiration > Os.system_time(:millisecond)
148179
nil -> false
149180
end
150181
end
151182

152183
# Remove all expired entries from the Mnesia cache.
153184
defp cache_delete_expired(cache) do
154185
select = fn ->
155-
Mnesia.select(cache, [{{cache, :"$1", :_, :"$3"},
156-
[{:>, Os.system_time(:seconds), :"$3"}],
186+
Mnesia.select(cache, [{{cache, :"$1", :"$2"},
187+
[{:>, Os.system_time(:millisecond), :"$2"}],
157188
[:"$1"]}])
158189
end
159190

@@ -182,8 +213,8 @@ defmodule RabbitMQ.Cache do
182213
default = cache_property(cache, :default_ttl)
183214

184215
cond do
185-
ttl != nil -> Os.system_time(:seconds) + ttl
186-
default != nil -> Os.system_time(:seconds) + default
216+
ttl != nil -> Os.system_time(:millisecond) + ttl
217+
default != nil -> Os.system_time(:millisecond) + default
187218
true -> nil
188219
end
189220
end

lib/rabbit_message_deduplication_exchange.ex

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,18 @@
66
# All rights reserved.
77

88

9-
defmodule RabbitMQ.MessageDeduplicationExchangeType do
9+
defmodule RabbitMQ.MessageDeduplicationPlugin.Exchange do
1010
import Record, only: [defrecord: 2, extract: 2]
1111

12-
require RabbitMQ.Cache
13-
require RabbitMQ.CacheSupervisor
12+
require RabbitMQ.MessageDeduplicationPlugin.Cache
13+
require RabbitMQ.MessageDeduplicationPlugin.Supervisor
1414

15+
alias :rabbit_log, as: RabbitLog
1516
alias :rabbit_misc, as: RabbitMisc
1617
alias :rabbit_router, as: RabbitRouter
1718
alias :rabbit_exchange, as: RabbitExchange
19+
alias RabbitMQ.MessageDeduplicationPlugin.Cache, as: MessageCache
20+
alias RabbitMQ.MessageDeduplicationPlugin.Supervisor, as: CacheSupervisor
1821

1922
@behaviour :rabbit_exchange_type
2023

@@ -30,13 +33,6 @@ defmodule RabbitMQ.MessageDeduplicationExchangeType do
3033
{:requires, :rabbit_registry},
3134
{:enables, :kernel_ready}]}
3235

33-
@rabbit_boot_step {:rabbit_exchange_type_caches_supervisor,
34-
[{:description,
35-
"message deduplication exchange type: supervisor"},
36-
{:mfa, {__MODULE__, :start_caches_supervisor, []}},
37-
{:requires, :database},
38-
{:enables, :external_infrastructure}]}
39-
4036
defrecord :exchange, extract(
4137
:exchange, from_lib: "rabbit_common/include/rabbit.hrl")
4238

@@ -49,10 +45,6 @@ defmodule RabbitMQ.MessageDeduplicationExchangeType do
4945
defrecord :basic_message, extract(
5046
:basic_message, from_lib: "rabbit_common/include/rabbit.hrl")
5147

52-
def start_caches_supervisor() do
53-
RabbitMQ.CacheSupervisor.start_link()
54-
end
55-
5648
def description() do
5749
[
5850
{:name, <<"x-message-deduplication">>},
@@ -119,7 +111,9 @@ defmodule RabbitMQ.MessageDeduplicationExchangeType do
119111
|> String.to_atom()
120112
options = [size: size, ttl: ttl, persistence: persistence]
121113

122-
RabbitMQ.CacheSupervisor.start_cache(cache, options)
114+
RabbitLog.debug("Starting exchange deduplication cache ~s~n", [cache])
115+
116+
CacheSupervisor.start_cache(cache, options)
123117
end
124118

125119
def create(:none, _ex) do
@@ -129,9 +123,9 @@ defmodule RabbitMQ.MessageDeduplicationExchangeType do
129123
def delete(:transaction, exchange(name: name), _bs) do
130124
cache = cache_name(name)
131125

132-
:ok = RabbitMQ.Cache.drop(cache)
126+
:ok = MessageCache.drop(cache)
133127

134-
RabbitMQ.CacheSupervisor.stop_cache(cache)
128+
CacheSupervisor.stop_cache(cache)
135129
end
136130

137131
def delete(:none, _ex, _bs) do
@@ -159,7 +153,7 @@ defmodule RabbitMQ.MessageDeduplicationExchangeType do
159153
end
160154

161155
def info(exchange(name: name), [:cache_info]) do
162-
[cache_info: name |> cache_name() |> RabbitMQ.Cache.info()]
156+
[cache_info: name |> cache_name() |> MessageCache.info()]
163157
end
164158

165159
def info(_ex, _it) do
@@ -185,7 +179,7 @@ defmodule RabbitMQ.MessageDeduplicationExchangeType do
185179
defp cached?(cache, headers) do
186180
case rabbitmq_keyfind(headers, "x-deduplication-header") do
187181
nil -> false
188-
key -> case RabbitMQ.Cache.member?(cache, key) do
182+
key -> case MessageCache.member?(cache, key) do
189183
true -> true
190184
false -> cache_put(cache, key, headers)
191185
false
@@ -196,8 +190,8 @@ defmodule RabbitMQ.MessageDeduplicationExchangeType do
196190
# Puts the key and related headers in the cache
197191
defp cache_put(cache, key, headers) do
198192
case rabbitmq_keyfind(headers, "x-cache-ttl") do
199-
nil -> RabbitMQ.Cache.put(cache, key)
200-
ttl -> RabbitMQ.Cache.put(cache, key, ttl)
193+
nil -> MessageCache.put(cache, key)
194+
ttl -> MessageCache.put(cache, key, ttl)
201195
end
202196
end
203197

@@ -206,7 +200,7 @@ defmodule RabbitMQ.MessageDeduplicationExchangeType do
206200
resource = sanitize_string(resource)
207201
exchange = sanitize_string(exchange)
208202

209-
String.to_atom("cache_#{resource}_#{exchange}")
203+
String.to_atom("cache_exchange_#{resource}_#{exchange}")
210204
end
211205

212206
# Returns the value given a key from a RabbitMQ list [{"key", :type, value}]

0 commit comments

Comments
 (0)