Skip to content

Commit c5fd19d

Browse files
committed
Make channel_operation_timeout_test_queue more resilient
Now the CQ state can be changed without breaking this test all the time. Only the #msg_status{} record remains a potential problem but thankfully it does not change often.
1 parent d0e6739 commit c5fd19d

File tree

2 files changed

+15
-146
lines changed

2 files changed

+15
-146
lines changed

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
-export([start/2, stop/1]).
2323

2424
%% exported for testing only
25-
-export([start_msg_store/3, stop_msg_store/1]).
25+
-export([start_msg_store/3, stop_msg_store/1, ram_pending_acks/1]).
2626

2727
-include("mc.hrl").
2828
-include_lib("stdlib/include/qlc.hrl").
@@ -1584,6 +1584,9 @@ prepare_to_store(Msg) ->
15841584
%% Internal gubbins for acks
15851585
%%----------------------------------------------------------------------------
15861586

1587+
ram_pending_acks(#vqstate{ ram_pending_ack = RPA }) ->
1588+
RPA.
1589+
15871590
record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus,
15881591
State = #vqstate { ram_pending_ack = RPA,
15891592
disk_pending_ack = DPA,

deps/rabbit/test/channel_operation_timeout_test_queue.erl

Lines changed: 11 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
66
%%
77

8-
%% @todo This module also needs to be updated when variable queue changes.
98
-module(channel_operation_timeout_test_queue).
109

1110
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
@@ -28,69 +27,11 @@
2827
%% the test message has been published, and is awaiting acknowledgement in the
2928
%% queue index. Test message is "timeout_test_msg!".
3029
%%
30+
%% Only access the #msg_status{} record needs to be updated if it changes.
3131
%%----------------------------------------------------------------------------
3232

3333
-behaviour(rabbit_backing_queue).
3434

35-
-record(vqstate,
36-
{ q_head,
37-
q_tail,
38-
next_seq_id,
39-
%% seq_id() of first undelivered message
40-
%% everything before this seq_id() was delivered at least once
41-
next_deliver_seq_id,
42-
ram_pending_ack, %% msgs still in RAM
43-
disk_pending_ack, %% msgs in store, paged out
44-
index_state,
45-
store_state,
46-
msg_store_clients,
47-
durable,
48-
transient_threshold,
49-
qi_embed_msgs_below,
50-
51-
bytes, %% w/o unacked
52-
unacked_bytes,
53-
persistent_count, %% w unacked
54-
persistent_bytes, %% w unacked
55-
56-
ram_msg_count, %% w/o unacked
57-
ram_msg_count_prev,
58-
ram_ack_count_prev,
59-
ram_bytes, %% w unacked
60-
out_counter,
61-
in_counter,
62-
rates,
63-
%% There are two confirms paths: either store/index produce confirms
64-
%% separately (v2 with per-vhost message store) or the confirms
65-
%% are produced all at once while syncing/flushing (v2 with per-queue
66-
%% message store). The latter is more efficient as it avoids many
67-
%% sets operations.
68-
msgs_on_disk,
69-
msg_indices_on_disk,
70-
unconfirmed,
71-
confirmed,
72-
ack_out_counter,
73-
ack_in_counter,
74-
%% Unlike the other counters these two do not feed into
75-
%% #rates{} and get reset
76-
disk_read_count,
77-
disk_write_count,
78-
79-
%% Fast path for confirms handling. Instead of having
80-
%% index/store keep track of confirms separately and
81-
%% doing intersect/subtract/union we just put the messages
82-
%% here and on sync move them to 'confirmed'.
83-
%%
84-
%% Note: This field used to be 'memory_reduction_run_count'.
85-
unconfirmed_simple,
86-
%% Queue data is grouped by VHost. We need to store it
87-
%% to work with queue index.
88-
virtual_host,
89-
waiting_bump = false
90-
}).
91-
92-
-record(rates, { in, out, ack_in, ack_out, timestamp }).
93-
9435
-record(msg_status,
9536
{ seq_id,
9637
msg_id,
@@ -103,79 +44,9 @@
10344
msg_props
10445
}).
10546

106-
-record(q_tail,
107-
{ start_seq_id, %% start_seq_id is inclusive
108-
count,
109-
end_seq_id %% end_seq_id is exclusive
110-
}).
111-
112-
11347
-include_lib("rabbit_common/include/rabbit.hrl").
114-
-define(QUEUE, lqueue).
11548
-define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>).
11649

