Skip to content

Commit 1547718

Browse files
authored
Merge pull request #14736 from rabbitmq/require-ff-message_containers_deaths_v2
Require feature flag `message_containers_deaths_v2`
2 parents e9f1cae + cb53a21 commit 1547718

File tree

11 files changed

+55
-235
lines changed

11 files changed

+55
-235
lines changed

deps/rabbit/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_rec
274274

275275
PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue
276276
PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_online_and_offline logging lqueue maintenance_mode rabbit_fifo_q
277-
PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_containers_deaths_v2 message_size_limit metadata_store_migration
277+
PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_size_limit metadata_store_migration
278278
PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor
279279

280280
PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_msg_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue

deps/rabbit/include/mc.hrl

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
-define(ANN_DURABLE, d).
1414
-define(ANN_PRIORITY, p).
1515

16-
-define(FF_MC_DEATHS_V2, message_containers_deaths_v2).
17-
1816
-type death_key() :: {SourceQueue :: rabbit_misc:resource_name(), rabbit_dead_letter:reason()}.
1917
-type death_anns() :: #{%% timestamp of the first time this message
2018
%% was dead lettered from this queue for this reason

deps/rabbit/src/mc.erl

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
convert/3,
3636
protocol_state/1,
3737
prepare/2,
38-
record_death/4,
38+
record_death/3,
3939
is_death_cycle/2,
4040
death_queue_names/1
4141
]).
@@ -382,11 +382,8 @@ protocol_state(BasicMsg) ->
382382

383383
-spec record_death(rabbit_dead_letter:reason(),
384384
rabbit_misc:resource_name(),
385-
state(),
386-
environment()) -> state().
387-
record_death(Reason, SourceQueue,
388-
#?MODULE{annotations = Anns0} = State,
389-
Env)
385+
state()) -> state().
386+
record_death(Reason, SourceQueue, #?MODULE{annotations = Anns0} = State)
390387
when is_atom(Reason) andalso
391388
is_binary(SourceQueue) ->
392389
Key = {SourceQueue, Reason},
@@ -438,25 +435,17 @@ record_death(Reason, SourceQueue,
438435
<<"x-last-death-exchange">> => Exchange,
439436
deaths := Deaths};
440437
_ ->
441-
Deaths = case Env of
442-
#{?FF_MC_DEATHS_V2 := false} ->
443-
#deaths{last = Key,
444-
first = Key,
445-
records = #{Key => NewDeath}};
446-
_ ->
447-
[{Key, NewDeath}]
448-
end,
449438
Anns0#{<<"x-first-death-reason">> => ReasonBin,
450439
<<"x-first-death-queue">> => SourceQueue,
451440
<<"x-first-death-exchange">> => Exchange,
452441
<<"x-last-death-reason">> => ReasonBin,
453442
<<"x-last-death-queue">> => SourceQueue,
454443
<<"x-last-death-exchange">> => Exchange,
455-
deaths => Deaths}
444+
deaths => [{Key, NewDeath}]}
456445
end,
457446
State#?MODULE{annotations = Anns};
458-
record_death(Reason, SourceQueue, BasicMsg, Env) ->
459-
mc_compat:record_death(Reason, SourceQueue, BasicMsg, Env).
447+
record_death(Reason, SourceQueue, BasicMsg) ->
448+
mc_compat:record_death(Reason, SourceQueue, BasicMsg).
460449

461450
update_death(#death{count = Count,
462451
anns = DeathAnns} = Death, Timestamp) ->

deps/rabbit/src/mc_compat.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
protocol_state/1,
2929
%serialize/1,
3030
prepare/2,
31-
record_death/4,
31+
record_death/3,
3232
is_death_cycle/2,
3333
%deaths/1,
3434
death_queue_names/1
@@ -166,7 +166,7 @@ prepare(store, Msg) ->
166166
record_death(Reason, SourceQueue,
167167
#basic_message{content = Content,
168168
exchange_name = Exchange,
169-
routing_keys = RoutingKeys} = Msg, _Env) ->
169+
routing_keys = RoutingKeys} = Msg) ->
170170
% HeadersFun1 = fun (H) -> lists:keydelete(<<"CC">>, 1, H) end,
171171
ReasonBin = atom_to_binary(Reason),
172172
TimeSec = os:system_time(seconds),

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@
187187
{message_containers_deaths_v2,
188188
#{desc => "Bug fix for dead letter cycle detection",
189189
doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/11159",
190-
stability => stable,
190+
stability => required,
191191
depends_on => [message_containers]
192192
}}).
193193

