Skip to content

Commit a918ce5

Browse files
committed
CQ: Tune reads from index based on message rate
We want to reduce the number of times we cross over to the next segment when reading. We do this by computing a threshold based on read rate, then reading a little more if we are close enough to the end of the current segment, and a little less if we are not reading enough into the next segment.
1 parent 5892211 commit a918ce5

File tree

3 files changed

+100
-57
lines changed

3 files changed

+100
-57
lines changed

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
-module(rabbit_classic_queue_index_v2).
99

1010
-export([erase/1, init/1, reset_state/1, recover/4,
11-
bounds/2, next_segment_boundary/1, info/1,
11+
bounds/2, tune_read/2, info/1,
1212
terminate/3, delete_and_terminate/1,
1313
publish/7, ack/2, read/3,
1414
sync/1, needs_sync/1]).
@@ -757,10 +757,11 @@ ack_delete_fold_fun(SeqId, Write, {Buffer, Updates, Deletes, SegmentEntryCount})
757757
Deletes, SegmentEntryCount}
758758
end.
759759

760-
%% A better interface for read/3 would be to request a maximum
761-
%% of N messages, rather than first call next_segment_boundary/3
762-
%% and then read from S1 to S2. This function could then return
763-
%% either N messages or less depending on the current state.
760+
%% Before calling read/3 it is recommended to call tune_read/2
761+
%% so that the index can inform the queue how to most efficiently
762+
%% read messages, as the index has knowledge of segment boundaries
763+
%% and can decide whether it is worth it to read from one or two
764+
%% segments at a time.
764765

