Skip to content

Commit 2f5809c

Browse files
committed
more link errors
1 parent 74e1175 commit 2f5809c

File tree

2 files changed

+79
-79
lines changed

2 files changed

+79
-79
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 63 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,32 +1125,34 @@ handle_frame(#'v1_0.attach'{name = {utf8, NameBin} = Name,
11251125
source = Source,
11261126
target = Target} = Attach,
11271127
State) ->
1128-
ok = validate_attach(Attach),
1129-
try handle_attach(Attach, State)
1130-
catch {link_error, Error} ->
1131-
%% Figure 2.33
1132-
?LOG_WARNING("refusing link '~ts': ~tp", [NameBin, Error]),
1133-
AttachReply = case Role of
1134-
?AMQP_ROLE_SENDER ->
1135-
#'v1_0.attach'{
1136-
name = Name,
1137-
handle = Handle,
1138-
role = ?AMQP_ROLE_RECEIVER,
1139-
source = Source,
1140-
target = null};
1141-
?AMQP_ROLE_RECEIVER ->
1142-
#'v1_0.attach'{
1143-
name = Name,
1144-
handle = Handle,
1145-
role = ?AMQP_ROLE_SENDER,
1146-
source = null,
1147-
target = Target,
1148-
initial_delivery_count = ?UINT(?INITIAL_DELIVERY_COUNT)}
1149-
end,
1150-
Detach = #'v1_0.detach'{handle = Handle,
1151-
closed = true,
1152-
error = Error},
1153-
{ok, [AttachReply, Detach], State}
1128+
try
1129+
ok = validate_attach(Attach),
1130+
handle_attach(Attach, State)
1131+
catch
1132+
{link_error, Error} ->
1133+
%% Figure 2.33
1134+
?LOG_WARNING("refusing link '~ts': ~tp", [NameBin, Error]),
1135+
AttachReply = case Role of
1136+
?AMQP_ROLE_SENDER ->
1137+
#'v1_0.attach'{
1138+
name = Name,
1139+
handle = Handle,
1140+
role = ?AMQP_ROLE_RECEIVER,
1141+
source = Source,
1142+
target = null};
1143+
?AMQP_ROLE_RECEIVER ->
1144+
#'v1_0.attach'{
1145+
name = Name,
1146+
handle = Handle,
1147+
role = ?AMQP_ROLE_SENDER,
1148+
source = null,
1149+
target = Target,
1150+
initial_delivery_count = ?UINT(?INITIAL_DELIVERY_COUNT)}
1151+
end,
1152+
Detach = #'v1_0.detach'{handle = Handle,
1153+
closed = true,
1154+
error = Error},
1155+
{ok, [AttachReply, Detach], State}
11541156
end;
11551157

11561158
handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
@@ -1257,9 +1259,9 @@ handle_attach(#'v1_0.attach'{
12571259
Pair#management_link_pair{incoming_half = HandleInt},
12581260
Pairs0);
12591261
#{LinkName := Other} ->
1260-
protocol_error(?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
1261-
"received invalid attach ~p for management link pair ~p",
1262-
[Attach, Other]);
1262+
link_error(?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
1263+
"received invalid attach ~p for management link pair ~p",
1264+
[Attach, Other]);
12631265
_ ->
12641266
maps:put(LinkName,
12651267
#management_link_pair{client_terminus_address = ClientTerminusAddress,
@@ -1314,9 +1316,9 @@ handle_attach(#'v1_0.attach'{
13141316
Pair#management_link_pair{outgoing_half = HandleInt},
13151317
Pairs0);
13161318
#{LinkName := Other} ->
1317-
protocol_error(?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
1318-
"received invalid attach ~p for management link pair ~p",
1319-
[Attach, Other]);
1319+
link_error(?V_1_0_AMQP_ERROR_PRECONDITION_FAILED,
1320+
"received invalid attach ~p for management link pair ~p",
1321+
[Attach, Other]);
13201322
_ ->
13211323
maps:put(LinkName,
13221324
#management_link_pair{client_terminus_address = ClientTerminusAddress,
@@ -1546,18 +1548,16 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
15461548
rabbit_global_counters:consumer_created(?PROTOCOL),
15471549
{ok, [A], State1};
15481550
{error, _Type, Reason, Args} ->
1549-
protocol_error(
1550-
?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
1551-
Reason, Args)
1551+
link_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
1552+
Reason, Args)
15521553
end
15531554
end) of
15541555
{ok, Reply, State} ->
15551556
reply_frames(Reply, State);
15561557
{error, Reason} ->
1557-
protocol_error(
1558-
?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
1559-
"Could not operate on ~s: ~tp",
1560-
[rabbit_misc:rs(QName), Reason])
1558+
link_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
1559+
"Could not operate on ~ts: ~tp",
1560+
[rabbit_misc:rs(QName), Reason])
15611561
end
15621562
end.
15631563