deps/rabbit/src/rabbit_dead_letter.erl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,7 @@ publish(Msg0, Reason, #exchange{name = XName} = DLX, RK,
3232
_ ->
3333
[RK]
3434
end,
35-
Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of
36-
true -> #{};
37-
false -> #{?FF_MC_DEATHS_V2 => false}
38-
end,
39-
Msg1 = mc:record_death(Reason, SourceQName, Msg0, Env),
35+
Msg1 = mc:record_death(Reason, SourceQName, Msg0),
4036
{Ttl, Msg2} = mc:take_annotation(dead_letter_ttl, Msg1),
4137
Msg3 = mc:set_ttl(Ttl, Msg2),
4238
Msg4 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg3),

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,11 +325,7 @@ forward(ConsumedMsg, ConsumedMsgId, ConsumedQRef, DLX, Reason,
325325
_ ->
326326
[RKey]
327327
end,
328-
Env = case rabbit_feature_flags:is_enabled(?FF_MC_DEATHS_V2) of
329-
true -> #{};
330-
false -> #{?FF_MC_DEATHS_V2 => false}
331-
end,
332-
Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg, Env),
328+
Msg0 = mc:record_death(Reason, SourceQName, ConsumedMsg),
333329
Msg1 = mc:set_ttl(undefined, Msg0),
334330
Msg2 = mc:set_annotation(?ANN_ROUTING_KEYS, DLRKeys, Msg1),
335331
Msg = mc:set_annotation(?ANN_EXCHANGE, DLXName, Msg2),

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 25 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -246,18 +246,11 @@ init_per_group(Group, Config) ->
246246
rabbit_ct_client_helpers:setup_steps()).
247247

248248
end_per_group(_, Config) ->
249-
rabbit_ct_helpers:run_teardown_steps(Config,
249+
rabbit_ct_helpers:run_teardown_steps(
250+
Config,
250251
rabbit_ct_client_helpers:teardown_steps() ++
251252
rabbit_ct_broker_helpers:teardown_steps()).
252253

253-
init_per_testcase(T = dead_letter_reject, Config) ->
254-
case rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2) of
255-
ok ->
256-
rabbit_ct_helpers:testcase_started(Config, T);
257-
_ ->
258-
{skip, "This test is known to fail with feature flag message_containers_deaths_v2 disabled "
259-
"due bug https://github.com/rabbitmq/rabbitmq-server/issues/11159"}
260-
end;
261254
init_per_testcase(Testcase, Config) ->
262255
rabbit_ct_helpers:testcase_started(Config, Testcase).
263256

@@ -5116,34 +5109,29 @@ dead_letter_headers_exchange(Config) ->
51165109
?assertEqual(<<"m2">>, amqp10_msg:body_bin(Msg2)),
51175110
?assertEqual(#{message_id => <<"my ID">>}, amqp10_msg:properties(Msg1)),
51185111
?assertEqual(0, maps:size(amqp10_msg:properties(Msg2))),
5119-
case rpc(Config, rabbit_feature_flags, is_enabled, [message_containers_deaths_v2]) of
5120-
true ->
5121-
?assertMatch(
5122-
#{<<"x-first-death-queue">> := QName1,
5123-
<<"x-first-death-exchange">> := <<>>,
5124-
<<"x-first-death-reason">> := <<"expired">>,
5125-
<<"x-last-death-queue">> := QName1,
5126-
<<"x-last-death-exchange">> := <<>>,
5127-
<<"x-last-death-reason">> := <<"expired">>,
5128-
<<"x-opt-deaths">> := {array,
5129-
map,
5130-
[{map,
5131-
[
5132-
{{symbol, <<"queue">>}, {utf8, QName1}},
5133-
{{symbol, <<"reason">>}, {symbol, <<"expired">>}},
5134-
{{symbol, <<"count">>}, {ulong, 1}},
5135-
{{symbol, <<"first-time">>}, {timestamp, Timestamp}},
5136-
{{symbol, <<"last-time">>}, {timestamp, Timestamp}},
5137-
{{symbol, <<"exchange">>},{utf8, <<>>}},
5138-
{{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName1}]}}
5139-
]}]}
5140-
} when is_integer(Timestamp) andalso
5141-
Timestamp > Now - 5000 andalso
5142-
Timestamp < Now + 5000,
5143-
amqp10_msg:message_annotations(Msg1));
5144-
false ->
5145-
ok
5146-
end,
5112+
?assertMatch(
5113+
#{<<"x-first-death-queue">> := QName1,
5114+
<<"x-first-death-exchange">> := <<>>,
5115+
<<"x-first-death-reason">> := <<"expired">>,
5116+
<<"x-last-death-queue">> := QName1,
5117+
<<"x-last-death-exchange">> := <<>>,
5118+
<<"x-last-death-reason">> := <<"expired">>,
5119+
<<"x-opt-deaths">> := {array,
5120+
map,
5121+
[{map,
5122+
[
5123+
{{symbol, <<"queue">>}, {utf8, QName1}},
5124+
{{symbol, <<"reason">>}, {symbol, <<"expired">>}},
5125+
{{symbol, <<"count">>}, {ulong, 1}},
5126+
{{symbol, <<"first-time">>}, {timestamp, Timestamp}},
5127+
{{symbol, <<"last-time">>}, {timestamp, Timestamp}},
5128+
{{symbol, <<"exchange">>},{utf8, <<>>}},
5129+
{{symbol, <<"routing-keys">>}, {array, utf8, [{utf8, QName1}]}}
5130+
]}]}
5131+
} when is_integer(Timestamp) andalso
5132+
Timestamp > Now - 5000 andalso
5133+
Timestamp < Now + 5000,
5134+
amqp10_msg:message_annotations(Msg1)),
51475135

