Skip to content

Commit 1fe87c7

Browse files
Add capabilities parameter
1 parent 35b3fda commit 1fe87c7

File tree

4 files changed

+65
-8
lines changed

4 files changed

+65
-8
lines changed

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
attach_sender_link/3,
2121
attach_sender_link/4,
2222
attach_sender_link/5,
23+
attach_sender_link_sync/2,
2324
attach_sender_link_sync/3,
2425
attach_sender_link_sync/4,
2526
attach_sender_link_sync/5,
@@ -197,6 +198,19 @@ attach_sender_link_sync(Session, Name, Target, SettleMode, Durability) ->
197198
after ?TIMEOUT -> link_timeout
198199
end.
199200

201+
-spec attach_sender_link_sync(pid(), attach_args()) -> {ok, link_ref()} | link_timeout.
202+
attach_sender_link_sync(Session, AttachArgs) ->
203+
{ok, Ref} = amqp10_client_session:attach(Session, AttachArgs),
204+
receive
205+
{amqp10_event, {link, Ref, attached}} ->
206+
{ok, Ref};
207+
{amqp10_event, {link, Ref, {attached, #'v1_0.attach'{}}}} ->
208+
{ok, Ref};
209+
{amqp10_event, {link, Ref, {detached, Err}}} ->
210+
{error, Err}
211+
after ?TIMEOUT -> link_timeout
212+
end.
213+
200214
%% @doc Attaches a sender link to a target.
201215
%% This is asynchronous and will notify completion of the attach request to the
202216
%% caller using an amqp10_event of the following format:

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ connect_source(State = #{name := Name,
7979
on_publish -> unsettled;
8080
on_confirm -> unsettled
8181
end,
82-
AttachFun = fun amqp10_client:attach_receiver_link/5,
82+
AttachFun = fun amqp10_client:attach_link/2,
8383
{Conn, Sess, LinkRef} = connect(Name, SndSettleMode, Uri, "receiver", Addr, Src,
8484
AttachFun),
8585
State#{source => Src#{current => #{conn => Conn,
@@ -97,7 +97,7 @@ connect_dest(State = #{name := Name,
9797
on_publish -> settled;
9898
on_confirm -> unsettled
9999
end,
100-
AttachFun = fun amqp10_client:attach_sender_link_sync/5,
100+
AttachFun = fun amqp10_client:attach_sender_link_sync/2,
101101
{Conn, Sess, LinkRef} = connect(Name, SndSettleMode, Uri, "sender", Addr, Dst,
102102
AttachFun),
103103
%% wait for link credit here as if there are messages waiting we may try
@@ -126,9 +126,31 @@ connect(Name, SndSettleMode, Uri, Postfix, Addr, Map, AttachFun) ->
126126
% needs to be sync, i.e. awaits the 'attach' event as
127127
% else we may try to use the link before it is ready
128128
Durability = maps:get(durability, Map, unsettled_state),
129-
{ok, LinkRef} = AttachFun(Sess, LinkName, Addr,
130-
SndSettleMode,
131-
Durability),
129+
AttachArgs = case Postfix of
130+
"sender" ->
131+
SenderCapabilities = maps:get(sender_capabilities, Map, undefined),
132+
#{
133+
name => LinkName,
134+
role => {sender, #{address => Addr,
135+
durable => Durability,
136+
capabilities => SenderCapabilities}},
137+
snd_settle_mode => SndSettleMode,
138+
rcv_settle_mode => first
139+
};
140+
"receiver" ->
141+
ReceiverCapabilities = maps:get(receiver_capabilities, Map, undefined),
142+
#{
143+
name => LinkName,
144+
role => {receiver, #{address => Addr,
145+
durable => Durability,
146+
capabilities => ReceiverCapabilities}, self()},
147+
snd_settle_mode => SndSettleMode,
148+
rcv_settle_mode => first,
149+
filter => #{},
150+
properties => #{}
151+
}
152+
end,
153+
{ok, LinkRef} = AttachFun(Sess, AttachArgs),
132154
{Conn, Sess, LinkRef}.
133155

134156
-spec init_source(state()) -> state().

deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ amqp10_src_validation(_Def, User) ->
133133
{<<"src-uri">>, validate_uri_fun(User), mandatory},
134134
{<<"src-address">>, fun rabbit_parameter_validation:binary/2, mandatory},
135135
{<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
136-
{<<"src-delete-after">>, fun validate_delete_after/2, optional}
136+
{<<"src-delete-after">>, fun validate_delete_after/2, optional},
137+
{<<"src-capabilities">>, fun validate_link_capabilities/2, optional}
137138
].
138139

139140
amqp091_src_validation(_Def, User) ->
@@ -168,7 +169,8 @@ amqp10_dest_validation(_Def, User) ->
168169
{<<"dest-application-properties">>, fun validate_amqp10_map/2, optional},
169170
{<<"dest-message-annotations">>, fun validate_amqp10_map/2, optional},
170171
% TODO: restrict to allowed fields
171-
{<<"dest-properties">>, fun validate_amqp10_map/2, optional}
172+
{<<"dest-properties">>, fun validate_amqp10_map/2, optional},
173+
{<<"dest-capabilities">>, fun validate_link_capabilities/2, optional}
172174
].
173175

