diff --git a/crates/fluss/src/client/write/accumulator.rs b/crates/fluss/src/client/write/accumulator.rs index 215adbe6..beae0caa 100644 --- a/crates/fluss/src/client/write/accumulator.rs +++ b/crates/fluss/src/client/write/accumulator.rs @@ -299,34 +299,36 @@ impl RecordAccumulator { .batches .get(&table_bucket.bucket_id()) { - let mut batch = { + let mut maybe_batch = None; + { let mut batch_lock = deque.lock().await; - if batch_lock.is_empty() { - continue; + if !batch_lock.is_empty() { + let first_batch = batch_lock.front().unwrap(); + + if size + first_batch.estimated_size_in_bytes() > max_size as i64 + && !ready.is_empty() + { + // there is a rare case that a single batch size is larger than the request size + // due to compression; in this case we will still eventually send this batch in + // a single request. + break; + } + + maybe_batch = Some(batch_lock.pop_front().unwrap()); } - let first_batch = batch_lock.front().unwrap(); - - if size + first_batch.estimated_size_in_bytes() > max_size as i64 - && !ready.is_empty() - { - // there is a rare case that a single batch size is larger than the request size - // due to compression; in this case we will still eventually send this batch in - // a single request. - break; - } - - batch_lock.pop_front().unwrap() - }; + } - let current_batch_size = batch.estimated_size_in_bytes(); - size += current_batch_size; + if let Some(mut batch) = maybe_batch { + let current_batch_size = batch.estimated_size_in_bytes(); + size += current_batch_size; - // mark the batch as drained. - batch.drained(current_time_ms()); - ready.push(Arc::new(ReadyWriteBatch { - table_bucket, - write_batch: batch, - })); + // mark the batch as drained. + batch.drained(current_time_ms()); + ready.push(Arc::new(ReadyWriteBatch { + table_bucket, + write_batch: batch, + })); + } } } if current_index == start {