Skip to content

Commit ea94a77

Browse files
committed
Interpret x-death header from AMQP 0.9.1 client
Fixes #10709 Fixes #11331 This commit fixes the following regression which worked in 3.12.x, but stopped working in 3.13.0 - 3.13.2: ``` AMQP 0.9.1 client --publish--> Q --dead-letter--> DLQ --consume--> AMQP 0.9.1 client (death count is now 1) --republish-same-message-with-headers-as-just-received--> Q --dead-letter--> DLQ --consume --> AMQP 0.9.1 (death count is now 1, but should be 2) ``` The reason this behaviour stopped to work in 3.13.0 is that the broker won't specially interpret x-headers in general, and the x-death header specifically in this case anymore. In other words, the new desired 3.13 behaviour with message containers is that "x-headers belong to the broker". While this is correct, it does break client applications which depended on the previous use case. One simple fix is that the client application does not re-publish with the x-death header, but instead sets its own custom count header to determine the number of times it retries. This commit will only be packported to v3.13.x branch. In other words, 4.0 won't interpret x-headers as done in 3.13.0 - 3.13.2. The reason we backport this commit to v3.13.x is that the Spring documentation expliclity recommends re-publishing the message with x-death header being set: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-rabbit/3.0.6.RELEASE/reference/html/spring-cloud-stream-binder-rabbit.html#_retry_with_the_rabbitmq_binder
1 parent 1f8b21d commit ea94a77

File tree

13 files changed

+266
-120
lines changed

13 files changed

