Skip to content

Commit 8acdc65

Browse files
Add incoming message interceptors
This commit enables users to provide custom message interceptor modules, i.e. modules to process incoming and outgoing messages. The `rabbit_message_interceptor` behaviour defines a `intercept/4` callback, for those modules to implement. Co-authored-by: Péter Gömöri <[email protected]>
1 parent 6d1689c commit 8acdc65

File tree

70 files changed

+1208
-506
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+1208
-506
lines changed

.github/workflows/test-make-target.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ jobs:
5757
uses: dsaltares/fetch-gh-release-asset@master
5858
if: inputs.mixed_clusters
5959
with:
60-
version: 'tags/v4.0.5'
60+
repo: 'rabbitmq/server-packages'
61+
version: 'tags/alphas.1744021065493'
6162
regex: true
6263
file: "rabbitmq-server-generic-unix-\\d.+\\.tar\\.xz"
6364
target: ./

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747

4848
-type terminus_durability() :: amqp10_client_session:terminus_durability().
4949

50+
-type terminus_address() :: amqp10_client_session:terminus_address().
5051
-type target_def() :: amqp10_client_session:target_def().
5152
-type source_def() :: amqp10_client_session:source_def().
5253

@@ -64,6 +65,7 @@
6465
snd_settle_mode/0,
6566
rcv_settle_mode/0,
6667
terminus_durability/0,
68+
terminus_address/0,
6769
target_def/0,
6870
source_def/0,
6971
attach_role/0,
@@ -170,7 +172,7 @@ attach_sender_link_sync(Session, Name, Target) ->
170172
%% @doc Synchronously attach a link on 'Session'.
171173
%% This is a convenience function that awaits attached event
172174
%% for the link before returning.
173-
-spec attach_sender_link_sync(pid(), binary(), binary(),
175+
-spec attach_sender_link_sync(pid(), binary(), terminus_address(),
174176
snd_settle_mode()) ->
175177
{ok, link_ref()} | link_timeout.
176178
attach_sender_link_sync(Session, Name, Target, SettleMode) ->
@@ -179,7 +181,7 @@ attach_sender_link_sync(Session, Name, Target, SettleMode) ->
179181
%% @doc Synchronously attach a link on 'Session'.
180182
%% This is a convenience function that awaits attached event
181183
%% for the link before returning.
182-
-spec attach_sender_link_sync(pid(), binary(), binary(),
184+
-spec attach_sender_link_sync(pid(), binary(), terminus_address(),
183185
snd_settle_mode(), terminus_durability()) ->
184186
{ok, link_ref()} | link_timeout.
185187
attach_sender_link_sync(Session, Name, Target, SettleMode, Durability) ->
@@ -199,7 +201,7 @@ attach_sender_link_sync(Session, Name, Target, SettleMode, Durability) ->
199201
%% This is asynchronous and will notify completion of the attach request to the
200202
%% caller using an amqp10_event of the following format:
201203
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
202-
-spec attach_sender_link(pid(), binary(), binary()) -> {ok, link_ref()}.
204+
-spec attach_sender_link(pid(), binary(), terminus_address()) -> {ok, link_ref()}.
203205
attach_sender_link(Session, Name, Target) ->
204206
% mixed should work with any type of msg
205207
attach_sender_link(Session, Name, Target, mixed).
@@ -208,7 +210,7 @@ attach_sender_link(Session, Name, Target) ->
208210
%% This is asynchronous and will notify completion of the attach request to the
209211
%% caller using an amqp10_event of the following format:
210212
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
211-
-spec attach_sender_link(pid(), binary(), binary(),
213+
-spec attach_sender_link(pid(), binary(), terminus_address(),
212214
snd_settle_mode()) ->
213215
{ok, link_ref()}.
214216
attach_sender_link(Session, Name, Target, SettleMode) ->
@@ -218,7 +220,7 @@ attach_sender_link(Session, Name, Target, SettleMode) ->
218220
%% This is asynchronous and will notify completion of the attach request to the
219221
%% caller using an amqp10_event of the following format:
220222
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
221-
-spec attach_sender_link(pid(), binary(), binary(),
223+
-spec attach_sender_link(pid(), binary(), terminus_address(),
222224
snd_settle_mode(), terminus_durability()) ->
223225
{ok, link_ref()}.
224226
attach_sender_link(Session, Name, Target, SettleMode, Durability) ->

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,22 @@
6565
-define(INITIAL_DELIVERY_COUNT, ?UINT_MAX - 2).
6666

6767
-type link_name() :: binary().
68-
-type link_address() :: binary().
68+
%% https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-address-string
69+
%% or
70+
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/anonterm-v1.0.html
71+
-type terminus_address() :: binary() | null.
6972
-type link_role() :: sender | receiver.
70-
-type link_target() :: {pid, pid()} | binary() | undefined.
73+
-type link_target() :: {pid, pid()} | terminus_address() | undefined.
7174
%% "The locally chosen handle is referred to as the output handle." [2.6.2]
7275
-type output_handle() :: link_handle().
7376
%% "The remotely chosen handle is referred to as the input handle." [2.6.2]
7477
-type input_handle() :: link_handle().
7578

7679
-type terminus_durability() :: none | configuration | unsettled_state.
7780

78-
-type target_def() :: #{address => link_address(),
81+
-type target_def() :: #{address => terminus_address(),
7982
durable => terminus_durability()}.
80-
-type source_def() :: #{address => link_address(),
83+
-type source_def() :: #{address => terminus_address(),
8184
durable => terminus_durability()}.
8285

8386
-type attach_role() :: {sender, target_def()} | {receiver, source_def(), pid()}.
@@ -112,6 +115,7 @@
112115
terminus_durability/0,
113116
attach_args/0,
114117
attach_role/0,
118+
terminus_address/0,
115119
target_def/0,
116120
source_def/0,
117121
filter/0,

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 94 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2658,23 +2658,102 @@ end}.
26582658
{mapping, "message_interceptors.incoming.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [
26592659
{datatype, {enum, [true, false]}}]}.
26602660