765766
-spec read(rabbit_variable_queue:seq_id(),
766767
rabbit_variable_queue:seq_id(),
@@ -1064,13 +1065,62 @@ bounds(State = #qi{ segments = Segments }, NextSeqIdHint) ->
10641065
State}
10651066
end.
10661067

1068+
%% We tune the read request so that we only cross over segment
1069+
%% boundaries when it makes sense. We compute a threshold
1070+
%% based on the number of messages requested, and look whether
1071+
%% the number of messages to be read from the next segment is
1072+
%% higher than the threshold, otherwise we stop at the segment
1073+
%% boundary. Similarly, if the number of messages requested
1074+
%% almost reaches the segment boundary, we read a few more
1075+
%% messages up to the segment boundary.
1076+
%%
1077+
%% This is meant to reduce the number of file reads when the
1078+
%% outgoing message rate is moderate, while still sticking to
1079+
%% segment file boundaries when the message rate is maxed.
1080+
1081+
-spec tune_read(rabbit_variable_queue:seq_id(), rabbit_variable_queue:seq_id())
1082+
-> rabbit_variable_queue:seq_id().
1083+
1084+
tune_read(FromSeqId, ToSeqId)
1085+
when FromSeqId =:= ToSeqId ->
1086+
%% Nothing will be read as From is inclusive but To is exclusive.
1087+
ToSeqId;
1088+
tune_read(FromSeqId, ToSeqId) ->
1089+
%% How much we are reading.
1090+
ReqCount = ToSeqId - FromSeqId,
1091+
%% How much remains in the current segment.
1092+
NextSeqId = next_segment_boundary(FromSeqId),
1093+
RemCount = NextSeqId - FromSeqId,
1094+
%% How much we are willing to accept as extra messages to read.
1095+
Threshold = max(1, ReqCount div 7),
1096+
if
1097+
%% There are messages remaining in the segment, and the number
1098+
%% of messages remaining is less than the threshold: read up
1099+
%% to the end of the segment (To is exclusive).
1100+
(RemCount >= ReqCount) andalso (RemCount - ReqCount =< Threshold) ->
1101+
NextSeqId;
1102+
%% There are messages remaining in the segment but the number
1103+
%% of messages remaining is more than we are willing to read:
1104+
%% only read what was originally requested.
1105+
(RemCount >= ReqCount) ->
1106+
ToSeqId;
1107+
%% We are requested to read past the end of the current segment.
1108+
%% This would require us to read from two different segments,
1109+
%% which we only want to do if this involves a good number of
1110+
%% messages. If this number is below the threshold, we reduce
1111+
%% the number of messages to read.
1112+
(ReqCount - RemCount =< Threshold) ->
1113+
NextSeqId;
1114+
%% Otherwise we cross over into the next segment, meaning we
1115+
%% only read what was originally requested.
1116+
true ->
1117+
ToSeqId
1118+
end.
1119+
10671120
%% The next_segment_boundary/1 function is used internally when
10681121
%% reading. It should not be called from rabbit_variable_queue.
10691122

1070-
-spec next_segment_boundary(SeqId) -> SeqId when SeqId::rabbit_variable_queue:seq_id().
1071-
10721123
next_segment_boundary(SeqId) ->
1073-
?DEBUG("~0p", [SeqId]),
10741124
SegmentEntryCount = segment_entry_count(),
10751125
(1 + (SeqId div SegmentEntryCount)) * SegmentEntryCount.
10761126

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1866,9 +1866,7 @@ read_from_q_tail(DelsAndAcksFun,
18661866
%% messages we read.
18671867
%% @todo Simply ask for N messages instead of low/high bounds.
18681868
QTailSeqLimit = QTailSeqId + MemoryLimit,
1869-
QTailSeqId1 =
1870-
lists:min([rabbit_classic_queue_index_v2:next_segment_boundary(QTailSeqId),
1871-
QTailSeqLimit, QTailSeqIdEnd]),
1869+
QTailSeqId1 = rabbit_classic_queue_index_v2:tune_read(QTailSeqId, min(QTailSeqLimit, QTailSeqIdEnd)),
18721870
{List0, IndexState1} = rabbit_classic_queue_index_v2:read(QTailSeqId, QTailSeqId1, IndexState),
18731871
{List, StoreState3, MCStateP3, MCStateT3} = case WhatToRead of
18741872
messages ->

deps/rabbit/test/backing_queue_SUITE.erl

Lines changed: 41 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -796,9 +796,12 @@ bq_queue_index(Config) ->
796796
index_mod() ->
797797
rabbit_classic_queue_index_v2.
798798

799+
segment_entry_count() ->
800+
persistent_term:get(classic_queue_index_v2_segment_entry_count, 4096).
801+
799802
bq_queue_index1(_Config) ->
800803
IndexMod = index_mod(),
801-
SegmentSize = IndexMod:next_segment_boundary(0),
804+
SegmentSize = segment_entry_count(),
802805
TwoSegs = SegmentSize + SegmentSize,
803806
MostOfASegment = trunc(SegmentSize*0.75),
804807
SeqIdsA = lists:seq(0, MostOfASegment-1),
@@ -993,7 +996,7 @@ v2_delete_segment_file_completely_acked(Config) ->
993996

994997
v2_delete_segment_file_completely_acked1(_Config) ->
995998
IndexMod = rabbit_classic_queue_index_v2,
996-
SegmentSize = IndexMod:next_segment_boundary(0),
999+
SegmentSize = segment_entry_count(),
9971000
SeqIds = lists:seq(0, SegmentSize - 1),
9981001

9991002
with_empty_test_queue(
@@ -1020,7 +1023,7 @@ v2_delete_segment_file_partially_acked(Config) ->
10201023

10211024
v2_delete_segment_file_partially_acked1(_Config) ->
10221025
IndexMod = rabbit_classic_queue_index_v2,
1023-
SegmentSize = IndexMod:next_segment_boundary(0),
1026+
SegmentSize = segment_entry_count(),
10241027
SeqIds = lists:seq(0, SegmentSize div 2),
10251028
SeqIdsLen = length(SeqIds),
10261029

@@ -1048,7 +1051,7 @@ v2_delete_segment_file_partially_acked_with_holes(Config) ->
10481051

10491052
v2_delete_segment_file_partially_acked_with_holes1(_Config) ->
10501053
IndexMod = rabbit_classic_queue_index_v2,
1051-
SegmentSize = IndexMod:next_segment_boundary(0),
1054+
SegmentSize = segment_entry_count(),
10521055
SeqIdsA = lists:seq(0, SegmentSize div 2),
10531056
SeqIdsB = lists:seq(11 + SegmentSize div 2, SegmentSize - 1),
10541057
SeqIdsLen = length(SeqIdsA) + length(SeqIdsB),
@@ -1107,8 +1110,7 @@ bq_queue_recover(Config) ->
11071110
?MODULE, bq_queue_recover1, [Config]).
11081111

11091112
bq_queue_recover1(Config) ->
1110-
IndexMod = index_mod(),
1111-
Count = 2 * IndexMod:next_segment_boundary(0),
1113+
Count = 2 * segment_entry_count(),
11121114
QName0 = queue_name(Config, <<"bq_queue_recover-q">>),
11131115
{new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
11141116
QName = amqqueue:get_name(Q),
@@ -1164,14 +1166,13 @@ get_queue_sup_pid([], _QueuePid) ->
11641166

11651167
variable_queue_partial_segments_q_tail_thing(Config) ->
11661168
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1167-
?MODULE, variable_queue_partial_segments_q_tail_thing1, [Config]).
1169+
?MODULE, variable_queue_partial_segments_q_tail_thing1, []).
11681170

1169-
variable_queue_partial_segments_q_tail_thing1(Config) ->
1171+
variable_queue_partial_segments_q_tail_thing1() ->
11701172
with_fresh_variable_queue(fun variable_queue_partial_segments_q_tail_thing2/2).
11711173

11721174
variable_queue_partial_segments_q_tail_thing2(VQ0, _QName) ->
1173-
IndexMod = index_mod(),
1174-
SegmentSize = IndexMod:next_segment_boundary(0),
1175+
SegmentSize = segment_entry_count(),
11751176
HalfSegment = SegmentSize div 2,
11761177
OneAndAHalfSegment = SegmentSize + HalfSegment,
11771178
VQ1 = variable_queue_publish(true, OneAndAHalfSegment, VQ0),
@@ -1194,12 +1195,9 @@ variable_queue_partial_segments_q_tail_thing2(VQ0, _QName) ->
11941195
SegmentSize + HalfSegment + 1, VQ5),
11951196
VQ7 = check_variable_queue_status(
11961197
VQ6,
1197-
%% We only read from q_tail up to the end of the segment, so
1198-
%% after fetching exactly one segment, we should have no
1199-
%% messages in memory.
1200-
[{q_head, 0},
1201-
{q_tail, {q_tail, SegmentSize, HalfSegment + 1, OneAndAHalfSegment + 1}},
1202-
{len, HalfSegment + 1}]),
1198+
%% The length is the only predictible stat we have since
1199+
%% the contents of q_head and q_tail depend on the rate.
1200+
[{len, HalfSegment + 1}]),
12031201
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
12041202
HalfSegment + 1, VQ7),
12051203
{_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
@@ -1209,14 +1207,13 @@ variable_queue_partial_segments_q_tail_thing2(VQ0, _QName) ->
12091207

12101208
variable_queue_all_the_bits_not_covered_elsewhere_A(Config) ->
12111209
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1212-
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_A1, [Config]).
1210+
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_A1, []).
12131211