@@ -2531,10 +2531,10 @@ incoming_link_transfer(
25312531
multi_transfer_msg = undefined},
25322532
{ok, Reply, Link, State};
25332533
{error, Reason} ->
2534-
protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
2535-
"Failed to deliver message to queues, "
2536-
"delivery_tag=~p, delivery_id=~p, reason=~p",
2537-
[DeliveryTag, DeliveryId, Reason])
2534+
link_error(
2535+
?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
2536+
"failed to deliver message (delivery-tag=~p, delivery-id=~p): ~tp",
2537+
[DeliveryTag, DeliveryId, Reason])
25382538
end;
25392539
{error, {anonymous_terminus, false}, #'v1_0.error'{} = Err} ->
25402540
Disposition = case Settled of
@@ -3364,8 +3364,8 @@ validate_multi_transfer_delivery_id(undefined, _FirstDeliveryId) ->
33643364
validate_multi_transfer_delivery_id(OtherId, FirstDeliveryId) ->
33653365
%% "It is an error if the delivery-id on a continuation transfer
33663366
%% differs from the delivery-id on the first transfer of a delivery."
3367-
protocol_error(
3368-
?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
3367+
link_error(
3368+
?V_1_0_AMQP_ERROR_INVALID_FIELD,
33693369
"delivery-id of continuation transfer (~p) differs from delivery-id on first transfer (~p)",
33703370
[OtherId, FirstDeliveryId]).
33713371

@@ -3377,8 +3377,8 @@ validate_multi_transfer_settled(undefined, Settled)
33773377
ok;
33783378
validate_multi_transfer_settled(Other, First)
33793379
when is_boolean(First) ->
3380-
protocol_error(
3381-
?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
3380+
link_error(
3381+
?V_1_0_AMQP_ERROR_INVALID_FIELD,
33823382
"field 'settled' of continuation transfer (~p) differs from "
33833383
"(interpreted) field 'settled' on first transfer (~p)",
33843384
[Other, First]).
@@ -3394,8 +3394,8 @@ validate_transfer_snd_settle_mode(settled, true) ->
33943394
%% then this field MUST be true on at least one transfer frame for a delivery" [2.7.5]
33953395
ok;
33963396
validate_transfer_snd_settle_mode(SndSettleMode, Settled) ->
3397-
protocol_error(
3398-
?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
3397+
link_error(
3398+
?V_1_0_AMQP_ERROR_INVALID_FIELD,
33993399
"sender settle mode is '~s' but transfer settled flag is interpreted as being '~s'",
34003400
[SndSettleMode, Settled]).
34013401

@@ -3523,14 +3523,13 @@ declare_queue(QNameBin,
35233523
{existing, _Q} ->
35243524
ok;
35253525
{error, queue_limit_exceeded, Reason, ReasonArgs} ->
3526-
link_error(
3527-
?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED,
3528-
Reason,
3529-
ReasonArgs);
3526+
link_error(?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED,
3527+
Reason,
3528+
ReasonArgs);
35303529
Other ->
3531-
protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
3532-
"Failed to declare ~s: ~p",
3533-
[rabbit_misc:rs(QName), Other])
3530+
link_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR,
3531+
"Failed to declare ~s: ~p",
3532+
[rabbit_misc:rs(QName), Other])
35343533
end,
35353534
{ok, PermCache}.
35363535

@@ -3836,7 +3835,7 @@ check_user_id(Mc, User) ->
38363835
ok ->
38373836
ok;
38383837
{refused, Reason, Args} ->
3839-
protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, Reason, Args)
3838+
link_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, Reason, Args)
38403839
end.
38413840

38423841
maps_update_with(Key, Fun, Init, Map) ->
@@ -3864,23 +3863,23 @@ check_paired({map, Properties}) ->
38643863
true ->
38653864
ok;
38663865
false ->
3867-
exit_property_paired_not_set()
3866+
error_property_paired_not_set()
38683867
end;
38693868
check_paired(_) ->
3870-
exit_property_paired_not_set().
3869+
error_property_paired_not_set().
38713870

3872-
-spec exit_property_paired_not_set() -> no_return().
3873-
exit_property_paired_not_set() ->
3874-
protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD,
3875-
"Link property 'paired' is not set to boolean value 'true'", []).
3871+
-spec error_property_paired_not_set() -> no_return().
3872+
error_property_paired_not_set() ->
3873+
link_error(?V_1_0_AMQP_ERROR_INVALID_FIELD,
3874+
"Link property 'paired' is not set to boolean value 'true'", []).
38763875

