diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 96c1312eb8a1..7119003fe711 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -119,6 +119,7 @@ define PROJECT_ENV {dead_letter_worker_consumer_prefetch, 32}, {dead_letter_worker_publisher_confirm_timeout, 180000}, {vhost_process_reconciliation_run_interval, 30}, + {stream_read_ahead, true}, %% for testing {vhost_process_reconciliation_enabled, true}, {license_line, "Licensed under the MPL 2.0. Website: https://rabbitmq.com"} diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 603f8540a85e..620888253c06 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2785,6 +2785,9 @@ fun(Conf) -> end end}. +{mapping, "stream.read_ahead", "rabbit.stream_read_ahead", + [{datatype, {enum, [true, false]}}]}. + {mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [ {datatype, [binary]} ]}. diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 6fc36681555b..def01719f9df 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -51,6 +51,7 @@ -export([format_osiris_event/2]). -export([update_stream_conf/2]). -export([readers/1]). +-export([read_ahead_on/0]). -export([parse_offset_arg/1, filter_spec/1]). @@ -463,10 +464,11 @@ query_local_pid(#stream_client{stream_id = StreamId} = State) -> begin_stream(#stream_client{name = QName, readers = Readers0, local_pid = LocalPid} = State, - Tag, Offset, Mode, AckRequired, Filter, Options) + Tag, Offset, Mode, AckRequired, Filter, Options0) when is_pid(LocalPid) -> CounterSpec = {{?MODULE, QName, Tag, self()}, []}, - {ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options), + Options1 = Options0#{read_ahead => read_ahead_on()}, + {ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options1), NextOffset = osiris_log:next_offset(Seg0) - 1, osiris:register_offset_listener(LocalPid, NextOffset), StartOffset = case Offset of @@ -491,7 +493,7 @@ begin_stream(#stream_client{name = QName, last_consumed_offset = StartOffset, log = Seg0, filter = Filter, - reader_options = Options}, + reader_options = Options1}, {ok, State#stream_client{readers = Readers0#{Tag => Str0}}}. cancel(_Q, #{consumer_tag := ConsumerTag, @@ -659,8 +661,8 @@ handle_event(_QName, {stream_local_member_change, Pid}, osiris_log:close(Log0), CounterSpec = {{?MODULE, QName, self()}, []}, ?LOG_DEBUG("Re-creating Osiris reader for consumer ~tp at offset ~tp " - " with options ~tp", - [T, Offset, Options]), + " with options ~tp", + [T, Offset, Options]), {ok, Log1} = osiris:init_reader(Pid, Offset, CounterSpec, Options), NextOffset = osiris_log:next_offset(Log1) - 1, ?LOG_DEBUG("Registering offset listener at offset ~tp", [NextOffset]), @@ -1176,7 +1178,7 @@ stream_entries(QName, Name, CTag, LocalPid, credit = Credit} = Str0) -> case Credit > 0 of true -> - case chunk_iterator(Str0, LocalPid) of + case chunk_iterator(Str0, LocalPid, undefined) of {ok, Str} -> stream_entries(QName, Name, CTag, LocalPid, Str); {end_of_stream, Str} -> @@ -1229,7 +1231,7 @@ stream_entries(QName, Name, CTag, LocalPid, gen_server:cast(self(), queue_event(QName, {resume_filtering, CTag})), {Str0#stream{filtering_paused = true}, lists:reverse(Acc0)}; end_of_chunk -> - case chunk_iterator(Str0, LocalPid) of + case chunk_iterator(Str0, LocalPid, Iter0) of {ok, Str} -> stream_entries(QName, Name, CTag, LocalPid, Str, Acc0); {end_of_stream, Str} -> @@ -1294,8 +1296,8 @@ stream_entries(QName, Name, CTag, LocalPid, chunk_iterator(#stream{credit = Credit, listening_offset = LOffs, - log = Log0} = Str0, LocalPid) -> - case osiris_log:chunk_iterator(Log0, Credit) of + log = Log0} = Str0, LocalPid, PrevIterator) -> + case osiris_log:chunk_iterator(Log0, Credit, PrevIterator) of {ok, _ChunkHeader, Iter, Log} -> {ok, Str0#stream{chunk_iterator = Iter, log = Log}}; @@ -1527,3 +1529,6 @@ queue_vm_stats_sups() -> queue_vm_ets() -> {[], []}. + +read_ahead_on() -> + application:get_env(rabbit, stream_read_ahead, true). diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 6345ed3c8dac..d6ce95623a21 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -1223,6 +1223,28 @@ credential_validator.regexp = ^abc\\d+", [{osiris, [ {port_range, {4100, 4600}} ]}], + []}, + + %% + %% Stream read ahead on/off + %% + + {stream_read_ahead, + " + stream.read_ahead = true + ", + [{rabbit, [ + {stream_read_ahead, true} + ]}], + []}, + + {stream_read_ahead, + " + stream.read_ahead = false + ", + [{rabbit, [ + {stream_read_ahead, false} + ]}], []} ]. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 16870b416eb6..aac8bd4abd87 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2812,11 +2812,14 @@ init_reader(ConnectionTransport, Properties, OffsetSpec) -> CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []}, - Options = maps:merge(#{transport => ConnectionTransport, - chunk_selector => get_chunk_selector(Properties)}, - rabbit_stream_utils:filter_spec(Properties)), - {ok, Segment} = - osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec, Options), + Options0 = #{transport => ConnectionTransport, + chunk_selector => get_chunk_selector(Properties), + read_ahead => rabbit_stream_queue:read_ahead_on()}, + + Options1 = maps:merge(Options0, + rabbit_stream_utils:filter_spec(Properties)), + {ok, Segment} = osiris:init_reader(LocalMemberPid, OffsetSpec, + CounterSpec, Options1), ?LOG_DEBUG("Next offset for subscription ~tp is ~tp", [SubscriptionId, osiris_log:next_offset(Segment)]), Segment. @@ -3571,12 +3574,9 @@ subscription_exists(StreamSubscriptions, SubscriptionId) -> lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds). send_file_callback(?VERSION_1, - Transport, _Log, #consumer{configuration = - #consumer_configuration{socket = S, - subscription_id = - SubscriptionId, + #consumer_configuration{subscription_id = SubId, counters = Counters}}, Counter) -> fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries}, @@ -3587,19 +3587,16 @@ send_file_callback(?VERSION_1, ?REQUEST:1, ?COMMAND_DELIVER:15, ?VERSION_1:16, - SubscriptionId:8/unsigned>>, - Transport:send(S, FrameBeginning), + SubId:8/unsigned>>, atomics:add(Counter, 1, Size), increase_messages_consumed(Counters, NumEntries), - set_consumer_offset(Counters, FirstOffsetInChunk) + set_consumer_offset(Counters, FirstOffsetInChunk), + FrameBeginning end; send_file_callback(?VERSION_2, - Transport, Log, #consumer{configuration = - #consumer_configuration{socket = S, - subscription_id = - SubscriptionId, + #consumer_configuration{subscription_id = SubId, counters = Counters}}, Counter) -> fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries}, @@ -3611,12 +3608,12 @@ send_file_callback(?VERSION_2, ?REQUEST:1, ?COMMAND_DELIVER:15, ?VERSION_2:16, - SubscriptionId:8/unsigned, + SubId:8/unsigned, CommittedChunkId:64>>, - Transport:send(S, FrameBeginning), atomics:add(Counter, 1, Size), increase_messages_consumed(Counters, NumEntries), - set_consumer_offset(Counters, FirstOffsetInChunk) + set_consumer_offset(Counters, FirstOffsetInChunk), + FrameBeginning end. send_chunks(DeliverVersion, @@ -3686,9 +3683,7 @@ send_chunks(DeliverVersion, Retry, Counter) -> case osiris_log:send_file(Socket, Log, - send_file_callback(DeliverVersion, - Transport, - Log, + send_file_callback(DeliverVersion, Log, Consumer, Counter)) of diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 7d8a6bfb45b0..14d712e42d56 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -19,6 +19,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl"). @@ -1773,3 +1774,9 @@ request(CorrId, Cmd) -> rand_bin() -> base64:encode(rand:bytes(20)). + +generate_log(MsgSize, MsgsPerChunk, NumMessages, Directory) -> + Body = binary:copy(<<"a">>, MsgSize), + Data = #'v1_0.data'{content = Body}, + Bin = amqp10_framing:encode_bin(Data), + osiris_log:generate_log(Bin, MsgsPerChunk, NumMessages, Directory). diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index ba99e4337173..c3bcf2d5d542 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -49,7 +49,7 @@ dep_jose = hex 1.11.10 dep_khepri = hex 0.17.2 dep_khepri_mnesia_migration = hex 0.8.0 dep_meck = hex 1.0.0 -dep_osiris = git https://github.com/rabbitmq/osiris v1.9.0 +dep_osiris = git https://github.com/rabbitmq/osiris v1.10.0 dep_prometheus = hex 5.1.1 dep_ra = hex 2.17.1 dep_ranch = hex 2.2.0