51485136
%% We expect M3 and M4 to get dropped.
51495137
receive Unexp -> ct:fail({unexpected, Unexp})

deps/rabbit/test/dead_lettering_SUITE.erl

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -176,18 +176,7 @@ end_per_group(Group, Config) ->
176176
Config
177177
end.
178178

179-
init_per_testcase(T, Config)
180-
when T =:= dead_letter_reject_expire_expire orelse
181-
T =:= stream ->
182-
%% With feature flag message_containers_deaths_v2 disabled, test case:
183-
%% * dead_letter_reject_expire_expire is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11159
184-
%% * stream is known to fail due to https://github.com/rabbitmq/rabbitmq-server/issues/11173
185-
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, message_containers_deaths_v2),
186-
init_per_testcase0(T, Config);
187179
init_per_testcase(Testcase, Config) ->
188-
init_per_testcase0(Testcase, Config).
189-
190-
init_per_testcase0(Testcase, Config) ->
191180
Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
192181
Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
193182
Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])),

deps/rabbit/test/mc_unit_SUITE.erl

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ all_tests() ->
2929
amqpl_compat,
3030
amqpl_table_x_header,
3131
amqpl_table_x_header_array_of_tbls,
32-
amqpl_death_v1_records,
33-
amqpl_death_v2_records,
32+
amqpl_death_records,
3433
is_death_cycle,
3534
amqpl_amqp_bin_amqpl,
3635
amqpl_cc_amqp_bin_amqpl,
@@ -198,50 +197,39 @@ amqpl_table_x_header_array_of_tbls(_Config) ->
198197
]},
199198
mc:x_header(<<"x-fruit">>, Msg)).
200199

201-
amqpl_death_v1_records(_Config) ->
202-
ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => false}).
203-
204-
amqpl_death_v2_records(_Config) ->
205-
ok = amqpl_death_records(#{?FF_MC_DEATHS_V2 => true}).
206-
207-
amqpl_death_records(Env) ->
200+
amqpl_death_records(_Config) ->
208201
Content = #content{class_id = 60,
209202
properties = #'P_basic'{headers = []},
210203
payload_fragments_rev = [<<"data">>]},
211204
Msg0 = mc:prepare(store, mc:init(mc_amqpl, Content, annotations())),
212205

213-
Msg1 = mc:record_death(rejected, <<"q1">>, Msg0, Env),
214-
?assertEqual([<<"q1">>], mc:death_queue_names(Msg1)),
215-
?assertEqual(false, mc:is_death_cycle(<<"q1">>, Msg1)),
206+
Msg1 = mc:record_death(rejected, <<"q">>, Msg0),
207+
?assertEqual([<<"q">>], mc:death_queue_names(Msg1)),
208+
?assertEqual(false, mc:is_death_cycle(<<"q">>, Msg1)),
216209

217210
#content{properties = #'P_basic'{headers = H1}} = mc:protocol_state(Msg1),
218211
?assertMatch({_, array, [_]}, header(<<"x-death">>, H1)),
219-
?assertMatch({_, longstr, <<"q1">>}, header(<<"x-first-death-queue">>, H1)),
212+
?assertMatch({_, longstr, <<"q">>}, header(<<"x-first-death-queue">>, H1)),
220213
?assertMatch({_, longstr, <<"exch">>}, header(<<"x-first-death-exchange">>, H1)),
221214
?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-first-death-reason">>, H1)),
222-
?assertMatch({_, longstr, <<"q1">>}, header(<<"x-last-death-queue">>, H1)),
215+
?assertMatch({_, longstr, <<"q">>}, header(<<"x-last-death-queue">>, H1)),
223216
?assertMatch({_, longstr, <<"exch">>}, header(<<"x-last-death-exchange">>, H1)),
224217
?assertMatch({_, longstr, <<"rejected">>}, header(<<"x-last-death-reason">>, H1)),
225218
{_, array, [{table, T1}]} = header(<<"x-death">>, H1),
226219
?assertMatch({_, long, 1}, header(<<"count">>, T1)),
227220
?assertMatch({_, longstr, <<"rejected">>}, header(<<"reason">>, T1)),
228-
?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T1)),
221+
?assertMatch({_, longstr, <<"q">>}, header(<<"queue">>, T1)),
229222
?assertMatch({_, longstr, <<"exch">>}, header(<<"exchange">>, T1)),
230223
?assertMatch({_, timestamp, _}, header(<<"time">>, T1)),
231224
?assertMatch({_, array, [{longstr, <<"apple">>}]}, header(<<"routing-keys">>, T1)),
232225