+266
-120
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,9 @@ rabbitmq_suite(
815815
rabbitmq_suite(
816816
name = "mc_unit_SUITE",
817817
size = "small",
818+
runtime_deps = [
819+
"@meck//:erlang_app",
820+
],
818821
deps = [
819822
"//deps/amqp10_common:erlang_app",
820823
"//deps/rabbit_common:erlang_app",

deps/rabbit/app.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1721,6 +1721,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
17211721
testonly = True,
17221722
srcs = ["test/unit_classic_mirrored_queue_sync_throttling_SUITE.erl"],
17231723
outs = ["test/unit_classic_mirrored_queue_sync_throttling_SUITE.beam"],
1724+
hdrs = ["include/mc.hrl"],
17241725
app_name = "rabbit",
17251726
erlc_opts = "//:test_erlc_opts",
17261727
deps = ["//deps/rabbit_common:erlang_app"],

deps/rabbit/include/mc.hrl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414
-define(ANN_PRIORITY, p).
1515

1616
-define(FF_MC_DEATHS_V2, message_containers_deaths_v2).
17+
-define(MC_ENV,
18+
case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of
19+
true -> #{};
20+
false -> #{?FF_MC_DEATHS_V2 => false}
21+
end).
1722

1823
-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
1924
-type death_anns() :: #{%% timestamp of the first time this message

deps/rabbit/src/mc.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@
9393
%% protocol specific init function
9494
%% returns a map of additional annotations to merge into the
9595
%% protocol generic annotations map, e.g. ttl, priority and durable
96-
-callback init(term()) ->
96+
-callback init(term(), environment()) ->
9797
{proto_state(), annotations()}.
9898

9999
%% the size of the payload and other meta data respectively
@@ -147,7 +147,7 @@ init(Proto, Data, Anns0, Env)
147147
when is_atom(Proto)
148148
andalso is_map(Anns0)
149149
andalso is_map(Env) ->
150-
{ProtoData, ProtoAnns} = Proto:init(Data),
150+
{ProtoData, ProtoAnns} = Proto:init(Data, Env),
151151
Anns = case maps:size(Env) == 0 of
152152
true ->
153153
Anns0;
@@ -389,9 +389,9 @@ record_death(Reason, SourceQueue,
389389
[{Key, NewDeath} | Deaths0]
390390
end
391391
end,
392-
Anns0#{<<"x-last-death-reason">> := atom_to_binary(Reason),
393-
<<"x-last-death-queue">> := SourceQueue,
394-
<<"x-last-death-exchange">> := Exchange,
392+
Anns0#{<<"x-last-death-reason">> => atom_to_binary(Reason),
393+
<<"x-last-death-queue">> => SourceQueue,
394+
<<"x-last-death-exchange">> => Exchange,
395395
deaths := Deaths};
396396
_ ->
397397
Deaths = case Env of

deps/rabbit/src/mc_amqp.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
-include("mc.hrl").
66

77
-export([
8-
init/1,
8+
init/2,
99
size/1,
1010
x_header/2,
1111
property/2,
@@ -59,18 +59,18 @@
5959
]).
6060

6161
%% mc implementation
62-
init(Sections) when is_list(Sections) ->
62+
init(Sections, Env) when is_list(Sections) ->
6363
Msg = decode(Sections, #msg{}),
64-
init(Msg);
65-
init(#msg{} = Msg) ->
64+
init(Msg, Env);
65+
init(#msg{} = Msg, _Env) ->
6666
%% TODO: as the essential annotations, durable, priority, ttl and delivery_count
6767
%% is all we are interested in it isn't necessary to keep hold of the
6868
%% incoming AMQP header inside the state
6969
Anns = essential_properties(Msg),
7070
{Msg, Anns}.
7171

72-
convert_from(?MODULE, Sections, _Env) ->
73-
element(1, init(Sections));
72+
convert_from(?MODULE, Sections, Env) ->
73+
element(1, init(Sections, Env));
7474
convert_from(_SourceProto, _, _Env) ->
7575
not_implemented.
7676

deps/rabbit/src/mc_amqpl.erl

Lines changed: 128 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
%% mc
1010
-export([
11-
init/1,
11+
init/2,
1212
size/1,
1313
x_header/2,
1414
routing_headers/2,
@@ -48,10 +48,10 @@
4848
]).
4949

5050
%% mc implementation
51-
init(#content{} = Content0) ->
51+
init(#content{} = Content0, Env) ->
5252
Content1 = rabbit_binary_parser:ensure_content_decoded(Content0),
5353
%% project essential properties into annotations
54-
Anns = essential_properties(Content1),
54+
Anns = essential_properties(Content1, Env),
5555
Content = strip_header(Content1, ?DELETED_HEADER),
5656
{Content, Anns}.
5757

@@ -483,7 +483,8 @@ message(#resource{name = ExchangeNameBin}, RoutingKey,
483483
{ok, mc:init(?MODULE,
484484
Content,
485485
Anns#{?ANN_ROUTING_KEYS => [RoutingKey | HeaderRoutes],
486-
?ANN_EXCHANGE => ExchangeNameBin})}
486+
?ANN_EXCHANGE => ExchangeNameBin},
487+
?MC_ENV)}
487488
end;
488489
message(#resource{} = XName, RoutingKey,
489490
#content{} = Content, Anns, false) ->
@@ -514,75 +515,6 @@ from_basic_message(#basic_message{content = Content,
514515

515516
%% Internal
516517

517-
deaths_to_headers(Deaths, Headers0) ->
518-
Infos = case Deaths of
519-
#deaths{records = Records} ->
520-
%% sort records by the last timestamp
521-
List = lists:sort(
522-
fun({_, #death{anns = #{last_time := L1}}},
523-
{_, #death{anns = #{last_time := L2}}}) ->
524-
L1 =< L2
525-
end, maps:to_list(Records)),
526-
lists:foldl(fun(Record, Acc) ->
527-
Table = death_table(Record),
528-
[Table | Acc]
529-
end, [], List);
530-
_ ->
531-
lists:map(fun death_table/1, Deaths)
532-
end,
533-
rabbit_misc:set_table_value(Headers0, <<"x-death">>, array, Infos).
534-
535-
convert_from_amqp_deaths({array, map, Maps}) ->
536-
L = lists:map(
537-
fun({map, KvList}) ->
538-
{Ttl, KvList1} = case KvList of
539-
[{{symbol, <<"ttl">>}, {uint, Ttl0}} | Tail] ->
540-
{Ttl0, Tail};
541-
_ ->
542-
{undefined, KvList}
543-
end,
544-
[
545-
{{symbol, <<"queue">>}, {utf8, Queue}},
546-
{{symbol, <<"reason">>}, {symbol, Reason}},
547-
{{symbol, <<"count">>}, {ulong, Count}},
548-
{{symbol, <<"first-time">>}, {timestamp, FirstTime}},
549-
{{symbol, <<"last-time">>}, {timestamp, _LastTime}},
550-
{{symbol, <<"exchange">>}, {utf8, Exchange}},
551-
{{symbol, <<"routing-keys">>}, {array, utf8, RKeys0}}
552-
] = KvList1,
553-
RKeys = [Key || {utf8, Key} <- RKeys0],
554-
death_table(Queue, Reason, Exchange, RKeys, Count, FirstTime, Ttl)
555-
end, Maps),
556-
{true, {<<"x-death">>, array, L}};
557-
convert_from_amqp_deaths(_IgnoreUnknownValue) ->
558-
false.
559-
560-
death_table({{QName, Reason},
561-
#death{exchange = Exchange,
562-
routing_keys = RoutingKeys,
563-
count = Count,
564-
anns = DeathAnns = #{first_time := FirstTime}}}) ->
565-
death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime,
566-
maps:get(ttl, DeathAnns, undefined)).
567-
568-
death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, Ttl) ->
569-
L0 = [
570-
{<<"count">>, long, Count},
571-
{<<"reason">>, longstr, rabbit_data_coercion:to_binary(Reason)},
572-
{<<"queue">>, longstr, QName},
573-
{<<"time">>, timestamp, FirstTime div 1000},
574-
{<<"exchange">>, longstr, Exchange},
575-
{<<"routing-keys">>, array, [{longstr, Key} || Key <- RoutingKeys]}
576-
],
577-
L = case Ttl of
578-
undefined ->
579-
L0;
580-
_ ->
581-
Expiration = integer_to_binary(Ttl),
582-
[{<<"original-expiration">>, longstr, Expiration} | L0]
583-
end,
584-
{table, L}.
585-
586518
strip_header(#content{properties = #'P_basic'{headers = undefined}}
587519
= DecodedContent, _Key) ->
588520
DecodedContent;
@@ -732,11 +664,11 @@ message_id({utf8, S}, HKey, H0) ->
732664
message_id(undefined, _HKey, H) ->
733665
{H, undefined}.
734666

735-
essential_properties(#content{} = C) ->
667+
essential_properties(#content{properties = Props}, Env) ->
736668
#'P_basic'{delivery_mode = Mode,
737669
priority = Priority,
738670
timestamp = TimestampRaw,
739-
headers = Headers} = Props = C#content.properties,
671+
headers = Headers} = Props,
740672
{ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
741673
Timestamp = case TimestampRaw of
742674
undefined ->
@@ -752,6 +684,8 @@ essential_properties(#content{} = C) ->
752684
_ ->
753685
undefined
754686
end,
687+
Deaths = headers_to_deaths(Headers, Env),
688+
755689
maps_put_truthy(
756690
?ANN_PRIORITY, Priority,
757691
maps_put_truthy(
@@ -762,7 +696,125 @@ essential_properties(#content{} = C) ->
762696
?ANN_DURABLE, Durable,
763697
maps_put_truthy(
764698
bcc, BccKeys,
765-
#{}))))).
699+
maps_put_truthy(
700+
deaths, Deaths,
701+
#{})))))).
702+
703+
headers_to_deaths(_, #{?FF_MC_DEATHS_V2 := false}) ->
704+
undefined;
705+
headers_to_deaths(undefined, _) ->
706+
undefined;
707+
headers_to_deaths(Headers, _) ->
708+
case lists:keymember(<<"x-death">>, 1, Headers) of
709+
true ->
710+
case rabbit_misc:amqp_table(Headers) of
711+
#{<<"x-death">> := XDeathList}
712+
when is_list(XDeathList) ->
713+
recover_deaths(XDeathList, []);
714+
_ ->
715+
undefined
716+
end;
717+
false ->
718+
undefined
719+
end.
720+
721+
recover_deaths([], Acc) ->
722+
lists:reverse(Acc);
723+
recover_deaths([Map = #{<<"exchange">> := Exchange,
724+
<<"queue">> := Queue,
725+
<<"routing-keys">> := RKeys,
726+
<<"reason">> := ReasonBin,
727+
<<"count">> := Count,
728+
<<"time">> := Ts} | Rem], Acc0) ->
729+
Reason = binary_to_existing_atom(ReasonBin),
730+
DeathAnns0 = #{first_time => Ts,
731+
%% Given that this timestamp is absent in the AMQP 0.9.1
732+
%% x-death header, the last_time we set here is incorrect
733+
%% if the message was dead lettered more than one time.
734+
last_time => Ts},
735+
DeathAnns = case Map of
736+
#{<<"original-expiration">> := Exp} ->
737+
DeathAnns0#{ttl => binary_to_integer(Exp)};
738+
_ ->
739+
DeathAnns0
740+
end,
741+
Acc = [{{Queue, Reason},
742+
#death{anns = DeathAnns,
743+
exchange = Exchange,
744+
count = Count,
745+
routing_keys = RKeys}} | Acc0],
746+
recover_deaths(Rem, Acc);
747+
recover_deaths([_IgnoreInvalid | Rem], Acc) ->
748+
recover_deaths(Rem, Acc).
749+
750+
deaths_to_headers(Deaths, Headers0) ->
751+
Infos = case Deaths of
752+
#deaths{records = Records} ->
753+
%% sort records by the last timestamp
754+
List = lists:sort(
755+
fun({_, #death{anns = #{last_time := L1}}},
756+
{_, #death{anns = #{last_time := L2}}}) ->
757+
L1 =< L2
758+
end, maps:to_list(Records)),
759+
lists:foldl(fun(Record, Acc) ->
760+
Table = death_table(Record),
761+
[Table | Acc]
762+
end, [], List);
763+
_ ->
764+
lists:map(fun death_table/1, Deaths)
765+
end,
766+
rabbit_misc:set_table_value(Headers0, <<"x-death">>, array, Infos).
767+
768+
convert_from_amqp_deaths({array, map, Maps}) ->
769+
L = lists:map(
770+
fun({map, KvList}) ->
771+
{Ttl, KvList1} = case KvList of
772+
[{{symbol, <<"ttl">>}, {uint, Ttl0}} | Tail] ->
773+
{Ttl0, Tail};
774+
_ ->
775+
{undefined, KvList}
776+
end,
777+
[
778+
{{symbol, <<"queue">>}, {utf8, Queue}},
779+
{{symbol, <<"reason">>}, {symbol, Reason}},
780+
{{symbol, <<"count">>}, {ulong, Count}},
781+
{{symbol, <<"first-time">>}, {timestamp, FirstTime}},
782+
{{symbol, <<"last-time">>}, {timestamp, _LastTime}},
783+
{{symbol, <<"exchange">>}, {utf8, Exchange}},
784+
{{symbol, <<"routing-keys">>}, {array, utf8, RKeys0}}
785+
] = KvList1,
786+
RKeys = [Key || {utf8, Key} <- RKeys0],
787+
death_table(Queue, Reason, Exchange, RKeys, Count, FirstTime, Ttl)
788+
end, Maps),
789+
{true, {<<"x-death">>, array, L}};
790+
convert_from_amqp_deaths(_IgnoreUnknownValue) ->
791+
false.
792+
793+
death_table({{QName, Reason},
794+
#death{exchange = Exchange,
795+
routing_keys = RoutingKeys,
796+
count = Count,
797+
anns = DeathAnns = #{first_time := FirstTime}}}) ->
798+
death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime,
799+
maps:get(ttl, DeathAnns, undefined)).
800+
801+
death_table(QName, Reason, Exchange, RoutingKeys, Count, FirstTime, Ttl) ->
802+
L0 = [
803+
{<<"count">>, long, Count},
804+
{<<"reason">>, longstr, rabbit_data_coercion:to_binary(Reason)},
805+
{<<"queue">>, longstr, QName},
806+
{<<"time">>, timestamp, FirstTime div 1000},
807+
{<<"exchange">>, longstr, Exchange},
808+
{<<"routing-keys">>, array, [{longstr, Key} || Key <- RoutingKeys]}
809+
],
810+
L = case Ttl of
811+
undefined ->
812+
L0;
813+
_ ->
814+
Expiration = integer_to_binary(Ttl),
815+
[{<<"original-expiration">>, longstr, Expiration} | L0]
816+
end,
817+
{table, L}.
766818

767819
%% headers that are added as annotations during conversions
768820
is_internal_header(<<"x-basic-", _/binary>>) ->

deps/rabbit/src/rabbit_dead_letter.erl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,7 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK,
3131
_ ->
3232
[RK]
3333
end,
34-
Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of
35-
true -> #{};
36-
false -> #{?FF_MC_DEATHS_V2 => false}
37-
end,
38-
Msg1 = mc:record_death(Reason, SourceQName, Msg0, Env),
34+
Msg1 = mc:record_death(Reason, SourceQName, Msg0, ?MC_ENV),
3935
{Ttl, Msg2} = mc:take_annotation(dead_letter_ttl, Msg1),
4036
Msg3 = mc:set_ttl(Ttl, Msg2),
4137
Msg4 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg3),

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -324,11 +324,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason,
324324
_ ->
325325
[RKey]
326326
end,
327-
Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of
328-
true -> #{};
329-
false -> #{?FF_MC_DEATHS_V2 => false}
330-
end,
331-
Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg, Env),
327+
Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg, ?MC_ENV),
332328
Msg1 = mc:set_ttl(undefined, Msg0),
333329
Msg2 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg1),
334330
Msg = mc:set_annotation(?ANN_EXCHANGE, DLXName, Msg2),

0 commit comments

Comments
 (0)