Skip to content

Commit a138241

Browse files
committed
WIP Refuse link
1 parent e329881 commit a138241

File tree

6 files changed

+226
-122
lines changed

6 files changed

+226
-122
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 55 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@
127127
-record(link,
128128
{name :: link_name(),
129129
ref :: link_ref(),
130-
state = detached :: detached | attach_sent | attached | detach_sent,
130+
state = detached :: detached | attach_sent | attached | attach_refused | detach_sent,
131131
notify :: pid(),
132132
output_handle :: output_handle(),
133133
input_handle :: input_handle() | undefined,
@@ -325,9 +325,11 @@ mapped(cast, #'v1_0.end'{} = End, State) ->
325325
ok = notify_session_ended(End, State),
326326
{stop, normal, State};
327327
mapped(cast, #'v1_0.attach'{name = {utf8, Name},
328-
initial_delivery_count = IDC,
329328
handle = {uint, InHandle},
330329
role = PeerRoleBool,
330+
source = Source,
331+
target = Target,
332+
initial_delivery_count = IDC,
331333
max_message_size = MaybeMaxMessageSize} = Attach,
332334
#state{links = Links, link_index = LinkIndex,
333335
link_handle_index = LHI} = State0) ->
@@ -339,20 +341,28 @@ mapped(cast, #'v1_0.attach'{name = {utf8, Name},
339341
#{OutHandle := Link0} = Links,
340342
ok = notify_link_attached(Link0, Attach, State0),
341343

342-
{DeliveryCount, MaxMessageSize} =
344+
{LinkState, DeliveryCount, MaxMessageSize} =
343345
case Link0 of
344346
#link{role = sender = OurRole,
345347
delivery_count = DC} ->
348+
LS = case Target of
349+
#'v1_0.target'{} -> attached;
350+
_ -> attach_refused
351+
end,
346352
MSS = case MaybeMaxMessageSize of
347353
{ulong, S} when S > 0 -> S;
348354
_ -> undefined
349355
end,
350-
{DC, MSS};
356+
{LS, DC, MSS};
351357
#link{role = receiver = OurRole,
352358
max_message_size = MSS} ->
353-
{unpack(IDC), MSS}
359+
LS = case Source of
360+
#'v1_0.source'{} -> attached;
361+
_ -> attach_refused
362+
end,
363+
{LS, unpack(IDC), MSS}
354364
end,
355-
Link = Link0#link{state = attached,
365+
Link = Link0#link{state = LinkState,
356366
input_handle = InHandle,
357367
delivery_count = DeliveryCount,
358368
max_message_size = MaxMessageSize},
@@ -495,50 +505,41 @@ mapped({call, From},
495505
when Window =< 0 ->
496506
{keep_state_and_data, {reply, From, {error, remote_incoming_window_exceeded}}};
497507
mapped({call, From = {Pid, _}},
498-
{transfer, #'v1_0.transfer'{handle = {uint, OutHandle},
499-
delivery_tag = {binary, DeliveryTag},
500-
settled = false} = Transfer0, Sections},
501-
#state{outgoing_delivery_id = DeliveryId, links = Links,
502-
outgoing_unsettled = Unsettled} = State) ->
503-
case Links of
504-
#{OutHandle := #link{input_handle = undefined}} ->
505-
{keep_state_and_data, {reply, From, {error, half_attached}}};
506-
#{OutHandle := #link{link_credit = LC}} when LC =< 0 ->
507-
{keep_state_and_data, {reply, From, {error, insufficient_credit}}};
508-
#{OutHandle := Link = #link{max_message_size = MaxMessageSize,
509-
footer_opt = FooterOpt}} ->
510-
Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(DeliveryId)},
511-
case send_transfer(Transfer, Sections, FooterOpt, MaxMessageSize, State) of
512-
{ok, NumFrames} ->
513-
State1 = State#state{outgoing_unsettled = Unsettled#{DeliveryId => {DeliveryTag, Pid}}},
514-
{keep_state, book_transfer_send(NumFrames, Link, State1), {reply, From, ok}};
515-
Error ->
516-
{keep_state_and_data, {reply, From, Error}}
517-
end;
518-
_ ->
519-
{keep_state_and_data, {reply, From, {error, link_not_found}}}
520-
521-
end;
522-
mapped({call, From},
523-
{transfer, #'v1_0.transfer'{handle = {uint, OutHandle}} = Transfer0,
524-
Sections}, #state{outgoing_delivery_id = DeliveryId,
525-
links = Links} = State) ->
508+
{transfer,
509+
#'v1_0.transfer'{handle = {uint, OutHandle},
510+
delivery_tag = DeliveryTag,
511+
settled = Settled} = Transfer0,
512+
Sections},
513+
#state{outgoing_delivery_id = DeliveryId,
514+
links = Links,
515+
outgoing_unsettled = Unsettled0} = State0) ->
526516
case Links of
517+
#{OutHandle := #link{state = attach_refused}} ->
518+
{keep_state_and_data, {reply, From, {error, attach_refused}}};
527519
#{OutHandle := #link{input_handle = undefined}} ->
528520
{keep_state_and_data, {reply, From, {error, half_attached}}};
529521
#{OutHandle := #link{link_credit = LC}} when LC =< 0 ->
530522
{keep_state_and_data, {reply, From, {error, insufficient_credit}}};
531523
#{OutHandle := Link = #link{max_message_size = MaxMessageSize,
532524
footer_opt = FooterOpt}} ->
533525
Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(DeliveryId)},
534-
case send_transfer(Transfer, Sections, FooterOpt, MaxMessageSize, State) of
526+
case send_transfer(Transfer, Sections, FooterOpt, MaxMessageSize, State0) of
535527
{ok, NumFrames} ->
528+
State = case Settled of
529+
true ->
530+
State0;
531+
false ->
532+
{binary, Tag} = DeliveryTag,
533+
Unsettled = Unsettled0#{DeliveryId => {Tag, Pid}},
534+
State0#state{outgoing_unsettled = Unsettled}
535+
end,
536536
{keep_state, book_transfer_send(NumFrames, Link, State), {reply, From, ok}};
537537
Error ->
538538
{keep_state_and_data, {reply, From, Error}}
539539
end;
540540
_ ->
541541
{keep_state_and_data, {reply, From, {error, link_not_found}}}
542+
542543
end;
543544

