Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_rec

PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue
PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_online_and_offline logging lqueue maintenance_mode rabbit_fifo_q
PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_containers_deaths_v2 message_size_limit metadata_store_migration
PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_size_limit metadata_store_migration
PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor

PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_msg_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue
Expand Down
2 changes: 0 additions & 2 deletions deps/rabbit/include/mc.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
-define(ANN_DURABLE, d).
-define(ANN_PRIORITY, p).

-define(FF_MC_DEATHS_V2, message_containers_deaths_v2).

-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
-type death_anns() :: #{%% timestamp of the first time this message
%% was dead lettered from this queue for this reason
Expand Down
23 changes: 6 additions & 17 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
convert/3,
protocol_state/1,
prepare/2,
record_death/4,
record_death/3,
is_death_cycle/2,
death_queue_names/1
]).
Expand Down Expand Up @@ -382,11 +382,8 @@ protocol_state(BasicMsg) ->

-spec record_death(rabbit_dead_letter:reason(),
rabbit_misc:resource_name(),
state(),
environment()) -> state().
record_death(Reason, SourceQueue,
#?MODULE{annotations = Anns0} = State,
Env)
state()) -> state().
record_death(Reason, SourceQueue, #?MODULE{annotations = Anns0} = State)
when is_atom(Reason) andalso
is_binary(SourceQueue) ->
Key = {SourceQueue, Reason},
Expand Down Expand Up @@ -438,25 +435,17 @@ record_death(Reason, SourceQueue,
<<"x-last-death-exchange">> => Exchange,
deaths := Deaths};
_ ->
Deaths = case Env of
#{?FF_MC_DEATHS_V2 := false} ->
#deaths{last = Key,
first = Key,
records = #{Key => NewDeath}};
_ ->
[{Key, NewDeath}]
end,
Anns0#{<<"x-first-death-reason">> => ReasonBin,
<<"x-first-death-queue">> => SourceQueue,
<<"x-first-death-exchange">> => Exchange,
<<"x-last-death-reason">> => ReasonBin,
<<"x-last-death-queue">> => SourceQueue,
<<"x-last-death-exchange">> => Exchange,
deaths => Deaths}
deaths => [{Key, NewDeath}]}
end,
State#?MODULE{annotations = Anns};
record_death(Reason, SourceQueue, BasicMsg, Env) ->
mc_compat:record_death(Reason, SourceQueue, BasicMsg, Env).
record_death(Reason, SourceQueue, BasicMsg) ->
mc_compat:record_death(Reason, SourceQueue, BasicMsg).

update_death(#death{count = Count,
anns = DeathAnns} = Death, Timestamp) ->
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/mc_compat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
protocol_state/1,
%serialize/1,
prepare/2,
record_death/4,
record_death/3,
is_death_cycle/2,
%deaths/1,
death_queue_names/1
Expand Down Expand Up @@ -166,7 +166,7 @@ prepare(store, Msg) ->
record_death(Reason, SourceQueue,
#basic_message{content = Content,
exchange_name = Exchange,
routing_keys = RoutingKeys} = Msg, _Env) ->
routing_keys = RoutingKeys} = Msg) ->
% HeadersFun1 = fun (H) -> lists:keydelete(<<"CC">>, 1, H) end,
ReasonBin = atom_to_binary(Reason),
TimeSec = os:system_time(seconds),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@
{message_containers_deaths_v2,
#{desc => "Bug fix for dead letter cycle detection",
doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/11159",
stability => stable,
stability => required,
depends_on => [message_containers]
}}).

Expand Down
6 changes: 1 addition & 5 deletions deps/rabbit/src/rabbit_dead_letter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK,
_ ->
[RK]
end,
Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of
true -> #{};
false -> #{?FF_MC_DEATHS_V2 => false}
end,
Msg1 = mc:record_death(Reason, SourceQName, Msg0, Env),
Msg1 = mc:record_death(Reason, SourceQName, Msg0),
{Ttl, Msg2} = mc:take_annotation(dead_letter_ttl, Msg1),
Msg3 = mc:set_ttl(Ttl, Msg2),
Msg4 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg3),
Expand Down
6 changes: 1 addition & 5 deletions deps/rabbit/src/rabbit_fifo_dlx_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason,
_ ->
[RKey]
end,
Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of
true -> #{};
false -> #{?FF_MC_DEATHS_V2 => false}
end,
Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg, Env),
Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg),
Msg1 = mc:set_ttl(undefined, Msg0),
Msg2 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg1),
Msg = mc:set_annotation(?ANN_EXCHANGE, DLXName, Msg2),
Expand Down
62 changes: 25 additions & 37 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -246,18 +246,11 @@ init_per_group(Group, Config) ->
rabbit_ct_client_helpers:setup_steps()).