233-
234-
%% second dead letter, e.g. an expired reason returning to source queue
235-
236-
%% record_death uses a timestamp for death record ordering, ensure
237-
%% it is definitely higher than the last timestamp taken
238-
timer:sleep(2),
239-
Msg2 = mc:record_death(expired, <<"dl">>, Msg1, Env),
240-
226+
%% 2nd dead letter, e.g. an expired reason
227+
Msg2 = mc:record_death(expired, <<"dead letter queue">>, Msg1),
241228
#content{properties = #'P_basic'{headers = H2}} = mc:protocol_state(Msg2),
229+
%% We expect deaths to be ordered by recency
242230
{_, array, [{table, T2a}, {table, T2b}]} = header(<<"x-death">>, H2),
243-
?assertMatch({_, longstr, <<"dl">>}, header(<<"queue">>, T2a)),
244-
?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T2b)),
231+
?assertMatch({_, longstr, <<"dead letter queue">>}, header(<<"queue">>, T2a)),
232+
?assertMatch({_, longstr, <<"q">>}, header(<<"queue">>, T2b)),
245233
ok.
246234

247235
is_death_cycle(_Config) ->
@@ -254,29 +242,29 @@ is_death_cycle(_Config) ->
254242
%% Q1 --rejected--> Q2 --expired--> Q3 --expired-->
255243
%% Q1 --rejected--> Q2 --expired--> Q3
256244

257-
Msg1 = mc:record_death(rejected, <<"q1">>, Msg0, #{}),
245+
Msg1 = mc:record_death(rejected, <<"q1">>, Msg0),
258246
?assertNot(mc:is_death_cycle(<<"q1">>, Msg1),
259247
"A queue that dead letters to itself due to rejected is not considered a cycle."),
260248
?assertNot(mc:is_death_cycle(<<"q2">>, Msg1)),
261249
?assertNot(mc:is_death_cycle(<<"q3">>, Msg1)),
262250

263-
Msg2 = mc:record_death(expired, <<"q2">>, Msg1, #{}),
251+
Msg2 = mc:record_death(expired, <<"q2">>, Msg1),
264252
?assertNot(mc:is_death_cycle(<<"q1">>, Msg2)),
265253
?assert(mc:is_death_cycle(<<"q2">>, Msg2),
266254
"A queue that dead letters to itself due to expired is considered a cycle."),
267255
?assertNot(mc:is_death_cycle(<<"q3">>, Msg2)),
268256

269-
Msg3 = mc:record_death(expired, <<"q3">>, Msg2, #{}),
257+
Msg3 = mc:record_death(expired, <<"q3">>, Msg2),
270258
?assertNot(mc:is_death_cycle(<<"q1">>, Msg3)),
271259
?assert(mc:is_death_cycle(<<"q2">>, Msg3)),
272260
?assert(mc:is_death_cycle(<<"q3">>, Msg3)),
273261

274-
Msg4 = mc:record_death(rejected, <<"q1">>, Msg3, #{}),
262+
Msg4 = mc:record_death(rejected, <<"q1">>, Msg3),
275263
?assertNot(mc:is_death_cycle(<<"q1">>, Msg4)),
276264
?assertNot(mc:is_death_cycle(<<"q2">>, Msg4)),
277265
?assertNot(mc:is_death_cycle(<<"q3">>, Msg4)),
278266

279-
Msg5 = mc:record_death(expired, <<"q2">>, Msg4, #{}),
267+
Msg5 = mc:record_death(expired, <<"q2">>, Msg4),
280268
?assertNot(mc:is_death_cycle(<<"q1">>, Msg5)),
281269
?assert(mc:is_death_cycle(<<"q2">>, Msg5)),
282270
?assertNot(mc:is_death_cycle(<<"q3">>, Msg5)),

0 commit comments

Comments
 (0)