38773876
-spec exit_not_implemented(io:format()) -> no_return().
38783877
exit_not_implemented(Format) ->
38793878
exit_not_implemented(Format, []).
38803879

38813880
-spec exit_not_implemented(io:format(), [term()]) -> no_return().
38823881
exit_not_implemented(Format, Args) ->
3883-
protocol_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, Format, Args).
3882+
link_error(?V_1_0_AMQP_ERROR_NOT_IMPLEMENTED, Format, Args).
38843883

38853884
-spec error_not_found(rabbit_types:r(exchange | queue)) -> #'v1_0.error'{}.
38863885
error_not_found(Resource) ->

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -994,46 +994,46 @@ durable_field(Config, QueueType, QName)
994994
invalid_transfer_settled_flag(Config) ->
995995
OpnConf = connection_config(Config),
996996
{ok, Connection} = amqp10_client:open_connection(OpnConf),
997-
{ok, Session1} = amqp10_client:begin_session(Connection),
998-
{ok, Session2} = amqp10_client:begin_session(Connection),
997+
{ok, Session} = amqp10_client:begin_session(Connection),
999998
TargetAddr = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
1000999
{ok, SenderSettled} = amqp10_client:attach_sender_link_sync(
1001-
Session1, <<"link 1">>, TargetAddr, settled),
1000+
Session, <<"link 1">>, TargetAddr, settled),
10021001
{ok, SenderUnsettled} = amqp10_client:attach_sender_link_sync(
1003-
Session2, <<"link 2">>, TargetAddr, unsettled),
1002+
Session, <<"link 2">>, TargetAddr, unsettled),
10041003
ok = wait_for_credit(SenderSettled),
10051004
ok = wait_for_credit(SenderUnsettled),
10061005

10071006
ok = amqp10_client:send_msg(SenderSettled, amqp10_msg:new(<<"tag1">>, <<"m1">>, false)),
10081007
receive
10091008
{amqp10_event,
1010-
{session, Session1,
1011-
{ended,
1009+
{link, SenderSettled,
1010+
{detached,
10121011
#'v1_0.error'{
1013-
condition = ?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
1012+
condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD,
10141013
description = {utf8, Description1}}}}} ->
10151014
?assertEqual(
10161015
<<"sender settle mode is 'settled' but transfer settled flag is interpreted as being 'false'">>,
10171016
Description1)
1018-
after 30000 -> flush(missing_ended),
1017+
after 9000 -> flush(missing_event),
10191018
ct:fail({missing_event, ?LINE})
10201019
end,
10211020

10221021
ok = amqp10_client:send_msg(SenderUnsettled, amqp10_msg:new(<<"tag2">>, <<"m2">>, true)),
10231022
receive
10241023
{amqp10_event,
1025-
{session, Session2,
1026-
{ended,
1024+
{link, SenderUnsettled,
1025+
{detached,
10271026
#'v1_0.error'{
1028-
condition = ?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
1027+
condition = ?V_1_0_AMQP_ERROR_INVALID_FIELD,
10291028
description = {utf8, Description2}}}}} ->
10301029
?assertEqual(
10311030
<<"sender settle mode is 'unsettled' but transfer settled flag is interpreted as being 'true'">>,
10321031
Description2)
1033-
after 30000 -> flush(missing_ended),
1032+
after 9000 -> flush(missing_event),
10341033
ct:fail({missing_event, ?LINE})
10351034
end,
10361035

1036+
ok = end_session_sync(Session),
10371037
ok = close_connection_sync(Connection).
10381038

10391039
quorum_queue_rejects(Config) ->
@@ -4664,15 +4664,16 @@ user_id(Config) ->
46644664
ok = amqp10_client:send_msg(Sender, Msg2),
46654665
receive
46664666
{amqp10_event,
4667-
{session, Session,
4668-
{ended,
4667+
{link, Sender,
4668+
{detached,
46694669
#'v1_0.error'{
46704670
condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
46714671
description = {utf8, <<"user_id property set to 'fake user' but authenticated user was 'guest'">>}}}}} -> ok
4672-
after 30000 -> flush(missing_ended),
4672+
after 9000 -> flush(missing_detached),
46734673
ct:fail("did not receive expected error")
46744674
end,
46754675

4676+
ok = end_session_sync(Session),
46764677
ok = close_connection_sync(Connection).
46774678

46784679
message_ttl(Config) ->

0 commit comments

Comments
 (0)