1214-
variable_queue_all_the_bits_not_covered_elsewhere_A1(Config) ->
1212+
variable_queue_all_the_bits_not_covered_elsewhere_A1() ->
12151213
with_fresh_variable_queue(fun variable_queue_all_the_bits_not_covered_elsewhere_A2/2).
12161214

12171215
variable_queue_all_the_bits_not_covered_elsewhere_A2(VQ0, QName) ->
1218-
IndexMod = index_mod(),
1219-
Count = 2 * IndexMod:next_segment_boundary(0),
1216+
Count = 2 * segment_entry_count(),
12201217
VQ1 = variable_queue_publish(true, Count, VQ0),
12211218
VQ2 = variable_queue_publish(false, Count, VQ1),
12221219
{VQ4, _AckTags} = variable_queue_fetch(Count, true, false,
@@ -1234,9 +1231,9 @@ variable_queue_all_the_bits_not_covered_elsewhere_A2(VQ0, QName) ->
12341231

12351232
variable_queue_all_the_bits_not_covered_elsewhere_B(Config) ->
12361233
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1237-
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_B1, [Config]).
1234+
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_B1, []).
12381235

1239-
variable_queue_all_the_bits_not_covered_elsewhere_B1(Config) ->
1236+
variable_queue_all_the_bits_not_covered_elsewhere_B1() ->
12401237
with_fresh_variable_queue(fun variable_queue_all_the_bits_not_covered_elsewhere_B2/2).
12411238

