@@ -37,6 +37,10 @@ level_zero_log_reader_impl::level_zero_log_reader_impl(
3737ss::future<model::record_batch_reader::storage_t >
3838level_zero_log_reader_impl::do_load_slice (
3939 model::timeout_clock::time_point deadline) {
40+ if (is_over_limit (0 )) {
41+ _current = state::end_of_stream_state;
42+ co_return chunked_circular_buffer<model::record_batch>{};
43+ }
4044 // We're only fetching from the record batch cache if the reader is in
4145 // the 'empty' state. It doesn't make any difference if the reader is in
4246 // the 'materialized' state. If we're in 'ready' state we risk to go out
@@ -80,13 +84,10 @@ level_zero_log_reader_impl::do_load_slice(
8084std::optional<chunked_circular_buffer<model::record_batch>>
8185level_zero_log_reader_impl::maybe_load_slices_from_cache () {
8286 chunked_circular_buffer<model::record_batch> ret;
83- size_t materialized_bytes = 0 ;
8487 auto current = _config.start_offset ;
85- while (materialized_bytes < _config.max_bytes
86- && current <= _config.max_offset ) {
88+ while (current <= _config.max_offset ) {
8789 auto batch = _ct_api->cache_get (
8890 _ctp->ntp (), kafka::offset_cast (current));
89- size_t batch_size = 0 ;
9091 if (!batch.has_value ()) {
9192 // We hit a gap in the cache and have to download objects
9293 // from S3.
@@ -105,12 +106,8 @@ level_zero_log_reader_impl::maybe_load_slices_from_cache() {
105106 batch.value ().term () > model::term_id{-1 },
106107 " Batch without term in the cache: {}" ,
107108 batch.value ().header ());
108- batch_size = batch.value ().size_bytes ();
109- if (
110- !ret.empty () && batch_size + materialized_bytes > _config.max_bytes ) {
111- // The batch will cause over the limit.
112- // We want to accept the oversized batch if the res is empty to
113- // avoid stalling the reader.
109+ auto batch_size = batch.value ().size_bytes ();
110+ if (is_over_limit (batch_size)) {
114111 break ;
115112 }
116113 vassert (
@@ -122,8 +119,7 @@ level_zero_log_reader_impl::maybe_load_slices_from_cache() {
122119 batch->last_offset (),
123120 current);
124121 ret.push_back (std::move (batch.value ()));
125- materialized_bytes += batch_size;
126- // Invariant: it's guaranteed that the 'ret' is not empty.
122+ _config.bytes_consumed += batch_size;
127123 current = model::offset_cast (
128124 model::next_offset (ret.back ().last_offset ()));
129125 }
0 commit comments