2661+
% Pseudo-key to include the interceptor in the list of interceptors.
2662+
% - If any other configuration is provided for the interceptor this
2663+
% configuration is not required.
2664+
% - If no other configuration is provided, this one is required so that the
2665+
% interceptor gets invoked.
2666+
{mapping, "message_interceptors.incoming.$interceptor.enabled", "rabbit.incoming_message_interceptors", [
2667+
{datatype, {enum, [true]}}]}.
2668+
2669+
{mapping, "message_interceptors.outgoing.$interceptor.enabled", "rabbit.outgoing_message_interceptors", [
2670+
{datatype, {enum, [true]}}]}.
2671+
2672+
{mapping,
2673+
"message_interceptors.incoming.set_header_timestamp.overwrite",
2674+
"rabbit.incoming_message_interceptors",
2675+
[{datatype, {enum, [true, false]}}]}.
2676+
{mapping,
2677+
"message_interceptors.incoming.rabbit_message_interceptor_routing_node.overwrite",
2678+
"rabbit.incoming_message_interceptors",
2679+
[{datatype, {enum, [true, false]}}]}.
2680+
2681+
{mapping,
2682+
"message_interceptors.incoming.set_header_routing_node.overwrite",
2683+
"rabbit.incoming_message_interceptors",
2684+
[{datatype, {enum, [true, false]}}]}.
2685+
{mapping,
2686+
"message_interceptors.incoming.rabbit_message_interceptor_timestamp.overwrite",
2687+
"rabbit.incoming_message_interceptors",
2688+
[{datatype, {enum, [true, false]}}]}.
2689+
26612690
{translation, "rabbit.incoming_message_interceptors",
26622691
fun(Conf) ->
2663-
case cuttlefish_variable:filter_by_prefix("message_interceptors", Conf) of
2664-
[] ->
2665-
cuttlefish:unset();
2666-
L ->
2667-
[begin
2668-
Interceptor = list_to_atom(Interceptor0),
2669-
case lists:member(Interceptor, [set_header_timestamp,
2670-
set_header_routing_node]) of
2671-
true ->
2672-
{Interceptor, Overwrite};
2673-
false ->
2674-
cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor]))
2675-
end
2676-
end || {["message_interceptors", "incoming", Interceptor0, "overwrite"], Overwrite} <- L]
2677-
end
2692+
case cuttlefish_variable:filter_by_prefix("message_interceptors.incoming", Conf) of
2693+
[] ->
2694+
cuttlefish:unset();
2695+
L ->
2696+
InterceptorsConfig = [
2697+
{Module0, Config, Value}
2698+
|| {["message_interceptors", "incoming", Module0, Config], Value} <- L
2699+
],
2700+
{Result, Order0} = lists:foldl(
2701+
fun({Interceptor0, Key0, Value}, {Acc, Order}) ->
2702+
Interceptor = list_to_atom(Interceptor0),
2703+
Key = list_to_atom(Key0),
2704+
MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end,
2705+
% This Interceptor -> Module alias exists for
2706+
% compatibility reasons
2707+
Module = case Interceptor of
2708+
set_header_timestamp ->
2709+
rabbit_message_interceptor_timestamp;
2710+
set_header_routing_node ->
2711+
rabbit_message_interceptor_routing_node;
2712+
_ ->
2713+
Interceptor
2714+
end,
2715+
NewAcc = maps:update_with(Module,
2716+
MapPutFun,
2717+
#{Key => Value},
2718+
Acc),
2719+
{NewAcc, [Module| Order]}
2720+
end,
2721+
{#{}, []},
2722+
InterceptorsConfig
2723+
),
2724+
Order = lists:uniq(Order0),
2725+
[{O, maps:without([enabled], maps:get(O, Result))} || O <- Order]
2726+
end
2727+
end
2728+
}.
2729+
2730+
{translation, "rabbit.outgoing_message_interceptors",
2731+
fun(Conf) ->
2732+
case cuttlefish_variable:filter_by_prefix("message_interceptors.outgoing", Conf) of
2733+
[] ->
2734+
cuttlefish:unset();
2735+
L ->
2736+
InterceptorsConfig = [
2737+
{Module0, Config, Value}
2738+
|| {["message_interceptors", "outgoing", Module0, Config], Value} <- L
2739+
],
2740+
{Result, Order0} = lists:foldl(
2741+
fun({Interceptor0, Key0, Value}, {Acc, Order}) ->
2742+
Module = list_to_atom(Interceptor0),
2743+
Key = list_to_atom(Key0),
2744+
MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end,
2745+
NewAcc = maps:update_with(Module,
2746+
MapPutFun,
2747+
#{Key => Value},
2748+
Acc),
2749+
{NewAcc, [Module| Order]}
2750+
end,
2751+
{#{}, []},
2752+
InterceptorsConfig
2753+
),
2754+
Order = lists:uniq(Order0),
2755+
[{O, maps:without([enabled], maps:get(O, Result))} || O <- Order]
2756+
end
26782757
end
26792758
}.
26802759

deps/rabbit/src/rabbit.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1656,7 +1656,8 @@ persist_static_configuration() ->
16561656
[classic_queue_index_v2_segment_entry_count,
16571657
classic_queue_store_v2_max_cache_size,
16581658
classic_queue_store_v2_check_crc32,
1659-
incoming_message_interceptors
1659+
incoming_message_interceptors,
1660+
outgoing_message_interceptors
16601661
]),
16611662