12421239
variable_queue_all_the_bits_not_covered_elsewhere_B2(VQ1, QName) ->
@@ -1252,9 +1249,9 @@ variable_queue_all_the_bits_not_covered_elsewhere_B2(VQ1, QName) ->
12521249

12531250
variable_queue_drop(Config) ->
12541251
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1255-
?MODULE, variable_queue_drop1, [Config]).
1252+
?MODULE, variable_queue_drop1, []).
12561253

1257-
variable_queue_drop1(Config) ->
1254+
variable_queue_drop1() ->
12581255
with_fresh_variable_queue(fun variable_queue_drop2/2).
12591256

12601257
variable_queue_drop2(VQ0, _QName) ->
@@ -1275,9 +1272,9 @@ variable_queue_drop2(VQ0, _QName) ->
12751272

12761273
variable_queue_fold_msg_on_disk(Config) ->
12771274
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1278-
?MODULE, variable_queue_fold_msg_on_disk1, [Config]).
1275+
?MODULE, variable_queue_fold_msg_on_disk1, []).
12791276

1280-
variable_queue_fold_msg_on_disk1(Config) ->
1277+
variable_queue_fold_msg_on_disk1() ->
12811278
with_fresh_variable_queue(fun variable_queue_fold_msg_on_disk2/2).
12821279

12831280
variable_queue_fold_msg_on_disk2(VQ0, _QName) ->
@@ -1289,9 +1286,9 @@ variable_queue_fold_msg_on_disk2(VQ0, _QName) ->
12891286

12901287
variable_queue_dropfetchwhile(Config) ->
12911288
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1292-
?MODULE, variable_queue_dropfetchwhile1, [Config]).
1289+
?MODULE, variable_queue_dropfetchwhile1, []).
12931290

1294-
variable_queue_dropfetchwhile1(Config) ->
1291+
variable_queue_dropfetchwhile1() ->
12951292
with_fresh_variable_queue(fun variable_queue_dropfetchwhile2/2).
12961293

12971294
variable_queue_dropfetchwhile2(VQ0, _QName) ->
@@ -1335,9 +1332,9 @@ variable_queue_dropfetchwhile2(VQ0, _QName) ->
13351332

13361333
variable_queue_dropwhile_restart(Config) ->
13371334
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1338-
?MODULE, variable_queue_dropwhile_restart1, [Config]).
1335+
?MODULE, variable_queue_dropwhile_restart1, []).
13391336

1340-
variable_queue_dropwhile_restart1(Config) ->
1337+
variable_queue_dropwhile_restart1() ->
13411338
with_fresh_variable_queue(fun variable_queue_dropwhile_restart2/2).
13421339

13431340
variable_queue_dropwhile_restart2(VQ0, QName) ->
@@ -1372,9 +1369,9 @@ variable_queue_dropwhile_restart2(VQ0, QName) ->
13721369

13731370
variable_queue_dropwhile_sync_restart(Config) ->
13741371
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1375-
?MODULE, variable_queue_dropwhile_sync_restart1, [Config]).
1372+
?MODULE, variable_queue_dropwhile_sync_restart1, []).
13761373

1377-
variable_queue_dropwhile_sync_restart1(Config) ->
1374+
variable_queue_dropwhile_sync_restart1() ->
13781375
with_fresh_variable_queue(fun variable_queue_dropwhile_sync_restart2/2).
13791376

13801377
variable_queue_dropwhile_sync_restart2(VQ0, QName) ->
@@ -1412,9 +1409,9 @@ variable_queue_dropwhile_sync_restart2(VQ0, QName) ->
14121409