174176
amqp091_dest_validation(_Def, User) ->
@@ -226,6 +228,19 @@ validate_params_user(#amqp_params_direct{virtual_host = VHost},
226228
validate_params_user(#amqp_params_network{}, _User) ->
227229
ok.
228230

231+
validate_link_capabilities(Name, ListOfSymbols) when is_list(ListOfSymbols) ->
232+
case lists:all(fun(V) when is_binary(V) -> true;
233+
(_V) -> false
234+
end, ListOfSymbols) of
235+
true -> ok;
236+
false -> {error, "~ts should have list of binaries, actually was ~tp",
237+
[Name, ListOfSymbols]}
238+
end;
239+
validate_link_capabilities(_Name, Symbol) when is_binary(Symbol) ->
240+
ok;
241+
validate_link_capabilities(Name, Symbol) ->
242+
{error, "~ts should be a binary, actually was ~tp", [Name, Symbol]}.
243+
229244
validate_delete_after(_Name, <<"never">>) -> ok;
230245
validate_delete_after(_Name, <<"queue-length">>) -> ok;
231246
validate_delete_after(_Name, N) when is_integer(N), N >= 0 -> ok;
@@ -314,11 +329,13 @@ parse_amqp10_dest({_VHost, _Name}, _ClusterName, Def, SourceHeaders) ->
314329
MessageAnns =
315330
rabbit_data_coercion:to_proplist(
316331
pget(<<"dest-message-annotations">>, Def, [])),
332+
ReceiverCapabilities = pget(<<"dest-capabilities">>, Def),
317333
#{module => rabbit_amqp10_shovel,
318334
uris => Uris,
319335
target_address => Address,
320336
message_annotations => maps:from_list(MessageAnns),
321337
application_properties => maps:from_list(AppProperties ++ SourceHeaders),
338+
receiver_capabilities => ReceiverCapabilities,
322339
properties => maps:from_list(
323340
lists:map(fun({K, V}) ->
324341
{rabbit_data_coercion:to_atom(K), V}
@@ -416,12 +433,14 @@ parse_amqp10_source(Def) ->
416433
Address = pget(<<"src-address">>, Def),
417434
DeleteAfter = pget(<<"src-delete-after">>, Def, <<"never">>),
418435
PrefetchCount = pget(<<"src-prefetch-count">>, Def, 1000),
436+
SenderCapabilities = pget(<<"src-capabilities">>, Def),
419437
Headers = [],
420438
{#{module => rabbit_amqp10_shovel,
421439
uris => Uris,
422440
source_address => Address,
423441
delete_after => opt_b2a(DeleteAfter),
424442
prefetch_count => PrefetchCount,
443+
sender_capabilities => SenderCapabilities,
425444
consumer_args => []}, Headers}.
426445

427446
parse_amqp091_source(Def) ->

deps/rabbitmq_shovel/test/parameters_SUITE.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ parse_amqp10(_Config) ->
230230
{<<"src-address">>, <<"a-src-queue">>},
231231
{<<"src-delete-after">>, <<"never">>},
232232
{<<"src-prefetch-count">>, 30},
233+
{<<"src-capabilities">>, <<"capability0">>},
233234

234235
{<<"dest-protocol">>, <<"amqp10">>},
235236
{<<"dest-uri">>, <<"amqp://remotehost:5672">>},
@@ -240,7 +241,8 @@ parse_amqp10(_Config) ->
240241
<<"app-prop-value">>}]},
241242
{<<"dest-message-annotations">>, [{<<"some-message-ann">>,
242243
<<"message-ann-value">>}]},
243-
{<<"dest-properties">>, [{<<"user_id">>, <<"some-user">>}]}
244+
{<<"dest-properties">>, [{<<"user_id">>, <<"some-user">>}]},
245+
{<<"dest-capabilities">>, [<<"capability1">>, <<"capability2">>]}
244246
],
245247
ObfuscatedParams = rabbit_shovel_parameters:obfuscate_uris_in_definition(Params),
246248
?assertMatch(

0 commit comments

Comments
 (0)