Skip to content

Commit 68c0735

Browse files
committed
Support publishing AMQP 1.0 to Event Exchange
## What? Prior to this commit, the `rabbitmq_event_exchange` internally published always AMQP 0.9.1 messages to the `amq.rabbitmq.event` topic exchange. This commit allows users to configure the plugin to publish AMQP 1.0 messages instead. ## Why? Prior to this commit, when an AMQP 1.0 client consumed events, event properties that are lists were omitted, for example property `client_properties` of event `connection.created` or property `arguments` of event `queue.created` because of the following sequence: 1. The event exchange plugins listens for all kind of internal events. 2. The event exchange plugin re-publishes all events as AMQP 0.9.1 message to the event exchange. 3. Later, when an AMQP 1.0 client consumes this message, the broker must translate the message from AMQP 0.9.1 to AMQP 1.0. 4. This translation follows the rules outlined in https://www.rabbitmq.com/docs/conversions#amqpl-amqp 5. Specifically, in this table the row before the last one describes the rule we're hitting here. It says that if the AMQP 0.9.1 header value is not an `x-` prefixed header and its value is an array or table, then this header is not converted. That's because AMQP 1.0 application-properties must be simple types as mandated in https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-application-properties ## How? The user can configure the plugin as follows to have the plugin internally publish AMQP 1.0 messages: ``` event_exchange.protocol = amqp_1_0 ``` To support complex types such as lists, the plugin sets all event properties as message-annotations. The plugin prefixes all message annotation keys with `x-opt-` to comply with the AMQP 1.0 spec. ## Alternative Design An alternative design would have been to format all event properties e.g. as JSON within the message body. However, this breaks routing on specific event property values via a headers exchange. ## Documentation rabbitmq/rabbitmq-website#2129
1 parent 2795293 commit 68c0735

File tree

10 files changed

+554
-376
lines changed

10 files changed

+554
-376
lines changed

deps/rabbit/src/mc_amqpl.erl

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
-define(AMQP10_FOOTER, <<"x-amqp-1.0-footer">>).
4444
-define(PROTOMOD, rabbit_framing_amqp_0_9_1).
4545
-define(CLASS_ID, 60).
46-
-define(LONGSTR_UTF8_LIMIT, 4096).
4746

4847
-opaque state() :: #content{}.
4948

@@ -682,19 +681,13 @@ wrap(_Type, undefined) ->
682681
wrap(Type, Val) ->
683682
{Type, Val}.
684683

685-
from_091(longstr, V)
686-
when is_binary(V) andalso
687-
byte_size(V) =< ?LONGSTR_UTF8_LIMIT ->
688-
%% if a longstr is longer than 4096 bytes we just assume it is binary
689-
%% it _may_ still be valid utf8 but checking this for every longstr header
690-
%% value is going to be excessively slow
691-
case mc_util:is_utf8_no_null(V) of
684+
from_091(longstr, V) ->
685+
case mc_util:is_utf8_no_null_limited(V) of
692686
true ->
693687
{utf8, V};
694688
false ->
695689
{binary, V}
696690
end;
697-
from_091(longstr, V) -> {binary, V};
698691
from_091(long, V) -> {long, V};
699692
from_091(unsignedbyte, V) -> {ubyte, V};
700693
from_091(short, V) -> {short, V};

deps/rabbit/src/mc_util.erl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
-include("mc.hrl").
44

55
-export([is_valid_shortstr/1,
6+
is_utf8_no_null_limited/1,
67
is_utf8_no_null/1,
78
uuid_to_urn_string/1,
89
urn_string_to_uuid/1,
@@ -12,12 +13,24 @@
1213
is_x_header/1
1314
]).
1415

16+
-define(UTF8_SCAN_LIMIT, 4096).
17+
1518
-spec is_valid_shortstr(term()) -> boolean().
1619
is_valid_shortstr(Bin) when ?IS_SHORTSTR_LEN(Bin) ->
1720
is_utf8_no_null(Bin);
1821
is_valid_shortstr(_) ->
1922
false.
2023

24+
-spec is_utf8_no_null_limited(term()) -> boolean().
25+
is_utf8_no_null_limited(Bin)
26+
when byte_size(Bin) =< ?UTF8_SCAN_LIMIT ->
27+
is_utf8_no_null(Bin);
28+
is_utf8_no_null_limited(_Term) ->
29+
%% If longer than 4096 bytes, just assume it's not UTF-8.
30+
%% It _may_ still be valid UTF-8 but checking this
31+
%% on the hot path is going to be excessively slow.
32+
false.
33+
2134
-spec is_utf8_no_null(term()) -> boolean().
2235
is_utf8_no_null(Term) ->
2336
utf8_scan(Term, fun (C) -> C > 0 end).