16621663
%% Disallow the following two cases:

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 23 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -444,55 +444,40 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
444444
ShortName ->
445445
ShortName
446446
end}},
447-
{{utf8, <<"arguments">>}, QArgs}
447+
{{utf8, <<"arguments">>}, QArgs},
448+
{{utf8, <<"replicas">>},
449+
{array, utf8, [{utf8, atom_to_binary(R)} || R <- Replicas]}
450+
}
448451
],
449-
KVList1 = if is_list(Replicas) ->
450-
[{{utf8, <<"replicas">>},
451-
{array, utf8, [{utf8, atom_to_binary(R)} || R <- Replicas]}
452-
} | KVList0];
453-
Replicas =:= undefined ->
454-
KVList0
455-
end,
456452
KVList = case Leader of
457-
undefined ->
458-
KVList1;
453+
none ->
454+
KVList0;
459455
_ ->
460456
[{{utf8, <<"leader">>},
461457
{utf8, atom_to_binary(Leader)}
462-
} | KVList1]
458+
} | KVList0]
463459
end,
464460
{map, KVList}.
465461

466462
%% The returned Replicas contain both online and offline replicas.
467463
-spec queue_topology(amqqueue:amqqueue()) ->
468-
{Leader :: undefined | node(), Replicas :: undefined | [node(),...]}.
464+
{Leader :: node() | none, Replicas :: [node(),...]}.
469465
queue_topology(Q) ->
470-
case amqqueue:get_type(Q) of
471-
rabbit_quorum_queue ->
472-
[{leader, Leader0},
473-
{members, Members}] = rabbit_queue_type:info(Q, [leader, members]),
474-
Leader = case Leader0 of
475-
'' -> undefined;
476-
_ -> Leader0
477-
end,
478-
{Leader, Members};
479-
rabbit_stream_queue ->
480-
#{name := StreamId} = amqqueue:get_type_state(Q),
481-
case rabbit_stream_coordinator:members(StreamId) of
482-
{ok, Members} ->
483-
maps:fold(fun(Node, {_Pid, writer}, {_, Replicas}) ->
484-
{Node, [Node | Replicas]};
485-
(Node, {_Pid, replica}, {Writer, Replicas}) ->
486-
{Writer, [Node | Replicas]}
487-
end, {undefined, []}, Members);
488-
{error, _} ->
489-
{undefined, undefined}
490-
end;
491-
_ ->
492-
Pid = amqqueue:get_pid(Q),
493-
Node = node(Pid),
494-
{Node, [Node]}
495-
end.
466+
Leader = case amqqueue:get_pid(Q) of
467+
{_RaName, Node} ->
468+
Node;
469+
none ->
470+
none;
471+
Pid ->
472+
node(Pid)
473+
end,
474+
Replicas = case amqqueue:get_type_state(Q) of
475+
#{nodes := Nodes} ->
476+
Nodes;
477+
_ ->
478+
[Leader]
479+
end,
480+
{Leader, Replicas}.
496481

