Skip to content

Commit 37e418b

Browse files
committed
Merge stream.read_ahead and stream.read_ahead_limit options
Read-ahead can be controlled by a single option, with zero acting the same as `false`.
1 parent 04b9c50 commit 37e418b

File tree

5 files changed

+18
-32
lines changed

5 files changed

+18
-32
lines changed

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2777,12 +2777,7 @@ fun(Conf) ->
27772777
end}.
27782778

27792779
{mapping, "stream.read_ahead", "rabbit.stream_read_ahead",
2780-
[{datatype, {enum, [true, false]}}]}.
2781-
2782-
{mapping, "stream.read_ahead_limit", "rabbit.stream_read_ahead_limit", [
2783-
{datatype, [integer, string]},
2784-
{validators, ["is_supported_information_unit"]}
2785-
]}.
2780+
[{datatype, [{enum, [true, false]}, integer, string]}]}.
27862781

27872782
{mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [
27882783
{datatype, [binary]}

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
-export([format_osiris_event/2]).
5050
-export([update_stream_conf/2]).
5151
-export([readers/1]).
52-
-export([read_ahead_on/0, read_ahead_limit/0]).
52+
-export([read_ahead/0]).
5353

5454
-export([parse_offset_arg/1,
5555
filter_spec/1]).
@@ -465,8 +465,7 @@ begin_stream(#stream_client{name = QName,
465465
Tag, Offset, Mode, AckRequired, Filter, Options0)
466466
when is_pid(LocalPid) ->
467467
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
468-
Options1 = Options0#{read_ahead => read_ahead_on(),
469-
read_ahead_limit => read_ahead_limit()},
468+
Options1 = Options0#{read_ahead => read_ahead()},
470469
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options1),
471470
NextOffset = osiris_log:next_offset(Seg0) - 1,
472471
osiris:register_offset_listener(LocalPid, NextOffset),
@@ -1521,23 +1520,20 @@ queue_vm_ets() ->
15211520
{[],
15221521
[]}.
15231522

1524-
read_ahead_on() ->
1525-
application:get_env(rabbit, stream_read_ahead, true).
1526-
1527-
-spec read_ahead_limit() -> integer() | undefined.
1528-
read_ahead_limit() ->
1529-
case application:get_env(rabbit, stream_read_ahead_limit, undefined) of
1530-
undefined ->
1531-
undefined;
1532-
Bytes when is_integer(Bytes) ->
1533-
Bytes;
1523+
-spec read_ahead() -> boolean() | non_neg_integer().
1524+
read_ahead() ->
1525+
case application:get_env(rabbit, stream_read_ahead, true) of
1526+
Toggle when is_boolean(Toggle) ->
1527+
Toggle;
1528+
LimitBytes when is_integer(LimitBytes) ->
1529+
LimitBytes;
15341530
Limit when is_list(Limit) ->
15351531
case rabbit_resource_monitor_misc:parse_information_unit(Limit) of
15361532
{ok, ParsedLimit} ->
15371533
ParsedLimit;
15381534
{error, parse_error} ->
15391535
?LOG_ERROR("Unable to parse stream read ahead limit value "
15401536
"~tp", [Limit]),
1541-
undefined
1537+
true
15421538
end
15431539
end.

deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1234,7 +1234,7 @@ credential_validator.regexp = ^abc\\d+",
12341234
[]},
12351235

12361236
%%
1237-
%% Stream read ahead on/off
1237+
%% Stream read ahead on/off/limit
12381238
%%
12391239

12401240
{stream_read_ahead,
@@ -1254,24 +1254,20 @@ credential_validator.regexp = ^abc\\d+",
12541254
{stream_read_ahead, false}
12551255
]}],
12561256
[]},
1257-
1258-
%%
1259-
%% Stream read limit
1260-
%%
12611257
{stream_read_ahead_limit_bytes,
12621258
"
1263-
stream.read_ahead_limit = 8192
1259+
stream.read_ahead = 8192
12641260
",
12651261
[{rabbit, [
1266-
{stream_read_ahead_limit, 8192}
1262+
{stream_read_ahead, 8192}
12671263
]}],
12681264
[]},
12691265
{stream_read_ahead_limit_information_unit,
12701266
"
1271-
stream.read_ahead_limit = 8KiB
1267+
stream.read_ahead = 8KiB
12721268
",
12731269
[{rabbit, [
1274-
{stream_read_ahead_limit, "8KiB"}
1270+
{stream_read_ahead, "8KiB"}
12751271
]}],
12761272
[]}
12771273

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2814,8 +2814,7 @@ init_reader(ConnectionTransport,
28142814
CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []},
28152815
Options0 = #{transport => ConnectionTransport,
28162816
chunk_selector => get_chunk_selector(Properties),
2817-
read_ahead => rabbit_stream_queue:read_ahead_on(),
2818-
read_ahead_limit => rabbit_stream_queue:read_ahead_limit()},
2817+
read_ahead => rabbit_stream_queue:read_ahead()},
28192818

28202819
Options1 = maps:merge(Options0,
28212820
rabbit_stream_utils:filter_spec(Properties)),

rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ dep_jose = hex 1.11.10
4949
dep_khepri = hex 0.17.2
5050
dep_khepri_mnesia_migration = hex 0.8.0
5151
dep_meck = hex 1.0.0
52-
dep_osiris = git https://github.com/rabbitmq/osiris v1.10.1
52+
dep_osiris = git https://github.com/rabbitmq/osiris f384e462d58d9ecea993a9a10d9ec35eb20ebd71
5353
dep_prometheus = hex 5.1.1
5454
dep_ra = hex 2.17.1
5555
dep_ranch = hex 2.2.0

0 commit comments

Comments
 (0)