@@ -447,6 +447,11 @@ impl CompactionServiceImpl {
447447 Ok ( ( ) )
448448 }
449449}
450+
451+ /// The batch size used by CompactionServiceImpl::compact. Based on MAX_BATCH_ROWS=4096 of the
452+ /// pre-DF-upgrade's MergeSortExec, but even smaller to be farther from potential i32 overflow.
453+ const COMPACT_BATCH_SIZE : usize = 2048 ;
454+
450455#[ async_trait]
451456impl CompactionService for CompactionServiceImpl {
452457 async fn compact (
@@ -600,9 +605,12 @@ impl CompactionService for CompactionServiceImpl {
600605 }
601606 }
602607
608+ // We use COMPACT_BATCH_SIZE instead of ROW_GROUP_SIZE for this write, to avoid i32 Utf8 arrow
609+ // array offset overflow in some (unusual) cases.
610+ // TODO: Simply lowering the size is not great.
603611 let store = ParquetTableStore :: new (
604612 index. get_row ( ) . clone ( ) ,
605- ROW_GROUP_SIZE ,
613+ COMPACT_BATCH_SIZE ,
606614 self . metadata_cache_factory . clone ( ) ,
607615 ) ;
608616 let old_partition_remote = match & new_chunk {
@@ -678,13 +686,12 @@ impl CompactionService for CompactionServiceImpl {
678686 . metadata_cache_factory
679687 . cache_factory ( )
680688 . make_session_config ( ) ;
681- const MAX_BATCH_ROWS : usize = 2048 ;
682689 // Set batch size to 2048 to avoid overflow in case where, perhaps, we might get repeated
683690 // large string values, such that the default value, 8192, could produce an array too big
684691 // for i32 string array offsets in a SortPreservingMergeExecStream that is constructed in
685692 // `merge_chunks`. In pre-DF-upgrade Cubestore, MergeSortExec used a local variable,
686693 // MAX_BATCH_ROWS = 4096, which might be small enough.
687- let session_config = session_config. with_batch_size ( MAX_BATCH_ROWS ) ;
694+ let session_config = session_config. with_batch_size ( COMPACT_BATCH_SIZE ) ;
688695
689696 // Merge and write rows.
690697 let schema = Arc :: new ( arrow_schema ( index. get_row ( ) ) ) ;
@@ -1302,15 +1309,15 @@ async fn write_to_files_impl(
13021309) -> Result < ( ) , CubeError > {
13031310 let schema = Arc :: new ( store. arrow_schema ( ) ) ;
13041311 let writer_props = store. writer_props ( table) . await ?;
1305- let mut writers = files. into_iter ( ) . map ( move |f| -> Result < _ , CubeError > {
1312+ let mut writers = files. clone ( ) . into_iter ( ) . map ( move |f| -> Result < _ , CubeError > {
13061313 Ok ( ArrowWriter :: try_new (
13071314 File :: create ( f) ?,
13081315 schema. clone ( ) ,
13091316 Some ( writer_props. clone ( ) ) ,
13101317 ) ?)
13111318 } ) ;
13121319
1313- let ( write_tx, mut write_rx) = tokio:: sync:: mpsc:: channel ( 1 ) ;
1320+ let ( write_tx, mut write_rx) = tokio:: sync:: mpsc:: channel :: < ( usize , RecordBatch ) > ( 1 ) ;
13141321 let io_job = cube_ext:: spawn_blocking ( move || -> Result < _ , CubeError > {
13151322 let mut writer = writers. next ( ) . transpose ( ) ?. unwrap ( ) ;
13161323 let mut current_writer_i = 0 ;
@@ -1330,27 +1337,57 @@ async fn write_to_files_impl(
13301337 Ok ( ( ) )
13311338 } ) ;
13321339
1333- let mut writer_i = 0 ;
1334- let mut process_row_group = move |b : RecordBatch | -> Result < _ , CubeError > {
1340+ let mut writer_i: usize = 0 ;
1341+ let mut process_row_group = move |b : RecordBatch | -> ( ( usize , RecordBatch ) , Option < RecordBatch > ) {
13351342 match pick_writer ( & b) {
1336- WriteBatchTo :: Current => Ok ( ( ( writer_i, b) , None ) ) ,
1343+ WriteBatchTo :: Current => ( ( writer_i, b) , None ) ,
13371344 WriteBatchTo :: Next {
13381345 rows_for_current : n,
13391346 } => {
13401347 let current_writer = writer_i;
13411348 writer_i += 1 ; // Next iteration will write into the next file.
1342- Ok ( (
1349+ (
13431350 ( current_writer, b. slice ( 0 , n) ) ,
13441351 Some ( b. slice ( n, b. num_rows ( ) - n) ) ,
1345- ) )
1352+ )
13461353 }
13471354 }
13481355 } ;
1349- let err = redistribute ( records, ROW_GROUP_SIZE , move |b| {
1356+ let err = redistribute ( records, store. row_group_size ( ) , move |b| {
1357+ // See if we get an array using more than 512 MB and log it. With COMPACT_BATCH_SIZE=2048,
1358+ // this means a default batch size of 8192 might, or our row group size of 16384 really might,
1359+ // get i32 offset overflow when used in an Arrow array.
1360+
1361+ // First figure out what to log. (Normally we don't allocate or log anything.)
1362+ let mut loggable_overlongs = Vec :: new ( ) ;
1363+ {
1364+ for ( column, field) in b. columns ( ) . iter ( ) . zip ( b. schema_ref ( ) . fields ( ) . iter ( ) ) {
1365+ let memory_size = column. get_buffer_memory_size ( ) ;
1366+ if memory_size > 512 * 1024 * 1024 {
1367+ loggable_overlongs. push ( ( field. name ( ) . clone ( ) , memory_size, column. len ( ) ) )
1368+ }
1369+ }
1370+ }
1371+
13501372 let r = process_row_group ( b) ;
1373+
1374+ // Then, now that we know what file names the rows would be written into, log anything we need to log.
1375+ for ( column_name, memory_size, length) in loggable_overlongs {
1376+ // *out of bounds write index* provably can't happen (if pick_writer has nothing wrong with it) but let's not make logging break things.
1377+ let oob = "*out of bounds write index*" ;
1378+ match r {
1379+ ( ( write_i, _) , None ) => {
1380+ log:: warn!( "Column {} has large memory size {} with length = {}, writing to file '#{}'" , column_name, memory_size, length, files. get( write_i) . map( String :: as_str) . unwrap_or( oob) ) ;
1381+ } ,
1382+ ( ( write_i, _) , Some ( _) ) => {
1383+ log:: warn!( "Column {} has large memory size {} with length = {}, writing across file '#{}' and '#{}'" , column_name, memory_size, length, files. get( write_i) . map( String :: as_str) . unwrap_or( oob) , files. get( write_i + 1 ) . map( String :: as_str) . unwrap_or( oob) ) ;
1384+ }
1385+ }
1386+ }
1387+
13511388 let write_tx = write_tx. clone ( ) ;
13521389 async move {
1353- let ( to_write, to_return) = r? ;
1390+ let ( to_write, to_return) = r;
13541391 write_tx. send ( to_write) . await ?;
13551392 return Ok ( to_return) ;
13561393 }
0 commit comments