Skip to content

Commit 5739411

Browse files
Add link capabilities
1 parent 8c046c7 commit 5739411

File tree

2 files changed

+167
-8
lines changed

2 files changed

+167
-8
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,16 @@
7070
-type input_handle() :: link_handle().
7171

7272
-type terminus_durability() :: none | configuration | unsettled_state.
73+
-type terminus_capabilities() :: undefined | binary() | [binary(),...].
7374

7475
-type target_def() :: #{address => link_address(),
75-
durable => terminus_durability()}.
76+
durable => terminus_durability(),
77+
capabilities => terminus_capabilities()
78+
}.
7679
-type source_def() :: #{address => link_address(),
77-
durable => terminus_durability()}.
80+
durable => terminus_durability(),
81+
capabilities => terminus_capabilities()
82+
}.
7883

7984
-type attach_role() :: {sender, target_def()} | {receiver, source_def(), pid()}.
8085

@@ -706,9 +711,17 @@ make_source(#{role := {sender, _}}) ->
706711
make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) ->
707712
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
708713
TranslatedFilter = translate_filters(Filter),
709-
#'v1_0.source'{address = {utf8, Address},
710-
durable = {uint, Durable},
711-
filter = TranslatedFilter}.
714+
try translate_terminus_capabilities(maps:get(capabilities, Source, undefined)) of
715+
Capabilities ->
716+
#'v1_0.source'{address = {utf8, Address},
717+
durable = {uint, Durable},
718+
filter = TranslatedFilter,
719+
capabilities = Capabilities}
720+
catch
721+
throw:Err ->
722+
logger:warning("make_source failed due to ~p", [Err]),
723+
{error, Err}
724+
end.
712725

713726
make_target(#{role := {receiver, _Source, _Pid}}) ->
714727
#'v1_0.target'{};
@@ -718,8 +731,16 @@ make_target(#{role := {sender, #{address := Address} = Target}}) ->
718731
true -> {utf8, Address};
719732
false -> Address
720733
end,
721-
#'v1_0.target'{address = TargetAddr,
722-
durable = {uint, Durable}}.
734+
try translate_terminus_capabilities(maps:get(capabilities, Target, undefined)) of
735+
Capabilities ->
736+
#'v1_0.target'{address = TargetAddr,
737+
durable = {uint, Durable},
738+
capabilities = Capabilities}
739+
catch
740+
throw:Err ->
741+
logger:warning("make_target failed due to ~p", [Err]),
742+
{error, Err}
743+
end.
723744