117-
%%----------------------------------------------------------------------------
118-
119-
-type seq_id() :: non_neg_integer().
120-
121-
-type rates() :: #rates { in :: float(),
122-
out :: float(),
123-
ack_in :: float(),
124-
ack_out :: float(),
125-
timestamp :: rabbit_types:timestamp()}.
126-
127-
-type q_tail() :: #q_tail { start_seq_id :: non_neg_integer(),
128-
count :: non_neg_integer(),
129-
end_seq_id :: non_neg_integer() }.
130-
131-
%% The compiler (rightfully) complains that ack() and state() are
132-
%% unused. For this reason we duplicate a -spec from
133-
%% rabbit_backing_queue with the only intent being to remove
134-
%% warnings. The problem here is that we can't parameterise the BQ
135-
%% behaviour by these two types as we would like to. We still leave
136-
%% these here for documentation purposes.
137-
-type ack() :: seq_id().
138-
-type state() :: #vqstate {
139-
q_head :: ?QUEUE:?QUEUE(),
140-
q_tail :: q_tail(),
141-
next_seq_id :: seq_id(),
142-
next_deliver_seq_id :: seq_id(),
143-
ram_pending_ack :: map(),
144-
disk_pending_ack :: map(),
145-
index_state :: any(),
146-
store_state :: any(),
147-
msg_store_clients :: 'undefined' | {{any(), binary()},
148-
{any(), binary()}},
149-
durable :: boolean(),
150-
transient_threshold :: non_neg_integer(),
151-
qi_embed_msgs_below :: non_neg_integer(),
152-
153-
bytes :: non_neg_integer(),
154-
unacked_bytes :: non_neg_integer(),
155-
persistent_count :: non_neg_integer(),
156-
persistent_bytes :: non_neg_integer(),
157-
158-
ram_msg_count :: non_neg_integer(),
159-
ram_msg_count_prev :: non_neg_integer(),
160-
ram_ack_count_prev :: non_neg_integer(),
161-
ram_bytes :: non_neg_integer(),
162-
out_counter :: non_neg_integer(),
163-
in_counter :: non_neg_integer(),
164-
rates :: rates(),
165-
msgs_on_disk :: gb_sets:set(),
166-
msg_indices_on_disk :: gb_sets:set(),
167-
unconfirmed :: gb_sets:set(),
168-
confirmed :: gb_sets:set(),
169-
ack_out_counter :: non_neg_integer(),
170-
ack_in_counter :: non_neg_integer(),
171-
disk_read_count :: non_neg_integer(),
172-
disk_write_count :: non_neg_integer(),
173-
174-
unconfirmed_simple :: sets:set()}.
175-
176-
%% Duplicated from rabbit_backing_queue
177-
-spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
178-
17950
%%----------------------------------------------------------------------------
18051
%% Public API
18152
%%----------------------------------------------------------------------------
@@ -198,12 +69,8 @@ delete_and_terminate(Reason, State) ->
19869
delete_crashed(Q) ->
19970
rabbit_variable_queue:delete_crashed(Q).
20071

201-
purge(State = #vqstate { ram_pending_ack= QPA }) ->
202-
maybe_delay(QPA),
203-
rabbit_variable_queue:purge(State);
204-
%% For v4.2.x because the state has changed.
20572
purge(State) ->
206-
QPA = element(9, State),
73+
QPA = ram_pending_acks(State),
20774
maybe_delay(QPA),
20875
rabbit_variable_queue:purge(State).
20976

@@ -236,24 +103,16 @@ drop(AckRequired, State) ->
236103
ack(List, State) ->
237104
rabbit_variable_queue:ack(List, State).
238105

239-
requeue(AckTags, #vqstate { ram_pending_ack = QPA } = State) ->
240-
maybe_delay(QPA),
241-
rabbit_variable_queue:requeue(AckTags, State);
242-
%% For v4.2.x because the state has changed.
243106
requeue(AckTags, State) ->
244-
QPA = element(9, State),
107+
QPA = ram_pending_acks(State),
245108
maybe_delay(QPA),
246109
rabbit_variable_queue:requeue(AckTags, State).
247110

248111
ackfold(MsgFun, Acc, State, AckTags) ->
249112
rabbit_variable_queue:ackfold(MsgFun, Acc, State, AckTags).
250113

251-
len(#vqstate { ram_pending_ack = QPA } = State) ->
252-
maybe_delay(QPA),
253-
rabbit_variable_queue:len(State);
254-
%% For v4.2.x because the state has changed.
255114
len(State) ->
256-
QPA = element(9, State),
115+
QPA = ram_pending_acks(State),
257116
maybe_delay(QPA),
258117
rabbit_variable_queue:len(State).
259118

@@ -298,6 +157,13 @@ set_queue_mode(_, State) ->
298157
set_queue_version(_, State) ->
299158
State.
300159

160+
ram_pending_acks(State) ->
161+
case erlang:function_exported(rabbit_variable_queue, ram_pending_acks, 1) of
162+
true -> rabbit_variable_queue:ram_pending_acks(State);
163+
%% For v4.2.x because the state has changed.
164+
false -> element(9, State)
165+
end.
166+
301167
%% Delay
302168
maybe_delay(QPA) ->
303169
%% The structure for ram_pending_acks has changed to maps in 3.12.

0 commit comments

Comments
 (0)