Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion crates/fluss/src/client/table/log_fetch_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::RecordBatch;
use parking_lot::Mutex;

use crate::error::Result;
use crate::metadata::TableBucket;
use crate::record::{
LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext, ScanRecord,
};
use parking_lot::Mutex;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
Expand All @@ -31,6 +33,7 @@ use tokio::sync::Notify;
pub trait CompletedFetch: Send + Sync {
fn table_bucket(&self) -> &TableBucket;
fn fetch_records(&mut self, max_records: usize) -> Result<Vec<ScanRecord>>;
fn fetch_batches(&mut self, max_batches: usize) -> Result<Vec<RecordBatch>>;
fn is_consumed(&self) -> bool;
fn drain(&mut self);
fn size_in_bytes(&self) -> usize;
Expand Down Expand Up @@ -318,6 +321,27 @@ impl DefaultCompletedFetch {
}
}
}

/// Get the next batch directly without row iteration
fn next_fetched_batch(&mut self) -> Result<Option<RecordBatch>> {
loop {
let Some(log_batch) = self.log_record_batch.next() else {
self.drain();
return Ok(None);
};

let record_batch = log_batch.record_batch(&self.read_context)?;

// Skip empty batches
if record_batch.num_rows() == 0 {
continue;
}

self.next_fetch_offset = log_batch.next_log_offset();
self.records_read += record_batch.num_rows();
return Ok(Some(record_batch));
}
}
}

impl CompletedFetch for DefaultCompletedFetch {
Expand Down Expand Up @@ -346,6 +370,23 @@ impl CompletedFetch for DefaultCompletedFetch {
Ok(scan_records)
}

fn fetch_batches(&mut self, max_batches: usize) -> Result<Vec<RecordBatch>> {
if self.consumed {
return Ok(Vec::new());
}

let mut batches = Vec::with_capacity(max_batches.min(16));

for _ in 0..max_batches {
match self.next_fetched_batch()? {
Some(batch) => batches.push(batch),
None => break,
}
}

Ok(batches)
}

fn is_consumed(&self) -> bool {
self.consumed
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/client/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mod scanner;
mod writer;

pub use append::{AppendWriter, TableAppend};
pub use scanner::{LogScanner, TableScan};
pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};

#[allow(dead_code)]
pub struct FlussTable<'a> {
Expand Down
236 changes: 221 additions & 15 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::RecordBatch;
use arrow_schema::SchemaRef;
use log::{debug, error, warn};
use parking_lot::{Mutex, RwLock};
use std::collections::{HashMap, HashSet};
use std::slice::from_ref;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;

use crate::client::connection::FlussConnection;
use crate::client::credentials::CredentialsCache;
use crate::client::metadata::Metadata;
Expand All @@ -30,14 +40,6 @@ use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTabl
use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, to_arrow_schema};
use crate::rpc::{RpcClient, message};
use crate::util::FairBucketStatusMap;
use arrow_schema::SchemaRef;
use log::{debug, error, warn};
use parking_lot::{Mutex, RwLock};
use std::collections::{HashMap, HashSet};
use std::slice::from_ref;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;

const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024;
#[allow(dead_code)]
Expand Down Expand Up @@ -134,25 +136,57 @@ impl<'a> TableScan<'a> {
}

pub fn create_log_scanner(self) -> Result<LogScanner> {
LogScanner::new(
let inner = LogScannerInner::new(
&self.table_info,
self.metadata.clone(),
self.conn.get_connections(),
self.projected_fields,
)
)?;
Ok(LogScanner {
inner: Arc::new(inner),
})
}

pub fn create_record_batch_log_scanner(self) -> Result<RecordBatchLogScanner> {
let inner = LogScannerInner::new(
&self.table_info,
self.metadata.clone(),
self.conn.get_connections(),
self.projected_fields,
)?;
Ok(RecordBatchLogScanner {
inner: Arc::new(inner),
})
}
}

/// Scanner for reading log records one at a time with per-record metadata.
///
/// Use this scanner when you need access to individual record offsets and timestamps.
/// For batch-level access, use [`RecordBatchLogScanner`] instead.
pub struct LogScanner {
inner: Arc<LogScannerInner>,
}

/// Scanner for reading log data as Arrow RecordBatches.
///
/// More efficient than [`LogScanner`] for batch-level analytics where per-record
/// metadata (offsets, timestamps) is not needed.
pub struct RecordBatchLogScanner {
inner: Arc<LogScannerInner>,
}

/// Private shared implementation for both scanner types
struct LogScannerInner {
table_path: TablePath,
table_id: i64,
metadata: Arc<Metadata>,
log_scanner_status: Arc<LogScannerStatus>,
log_fetcher: LogFetcher,
}

