diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index cee104e0..e9bac53f 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::RecordBatch; +use parking_lot::Mutex; + use crate::error::Result; use crate::metadata::TableBucket; use crate::record::{ LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext, ScanRecord, }; -use parking_lot::Mutex; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -31,6 +33,7 @@ use tokio::sync::Notify; pub trait CompletedFetch: Send + Sync { fn table_bucket(&self) -> &TableBucket; fn fetch_records(&mut self, max_records: usize) -> Result>; + fn fetch_batches(&mut self, max_batches: usize) -> Result>; fn is_consumed(&self) -> bool; fn drain(&mut self); fn size_in_bytes(&self) -> usize; @@ -318,6 +321,38 @@ impl DefaultCompletedFetch { } } } + + /// Get the next batch directly without row iteration + fn next_fetched_batch(&mut self) -> Result> { + loop { + let Some(log_batch) = self.log_record_batch.next() else { + self.drain(); + return Ok(None); + }; + + let mut record_batch = log_batch.record_batch(&self.read_context)?; + + // Skip empty batches + if record_batch.num_rows() == 0 { + continue; + } + + // Truncate batch + let base_offset = log_batch.base_log_offset(); + if self.next_fetch_offset > base_offset { + let skip_count = (self.next_fetch_offset - base_offset) as usize; + if skip_count >= record_batch.num_rows() { + continue; + } + // Slice the batch to skip the first skip_count rows + record_batch = record_batch.slice(skip_count, record_batch.num_rows() - skip_count); + } + + self.next_fetch_offset = log_batch.next_log_offset(); + self.records_read += record_batch.num_rows(); + return Ok(Some(record_batch)); + } + } } impl CompletedFetch for DefaultCompletedFetch { @@ -346,6 +381,23 @@ impl CompletedFetch for DefaultCompletedFetch { Ok(scan_records) } + fn fetch_batches(&mut self, max_batches: usize) -> Result> { + if self.consumed { + return Ok(Vec::new()); + } + + let mut batches = Vec::with_capacity(max_batches.min(16)); + + for _ in 0..max_batches { + match self.next_fetched_batch()? { + Some(batch) => batches.push(batch), + None => break, + } + } + + Ok(batches) + } + fn is_consumed(&self) -> bool { self.consumed } diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index e2cf9e6d..26341d70 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -32,7 +32,7 @@ mod scanner; mod writer; pub use append::{AppendWriter, TableAppend}; -pub use scanner::{LogScanner, TableScan}; +pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan}; #[allow(dead_code)] pub struct FlussTable<'a> { diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 0acaac89..a1f9307c 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -15,6 +15,16 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::RecordBatch; +use arrow_schema::SchemaRef; +use log::{debug, error, warn}; +use parking_lot::{Mutex, RwLock}; +use std::collections::{HashMap, HashSet}; +use std::slice::from_ref; +use std::sync::Arc; +use std::time::Duration; +use tempfile::TempDir; + use crate::client::connection::FlussConnection; use crate::client::credentials::CredentialsCache; use crate::client::metadata::Metadata; @@ -30,14 +40,6 @@ use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTabl use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, to_arrow_schema}; use crate::rpc::{RpcClient, message}; use crate::util::FairBucketStatusMap; -use arrow_schema::SchemaRef; -use log::{debug, error, warn}; -use parking_lot::{Mutex, RwLock}; -use std::collections::{HashMap, HashSet}; -use std::slice::from_ref; -use std::sync::Arc; -use std::time::Duration; -use tempfile::TempDir; const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024; #[allow(dead_code)] @@ -134,16 +136,48 @@ impl<'a> TableScan<'a> { } pub fn create_log_scanner(self) -> Result { - LogScanner::new( + let inner = LogScannerInner::new( &self.table_info, self.metadata.clone(), self.conn.get_connections(), self.projected_fields, - ) + )?; + Ok(LogScanner { + inner: Arc::new(inner), + }) + } + + pub fn create_record_batch_log_scanner(self) -> Result { + let inner = LogScannerInner::new( + &self.table_info, + self.metadata.clone(), + self.conn.get_connections(), + self.projected_fields, + )?; + Ok(RecordBatchLogScanner { + inner: Arc::new(inner), + }) } } +/// Scanner for reading log records one at a time with per-record metadata. +/// +/// Use this scanner when you need access to individual record offsets and timestamps. +/// For batch-level access, use [`RecordBatchLogScanner`] instead. pub struct LogScanner { + inner: Arc, +} + +/// Scanner for reading log data as Arrow RecordBatches. +/// +/// More efficient than [`LogScanner`] for batch-level analytics where per-record +/// metadata (offsets, timestamps) is not needed. +pub struct RecordBatchLogScanner { + inner: Arc, +} + +/// Private shared implementation for both scanner types +struct LogScannerInner { table_path: TablePath, table_id: i64, metadata: Arc, @@ -151,8 +185,8 @@ pub struct LogScanner { log_fetcher: LogFetcher, } -impl LogScanner { - pub fn new( +impl LogScannerInner { + fn new( table_info: &TableInfo, metadata: Arc, connections: Arc, @@ -174,7 +208,7 @@ impl LogScanner { }) } - pub async fn poll(&self, timeout: Duration) -> Result { + async fn poll_records(&self, timeout: Duration) -> Result { let start = std::time::Instant::now(); let deadline = start + timeout; @@ -213,7 +247,7 @@ impl LogScanner { } } - pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> { + async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> { let table_bucket = TableBucket::new(self.table_id, bucket); self.metadata .check_and_update_table_metadata(from_ref(&self.table_path)) @@ -223,7 +257,7 @@ impl LogScanner { Ok(()) } - pub async fn subscribe_batch(&self, bucket_offsets: &HashMap) -> Result<()> { + async fn subscribe_batch(&self, bucket_offsets: &HashMap) -> Result<()> { self.metadata .check_and_update_table_metadata(from_ref(&self.table_path)) .await?; @@ -257,6 +291,76 @@ impl LogScanner { // Collect completed fetches from buffer self.log_fetcher.collect_fetches() } + + async fn poll_batches(&self, timeout: Duration) -> Result> { + let start = std::time::Instant::now(); + let deadline = start + timeout; + + loop { + let batches = self.poll_for_batches().await?; + + if !batches.is_empty() { + self.log_fetcher.send_fetches().await?; + return Ok(batches); + } + + let now = std::time::Instant::now(); + if now >= deadline { + return Ok(Vec::new()); + } + + let remaining = deadline - now; + let has_data = self + .log_fetcher + .log_fetch_buffer + .await_not_empty(remaining) + .await; + + if !has_data { + return Ok(Vec::new()); + } + } + } + + async fn poll_for_batches(&self) -> Result> { + let result = self.log_fetcher.collect_batches()?; + if !result.is_empty() { + return Ok(result); + } + + self.log_fetcher.send_fetches().await?; + self.log_fetcher.collect_batches() + } +} + +// Implementation for LogScanner (records mode) +impl LogScanner { + pub async fn poll(&self, timeout: Duration) -> Result { + self.inner.poll_records(timeout).await + } + + pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> { + self.inner.subscribe(bucket, offset).await + } + + pub async fn subscribe_batch(&self, bucket_offsets: &HashMap) -> Result<()> { + self.inner.subscribe_batch(bucket_offsets).await + } +} + +// Implementation for RecordBatchLogScanner (batches mode) +impl RecordBatchLogScanner { + pub async fn poll(&self, timeout: Duration) -> Result> { + self.inner.poll_batches(timeout).await + } + + pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> { + self.inner.subscribe(bucket, offset).await + } + + pub async fn subscribe_batch(&self, bucket_offsets: &HashMap) -> Result<()> { + self.inner.subscribe_batch(bucket_offsets).await + } } struct LogFetcher { @@ -719,6 +823,108 @@ impl LogFetcher { } } + /// Collect completed fetches as RecordBatches + fn collect_batches(&self) -> Result> { + // Limit memory usage with both batch count and byte size constraints. + // Max 100 batches per poll, but also check total bytes (soft cap ~64MB). + const MAX_BATCHES: usize = 100; + const MAX_BYTES: usize = 64 * 1024 * 1024; // 64MB soft cap + let mut result: Vec = Vec::new(); + let mut batches_remaining = MAX_BATCHES; + let mut bytes_consumed: usize = 0; + + while batches_remaining > 0 && bytes_consumed < MAX_BYTES { + let next_in_line = self.log_fetch_buffer.next_in_line_fetch(); + + match next_in_line { + Some(mut next_fetch) if !next_fetch.is_consumed() => { + let batches = + self.fetch_batches_from_fetch(&mut next_fetch, batches_remaining)?; + let batch_count = batches.len(); + + if !batches.is_empty() { + // Track bytes consumed (soft cap - may exceed by one fetch) + let batch_bytes: usize = + batches.iter().map(|b| b.get_array_memory_size()).sum(); + bytes_consumed += batch_bytes; + + result.extend(batches); + batches_remaining = batches_remaining.saturating_sub(batch_count); + } + + if !next_fetch.is_consumed() { + self.log_fetch_buffer + .set_next_in_line_fetch(Some(next_fetch)); + } + } + _ => { + if let Some(completed_fetch) = self.log_fetch_buffer.poll() { + if !completed_fetch.is_initialized() { + let size_in_bytes = completed_fetch.size_in_bytes(); + match self.initialize_fetch(completed_fetch) { + Ok(initialized) => { + self.log_fetch_buffer.set_next_in_line_fetch(initialized); + continue; + } + Err(e) => { + if result.is_empty() && size_in_bytes == 0 { + continue; + } + return Err(e); + } + } + } else { + self.log_fetch_buffer + .set_next_in_line_fetch(Some(completed_fetch)); + } + } else { + break; + } + } + } + } + + Ok(result) + } + + fn fetch_batches_from_fetch( + &self, + next_in_line_fetch: &mut Box, + max_batches: usize, + ) -> Result> { + let table_bucket = next_in_line_fetch.table_bucket().clone(); + let current_offset = self.log_scanner_status.get_bucket_offset(&table_bucket); + + if current_offset.is_none() { + warn!( + "Ignoring fetched batches for {table_bucket:?} since the bucket has been unsubscribed" + ); + next_in_line_fetch.drain(); + return Ok(Vec::new()); + } + + let current_offset = current_offset.unwrap(); + let fetch_offset = next_in_line_fetch.next_fetch_offset(); + + if fetch_offset == current_offset { + let batches = next_in_line_fetch.fetch_batches(max_batches)?; + let next_fetch_offset = next_in_line_fetch.next_fetch_offset(); + + if next_fetch_offset > current_offset { + self.log_scanner_status + .update_offset(&table_bucket, next_fetch_offset); + } + + Ok(batches) + } else { + warn!( + "Ignoring fetched batches for {table_bucket:?} at offset {fetch_offset} since the current offset is {current_offset}" + ); + next_in_line_fetch.drain(); + Ok(Vec::new()) + } + } + async fn prepare_fetch_log_requests(&self) -> HashMap { let mut fetch_log_req_for_buckets = HashMap::new(); let mut table_id = None; diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 5a5115ed..89fb7b9c 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -546,6 +546,28 @@ impl LogRecordBatch { }; Ok(log_record_iterator) } + + /// Returns the record batch directly without creating an iterator. + /// This is more efficient when you need the entire batch rather than + /// iterating row-by-row. + pub fn record_batch(&self, read_context: &ReadContext) -> Result { + if self.record_count() == 0 { + // Return empty batch with correct schema + return Ok(RecordBatch::new_empty(read_context.target_schema.clone())); + } + + let data = self.data.get(RECORDS_OFFSET..).ok_or_else(|| { + crate::error::Error::UnexpectedError { + message: format!( + "Corrupt log record batch: data length {} is less than RECORDS_OFFSET {}", + self.data.len(), + RECORDS_OFFSET + ), + source: None, + } + })?; + read_context.record_batch(data) + } } /// Parse an Arrow IPC message from a byte slice. diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index 0ac34c76..4cba4699 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -469,4 +469,137 @@ mod table_test { records.sort_by_key(|r| r.offset()); records } + + #[tokio::test] + async fn test_poll_batches() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new("fluss".to_string(), "test_poll_batches".to_string()); + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .build() + .unwrap(); + + create_table( + &admin, + &table_path, + &TableDescriptor::builder().schema(schema).build().unwrap(), + ) + .await; + tokio::time::sleep(Duration::from_secs(1)).await; + + let table = connection.get_table(&table_path).await.unwrap(); + let scanner = table.new_scan().create_record_batch_log_scanner().unwrap(); + scanner.subscribe(0, 0).await.unwrap(); + + // Test 1: Empty table should return empty result + assert!( + scanner + .poll(Duration::from_millis(500)) + .await + .unwrap() + .is_empty() + ); + + let writer = table.new_append().unwrap().create_writer(); + writer + .append_arrow_batch( + record_batch!(("id", Int32, [1, 2]), ("name", Utf8, ["a", "b"])).unwrap(), + ) + .await + .unwrap(); + writer + .append_arrow_batch( + record_batch!(("id", Int32, [3, 4]), ("name", Utf8, ["c", "d"])).unwrap(), + ) + .await + .unwrap(); + writer + .append_arrow_batch( + record_batch!(("id", Int32, [5, 6]), ("name", Utf8, ["e", "f"])).unwrap(), + ) + .await + .unwrap(); + writer.flush().await.unwrap(); + + use arrow::array::Int32Array; + let batches = scanner.poll(Duration::from_secs(10)).await.unwrap(); + let mut all_ids: Vec = batches + .iter() + .flat_map(|b| { + (0..b.num_rows()).map(|i| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(i) + }) + }) + .collect(); + + // Test 2: Order should be preserved across multiple batches + assert_eq!(all_ids, vec![1, 2, 3, 4, 5, 6]); + + writer + .append_arrow_batch( + record_batch!(("id", Int32, [7, 8]), ("name", Utf8, ["g", "h"])).unwrap(), + ) + .await + .unwrap(); + writer.flush().await.unwrap(); + + let more = scanner.poll(Duration::from_secs(10)).await.unwrap(); + let new_ids: Vec = more + .iter() + .flat_map(|b| { + (0..b.num_rows()).map(|i| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(i) + }) + }) + .collect(); + + // Test 3: Subsequent polls should not return duplicate data (offset continuation) + assert_eq!(new_ids, vec![7, 8]); + + // Test 4: Subscribing from mid-offset should truncate batch (Arrow batch slicing) + // Server returns all records from start of batch, but client truncates to subscription offset + let trunc_scanner = table.new_scan().create_record_batch_log_scanner().unwrap(); + trunc_scanner.subscribe(0, 3).await.unwrap(); + let trunc_batches = trunc_scanner.poll(Duration::from_secs(10)).await.unwrap(); + let trunc_ids: Vec = trunc_batches + .iter() + .flat_map(|b| { + (0..b.num_rows()).map(|i| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(i) + }) + }) + .collect(); + + // Subscribing from offset 3 should return [4,5,6,7,8], not [1,2,3,4,5,6,7,8] + assert_eq!(trunc_ids, vec![4, 5, 6, 7, 8]); + + // Test 5: Projection should only return requested columns + let proj = table + .new_scan() + .project_by_name(&["id"]) + .unwrap() + .create_record_batch_log_scanner() + .unwrap(); + proj.subscribe(0, 0).await.unwrap(); + let proj_batches = proj.poll(Duration::from_secs(10)).await.unwrap(); + + // Projected batch should have 1 column (id), not 2 (id, name) + assert_eq!(proj_batches[0].num_columns(), 1); + } }