@@ -14,6 +14,14 @@ use tracing::{debug, error, info, instrument, warn};
1414// 20% overhead accounts for DashMap internal structures, RwLock wrappers,
1515// Arc<Schema> refs, and Arrow buffer alignment padding
1616const MEMORY_OVERHEAD_MULTIPLIER : f64 = 1.2 ;
17+ /// Hard limit multiplier (120%) provides headroom for in-flight writes while preventing OOM
18+ const HARD_LIMIT_MULTIPLIER : usize = 5 ; // max_bytes + max_bytes/5 = 120%
19+ /// Maximum CAS retry attempts before failing
20+ const MAX_CAS_RETRIES : u32 = 100 ;
21+ /// Base backoff delay in microseconds for CAS retries
22+ const CAS_BACKOFF_BASE_MICROS : u64 = 1 ;
23+ /// Maximum backoff exponent (caps delay at ~1ms)
24+ const CAS_BACKOFF_MAX_EXPONENT : u32 = 10 ;
1725
1826#[ derive( Debug , Default ) ]
1927pub struct RecoveryStats {
@@ -25,6 +33,13 @@ pub struct RecoveryStats {
2533 pub corrupted_entries_skipped : u64 ,
2634}
2735
36+ #[ derive( Debug , Default ) ]
37+ pub struct FlushStats {
38+ pub buckets_flushed : u64 ,
39+ pub buckets_failed : u64 ,
40+ pub total_rows : u64 ,
41+ }
42+
2843/// Callback for writing batches to Delta Lake. The callback MUST:
2944/// - Complete the Delta commit (including S3 upload) before returning Ok
3045/// - Return Err if the commit fails for any reason
@@ -93,16 +108,15 @@ impl BufferedWriteLayer {
93108
94109 /// Try to reserve memory atomically before a write.
95110 /// Returns estimated batch size on success, or error if hard limit exceeded.
96- /// Callers MUST implement retry logic - hard failures may cause data loss .
111+ /// Uses exponential backoff to reduce CPU thrashing under contention .
97112 fn try_reserve_memory ( & self , batches : & [ RecordBatch ] ) -> anyhow:: Result < usize > {
98113 let batch_size: usize = batches. iter ( ) . map ( estimate_batch_size) . sum ( ) ;
99114 let estimated_size = ( batch_size as f64 * MEMORY_OVERHEAD_MULTIPLIER ) as usize ;
100115
101116 let max_bytes = self . max_memory_bytes ( ) ;
102- // Hard limit at 120% provides headroom for in-flight writes while preventing OOM
103- let hard_limit = max_bytes. saturating_add ( max_bytes / 5 ) ;
117+ let hard_limit = max_bytes. saturating_add ( max_bytes / HARD_LIMIT_MULTIPLIER ) ;
104118
105- for _ in 0 ..100 {
119+ for attempt in 0 ..MAX_CAS_RETRIES {
106120 let current_reserved = self . reserved_bytes . load ( Ordering :: Acquire ) ;
107121 let current_mem = self . mem_buffer . estimated_memory_bytes ( ) ;
108122 let new_total = current_mem + current_reserved + estimated_size;
@@ -123,8 +137,20 @@ impl BufferedWriteLayer {
123137 {
124138 return Ok ( estimated_size) ;
125139 }
140+
141+ // Exponential backoff: spin_loop for first few attempts, then brief sleep.
142+ // Note: Using std::thread::sleep in this sync function called from async context.
143+ // This is acceptable because: (1) max sleep is ~1ms, (2) only under high contention,
144+ // (3) converting to async would require spawn_blocking which adds more overhead.
145+ if attempt < 5 {
146+ std:: hint:: spin_loop ( ) ;
147+ } else {
148+ // Max backoff = 1μs << 10 = 1024μs ≈ 1ms
149+ let backoff_micros = CAS_BACKOFF_BASE_MICROS << attempt. min ( CAS_BACKOFF_MAX_EXPONENT ) ;
150+ std:: thread:: sleep ( std:: time:: Duration :: from_micros ( backoff_micros) ) ;
151+ }
126152 }
127- anyhow:: bail!( "Failed to reserve memory after 100 retries due to contention" )
153+ anyhow:: bail!( "Failed to reserve memory after {} retries due to contention" , MAX_CAS_RETRIES )
128154 }
129155
130156 fn release_reservation ( & self , size : usize ) {
@@ -169,6 +195,12 @@ impl BufferedWriteLayer {
169195 self . release_reservation ( reserved_size) ;
170196
171197 result?;
198+
199+ // Immediate flush mode: flush after every insert
200+ if self . config . buffer . flush_immediately ( ) {
201+ self . flush_all_now ( ) . await ?;
202+ }
203+
172204 debug ! ( "BufferedWriteLayer insert complete: project={}, table={}" , project_id, table_name) ;
173205 Ok ( ( ) )
174206 }
@@ -202,7 +234,7 @@ impl BufferedWriteLayer {
202234
203235 for entry in entries {
204236 match entry. operation {
205- WalOperation :: Insert => match WalManager :: deserialize_batch ( & entry. data ) {
237+ WalOperation :: Insert => match WalManager :: deserialize_batch ( & entry. data , & entry . table_name ) {
206238 Ok ( batch) => {
207239 self . mem_buffer . insert ( & entry. project_id , & entry. table_name , batch, entry. timestamp_micros ) ?;
208240 entries_replayed += 1 ;
@@ -332,7 +364,7 @@ impl BufferedWriteLayer {
332364 return Ok ( ( ) ) ;
333365 }
334366
335- info ! ( "Flushing {} buckets to Delta" , flushable. len( ) ) ;
367+ debug ! ( "Flushing {} buckets to Delta" , flushable. len( ) ) ;
336368
337369 // Flush buckets in parallel with bounded concurrency
338370 let parallelism = self . config . buffer . flush_parallelism ( ) ;
@@ -442,6 +474,35 @@ impl BufferedWriteLayer {
442474 Ok ( ( ) )
443475 }
444476
477+ /// Force flush all buffered data to Delta immediately.
478+ pub async fn flush_all_now ( & self ) -> anyhow:: Result < FlushStats > {
479+ let _flush_guard = self . flush_lock . lock ( ) . await ;
480+ let all_buckets = self . mem_buffer . get_all_buckets ( ) ;
481+ let mut stats = FlushStats {
482+ total_rows : all_buckets. iter ( ) . map ( |b| b. row_count as u64 ) . sum ( ) ,
483+ ..Default :: default ( )
484+ } ;
485+
486+ for bucket in all_buckets {
487+ match self . flush_bucket ( & bucket) . await {
488+ Ok ( ( ) ) => {
489+ self . checkpoint_and_drain ( & bucket) ;
490+ stats. buckets_flushed += 1 ;
491+ }
492+ Err ( e) => {
493+ error ! ( "flush_all_now: failed bucket {}: {}" , bucket. bucket_id, e) ;
494+ stats. buckets_failed += 1 ;
495+ }
496+ }
497+ }
498+ Ok ( stats)
499+ }
500+
501+ /// Check if buffer is empty (all data flushed).
502+ pub fn is_empty ( & self ) -> bool {
503+ self . mem_buffer . get_stats ( ) . total_rows == 0
504+ }
505+
445506 pub fn get_stats ( & self ) -> MemBufferStats {
446507 self . mem_buffer . get_stats ( )
447508 }
@@ -503,8 +564,8 @@ impl BufferedWriteLayer {
503564#[ cfg( test) ]
504565mod tests {
505566 use super :: * ;
506- use arrow :: array :: { Int64Array , StringArray } ;
507- use arrow :: datatypes :: { DataType , Field , Schema } ;
567+ use crate :: test_utils :: test_helpers :: { json_to_batch , test_span } ;
568+ use serial_test :: serial ;
508569 use std:: path:: PathBuf ;
509570 use tempfile:: tempdir;
510571
@@ -514,14 +575,14 @@ mod tests {
514575 Arc :: new ( cfg)
515576 }
516577
517- fn create_test_batch ( ) -> RecordBatch {
518- let schema = Arc :: new ( Schema :: new ( vec ! [
519- Field :: new ( "id" , DataType :: Int64 , false ) ,
520- Field :: new ( "name ", DataType :: Utf8 , false ) ,
521- ] ) ) ;
522- let id_array = Int64Array :: from ( vec ! [ 1 , 2 , 3 ] ) ;
523- let name_array = StringArray :: from ( vec ! [ "a" , "b" , "c" ] ) ;
524- RecordBatch :: try_new ( schema , vec ! [ Arc :: new ( id_array ) , Arc :: new ( name_array ) ] ) . unwrap ( )
578+ fn create_test_batch ( project_id : & str ) -> RecordBatch {
579+ // Use test_span helper which creates data matching the default schema
580+ json_to_batch ( vec ! [
581+ test_span ( "test1 ", "span1" , project_id ) ,
582+ test_span ( "test2" , "span2" , project_id ) ,
583+ test_span ( "test3" , "span3" , project_id ) ,
584+ ] )
585+ . unwrap ( )
525586 }
526587
527588 #[ tokio:: test]
@@ -535,7 +596,7 @@ mod tests {
535596 let table = format ! ( "t{}" , test_id) ;
536597
537598 let layer = BufferedWriteLayer :: with_config ( cfg) . unwrap ( ) ;
538- let batch = create_test_batch ( ) ;
599+ let batch = create_test_batch ( & project ) ;
539600
540601 layer. insert ( & project, & table, vec ! [ batch. clone( ) ] ) . await . unwrap ( ) ;
541602
@@ -544,15 +605,16 @@ mod tests {
544605 assert_eq ! ( results[ 0 ] . num_rows( ) , 3 ) ;
545606 }
546607
547- // NOTE: This test is ignored because walrus-rust creates new files for each instance
548- // rather than discovering existing files from previous instances in the same directory.
549- // This is a limitation of the walrus library, not our code.
550- #[ ignore]
608+ #[ serial]
551609 #[ tokio:: test]
552610 async fn test_recovery ( ) {
553611 let dir = tempdir ( ) . unwrap ( ) ;
554612 let cfg = create_test_config ( dir. path ( ) . to_path_buf ( ) ) ;
555613
614+ // SAFETY: walrus-rust reads WALRUS_DATA_DIR from environment. We use #[serial]
615+ // to prevent concurrent access to this process-global state.
616+ unsafe { std:: env:: set_var ( "WALRUS_DATA_DIR" , & cfg. core . walrus_data_dir ) } ;
617+
556618 // Use unique but short project/table names (walrus has metadata size limit)
557619 let test_id = & uuid:: Uuid :: new_v4 ( ) . to_string ( ) [ ..4 ] ;
558620 let project = format ! ( "r{}" , test_id) ;
@@ -561,10 +623,9 @@ mod tests {
561623 // First instance - write data
562624 {
563625 let layer = BufferedWriteLayer :: with_config ( Arc :: clone ( & cfg) ) . unwrap ( ) ;
564- let batch = create_test_batch ( ) ;
626+ let batch = create_test_batch ( & project ) ;
565627 layer. insert ( & project, & table, vec ! [ batch] ) . await . unwrap ( ) ;
566- // Shutdown to ensure WAL is synced
567- layer. shutdown ( ) . await . unwrap ( ) ;
628+ // Layer drops here - WAL data should be persisted
568629 }
569630
570631 // Second instance - recover from WAL
@@ -591,7 +652,7 @@ mod tests {
591652 let layer = BufferedWriteLayer :: with_config ( cfg) . unwrap ( ) ;
592653
593654 // First insert should succeed
594- let batch = create_test_batch ( ) ;
655+ let batch = create_test_batch ( & project ) ;
595656 layer. insert ( & project, & table, vec ! [ batch] ) . await . unwrap ( ) ;
596657
597658 // Verify reservation is released (should be 0 after successful insert)
0 commit comments