14131410
variable_queue_restart_large_seq_id(Config) ->
14141411
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1415-
?MODULE, variable_queue_restart_large_seq_id1, [Config]).
1412+
?MODULE, variable_queue_restart_large_seq_id1, []).
14161413

1417-
variable_queue_restart_large_seq_id1(Config) ->
1414+
variable_queue_restart_large_seq_id1() ->
14181415
with_fresh_variable_queue(fun variable_queue_restart_large_seq_id2/2).
14191416

14201417
variable_queue_restart_large_seq_id2(VQ0, QName) ->
@@ -1449,9 +1446,9 @@ variable_queue_restart_large_seq_id2(VQ0, QName) ->
14491446

14501447
variable_queue_ack_limiting(Config) ->
14511448
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1452-
?MODULE, variable_queue_ack_limiting1, [Config]).
1449+
?MODULE, variable_queue_ack_limiting1, []).
14531450

1454-
variable_queue_ack_limiting1(Config) ->
1451+
variable_queue_ack_limiting1() ->
14551452
with_fresh_variable_queue(fun variable_queue_ack_limiting2/2).
14561453

14571454
variable_queue_ack_limiting2(VQ0, _Config) ->
@@ -1477,9 +1474,9 @@ variable_queue_ack_limiting2(VQ0, _Config) ->
14771474

14781475
variable_queue_purge(Config) ->
14791476
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1480-
?MODULE, variable_queue_purge1, [Config]).
1477+
?MODULE, variable_queue_purge1, []).
14811478

1482-
variable_queue_purge1(Config) ->
1479+
variable_queue_purge1() ->
14831480
with_fresh_variable_queue(fun variable_queue_purge2/2).
14841481

14851482
variable_queue_purge2(VQ0, _Config) ->
@@ -1499,9 +1496,9 @@ variable_queue_purge2(VQ0, _Config) ->
14991496

15001497
variable_queue_requeue(Config) ->
15011498
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1502-
?MODULE, variable_queue_requeue1, [Config]).
1499+
?MODULE, variable_queue_requeue1, []).
15031500

1504-
variable_queue_requeue1(Config) ->
1501+
variable_queue_requeue1() ->
15051502
with_fresh_variable_queue(fun variable_queue_requeue2/2).
15061503

15071504
variable_queue_requeue2(VQ0, _Config) ->
@@ -1525,14 +1522,13 @@ variable_queue_requeue2(VQ0, _Config) ->
15251522
%% requeue from ram_pending_ack into q_head, move to q_tail and then empty queue
15261523
variable_queue_requeue_ram_beta(Config) ->
15271524
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1528-
?MODULE, variable_queue_requeue_ram_beta1, [Config]).
1525+
?MODULE, variable_queue_requeue_ram_beta1, []).
15291526

1530-
variable_queue_requeue_ram_beta1(Config) ->
1527+
variable_queue_requeue_ram_beta1() ->
15311528
with_fresh_variable_queue(fun variable_queue_requeue_ram_beta2/2).
15321529

15331530
variable_queue_requeue_ram_beta2(VQ0, _Config) ->
1534-
IndexMod = index_mod(),
1535-
Count = IndexMod:next_segment_boundary(0)*2 + 2,
1531+
Count = 2 + 2 * segment_entry_count(),
15361532
VQ1 = variable_queue_publish(false, Count, VQ0),
15371533
{VQ2, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ1),
15381534
{Back, Front} = lists:split(Count div 2, AcksR),
@@ -1808,8 +1804,7 @@ requeue_one_by_one(Acks, VQ) ->
18081804
%% internal queues. Kept for completeness.
18091805
variable_queue_with_holes(VQ0) ->
18101806
Interval = 2048, %% should match vq:IO_BATCH_SIZE
1811-
IndexMod = index_mod(),
1812-
Count = IndexMod:next_segment_boundary(0)*2 + 2 * Interval,
1807+
Count = 2 * Interval + 2 * segment_entry_count(),
18131808
Seq = lists:seq(1, Count),
18141809
VQ1 = variable_queue_publish(
18151810
false, 1, Count,

0 commit comments

Comments
 (0)