end_per_group(_, Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_helpers:run_teardown_steps(
Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(T = dead_letter_reject, Config) ->
case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2) of
ok ->
rabbit_ct_helpers:testcase_started(Config, T);
_ ->
{skip, "This test is known to fail with feature flag message_containers_deaths_v2 disabled "
"due bug https://github.com/rabbitmq/rabbitmq-server/issues/11159"}
end;
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

Expand Down Expand Up @@ -5116,34 +5109,29 @@ dead_letter_headers_exchange(Config) ->
?assertEqual(<<"m2">>, amqp10_msg:body_bin(Msg2)),
?assertEqual(#{message_id => <<"my ID">>}, amqp10_msg:properties(Msg1)),
?assertEqual(0, maps:size(amqp10_msg:properties(Msg2))),
case rpc(Config, rabbit_feature_flags, is_enabled, [message_containers_deaths_v2]) of
true ->
?assertMatch(
#{<<"x-first-death-queue">> := QName1,
<<"x-first-death-exchange">> := <<>>,
<<"x-first-death-reason">> := <<"expired">>,
<<"x-last-death-queue">> := QName1,
<<"x-last-death-exchange">> := <<>>,
<<"x-last-death-reason">> := <<"expired">>,
<<"x-opt-deaths">> := {array,
map,
[{map,
[
{{symbol, <<"queue">>}, {utf8, QName1}},
{{symbol, <<"reason">>}, {symbol, <<"expired">>}},
{{symbol, <<"count">>}, {ulong, 1}},
{{symbol, <<"first-time">>}, {timestamp, Timestamp}},
{{symbol, <<"last-time">>}, {timestamp, Timestamp}},
{{symbol, <<"exchange">>},{utf8, <<>>}},
{{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName1}]}}
]}]}
} when is_integer(Timestamp) andalso
Timestamp > Now - 5000 andalso
Timestamp < Now + 5000,
amqp10_msg:message_annotations(Msg1));
false ->
ok
end,
?assertMatch(
#{<<"x-first-death-queue">> := QName1,
<<"x-first-death-exchange">> := <<>>,
<<"x-first-death-reason">> := <<"expired">>,
<<"x-last-death-queue">> := QName1,
<<"x-last-death-exchange">> := <<>>,
<<"x-last-death-reason">> := <<"expired">>,
<<"x-opt-deaths">> := {array,
map,
[{map,
[
{{symbol, <<"queue">>}, {utf8, QName1}},
{{symbol, <<"reason">>}, {symbol, <<"expired">>}},
{{symbol, <<"count">>}, {ulong, 1}},
{{symbol, <<"first-time">>}, {timestamp, Timestamp}},
{{symbol, <<"last-time">>}, {timestamp, Timestamp}},
{{symbol, <<"exchange">>},{utf8, <<>>}},
{{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName1}]}}
]}]}
} when is_integer(Timestamp) andalso
Timestamp > Now - 5000 andalso
Timestamp < Now + 5000,
amqp10_msg:message_annotations(Msg1)),

%% We expect M3 and M4 to get dropped.
receive Unexp -> ct:fail({unexpected, Unexp})
Expand Down
11 changes: 0 additions & 11 deletions deps/rabbit/test/dead_lettering_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -176,18 +176,7 @@ end_per_group(Group, Config) ->
Config
end.

init_per_testcase(T, Config)
when T =:= dead_letter_reject_expire_expire orelse
T =:= stream ->
%% With feature flag message_containers_deaths_v2 disabled, test case:
%% * dead_letter_reject_expire_expire is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11159
%% * stream is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11173
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2),
init_per_testcase0(T, Config);
init_per_testcase(Testcase, Config) ->
init_per_testcase0(Testcase, Config).

init_per_testcase0(Testcase, Config) ->
Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])),
Expand Down
48 changes: 18 additions & 30 deletions deps/rabbit/test/mc_unit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ all_tests() ->
amqpl_compat,
amqpl_table_x_header,
amqpl_table_x_header_array_of_tbls,
amqpl_death_v1_records,
amqpl_death_v2_records,
amqpl_death_records,
is_death_cycle,
amqpl_amqp_bin_amqpl,
amqpl_cc_amqp_bin_amqpl,
Expand Down Expand Up @@ -198,50 +197,39 @@ amqpl_table_x_header_array_of_tbls(_Config) ->
]},
mc:x_header(<<"x-fruit">>, Msg)).

amqpl_death_v1_records(_Config) ->
ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => false}).

amqpl_death_v2_records(_Config) ->
ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => true}).

amqpl_death_records(Env) ->
amqpl_death_records(_Config) ->
Content = #content{class_id = 60,
properties = #'P_basic'{headers = []},
payload_fragments_rev = [<<"data">>]},
Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content, annotations())),

Msg1 = mc:record_death(rejected, <<"q1">>, Msg0, Env),
?assertEqual([<<"q1">>], mc:death_queue_names(Msg1)),
?assertEqual(false, mc:is_death_cycle(<<"q1">>, Msg1)),
Msg1 = mc:record_death(rejected, <<"q">>, Msg0),
?assertEqual([<<"q">>], mc:death_queue_names(Msg1)),
?assertEqual(false, mc:is_death_cycle(<<"q">>, Msg1)),

