Skip to content

Commit 53db29b

Browse files
the-mikedavismergify[bot]
authored andcommitted
Add stream.read_ahead_limit config option
(cherry picked from commit f40cecd) # Conflicts: # deps/rabbit/src/rabbit_stream_queue.erl
1 parent c75ef6d commit 53db29b

File tree

4 files changed

+53
-4
lines changed

4 files changed

+53
-4
lines changed

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2779,6 +2779,11 @@ end}.
27792779
{mapping, "stream.read_ahead", "rabbit.stream_read_ahead",
27802780
[{datatype, {enum, [true, false]}}]}.
27812781

2782+
{mapping, "stream.read_ahead_limit", "rabbit.stream_read_ahead_limit", [
2783+
{datatype, [integer, string]},
2784+
{validators, ["is_supported_information_unit"]}
2785+
]}.
2786+
27822787
{mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [
27832788
{datatype, [binary]}
27842789
]}.

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
-export([format_osiris_event/2]).
5151
-export([update_stream_conf/2]).
5252
-export([readers/1]).
53-
-export([read_ahead_on/0]).
53+
-export([read_ahead_on/0, read_ahead_limit/0]).
5454

5555
-export([parse_offset_arg/1,
5656
filter_spec/1]).
@@ -468,7 +468,8 @@ begin_stream(#stream_client{name = QName,
468468
Tag, Offset, Mode, AckRequired, Filter, Options0)
469469
when is_pid(LocalPid) ->
470470
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
471-
Options1 = Options0#{read_ahead => read_ahead_on()},
471+
Options1 = Options0#{read_ahead => read_ahead_on(),
472+
read_ahead_limit => read_ahead_limit()},
472473
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options1),
473474
NextOffset = osiris_log:next_offset(Seg0) - 1,
474475
osiris:register_offset_listener(LocalPid, NextOffset),
@@ -1531,4 +1532,26 @@ shrink_all(_Node) ->
15311532
{error, not_quorum_queue}.
15321533

15331534
read_ahead_on() ->
1534-
application:get_env(rabbit, stream_read_ahead, true).
1535+
<<<<<<< HEAD
1536+
application:get_env(rabbit, stream_read_ahead, true).
1537+
=======
1538+
application:get_env(rabbit, stream_read_ahead, true).
1539+
1540+
-spec read_ahead_limit() -> integer() | undefined.
1541+
read_ahead_limit() ->
1542+
case application:get_env(rabbit, stream_read_ahead_limit, undefined) of
1543+
undefined ->
1544+
undefined;
1545+
Bytes when is_integer(Bytes) ->
1546+
Bytes;
1547+
Limit when is_list(Limit) ->
1548+
case rabbit_resource_monitor_misc:parse_information_unit(Limit) of
1549+
{ok, ParsedLimit} ->
1550+
ParsedLimit;
1551+
{error, parse_error} ->
1552+
?LOG_ERROR("Unable to parse stream read ahead limit value "
1553+
"~tp", [Limit]),
1554+
undefined
1555+
end
1556+
end.
1557+
>>>>>>> f40cecd0a (Add `stream.read_ahead_limit` config option)

deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1266,6 +1266,26 @@ credential_validator.regexp = ^abc\\d+",
12661266
[{rabbit, [
12671267
{stream_read_ahead, false}
12681268
]}],
1269+
[]},
1270+
1271+
%%
1272+
%% Stream read limit
1273+
%%
1274+
{stream_read_ahead_limit_bytes,
1275+
"
1276+
stream.read_ahead_limit = 8192
1277+
",
1278+
[{rabbit, [
1279+
{stream_read_ahead_limit, 8192}
1280+
]}],
1281+
[]},
1282+
{stream_read_ahead_limit_information_unit,
1283+
"
1284+
stream.read_ahead_limit = 8KiB
1285+
",
1286+
[{rabbit, [
1287+
{stream_read_ahead_limit, "8KiB"}
1288+
]}],
12691289
[]}
12701290

12711291
].

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2814,7 +2814,8 @@ init_reader(ConnectionTransport,
28142814
CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []},
28152815
Options0 = #{transport => ConnectionTransport,
28162816
chunk_selector => get_chunk_selector(Properties),
2817-
read_ahead => rabbit_stream_queue:read_ahead_on()},
2817+
read_ahead => rabbit_stream_queue:read_ahead_on(),
2818+
read_ahead_limit => rabbit_stream_queue:read_ahead_limit()},
28182819

28192820
Options1 = maps:merge(Options0,
28202821
rabbit_stream_utils:filter_spec(Properties)),

0 commit comments

Comments
 (0)