impl LogScanner {
pub fn new(
impl LogScannerInner {
fn new(
table_info: &TableInfo,
metadata: Arc<Metadata>,
connections: Arc<RpcClient>,
Expand All @@ -174,7 +208,7 @@ impl LogScanner {
})
}

pub async fn poll(&self, timeout: Duration) -> Result<ScanRecords> {
async fn poll_records(&self, timeout: Duration) -> Result<ScanRecords> {
let start = std::time::Instant::now();
let deadline = start + timeout;

Expand Down Expand Up @@ -213,7 +247,7 @@ impl LogScanner {
}
}

pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
let table_bucket = TableBucket::new(self.table_id, bucket);
self.metadata
.check_and_update_table_metadata(from_ref(&self.table_path))
Expand All @@ -223,7 +257,7 @@ impl LogScanner {
Ok(())
}

pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
self.metadata
.check_and_update_table_metadata(from_ref(&self.table_path))
.await?;
Expand Down Expand Up @@ -257,6 +291,76 @@ impl LogScanner {
// Collect completed fetches from buffer
self.log_fetcher.collect_fetches()
}

async fn poll_batches(&self, timeout: Duration) -> Result<Vec<RecordBatch>> {
let start = std::time::Instant::now();
let deadline = start + timeout;

loop {
let batches = self.poll_for_batches().await?;

if !batches.is_empty() {
self.log_fetcher.send_fetches().await?;
return Ok(batches);
}

let now = std::time::Instant::now();
if now >= deadline {
return Ok(Vec::new());
}

let remaining = deadline - now;
let has_data = self
.log_fetcher
.log_fetch_buffer
.await_not_empty(remaining)
.await;

if !has_data {
return Ok(Vec::new());
}
}
}

async fn poll_for_batches(&self) -> Result<Vec<RecordBatch>> {
let result = self.log_fetcher.collect_batches()?;
if !result.is_empty() {
return Ok(result);
}

self.log_fetcher.send_fetches().await?;
self.log_fetcher.collect_batches()
}
}

// Implementation for LogScanner (records mode)
impl LogScanner {
pub async fn poll(&self, timeout: Duration) -> Result<ScanRecords> {
self.inner.poll_records(timeout).await
}

pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
self.inner.subscribe(bucket, offset).await
}

pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
self.inner.subscribe_batch(bucket_offsets).await
}
}

// Implementation for RecordBatchLogScanner (batches mode)
impl RecordBatchLogScanner {
pub async fn poll(&self, timeout: Duration) -> Result<Vec<RecordBatch>> {
self.inner.poll_batches(timeout).await
}

pub async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
self.inner.subscribe(bucket, offset).await
}

pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
self.inner.subscribe_batch(bucket_offsets).await
}
}

struct LogFetcher {
Expand Down Expand Up @@ -719,6 +823,108 @@ impl LogFetcher {
}
}

/// Collect completed fetches as RecordBatches
fn collect_batches(&self) -> Result<Vec<RecordBatch>> {
// Limit memory usage with both batch count and byte size constraints.
// Max 100 batches per poll, but also check total bytes (soft cap ~64MB).
const MAX_BATCHES: usize = 100;
const MAX_BYTES: usize = 64 * 1024 * 1024; // 64MB soft cap
let mut result: Vec<RecordBatch> = Vec::new();
let mut batches_remaining = MAX_BATCHES;
let mut bytes_consumed: usize = 0;

while batches_remaining > 0 && bytes_consumed < MAX_BYTES {
let next_in_line = self.log_fetch_buffer.next_in_line_fetch();

match next_in_line {
Some(mut next_fetch) if !next_fetch.is_consumed() => {
let batches =
self.fetch_batches_from_fetch(&mut next_fetch, batches_remaining)?;
let batch_count = batches.len();

if !batches.is_empty() {
// Track bytes consumed (soft cap - may exceed by one fetch)
let batch_bytes: usize =
batches.iter().map(|b| b.get_array_memory_size()).sum();
bytes_consumed += batch_bytes;

result.extend(batches);
batches_remaining = batches_remaining.saturating_sub(batch_count);
}

if !next_fetch.is_consumed() {
self.log_fetch_buffer
.set_next_in_line_fetch(Some(next_fetch));
}
}
_ => {
if let Some(completed_fetch) = self.log_fetch_buffer.poll() {
if !completed_fetch.is_initialized() {
let size_in_bytes = completed_fetch.size_in_bytes();
match self.initialize_fetch(completed_fetch) {
Ok(initialized) => {
self.log_fetch_buffer.set_next_in_line_fetch(initialized);
continue;
}
Err(e) => {
if result.is_empty() && size_in_bytes == 0 {
continue;
}
return Err(e);
}
}
} else {
self.log_fetch_buffer
.set_next_in_line_fetch(Some(completed_fetch));
}
} else {
break;
}
}
}
}

Ok(result)
}

fn fetch_batches_from_fetch(
&self,
next_in_line_fetch: &mut Box<dyn CompletedFetch>,
max_batches: usize,
) -> Result<Vec<RecordBatch>> {
let table_bucket = next_in_line_fetch.table_bucket().clone();
let current_offset = self.log_scanner_status.get_bucket_offset(&table_bucket);

if current_offset.is_none() {
warn!(
"Ignoring fetched batches for {table_bucket:?} since the bucket has been unsubscribed"
);
next_in_line_fetch.drain();
return Ok(Vec::new());
}

let current_offset = current_offset.unwrap();
let fetch_offset = next_in_line_fetch.next_fetch_offset();

if fetch_offset == current_offset {
let batches = next_in_line_fetch.fetch_batches(max_batches)?;
let next_fetch_offset = next_in_line_fetch.next_fetch_offset();

if next_fetch_offset > current_offset {
self.log_scanner_status
.update_offset(&table_bucket, next_fetch_offset);
}

Ok(batches)
} else {
warn!(
"Ignoring fetched batches for {table_bucket:?} at offset {fetch_offset} since the current offset is {current_offset}"
);
next_in_line_fetch.drain();
Ok(Vec::new())
}
}

async fn prepare_fetch_log_requests(&self) -> HashMap<i32, FetchLogRequest> {
let mut fetch_log_req_for_buckets = HashMap::new();
let mut table_id = None;
Expand Down
Loading
Loading