Skip to content

Commit 2bd6921

Browse files
Merge pull request #11339 from rabbitmq/v3.13.x-amqpl-x-death
Interpret x-death header from AMQP 0.9.1 client
2 parents 1f8b21d + ea94a77 commit 2bd6921

File tree

13 files changed

+266
-120
lines changed

13 files changed

+266
-120
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,9 @@ rabbitmq_suite(
815815
rabbitmq_suite(
816816
name = "mc_unit_SUITE",
817817
size = "small",
818+
runtime_deps = [
819+
"@meck//:erlang_app",
820+
],
818821
deps = [
819822
"//deps/amqp10_common:erlang_app",
820823
"//deps/rabbit_common:erlang_app",

deps/rabbit/app.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1721,6 +1721,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
17211721
testonly = True,
17221722
srcs = ["test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl"],
17231723
outs = ["test/unit_classic_mirrored_queue_sync_throttling_SUITE.beam"],
1724+
hdrs = ["include/mc.hrl"],
17241725
app_name = "rabbit",
17251726
erlc_opts = "//:test_erlc_opts",
17261727
deps = ["//deps/rabbit_common:erlang_app"],

deps/rabbit/include/mc.hrl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414
-define(ANN_PRIORITY, p).
1515

1616
-define(FF_MC_DEATHS_V2, message_containers_deaths_v2).
17+
-define(MC_ENV,
18+
case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of
19+
true -> #{};
20+
false -> #{?FF_MC_DEATHS_V2 => false}
21+
end).
1722

1823
-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
1924
-type death_anns() :: #{%% timestamp of the first time this message

deps/rabbit/src/mc.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@
9393
%% protocol specific init function
9494
%% returns a map of additional annotations to merge into the
9595
%% protocol generic annotations map, e.g. ttl, priority and durable
96-
-callback init(term()) ->
96+
-callback init(term(), environment()) ->
9797
{proto_state(), annotations()}.
9898

9999
%% the size of the payload and other meta data respectively
@@ -147,7 +147,7 @@ init(Proto, Data, Anns0, Env)
147147
when is_atom(Proto)
148148
andalso is_map(Anns0)
149149
andalso is_map(Env) ->
150-
{ProtoData, ProtoAnns} = Proto:init(Data),
150+
{ProtoData, ProtoAnns} = Proto:init(Data, Env),
151151
Anns = case maps:size(Env) == 0 of
152152
true ->
153153
Anns0;
@@ -389,9 +389,9 @@ record_death(Reason, SourceQueue,
389389
[{Key, NewDeath} | Deaths0]
390390
end
391391
end,
392-
Anns0#{<<"x-last-death-reason">> := atom_to_binary(Reason),
393-
<<"x-last-death-queue">> := SourceQueue,
394-
<<"x-last-death-exchange">> := Exchange,
392+
Anns0#{<<"x-last-death-reason">> => atom_to_binary(Reason),
393+
<<"x-last-death-queue">> => SourceQueue,
394+
<<"x-last-death-exchange">> => Exchange,
395395
deaths := Deaths};
396396
_ ->
397397
Deaths = case Env of

deps/rabbit/src/mc_amqp.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
-include("mc.hrl").
66

77
-export([
8-
init/1,
8+
init/2,
99
size/1,
1010
x_header/2,
1111
property/2,
@@ -59,18 +59,18 @@
5959
]).
6060

6161
%% mc implementation
62-
init(Sections) when is_list(Sections) ->
62+
init(Sections, Env) when is_list(Sections) ->
6363
Msg = decode(Sections, #msg{}),
64-
init(Msg);
65-
init(#msg{} = Msg) ->
64+
init(Msg, Env);
65+
init(#msg{} = Msg, _Env) ->
6666
%% TODO: as the essential annotations, durable, priority, ttl and delivery_count
6767
%% is all we are interested in it isn't necessary to keep hold of the
6868
%% incoming AMQP header inside the state
6969
Anns = essential_properties(Msg),
7070
{Msg, Anns}.
7171

72-
convert_from(?MODULE, Sections, _Env) ->
73-
element(1, init(Sections));
72+
convert_from(?MODULE, Sections, Env) ->
73+
element(1, init(Sections, Env));
7474
convert_from(_SourceProto, _, _Env) ->
7575
not_implemented.
7676

deps/rabbit/src/mc_amqpl.erl

Lines changed: 128 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
%% mc
1010
-export([
11-
init/1,
11+
init/2,
1212
size/1,
1313
x_header/2,
1414
routing_headers/2,
@@ -48,10 +48,10 @@
4848
]).
4949

5050
%% mc implementation
51-
init(#content{} = Content0) ->
51+
init(#content{} = Content0, Env) ->
5252
Content1 = rabbit_binary_parser:ensure_content_decoded(Content0),
5353
%% project essential properties into annotations
54-
Anns = essential_properties(Content1),
54+
Anns = essential_properties(Content1, Env),
5555
Content = strip_header(Content1, ?DELETED_HEADER),
5656
{Content, Anns}.
5757

@@ -483,7 +483,8 @@ message(#resource{name = ExchangeNameBin}, RoutingKey,
483483
{ok, mc:init(?MODULE,
484484
Content,
485485
Anns#{?ANN_ROUTING_KEYS => [RoutingKey | HeaderRoutes],
486-
?ANN_EXCHANGE => ExchangeNameBin})}
486+
?ANN_EXCHANGE => ExchangeNameBin},
487+
?MC_ENV)}
487488
end;
488489
message(#resource{} = XName, RoutingKey,
489490
#content{} = Content, Anns, false) ->
@@ -514,75 +515,6 @@ from_basic_message(#basic_message{content = Content,
514515

515516
%% Internal
516517

517-
deaths_to_headers(Deaths, Headers0) ->
518-
Infos = case Deaths of
519-
#deaths{records = Records} ->
520-
%% sort records by the last timestamp
521-
List = lists:sort(
522-
fun({_, #death{anns = #{last_time := L1}}},
523-
{_, #death{anns = #{last_time := L2}}}) ->
524-
L1 =< L2
525-
end, maps:to_list(Records)),
526-
lists:foldl(fun(Record, Acc) ->
527-
Table = death_table(Record),
528-
[Table | Acc]
529-
end, [], List);
530-
_ ->
531-
lists:map(fun death_table/1, Deaths)
532-
end,
533-
rabbit_misc:set_table_value(Headers0, <<"x-death">>, array, Infos).
534-
535-
convert_from_amqp_deaths({array, map, Maps}) ->
536-
L = lists:map(
537-
fun({map, KvList}) ->
538-
{Ttl, KvList1} = case KvList of
539-
[{{symbol, <<"ttl">>}, {uint, Ttl0}} | Tail] ->
540-
{Ttl0, Tail};
541-
_ ->
542-
{undefined, KvList}
543-
end,
544-
[
545-
{{symbol, <<"queue">>}, {utf8, Queue}},
546-
{{symbol, <<"reason">>}, {symbol, Reason}},
547-
{{symbol, <<"count">>}, {ulong, Count}},
548-
{{symbol, <<"first-time">>}, {timestamp, FirstTime}},
549-
{{symbol, <<"last-time">>}, {timestamp, _LastTime}},
550-
{{symbol, <<"exchange">>}, {utf8, Exchange}},
551-
{{symbol, <<"routing-keys">>}, {array, utf8, RKeys0}}
552-
] = KvList1,
553-
RKeys = [Key || {utf8, Key} <- RKeys0],
554-
death_table(Queue, Reason, Exchange, RKeys, Count, FirstTime, Ttl)
555-
end, Maps),
556-
{true, {<<"x-death">>, array, L}};
557-
convert_from_amqp_deaths(_IgnoreUnknownValue) ->
558-
false.
559-
560-
death_table({{QName, Reason},
561-
#death{exchange = Exchange,
562-
routing_keys = RoutingKeys,
563-
count = Count,
564-
anns = DeathAnns = #{first_time := FirstTime}}}) ->
565-
death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime,
566-
maps:get(ttl, DeathAnns, undefined)).
567-
568-
death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, Ttl) ->
569-
L0 = [
570-
{<<"count">>, long, Count},
571-
{<<"reason">>, longstr, rabbit_data_coercion:to_binary(Reason)},
572-
{<<"queue">>, longstr, QName},
573-
{<<"time">>, timestamp, FirstTime div 1000},
574-
{<<"exchange">>, longstr, Exchange},
575-
{<<"routing-keys">>, array, [{longstr, Key} || Key <- RoutingKeys]}
576-
],
577-
L = case Ttl of
578-
undefined ->
579-
L0;
580-
_ ->
581-
Expiration = integer_to_binary(Ttl),
582-
[{<<"original-expiration">>, longstr, Expiration} | L0]
583-
end,
584-
{table, L}.
585-
586518
strip_header(#content{properties = #'P_basic'{headers = undefined}}
587519
= DecodedContent, _Key) ->
588520
DecodedContent;
@@ -732,11 +664,11 @@ message_id({utf8, S}, HKey, H0) ->
732664
message_id(undefined, _HKey, H) ->
733665
{H, undefined}.
734666

735-
essential_properties(#content{} = C) ->
667+
essential_properties(#content{properties = Props}, Env) ->
736668
#'P_basic'{delivery_mode = Mode,
737669
priority = Priority,
738670
timestamp = TimestampRaw,
739-
headers = Headers} = Props = C#content.properties,
671+
headers = Headers} = Props,
740672
{ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
741673
Timestamp = case TimestampRaw of
742674
undefined ->
@@ -752,6 +684,8 @@ essential_properties(#content{} = C) ->
752684
_ ->
753685
undefined
754686
end,
687+
Deaths = headers_to_deaths(Headers, Env),
688+
755689
maps_put_truthy(
756690
?ANN_PRIORITY, Priority,
757691
maps_put_truthy(
@@ -762,7 +696,125 @@ essential_properties(#content{} = C) ->
762696
?ANN_DURABLE, Durable,
763697
maps_put_truthy(
764698
bcc, BccKeys,
765-
#{}))))).
699+
maps_put_truthy(
700+
deaths, Deaths,
701+
#{})))))).
702+
703+
headers_to_deaths(_, #{?FF_MC_DEATHS_V2 := false}) ->
704+
undefined;
705+
headers_to_deaths(undefined, _) ->
706+
undefined;
707+
headers_to_deaths(Headers, _) ->
708+
case lists:keymember(<<"x-death">>, 1, Headers) of
709+
true ->
710+
case rabbit_misc:amqp_table(Headers) of
711+
#{<<"x-death">> := XDeathList}
712+
when is_list(XDeathList) ->
713+
recover_deaths(XDeathList, []);
714+
_ ->
715+
undefined
716+
end;
717+
false ->
718+
undefined
719+
end.
720+
721+
recover_deaths([], Acc) ->
722+
lists:reverse(Acc);
723+
recover_deaths([Map = #{<<"exchange">> := Exchange,
724+
<<"queue">> := Queue,
725+
<<"routing-keys">> := RKeys,
726+
<<"reason">> := ReasonBin,
727+
<<"count">> := Count,
728+
<<"time">> := Ts} | Rem], Acc0) ->
729+
Reason = binary_to_existing_atom(ReasonBin),
730+
DeathAnns0 = #{first_time => Ts,
731+
%% Given that this timestamp is absent in the AMQP 0.9.1
732+
%% x-death header, the last_time we set here is incorrect
733+
%% if the message was dead lettered more than one time.
734+
last_time => Ts},
735+
DeathAnns = case Map of
736+
#{<<"original-expiration">> := Exp} ->
737+
DeathAnns0#{ttl => binary_to_integer(Exp)};
738+
_ ->
739+
DeathAnns0
740+
end,
741+
Acc = [{{Queue, Reason},
742+
#death{anns = DeathAnns,
743+
exchange = Exchange,
744+
count = Count,
745+
routing_keys = RKeys}} | Acc0],
746+
recover_deaths(Rem, Acc);
747+
recover_deaths([_IgnoreInvalid | Rem], Acc) ->
748+
recover_deaths(Rem, Acc).
749+
750+
deaths_to_headers(Deaths, Headers0) ->
751+
Infos = case Deaths of
752+
#deaths{records = Records} ->
753+
%% sort records by the last timestamp
754+
List = lists:sort(
755+
fun({_, #death{anns = #{last_time := L1}}},
756+
{_, #death{anns = #{last_time := L2}}}) ->
757+
L1 =< L2
758+
end, maps:to_list(Records)),
759+
lists:foldl(fun(Record, Acc) ->
760+
Table = death_table(Record),
761+
[Table | Acc]
762+
end, [], List);
763+
_ ->
764+
lists:map(fun death_table/1, Deaths)
765+
end,
766+
rabbit_misc:set_table_value(Headers0, <<"x-death">>, array, Infos).
767+
768+
convert_from_amqp_deaths({array, map, Maps}) ->
769+
L = lists:map(
770+
fun({map, KvList}) ->
771+
{Ttl, KvList1} = case KvList of
772+
[{{symbol, <<"ttl">>}, {uint, Ttl0}} | Tail] ->
773+
{Ttl0, Tail};
774+
_ ->
775+
{undefined, KvList}
776+
end,
777+
[
778+
{{symbol, <<"queue">>}, {utf8, Queue}},
779+
{{symbol, <<"reason">>}, {symbol, Reason}},
780+
{{symbol, <<"count">>}, {ulong, Count}},
781+
{{symbol, <<"first-time">>}, {timestamp, FirstTime}},
782+
{{symbol, <<"last-time">>}, {timestamp, _LastTime}},
783+
{{symbol, <<"exchange">>}, {utf8, Exchange}},
784+
{{symbol, <<"routing-keys">>}, {array, utf8, RKeys0}}
785+
] = KvList1,
786+
RKeys = [Key || {utf8, Key} <- RKeys0],
787+
death_table(Queue, Reason, Exchange, RKeys, Count, FirstTime, Ttl)
788+
end, Maps),
789+
{true, {<<"x-death">>, array, L}};
790+
convert_from_amqp_deaths(_IgnoreUnknownValue) ->
791+
false.
792+
793+
death_table({{QName, Reason},
794+
#death{exchange = Exchange,
795+
routing_keys = RoutingKeys,
796+
count = Count,
797+
anns = DeathAnns = #{first_time := FirstTime}}}) ->
798+
death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime,
799+
maps:get(ttl, DeathAnns, undefined)).
800+
801+
death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, Ttl) ->
802+
L0 = [
803+
{<<"count">>, long, Count},
804+
{<<"reason">>, longstr, rabbit_data_coercion:to_binary(Reason)},
805+
{<<"queue">>, longstr, QName},
806+
{<<"time">>, timestamp, FirstTime div 1000},
807+
{<<"exchange">>, longstr, Exchange},
808+
{<<"routing-keys">>, array, [{longstr, Key} || Key <- RoutingKeys]}
809+
],
810+
L = case Ttl of
811+
undefined ->
812+
L0;
813+
_ ->
814+
Expiration = integer_to_binary(Ttl),
815+
[{<<"original-expiration">>, longstr, Expiration} | L0]
816+
end,
817+
{table, L}.
766818

767819
%% headers that are added as annotations during conversions
768820
is_internal_header(<<"x-basic-", _/binary>>) ->

deps/rabbit/src/rabbit_dead_letter.erl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,7 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK,
3131
_ ->
3232
[RK]
3333
end,
34-
Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of
35-
true -> #{};
36-
false -> #{?FF_MC_DEATHS_V2 => false}
37-
end,
38-
Msg1 = mc:record_death(Reason, SourceQName, Msg0, Env),
34+
Msg1 = mc:record_death(Reason, SourceQName, Msg0, ?MC_ENV),
3935
{Ttl, Msg2} = mc:take_annotation(dead_letter_ttl, Msg1),
4036
Msg3 = mc:set_ttl(Ttl, Msg2),
4137
Msg4 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg3),

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -324,11 +324,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason,
324324
_ ->
325325
[RKey]
326326
end,
327-
Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of
328-
true -> #{};
329-
false -> #{?FF_MC_DEATHS_V2 => false}
330-
end,
331-
Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg, Env),
327+
Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg, ?MC_ENV),
332328
Msg1 = mc:set_ttl(undefined, Msg0),
333329
Msg2 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg1),
334330
Msg = mc:set_annotation(?ANN_EXCHANGE, DLXName, Msg2),

0 commit comments

Comments
 (0)