544545
mapped({call, From},
@@ -688,21 +689,28 @@ send_flow_link(OutHandle,
688689
never -> never;
689690
_ -> {RenewWhenBelow, Credit}
690691
end,
691-
#{OutHandle := #link{output_handle = H,
692+
#{OutHandle := #link{state = LinkState,
693+
output_handle = H,
692694
role = receiver,
693695
delivery_count = DeliveryCount,
694696
available = Available} = Link} = Links,
695-
Flow1 = Flow0#'v1_0.flow'{
696-
handle = uint(H),
697-
%% "In the event that the receiving link endpoint has not yet seen the
698-
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
699-
delivery_count = maybe_uint(DeliveryCount),
700-
available = uint(Available)},
701-
Flow = set_flow_session_fields(Flow1, State),
702-
ok = send(Flow, State),
703-
State#state{links = Links#{OutHandle =>
704-
Link#link{link_credit = Credit,
705-
auto_flow = AutoFlow}}}.
697+
case LinkState of
698+
attach_refused ->
699+
%% We will receive the DETACH frame shortly.
700+
State;
701+
_ ->
702+
Flow1 = Flow0#'v1_0.flow'{
703+
handle = uint(H),
704+
%% "In the event that the receiving link endpoint has not yet seen the
705+
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
706+
delivery_count = maybe_uint(DeliveryCount),
707+
available = uint(Available)},
708+
Flow = set_flow_session_fields(Flow1, State),
709+
ok = send(Flow, State),
710+
State#state{links = Links#{OutHandle =>
711+
Link#link{link_credit = Credit,
712+
auto_flow = AutoFlow}}}
713+
end.
706714