497482
decode_exchange({map, KVList}) ->
498483
M = lists:foldl(

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,8 @@
283283
max_handle :: link_handle(),
284284
max_incoming_window :: pos_integer(),
285285
max_link_credit :: pos_integer(),
286-
max_queue_credit :: pos_integer()
286+
max_queue_credit :: pos_integer(),
287+
msg_interceptor_ctx :: map()
287288
}).
288289

289290
-record(state, {
@@ -474,7 +475,11 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId,
474475
max_handle = EffectiveHandleMax,
475476
max_incoming_window = MaxIncomingWindow,
476477
max_link_credit = MaxLinkCredit,
477-
max_queue_credit = MaxQueueCredit
478+
max_queue_credit = MaxQueueCredit,
479+
msg_interceptor_ctx = #{protocol => ?PROTOCOL,
480+
username => User#user.username,
481+
vhost => Vhost,
482+
conn_name => ConnName}
478483
}}}.
479484

480485
terminate(_Reason, #state{incoming_links = IncomingLinks,
@@ -2411,7 +2416,8 @@ incoming_link_transfer(
24112416
trace_state = Trace,
24122417
conn_name = ConnName,
24132418
channel_num = ChannelNum,
2414-
max_link_credit = MaxLinkCredit}}) ->
2419+
max_link_credit = MaxLinkCredit,
2420+
msg_interceptor_ctx = MsgInterceptorCtx}}) ->
24152421

24162422
{PayloadBin, DeliveryId, Settled} =
24172423
case MultiTransfer of
@@ -2436,7 +2442,9 @@ incoming_link_transfer(
24362442
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
24372443
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
24382444
{ok, X, RoutingKeys, Mc1, PermCache} ->
2439-
Mc2 = rabbit_message_interceptor:intercept(Mc1),
2445+
Mc2 = rabbit_message_interceptor:intercept(Mc1,
2446+
MsgInterceptorCtx,
2447+
incoming_message_interceptors),
24402448
check_user_id(Mc2, User),
24412449
TopicPermCache = check_write_permitted_on_topics(
24422450
X, User, RoutingKeys, TopicPermCache0),

0 commit comments

Comments
 (0)