724745
max_message_size(#{max_message_size := Size})
725746
when is_integer(Size) andalso
@@ -974,6 +995,13 @@ translate_delivery_state({modified,
974995
translate_delivery_state(released) -> #'v1_0.released'{};
975996
translate_delivery_state(received) -> #'v1_0.received'{}.
976997

998+
translate_terminus_capabilities(Capabilities) when is_binary(Capabilities) ->
999+
{symbol, Capabilities};
1000+
translate_terminus_capabilities(CapabilitiesList) when is_list(CapabilitiesList) ->
1001+
{array, symbol, [{symbol, V} || V <- CapabilitiesList, is_binary(V)]};
1002+
translate_terminus_capabilities(_) ->
1003+
undefined.
1004+
9771005
translate_role(receiver) -> true.
9781006

9791007
maybe_notify_link_credit(#link{role = sender,

deps/amqp10_client/test/system_SUITE.erl

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ groups() ->
5353
{mock, [], [
5454
insufficient_credit,
5555
incoming_heartbeat,
56-
multi_transfer_without_delivery_id
56+
multi_transfer_without_delivery_id,
57+
set_sender_capabilities,
58+
set_receiver_capabilities
5759
]}
5860
].
5961

@@ -914,6 +916,135 @@ incoming_heartbeat(Config) ->
914916
end,
915917
demonitor(MockRef).
916918

919+
set_sender_capabilities(Config) ->
920+
Hostname = ?config(mock_host, Config),
921+
Port = ?config(mock_port, Config),
922+
923+
OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) ->
924+
{Ch, [#'v1_0.open'{
925+
container_id = {utf8, <<"mock">>},
926+
%% The server doesn't expect any heartbeats from us (client).
927+
idle_time_out = {uint, 0}}]}
928+
end,
929+
930+
BeginStep = fun({0 = Ch, #'v1_0.begin'{}, _Pay}) ->
931+
{Ch, [#'v1_0.begin'{remote_channel = {ushort, Ch},
932+
next_outgoing_id = {uint, 1},
933+
incoming_window = {uint, 1000},
934+
outgoing_window = {uint, 1000}}
935+
]}
936+
end,
937+
AttachStep = fun({0 = Ch, #'v1_0.attach'{role = false,
938+
name = Name,
939+
source = #'v1_0.source'{
940+
941+
},
942+
target = #'v1_0.target'{
943+
capabilities = {symbol, <<"capability-1">>}}}, <<>>}) ->
944+
{Ch, [#'v1_0.attach'{name = Name,
945+
handle = {uint, 99},
946+
role = true}]}
947+
end,
948+
Steps = [fun mock_server:recv_amqp_header_step/1,
949+
fun mock_server:send_amqp_header_step/1,
950+
mock_server:amqp_step(OpenStep),
951+
mock_server:amqp_step(BeginStep),
952+
mock_server:amqp_step(AttachStep)],
953+
954+
ok = mock_server:set_steps(?config(mock_server, Config), Steps),
955+
956+
Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()},
957+
{ok, Connection} = amqp10_client:open_connection(Cfg),
958+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
959+
AttachArgs = #{name => <<"mock1-sender">>,
960+
role => {sender, #{address => <<"test">>,
961+
durable => none,
962+
capabilities => <<"capability-1">>}},
963+
snd_settle_mode => mixed,
964+
rcv_settle_mode => first},
965+
{ok, Sender} = amqp10_client:attach_link(Session, AttachArgs),
966+
await_link(Sender, attached, attached_timeout),
967+
Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true),
968+
{error, insufficient_credit} = amqp10_client:send_msg(Sender, Msg),
969+
970+
ok = amqp10_client:end_session(Session),
971+
ok = amqp10_client:close_connection(Connection),
972+
ok.
973+
974+
set_receiver_capabilities(Config) ->
975+
Hostname = ?config(mock_host, Config),
976+
Port = ?config(mock_port, Config),
977+
978+
OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) ->
979+
{Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]}
980+
end,
981+
BeginStep = fun({0 = Ch, #'v1_0.begin'{}, _Pay}) ->
982+
{Ch, [#'v1_0.begin'{remote_channel = {ushort, Ch},
983+
next_outgoing_id = {uint, 1},
984+
incoming_window = {uint, 1000},
985+
outgoing_window = {uint, 1000}}
986+
]}
987+
end,
988+
AttachStep = fun({0 = Ch, #'v1_0.attach'{role = true,
989+
name = Name,
990+
source = #'v1_0.source'{
991+
capabilities = {symbol, <<"capability-1">>}}
992+
}, <<>>}) ->
993+
{Ch, [#'v1_0.attach'{name = Name,
994+
handle = {uint, 99},
995+
initial_delivery_count = {uint, 1},
996+
role = false}
997+
]}
998+
end,
999+
1000+
LinkCreditStep = fun({0 = Ch, #'v1_0.flow'{}, <<>>}) ->
1001+
{Ch, {multi, [[#'v1_0.transfer'{handle = {uint, 99},
1002+
delivery_id = {uint, 12},
1003+
more = true},
1004+
#'v1_0.data'{content = <<"hello ">>}],
1005+
[#'v1_0.transfer'{handle = {uint, 99},
1006+
% delivery_id can be omitted
1007+
% for continuation frames
1008+
delivery_id = undefined,
1009+
settled = undefined,
1010+
more = false},
1011+
#'v1_0.data'{content = <<"world">>}]
1012+
]}}
1013+
end,
1014+
Steps = [fun mock_server:recv_amqp_header_step/1,
1015+
fun mock_server:send_amqp_header_step/1,
1016+
mock_server:amqp_step(OpenStep),
1017+
mock_server:amqp_step(BeginStep),
1018+
mock_server:amqp_step(AttachStep),
1019+
mock_server:amqp_step(LinkCreditStep)
1020+
],
1021+
1022+
ok = mock_server:set_steps(?config(mock_server, Config), Steps),
1023+
1024+
Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()},
1025+
{ok, Connection} = amqp10_client:open_connection(Cfg),
1026+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
1027+
AttachArgs = #{name => <<"mock1-received">>,
1028+
role => {receiver, #{address => <<"test">>,
1029+
durable => none,
1030+
capabilities => <<"capability-1">>}, self()},
1031+
snd_settle_mode => setlled,
1032+
rcv_settle_mode => first,
1033+
filter => #{},
1034+
properties => #{}},
1035+
{ok, Receiver} = amqp10_client:attach_link(Session, AttachArgs),
1036+
amqp10_client:flow_link_credit(Receiver, 100, 50),
1037+
receive
1038+
{amqp10_msg, Receiver, _InMsg} ->
1039+
ok
1040+
after 2000 ->
1041+
exit(delivery_timeout)
1042+
end,
1043+
1044+
ok = amqp10_client:end_session(Session),
1045+
ok = amqp10_client:close_connection(Connection),
1046+
ok.
1047+
9171048
%%% HELPERS
9181049
%%%
9191050

0 commit comments

Comments
 (0)