707715
send_flow_session(State) ->
708716
Flow = set_flow_session_fields(#'v1_0.flow'{}, State),

deps/amqp10_client/test/system_SUITE.erl

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ groups() ->
5252
]},
5353
{mock, [], [
5454
insufficient_credit,
55+
attach_refused,
5556
incoming_heartbeat,
5657
multi_transfer_without_delivery_id
5758
]}
@@ -772,11 +773,13 @@ insufficient_credit(Config) ->
772773
outgoing_window = {uint, 1000}}
773774
]}
774775
end,
775-
AttachStep = fun({0 = Ch, #'v1_0.attach'{role = false,
776-
name = Name}, <<>>}) ->
776+
AttachStep = fun({0 = Ch, #'v1_0.attach'{name = Name,
777+
role = false,
778+
target = Target}, <<>>}) ->
777779
{Ch, [#'v1_0.attach'{name = Name,
778780
handle = {uint, 99},
779-
role = true}]}
781+
role = true,
782+
target = Target}]}
780783
end,
781784
Steps = [fun mock_server:recv_amqp_header_step/1,
782785
fun mock_server:send_amqp_header_step/1,
@@ -799,6 +802,52 @@ insufficient_credit(Config) ->
799802
ok = amqp10_client:close_connection(Connection),
800803
ok.
801804

805+
attach_refused(Config) ->
806+
Hostname = ?config(mock_host, Config),
807+
Port = ?config(mock_port, Config),
808+
OpenStep = fun({0 = Ch, #'v1_0.open'{}, _Pay}) ->
809+
{Ch, [#'v1_0.open'{container_id = {utf8, <<"mock">>}}]}
810+
end,
811+
BeginStep = fun({0 = Ch, #'v1_0.begin'{}, _Pay}) ->
812+
{Ch, [#'v1_0.begin'{remote_channel = {ushort, Ch},
813+
next_outgoing_id = {uint, 1},
814+
incoming_window = {uint, 1000},
815+
outgoing_window = {uint, 1000}}
816+
]}
817+
end,
818+
AttachStep = fun({0 = Ch, #'v1_0.attach'{name = Name,
819+
role = false}, <<>>}) ->
820+
%% We test only the 1st stage of link refusal:
821+
%% Server replies with its local terminus set to null.
822+
%% We omit the 2nd stage (the detach frame).
823+
{Ch, [#'v1_0.attach'{name = Name,
824+
handle = {uint, 99},
825+
role = true,
826+
target = undefined}]}
827+
end,
828+
Steps = [fun mock_server:recv_amqp_header_step/1,
829+
fun mock_server:send_amqp_header_step/1,
830+
mock_server:amqp_step(OpenStep),
831+
mock_server:amqp_step(BeginStep),
832+
mock_server:amqp_step(AttachStep)],
833+
834+
ok = mock_server:set_steps(?config(mock_server, Config), Steps),
835+
836+
Cfg = #{address => Hostname, port => Port, sasl => none, notify => self()},
837+
{ok, Connection} = amqp10_client:open_connection(Cfg),
838+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
839+
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"mock1-sender">>,
840+
<<"test">>),
841+
await_link(Sender, attached, attached_timeout),
842+
Msg = amqp10_msg:new(<<"mock-tag">>, <<"banana">>, true),
843+
%% We expect that the lib prevents the app from sending messages
844+
%% in this intermediate link refusal state.
845+
?assertEqual({error, attach_refused},
846+
amqp10_client:send_msg(Sender, Msg)),
847+
848+
ok = amqp10_client:end_session(Session),
849+
ok = amqp10_client:close_connection(Connection).
850+
802851
multi_transfer_without_delivery_id(Config) ->
803852
Hostname = ?config(mock_host, Config),
804853
Port = ?config(mock_port, Config),

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -967,14 +967,17 @@ handle_frame({Performative = #'v1_0.transfer'{handle = ?UINT(Handle)}, Paylaod},
967967
{Reply, State} =
968968
case IncomingLinks of
969969
#{Handle := Link0} ->
970-
case incoming_link_transfer(Performative, Paylaod, Link0, State1) of
970+
try incoming_link_transfer(Performative, Paylaod, Link0, State1) of
971971
{ok, Reply0, Link, State2} ->
972972
{Reply0, State2#state{incoming_links = IncomingLinks#{Handle := Link}}};
973973
{error, Reply0} ->
974974
%% "When an error occurs at a link endpoint, the endpoint MUST be detached
975975
%% with appropriate error information supplied in the error field of the
976976
%% detach frame. The link endpoint MUST then be destroyed." [2.6.5]
977977
{Reply0, State1#state{incoming_links = maps:remove(Handle, IncomingLinks)}}
978+
catch {link_error, Error} ->
979+
Detach = detach(Handle, Link0, Error),
980+
{[Detach], State1#state{incoming_links = maps:remove(Handle, IncomingLinks)}}
978981
end;
979982
_ ->
980983
incoming_mgmt_link_transfer(Performative, Paylaod, State1)
@@ -1110,16 +1113,44 @@ handle_frame(#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER,
11101113
reply_frames(Reply, State)
11111114
end;
11121115

1113-
handle_frame(#'v1_0.attach'{handle = ?UINT(Handle)} = Attach,
1114-
#state{cfg = #cfg{max_handle = MaxHandle}} = State) ->
1116+
handle_frame(#'v1_0.attach'{handle = ?UINT(HandleInt)},
1117+
#state{cfg = #cfg{max_handle = MaxHandle}})
1118+
when HandleInt > MaxHandle ->
1119+
protocol_error(?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
1120+
"link handle value (~b) exceeds maximum link handle value (~b)",
1121+
[HandleInt, MaxHandle]);
1122+
handle_frame(#'v1_0.attach'{name = {utf8, NameBin} = Name,
1123+
handle = Handle,
1124+
role = Role,
1125+
source = Source,
1126+
target = Target} = Attach,
1127+
State) ->
11151128
ok = validate_attach(Attach),
1116-
case Handle > MaxHandle of
1117-
true ->
1118-
protocol_error(?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
1119-
"link handle value (~b) exceeds maximum link handle value (~b)",
1120-
[Handle, MaxHandle]);
1121-
false ->
1122-
handle_attach(Attach, State)
1129+
try handle_attach(Attach, State)
1130+
catch {link_error, Error} ->
1131+
%% Figure 2.33
1132+
?LOG_WARNING("refusing link '~ts': ~tp", [NameBin, Error]),
1133+
AttachReply = case Role of
1134+
?AMQP_ROLE_SENDER ->
1135+
#'v1_0.attach'{
1136+
name = Name,
1137+
handle = Handle,
1138+
role = ?AMQP_ROLE_RECEIVER,
1139+
source = Source,
1140+
target = null};
1141+
?AMQP_ROLE_RECEIVER ->
1142+
#'v1_0.attach'{
1143+
name = Name,
1144+
handle = Handle,
1145+
role = ?AMQP_ROLE_SENDER,
1146+
source = null,
1147+
target = Target,
1148+
initial_delivery_count = ?UINT(?INITIAL_DELIVERY_COUNT)}
1149+
end,
1150+
Detach = #'v1_0.detach'{handle = Handle,
1151+
closed = true,
1152+
error = Error},
1153+
{ok, [AttachReply, Detach], State}
11231154
end;
11241155

11251156
handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
@@ -1530,6 +1561,11 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
15301561
end
15311562
end.
15321563

1564+
link_error(Condition, Msg, Args) ->
1565+
Description = unicode:characters_to_binary(lists:flatten(io_lib:format(Msg, Args))),
1566+
throw({link_error, #'v1_0.error'{condition = Condition,
1567+
description = {utf8, Description}}}).
1568+
15331569
send_pending(#state{remote_incoming_window = RemoteIncomingWindow,
15341570
outgoing_pending = Buf0
15351571
} = State) ->
@@ -2871,7 +2907,8 @@ check_exchange(XNameBin, User, Vhost, PermCache0) ->
28712907
end,
28722908
{ok, Exchange, PermCache};
28732909
{error, not_found} ->
2874-
exit_not_found(XName)
2910+
link_error(?V_1_0_AMQP_ERROR_NOT_FOUND,
2911+
"no ~ts", [rabbit_misc:rs(XName)])
28752912
end.
28762913

28772914
address_v1_permitted() ->
@@ -3429,14 +3466,17 @@ exit_if_absent(Kind, Vhost, Name) when is_list(Name) ->
34293466
exit_if_absent(Kind, Vhost, Name) when is_binary(Name) ->
34303467
exit_if_absent(rabbit_misc:r(Vhost, Kind, Name)).
34313468

3432-
exit_if_absent(ResourceName = #resource{kind = Kind}) ->
3469+
exit_if_absent(Resource = #resource{kind = Kind}) ->
34333470
Mod = case Kind of
34343471
exchange -> rabbit_exchange;
34353472
queue -> rabbit_amqqueue
34363473
end,
3437-
case Mod:exists(ResourceName) of
3438-
true -> ok;
3439-
false -> exit_not_found(ResourceName)
3474+
case Mod:exists(Resource) of
3475+
true ->
3476+
ok;
3477+
false ->
3478+
link_error(?V_1_0_AMQP_ERROR_NOT_FOUND,
3479+
"no ~ts", [rabbit_misc:rs(Resource)])
34403480
end.
34413481

34423482
generate_queue_name_v1() ->
@@ -3482,7 +3522,7 @@ declare_queue(QNameBin,
34823522
{existing, _Q} ->
34833523
ok;
34843524
{error, queue_limit_exceeded, Reason, ReasonArgs} ->
3485-
protocol_error(
3525+
link_error(
34863526
?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED,
34873527
Reason,
34883528
ReasonArgs);
@@ -3685,9 +3725,9 @@ maybe_detach_mgmt_link(
36853725

36863726
check_internal_exchange(#exchange{internal = true,
36873727
name = XName}) ->
3688-
protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
3689-
"forbidden to publish to internal ~ts",
3690-
[rabbit_misc:rs(XName)]);
3728+
link_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
3729+
"forbidden to publish to internal ~ts",
3730+
[rabbit_misc:rs(XName)]);
36913731
check_internal_exchange(_) ->
36923732
ok.
36933733

@@ -3841,12 +3881,6 @@ exit_not_implemented(Format) ->
38413881
exit_not_implemented(Format, Args) ->
38423882
protocol_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, Format, Args).
38433883

3844-
-spec exit_not_found(rabbit_types:r(exchange | queue)) -> no_return().
3845-
exit_not_found(Resource) ->
3846-
protocol_error(?V_1_0_AMQP_ERROR_NOT_FOUND,
3847-
"no ~ts",
3848-
[rabbit_misc:rs(Resource)]).
3849-
38503884
-spec error_not_found(rabbit_types:r(exchange | queue)) -> #'v1_0.error'{}.
38513885
error_not_found(Resource) ->
38523886
Description = unicode:characters_to_binary("no " ++ rabbit_misc:rs(Resource)),

0 commit comments

Comments
 (0)