deps/rabbit/test/mc_unit_SUITE.erl

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -313,34 +313,37 @@ amqpl_amqp_bin_amqpl(_Config) ->
313313
%% incoming amqpl converted to amqp, serialized / deserialized then converted
314314
%% back to amqpl.
315315
%% simulates a legacy message published then consumed to a stream
316-
Props = #'P_basic'{content_type = <<"text/plain">>,
317-
content_encoding = <<"gzip">>,
318-
headers = [{<<"a-stream-offset">>, long, 99},
319-
{<<"a-string">>, longstr, <<"a string">>},
320-
{<<"a-bool">>, bool, false},
321-
{<<"a-unsignedbyte">>, unsignedbyte, 1},
322-
{<<"a-unsignedshort">>, unsignedshort, 1},
323-
{<<"a-unsignedint">>, unsignedint, 1},
324-
{<<"a-signedint">>, signedint, 1},
325-
{<<"a-timestamp">>, timestamp, 1},
326-
{<<"a-double">>, double, 1.0},
327-
{<<"a-float">>, float, 1.0},
328-
{<<"a-void">>, void, undefined},
329-
{<<"a-binary">>, binary, <<"data">>},
330-
{<<"a-array">>, array, [{long, 1}, {long, 2}]},
331-
{<<"x-stream-filter">>, longstr, <<"apple">>}
332-
],
333-
delivery_mode = 2,
334-
priority = 98,
335-
correlation_id = <<"corr">> ,
336-
reply_to = <<"reply-to">>,
337-
expiration = <<"1">>,
338-
message_id = <<"msg-id">>,
339-
timestamp = 99,
340-
type = <<"45">>,
341-
user_id = <<"banana">>,
342-
app_id = <<"rmq">>
343-
},
316+
String5k = binary:copy(<<"x">>, 5000),
317+
Props = #'P_basic'{
318+
content_type = <<"text/plain">>,
319+
content_encoding = <<"gzip">>,
320+
headers = [{<<"a-stream-offset">>, long, 99},
321+
{<<"a-string">>, longstr, <<"a string">>},
322+
{<<"a-very-long-string">>, longstr, String5k},
323+
{<<"a-bool">>, bool, false},
324+
{<<"a-unsignedbyte">>, unsignedbyte, 1},
325+
{<<"a-unsignedshort">>, unsignedshort, 1},
326+
{<<"a-unsignedint">>, unsignedint, 1},
327+
{<<"a-signedint">>, signedint, 1},
328+
{<<"a-timestamp">>, timestamp, 1},
329+
{<<"a-double">>, double, 1.0},
330+
{<<"a-float">>, float, 1.0},
331+
{<<"a-void">>, void, undefined},
332+
{<<"a-binary">>, binary, <<"data">>},
333+
{<<"a-array">>, array, [{long, 1}, {long, 2}]},
334+
{<<"x-stream-filter">>, longstr, <<"apple">>}
335+
],
336+
delivery_mode = 2,
337+
priority = 98,
338+
correlation_id = <<"corr">> ,
339+
reply_to = <<"reply-to">>,
340+
expiration = <<"1">>,
341+
message_id = <<"msg-id">>,
342+
timestamp = 99,
343+
type = <<"45">>,
344+
user_id = <<"banana">>,
345+
app_id = <<"rmq">>
346+
},
344347
Content = #content{properties = Props,
345348
payload_fragments_rev = [<<"data">>]},
346349
Msg = mc:init(mc_amqpl, Content, annotations()),
@@ -404,6 +407,9 @@ amqpl_amqp_bin_amqpl(_Config) ->
404407

405408
?assertEqual({long, 99}, Get(<<"a-stream-offset">>, AP10)),
406409
?assertEqual({utf8, <<"a string">>}, Get(<<"a-string">>, AP10)),
410+
%% We expect that a very long string is not scanned for valid UTF-8
411+
%% and instead directly turned into a binary.
412+
?assertEqual({binary, String5k}, Get(<<"a-very-long-string">>, AP10)),
407413
?assertEqual(false, Get(<<"a-bool">>, AP10)),
408414
?assertEqual({ubyte, 1}, Get(<<"a-unsignedbyte">>, AP10)),
409415
?assertEqual({ushort, 1}, Get(<<"a-unsignedshort">>, AP10)),

deps/rabbitmq_event_exchange/Makefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
PROJECT = rabbitmq_event_exchange
22
PROJECT_DESCRIPTION = Event Exchange Type
33

4+
define PROJECT_ENV
5+
[
6+
{protocol, amqp_0_9_1}
7+
]
8+
endef
9+
410
define PROJECT_APP_EXTRA_KEYS
511
{broker_version_requirements, []}
612
endef
Lines changed: 2 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -1,154 +1,7 @@
11
# RabbitMQ Event Exchange
22

