Skip to content

Commit 6962c73

Browse files
feat: log scanner support poll record batch directly
1 parent 4b95fc2 commit 6962c73

File tree

5 files changed

+430
-17
lines changed

5 files changed

+430
-17
lines changed

crates/fluss/src/client/table/log_fetch_buffer.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use arrow::array::RecordBatch;
19+
use parking_lot::Mutex;
20+
1821
use crate::error::Result;
1922
use crate::metadata::TableBucket;
2023
use crate::record::{
2124
LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext, ScanRecord,
2225
};
23-
use parking_lot::Mutex;
2426
use std::collections::{HashMap, VecDeque};
2527
use std::sync::Arc;
2628
use std::sync::atomic::{AtomicBool, Ordering};
@@ -31,6 +33,7 @@ use tokio::sync::Notify;
3133
pub trait CompletedFetch: Send + Sync {
3234
fn table_bucket(&self) -> &TableBucket;
3335
fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>;
36+
fn fetch_batches(&mut self, max_batches: usize) -> Result<Vec<RecordBatch>>;
3437
fn is_consumed(&self) -> bool;
3538
fn drain(&mut self);
3639
fn size_in_bytes(&self) -> usize;
@@ -318,6 +321,38 @@ impl DefaultCompletedFetch {
318321
}
319322
}
320323
}
324+
325+
/// Get the next batch directly without row iteration
326+
fn next_fetched_batch(&mut self) -> Result<Option<RecordBatch>> {
327+
loop {
328+
let Some(log_batch) = self.log_record_batch.next() else {
329+
self.drain();
330+
return Ok(None);
331+
};
332+
333+
let mut record_batch = log_batch.record_batch(&self.read_context)?;
334+
335+
// Skip empty batches
336+
if record_batch.num_rows() == 0 {
337+
continue;
338+
}
339+
340+
// Truncate batch
341+
let base_offset = log_batch.base_log_offset();
342+
if self.next_fetch_offset > base_offset {
343+
let skip_count = (self.next_fetch_offset - base_offset) as usize;
344+
if skip_count >= record_batch.num_rows() {
345+
continue;
346+
}
347+
// Slice the batch to skip the first skip_count rows
348+
record_batch = record_batch.slice(skip_count, record_batch.num_rows() - skip_count);
349+
}
350+
351+
self.next_fetch_offset = log_batch.next_log_offset();
352+
self.records_read += record_batch.num_rows();
353+
return Ok(Some(record_batch));
354+
}
355+
}
321356
}
322357

323358
impl CompletedFetch for DefaultCompletedFetch {
@@ -346,6 +381,23 @@ impl CompletedFetch for DefaultCompletedFetch {
346381
Ok(scan_records)
347382
}
348383

384+
fn fetch_batches(&mut self, max_batches: usize) -> Result<Vec<RecordBatch>> {
385+
if self.consumed {
386+
return Ok(Vec::new());
387+
}
388+
389+
let mut batches = Vec::with_capacity(max_batches.min(16));
390+
391+
for _ in 0..max_batches {
392+
match self.next_fetched_batch()? {
393+
Some(batch) => batches.push(batch),
394+
None => break,
395+
}
396+
}
397+
398+
Ok(batches)
399+
}
400+
349401
fn is_consumed(&self) -> bool {
350402
self.consumed
351403
}

crates/fluss/src/client/table/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ mod scanner;
3232
mod writer;
3333

3434
pub use append::{AppendWriter, TableAppend};
35-
pub use scanner::{LogScanner, TableScan};
35+
pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
3636

3737
#[allow(dead_code)]
3838
pub struct FlussTable<'a> {

0 commit comments

Comments
 (0)