From a98954caedc68bf329b748e9654fee9a3bd403f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Fri, 4 Jul 2025 08:18:34 +0000 Subject: [PATCH 1/3] Return stream frame header binary in dispatch chunk callback This saves a system call by sending the frame header and the chunk header at the same time. References rabbitmq/osiris#192 (cherry picked from commit 885e89ebbbb2e4a1567ddd406bcc27265360615d) --- .../src/rabbit_stream_reader.erl | 26 +++++++------------ rabbitmq-components.mk | 2 +- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 16870b416eb6..d8bc49c2ad27 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -3571,12 +3571,9 @@ subscription_exists(StreamSubscriptions, SubscriptionId) -> lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds). send_file_callback(?VERSION_1, - Transport, _Log, #consumer{configuration = - #consumer_configuration{socket = S, - subscription_id = - SubscriptionId, + #consumer_configuration{subscription_id = SubId, counters = Counters}}, Counter) -> fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries}, @@ -3587,19 +3584,16 @@ send_file_callback(?VERSION_1, ?REQUEST:1, ?COMMAND_DELIVER:15, ?VERSION_1:16, - SubscriptionId:8/unsigned>>, - Transport:send(S, FrameBeginning), + SubId:8/unsigned>>, atomics:add(Counter, 1, Size), increase_messages_consumed(Counters, NumEntries), - set_consumer_offset(Counters, FirstOffsetInChunk) + set_consumer_offset(Counters, FirstOffsetInChunk), + FrameBeginning end; send_file_callback(?VERSION_2, - Transport, Log, #consumer{configuration = - #consumer_configuration{socket = S, - subscription_id = - SubscriptionId, + #consumer_configuration{subscription_id = SubId, counters = Counters}}, Counter) -> fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries}, @@ -3611,12 +3605,12 @@ send_file_callback(?VERSION_2, ?REQUEST:1, ?COMMAND_DELIVER:15, ?VERSION_2:16, - SubscriptionId:8/unsigned, + SubId:8/unsigned, CommittedChunkId:64>>, - Transport:send(S, FrameBeginning), atomics:add(Counter, 1, Size), increase_messages_consumed(Counters, NumEntries), - set_consumer_offset(Counters, FirstOffsetInChunk) + set_consumer_offset(Counters, FirstOffsetInChunk), + FrameBeginning end. send_chunks(DeliverVersion, @@ -3686,9 +3680,7 @@ send_chunks(DeliverVersion, Retry, Counter) -> case osiris_log:send_file(Socket, Log, - send_file_callback(DeliverVersion, - Transport, - Log, + send_file_callback(DeliverVersion, Log, Consumer, Counter)) of diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index ba99e4337173..853176822fc2 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -49,7 +49,7 @@ dep_jose = hex 1.11.10 dep_khepri = hex 0.17.2 dep_khepri_mnesia_migration = hex 0.8.0 dep_meck = hex 1.0.0 -dep_osiris = git https://github.com/rabbitmq/osiris v1.9.0 +dep_osiris = git https://github.com/rabbitmq/osiris send-file-improvements dep_prometheus = hex 5.1.1 dep_ra = hex 2.17.1 dep_ranch = hex 2.2.0 From ac38eeb85cba7918532c42b06e13b75ff9d9ffd2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 9 Sep 2025 08:02:34 +0000 Subject: [PATCH 2/3] Adapt stream code to Osiris read ahead Osiris can read ahead data in case of small chunks. This saves system calls and increases consumption rate dramatically for some streams. This is transparent for the stream protocol, but requires a small tweak for the stream queue type implementation (passing in the previous iterator when creating a new one). The read ahead is on by default but can be deactivated with to the new stream.read_ahead configuration entry (true / false). Co-authored-by: Karl Nilsson References rabbitmq/osiris#192 (cherry picked from commit 9f162dfd01c1516d168e7d1fad39d33a929756e5) --- deps/rabbit/Makefile | 1 + deps/rabbit/priv/schema/rabbit.schema | 3 +++ deps/rabbit/src/rabbit_stream_queue.erl | 23 +++++++++++-------- .../config_schema_SUITE_data/rabbit.snippets | 22 ++++++++++++++++++ .../src/rabbit_stream_reader.erl | 13 +++++++---- .../test/rabbit_stream_SUITE.erl | 7 ++++++ 6 files changed, 55 insertions(+), 14 deletions(-) diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 96c1312eb8a1..7119003fe711 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -119,6 +119,7 @@ define PROJECT_ENV {dead_letter_worker_consumer_prefetch, 32}, {dead_letter_worker_publisher_confirm_timeout, 180000}, {vhost_process_reconciliation_run_interval, 30}, + {stream_read_ahead, true}, %% for testing {vhost_process_reconciliation_enabled, true}, {license_line, "Licensed under the MPL 2.0. Website: https://rabbitmq.com"} diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 603f8540a85e..620888253c06 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2785,6 +2785,9 @@ fun(Conf) -> end end}. +{mapping, "stream.read_ahead", "rabbit.stream_read_ahead", + [{datatype, {enum, [true, false]}}]}. + {mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [ {datatype, [binary]} ]}. diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 6fc36681555b..def01719f9df 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -51,6 +51,7 @@ -export([format_osiris_event/2]). -export([update_stream_conf/2]). -export([readers/1]). +-export([read_ahead_on/0]). -export([parse_offset_arg/1, filter_spec/1]). @@ -463,10 +464,11 @@ query_local_pid(#stream_client{stream_id = StreamId} = State) -> begin_stream(#stream_client{name = QName, readers = Readers0, local_pid = LocalPid} = State, - Tag, Offset, Mode, AckRequired, Filter, Options) + Tag, Offset, Mode, AckRequired, Filter, Options0) when is_pid(LocalPid) -> CounterSpec = {{?MODULE, QName, Tag, self()}, []}, - {ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options), + Options1 = Options0#{read_ahead => read_ahead_on()}, + {ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options1), NextOffset = osiris_log:next_offset(Seg0) - 1, osiris:register_offset_listener(LocalPid, NextOffset), StartOffset = case Offset of @@ -491,7 +493,7 @@ begin_stream(#stream_client{name = QName, last_consumed_offset = StartOffset, log = Seg0, filter = Filter, - reader_options = Options}, + reader_options = Options1}, {ok, State#stream_client{readers = Readers0#{Tag => Str0}}}. cancel(_Q, #{consumer_tag := ConsumerTag, @@ -659,8 +661,8 @@ handle_event(_QName, {stream_local_member_change, Pid}, osiris_log:close(Log0), CounterSpec = {{?MODULE, QName, self()}, []}, ?LOG_DEBUG("Re-creating Osiris reader for consumer ~tp at offset ~tp " - " with options ~tp", - [T, Offset, Options]), + " with options ~tp", + [T, Offset, Options]), {ok, Log1} = osiris:init_reader(Pid, Offset, CounterSpec, Options), NextOffset = osiris_log:next_offset(Log1) - 1, ?LOG_DEBUG("Registering offset listener at offset ~tp", [NextOffset]), @@ -1176,7 +1178,7 @@ stream_entries(QName, Name, CTag, LocalPid, credit = Credit} = Str0) -> case Credit > 0 of true -> - case chunk_iterator(Str0, LocalPid) of + case chunk_iterator(Str0, LocalPid, undefined) of {ok, Str} -> stream_entries(QName, Name, CTag, LocalPid, Str); {end_of_stream, Str} -> @@ -1229,7 +1231,7 @@ stream_entries(QName, Name, CTag, LocalPid, gen_server:cast(self(), queue_event(QName, {resume_filtering, CTag})), {Str0#stream{filtering_paused = true}, lists:reverse(Acc0)}; end_of_chunk -> - case chunk_iterator(Str0, LocalPid) of + case chunk_iterator(Str0, LocalPid, Iter0) of {ok, Str} -> stream_entries(QName, Name, CTag, LocalPid, Str, Acc0); {end_of_stream, Str} -> @@ -1294,8 +1296,8 @@ stream_entries(QName, Name, CTag, LocalPid, chunk_iterator(#stream{credit = Credit, listening_offset = LOffs, - log = Log0} = Str0, LocalPid) -> - case osiris_log:chunk_iterator(Log0, Credit) of + log = Log0} = Str0, LocalPid, PrevIterator) -> + case osiris_log:chunk_iterator(Log0, Credit, PrevIterator) of {ok, _ChunkHeader, Iter, Log} -> {ok, Str0#stream{chunk_iterator = Iter, log = Log}}; @@ -1527,3 +1529,6 @@ queue_vm_stats_sups() -> queue_vm_ets() -> {[], []}. + +read_ahead_on() -> + application:get_env(rabbit, stream_read_ahead, true). diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 6345ed3c8dac..d6ce95623a21 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -1223,6 +1223,28 @@ credential_validator.regexp = ^abc\\d+", [{osiris, [ {port_range, {4100, 4600}} ]}], + []}, + + %% + %% Stream read ahead on/off + %% + + {stream_read_ahead, + " + stream.read_ahead = true + ", + [{rabbit, [ + {stream_read_ahead, true} + ]}], + []}, + + {stream_read_ahead, + " + stream.read_ahead = false + ", + [{rabbit, [ + {stream_read_ahead, false} + ]}], []} ]. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index d8bc49c2ad27..aac8bd4abd87 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2812,11 +2812,14 @@ init_reader(ConnectionTransport, Properties, OffsetSpec) -> CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []}, - Options = maps:merge(#{transport => ConnectionTransport, - chunk_selector => get_chunk_selector(Properties)}, - rabbit_stream_utils:filter_spec(Properties)), - {ok, Segment} = - osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec, Options), + Options0 = #{transport => ConnectionTransport, + chunk_selector => get_chunk_selector(Properties), + read_ahead => rabbit_stream_queue:read_ahead_on()}, + + Options1 = maps:merge(Options0, + rabbit_stream_utils:filter_spec(Properties)), + {ok, Segment} = osiris:init_reader(LocalMemberPid, OffsetSpec, + CounterSpec, Options1), ?LOG_DEBUG("Next offset for subscription ~tp is ~tp", [SubscriptionId, osiris_log:next_offset(Segment)]), Segment. diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 7d8a6bfb45b0..14d712e42d56 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -19,6 +19,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl"). @@ -1773,3 +1774,9 @@ request(CorrId, Cmd) -> rand_bin() -> base64:encode(rand:bytes(20)). + +generate_log(MsgSize, MsgsPerChunk, NumMessages, Directory) -> + Body = binary:copy(<<"a">>, MsgSize), + Data = #'v1_0.data'{content = Body}, + Bin = amqp10_framing:encode_bin(Data), + osiris_log:generate_log(Bin, MsgsPerChunk, NumMessages, Directory). From c3465ba5f83cc9e5d16b1503a26e2554b6457dd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 22 Sep 2025 11:15:27 +0200 Subject: [PATCH 3/3] Bump Osiris to 1.10.0 (cherry picked from commit 025280e04c565921c49b9dac961282b6ff310758) --- rabbitmq-components.mk | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 853176822fc2..c3bcf2d5d542 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -49,7 +49,7 @@ dep_jose = hex 1.11.10 dep_khepri = hex 0.17.2 dep_khepri_mnesia_migration = hex 0.8.0 dep_meck = hex 1.0.0 -dep_osiris = git https://github.com/rabbitmq/osiris send-file-improvements +dep_osiris = git https://github.com/rabbitmq/osiris v1.10.0 dep_prometheus = hex 5.1.1 dep_ra = hex 2.17.1 dep_ranch = hex 2.2.0