3-
## Overview
4-
5-
This plugin exposes the internal RabbitMQ event mechanism as messages that clients
6-
can consume. It's useful
7-
if you want to keep track of certain events, e.g. when queues, exchanges, bindings, users,
8-
connections, channels are created and deleted. This plugin filters out stats
9-
events, so you are almost certainly going to get better results using
10-
the management plugin for stats.
11-
12-
## How it Works
13-
14-
It declares a topic exchange called `amq.rabbitmq.event` **in the default
15-
virtual host**. All events are published to this exchange with routing
16-
keys like 'exchange.created', 'binding.deleted' etc, so you can
17-
subscribe to only the events you're interested in.
18-
19-
The exchange behaves similarly to 'amq.rabbitmq.log': everything gets
20-
published there; if you don't trust a user with the information that
21-
gets published, don't allow them access.
22-
23-
24-
## Installation
25-
26-
This plugin ships with RabbitMQ. Like with all other plugins, it must be
27-
enabled before it can be used:
28-
29-
```bash
30-
[sudo] rabbitmq-plugins enable rabbitmq_event_exchange
31-
```
32-
33-
## Event format
34-
35-
Each event has various properties associated with it. These are
36-
translated into AMQP 0-9-1 data encoding and inserted in the message headers. The
37-
**message body is always blank**.
38-
39-
## Events
40-
41-
So far RabbitMQ and related plugins emit events with the following routing keys:
42-
43-
### RabbitMQ Broker
44-
45-
Queue, Exchange and Binding events:
46-
47-
* `queue.deleted`
48-
* `queue.created`
49-
* `exchange.created`
50-
* `exchange.deleted`
51-
* `binding.created`
52-
* `binding.deleted`
53-
54-
Connection and Channel events:
55-
56-
* `connection.created`
57-
* `connection.closed`
58-
* `channel.created`
59-
* `channel.closed`
60-
61-
Consumer events:
62-
63-
* `consumer.created`
64-
* `consumer.deleted`
65-
66-
Policy and Parameter events:
67-
68-
* `policy.set`
69-
* `policy.cleared`
70-
* `parameter.set`
71-
* `parameter.cleared`
72-
73-
Virtual host events:
74-
75-
* `vhost.created`
76-
* `vhost.deleted`
77-
* `vhost.limits.set`
78-
* `vhost.limits.cleared`
79-
80-
User related events:
81-
82-
* `user.authentication.success`
83-
* `user.authentication.failure`
84-
* `user.created`
85-
* `user.deleted`
86-
* `user.password.changed`
87-
* `user.password.cleared`
88-
* `user.tags.set`
89-
90-
Permission events:
91-
92-
* `permission.created`
93-
* `permission.deleted`
94-
* `topic.permission.created`
95-
* `topic.permission.deleted`
96-
97-
Alarm events:
98-
99-
* `alarm.set`
100-
* `alarm.cleared`
101-
102-
### Shovel Plugin
103-
104-
Worker events:
105-
106-
* `shovel.worker.status`
107-
* `shovel.worker.removed`
108-
109-
### Federation Plugin
110-
111-
Link events:
112-
113-
* `federation.link.status`
114-
* `federation.link.removed`
115-
116-
## Example
117-
118-
There is a usage example using the Java client in `examples/java`.
119-
120-
121-
## Configuration
122-
123-
* `rabbitmq_event_exchange.vhost`: what vhost should the `amq.rabbitmq.event` exchange be declared in. Default: `rabbit.default_vhost` (`<<"/">>`).
124-
125-
126-
## Uninstalling
127-
128-
If you want to remove the exchange which this plugin creates, first
129-
disable the plugin and restart the broker. Then you can delete the exchange,
130-
e.g. with :
131-
132-
rabbitmqctl eval 'rabbit_exchange:delete(rabbit_misc:r(<<"/">>, exchange, <<"amq.rabbitmq.event">>), false, <<"username">>).'
133-
134-
135-
## Building from Source
136-
137-
Building is no different from [building other RabbitMQ plugins](https://www.rabbitmq.com/plugin-development.html).
138-
139-
TL;DR:
140-
141-
git clone https://github.com.com/rabbitmq/rabbitmq-public-umbrella.git umbrella
142-
cd umbrella
143-
make co
144-
make up BRANCH=stable
145-
cd deps
146-
git clone https://github.com/rabbitmq/rabbitmq-event-exchange.git rabbitmq_event_exchange
147-
cd rabbitmq_event_exchange
148-
make dist
149-
3+
See the [website](https://www.rabbitmq.com/docs/event-exchange) for documentation.
1504

1515
## License
1526

153-
Released under the Mozilla Public License 2.0,
154-
the same as RabbitMQ.
7+
Released under the Mozilla Public License 2.0, the same as RabbitMQ.

deps/rabbitmq_event_exchange/priv/schema/rabbitmq_event_exchange.schema

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,7 @@
55
fun(Conf) ->
66
list_to_binary(cuttlefish:conf_get("event_exchange.vhost", Conf))
77
end}.
8+
9+
{mapping, "event_exchange.protocol", "rabbitmq_event_exchange.protocol", [
10+
{datatype, {enum, [amqp_0_9_1, amqp_1_0]}}
11+
]}.

0 commit comments

Comments
 (0)