#content{properties = #'P_basic'{headers = H1}} = mc:protocol_state(Msg1),
?assertMatch({_, array, [_]}, header(<<"x-death">>, H1)),
?assertMatch({_, longstr, <<"q1">>}, header(<<"x-first-death-queue">>, H1)),
?assertMatch({_, longstr, <<"q">>}, header(<<"x-first-death-queue">>, H1)),
?assertMatch({_, longstr, <<"exch">>}, header(<<"x-first-death-exchange">>, H1)),
?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-first-death-reason">>, H1)),
?assertMatch({_, longstr, <<"q1">>}, header(<<"x-last-death-queue">>, H1)),
?assertMatch({_, longstr, <<"q">>}, header(<<"x-last-death-queue">>, H1)),
?assertMatch({_, longstr, <<"exch">>}, header(<<"x-last-death-exchange">>, H1)),
?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-last-death-reason">>, H1)),
{_, array, [{table, T1}]} = header(<<"x-death">>, H1),
?assertMatch({_, long, 1}, header(<<"count">>, T1)),
?assertMatch({_, longstr, <<"rejected">>}, header(<<"reason">>, T1)),
?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T1)),
?assertMatch({_, longstr, <<"q">>}, header(<<"queue">>, T1)),
?assertMatch({_, longstr, <<"exch">>}, header(<<"exchange">>, T1)),
?assertMatch({_, timestamp, _}, header(<<"time">>, T1)),
?assertMatch({_, array, [{longstr, <<"apple">>}]}, header(<<"routing-keys">>, T1)),


%% second dead letter, e.g. an expired reason returning to source queue

%% record_death uses a timestamp for death record ordering, ensure
%% it is definitely higher than the last timestamp taken
timer:sleep(2),
Msg2 = mc:record_death(expired, <<"dl">>, Msg1, Env),

%% 2nd dead letter, e.g. an expired reason
Msg2 = mc:record_death(expired, <<"dead letter queue">>, Msg1),
#content{properties = #'P_basic'{headers = H2}} = mc:protocol_state(Msg2),
%% We expect deaths to be ordered by recency
{_, array, [{table, T2a}, {table, T2b}]} = header(<<"x-death">>, H2),
?assertMatch({_, longstr, <<"dl">>}, header(<<"queue">>, T2a)),
?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T2b)),
?assertMatch({_, longstr, <<"dead letter queue">>}, header(<<"queue">>, T2a)),
?assertMatch({_, longstr, <<"q">>}, header(<<"queue">>, T2b)),
ok.

is_death_cycle(_Config) ->
Expand All @@ -254,29 +242,29 @@ is_death_cycle(_Config) ->
%% Q1 --rejected--> Q2 --expired--> Q3 --expired-->
%% Q1 --rejected--> Q2 --expired--> Q3

Msg1 = mc:record_death(rejected, <<"q1">>, Msg0, #{}),
Msg1 = mc:record_death(rejected, <<"q1">>, Msg0),
?assertNot(mc:is_death_cycle(<<"q1">>, Msg1),
"A queue that dead letters to itself due to rejected is not considered a cycle."),
?assertNot(mc:is_death_cycle(<<"q2">>, Msg1)),
?assertNot(mc:is_death_cycle(<<"q3">>, Msg1)),

Msg2 = mc:record_death(expired, <<"q2">>, Msg1, #{}),
Msg2 = mc:record_death(expired, <<"q2">>, Msg1),
?assertNot(mc:is_death_cycle(<<"q1">>, Msg2)),
?assert(mc:is_death_cycle(<<"q2">>, Msg2),
"A queue that dead letters to itself due to expired is considered a cycle."),
?assertNot(mc:is_death_cycle(<<"q3">>, Msg2)),

Msg3 = mc:record_death(expired, <<"q3">>, Msg2, #{}),
Msg3 = mc:record_death(expired, <<"q3">>, Msg2),
?assertNot(mc:is_death_cycle(<<"q1">>, Msg3)),
?assert(mc:is_death_cycle(<<"q2">>, Msg3)),
?assert(mc:is_death_cycle(<<"q3">>, Msg3)),

Msg4 = mc:record_death(rejected, <<"q1">>, Msg3, #{}),
Msg4 = mc:record_death(rejected, <<"q1">>, Msg3),
?assertNot(mc:is_death_cycle(<<"q1">>, Msg4)),
?assertNot(mc:is_death_cycle(<<"q2">>, Msg4)),
?assertNot(mc:is_death_cycle(<<"q3">>, Msg4)),

Msg5 = mc:record_death(expired, <<"q2">>, Msg4, #{}),
Msg5 = mc:record_death(expired, <<"q2">>, Msg4),
?assertNot(mc:is_death_cycle(<<"q1">>, Msg5)),
?assert(mc:is_death_cycle(<<"q2">>, Msg5)),
?assertNot(mc:is_death_cycle(<<"q3">>, Msg5)),
Expand Down
Loading
Loading