@@ -447,6 +447,11 @@ impl CompactionServiceImpl {
447447 Ok ( ( ) )
448448 }
449449}
450+
451+ /// The batch size used by CompactionServiceImpl::compact. Based on MAX_BATCH_ROWS=4096 of the
452+ /// pre-DF-upgrade's MergeSortExec, but even smaller to be farther from potential i32 overflow.
453+ const COMPACT_BATCH_SIZE : usize = 2048 ;
454+
450455#[ async_trait]
451456impl CompactionService for CompactionServiceImpl {
452457 async fn compact (
@@ -600,9 +605,12 @@ impl CompactionService for CompactionServiceImpl {
600605 }
601606 }
602607
608+ // We use COMPACT_BATCH_SIZE instead of ROW_GROUP_SIZE for this write, to avoid i32 Utf8 arrow
609+ // array offset overflow in some (unusual) cases.
610+ // TODO: Simply lowering the size is not great.
603611 let store = ParquetTableStore :: new (
604612 index. get_row ( ) . clone ( ) ,
605- ROW_GROUP_SIZE ,
613+ COMPACT_BATCH_SIZE ,
606614 self . metadata_cache_factory . clone ( ) ,
607615 ) ;
608616 let old_partition_remote = match & new_chunk {
@@ -678,13 +686,12 @@ impl CompactionService for CompactionServiceImpl {
678686 . metadata_cache_factory
679687 . cache_factory ( )
680688 . make_session_config ( ) ;
681- const MAX_BATCH_ROWS : usize = 2048 ;
682689 // Set batch size to 2048 to avoid overflow in case where, perhaps, we might get repeated
683690 // large string values, such that the default value, 8192, could produce an array too big
684691 // for i32 string array offsets in a SortPreservingMergeExecStream that is constructed in
685692 // `merge_chunks`. In pre-DF-upgrade Cubestore, MergeSortExec used a local variable,
686693 // MAX_BATCH_ROWS = 4096, which might be small enough.
687- let session_config = session_config. with_batch_size ( MAX_BATCH_ROWS ) ;
694+ let session_config = session_config. with_batch_size ( COMPACT_BATCH_SIZE ) ;
688695
689696 // Merge and write rows.
690697 let schema = Arc :: new ( arrow_schema ( index. get_row ( ) ) ) ;
@@ -1302,15 +1309,18 @@ async fn write_to_files_impl(
13021309) -> Result < ( ) , CubeError > {
13031310 let schema = Arc :: new ( store. arrow_schema ( ) ) ;
13041311 let writer_props = store. writer_props ( table) . await ?;
1305- let mut writers = files. into_iter ( ) . map ( move |f| -> Result < _ , CubeError > {
1306- Ok ( ArrowWriter :: try_new (
1307- File :: create ( f) ?,
1308- schema. clone ( ) ,
1309- Some ( writer_props. clone ( ) ) ,
1310- ) ?)
1311- } ) ;
1312+ let mut writers = files
1313+ . clone ( )
1314+ . into_iter ( )
1315+ . map ( move |f| -> Result < _ , CubeError > {
1316+ Ok ( ArrowWriter :: try_new (
1317+ File :: create ( f) ?,
1318+ schema. clone ( ) ,
1319+ Some ( writer_props. clone ( ) ) ,
1320+ ) ?)
1321+ } ) ;
13121322
1313- let ( write_tx, mut write_rx) = tokio:: sync:: mpsc:: channel ( 1 ) ;
1323+ let ( write_tx, mut write_rx) = tokio:: sync:: mpsc:: channel :: < ( usize , RecordBatch ) > ( 1 ) ;
13141324 let io_job = cube_ext:: spawn_blocking ( move || -> Result < _ , CubeError > {
13151325 let mut writer = writers. next ( ) . transpose ( ) ?. unwrap ( ) ;
13161326 let mut current_writer_i = 0 ;
@@ -1330,27 +1340,58 @@ async fn write_to_files_impl(
13301340 Ok ( ( ) )
13311341 } ) ;
13321342
1333- let mut writer_i = 0 ;
1334- let mut process_row_group = move |b : RecordBatch | -> Result < _ , CubeError > {
1335- match pick_writer ( & b) {
1336- WriteBatchTo :: Current => Ok ( ( ( writer_i, b) , None ) ) ,
1337- WriteBatchTo :: Next {
1338- rows_for_current : n,
1339- } => {
1340- let current_writer = writer_i;
1341- writer_i += 1 ; // Next iteration will write into the next file.
1342- Ok ( (
1343- ( current_writer, b. slice ( 0 , n) ) ,
1344- Some ( b. slice ( n, b. num_rows ( ) - n) ) ,
1345- ) )
1343+ let mut writer_i: usize = 0 ;
1344+ let mut process_row_group =
1345+ move |b : RecordBatch | -> ( ( usize , RecordBatch ) , Option < RecordBatch > ) {
1346+ match pick_writer ( & b) {
1347+ WriteBatchTo :: Current => ( ( writer_i, b) , None ) ,
1348+ WriteBatchTo :: Next {
1349+ rows_for_current : n,
1350+ } => {
1351+ let current_writer = writer_i;
1352+ writer_i += 1 ; // Next iteration will write into the next file.
1353+ (
1354+ ( current_writer, b. slice ( 0 , n) ) ,
1355+ Some ( b. slice ( n, b. num_rows ( ) - n) ) ,
1356+ )
1357+ }
1358+ }
1359+ } ;
1360+ let err = redistribute ( records, store. row_group_size ( ) , move |b| {
1361+ // See if we get an array using more than 512 MB and log it. With COMPACT_BATCH_SIZE=2048,
1362+ // this means a default batch size of 8192 might, or our row group size of 16384 really might,
1363+ // get i32 offset overflow when used in an Arrow array.
1364+
1365+ // First figure out what to log. (Normally we don't allocate or log anything.)
1366+ let mut loggable_overlongs = Vec :: new ( ) ;
1367+ {
1368+ for ( column, field) in b. columns ( ) . iter ( ) . zip ( b. schema_ref ( ) . fields ( ) . iter ( ) ) {
1369+ let memory_size = column. get_buffer_memory_size ( ) ;
1370+ if memory_size > 512 * 1024 * 1024 {
1371+ loggable_overlongs. push ( ( field. name ( ) . clone ( ) , memory_size, column. len ( ) ) )
1372+ }
13461373 }
13471374 }
1348- } ;
1349- let err = redistribute ( records, ROW_GROUP_SIZE , move |b| {
1375+
13501376 let r = process_row_group ( b) ;
1377+
1378+ // Then, now that we know what file names the rows would be written into, log anything we need to log.
1379+ for ( column_name, memory_size, length) in loggable_overlongs {
1380+ // *out of bounds write index* provably can't happen (if pick_writer has nothing wrong with it) but let's not make logging break things.
1381+ let oob = "*out of bounds write index*" ;
1382+ match r {
1383+ ( ( write_i, _) , None ) => {
1384+ log:: warn!( "Column {} has large memory size {} with length = {}, writing to file '#{}'" , column_name, memory_size, length, files. get( write_i) . map( String :: as_str) . unwrap_or( oob) ) ;
1385+ } ,
1386+ ( ( write_i, _) , Some ( _) ) => {
1387+ log:: warn!( "Column {} has large memory size {} with length = {}, writing across file '#{}' and '#{}'" , column_name, memory_size, length, files. get( write_i) . map( String :: as_str) . unwrap_or( oob) , files. get( write_i + 1 ) . map( String :: as_str) . unwrap_or( oob) ) ;
1388+ }
1389+ }
1390+ }
1391+
13511392 let write_tx = write_tx. clone ( ) ;
13521393 async move {
1353- let ( to_write, to_return) = r? ;
1394+ let ( to_write, to_return) = r;
13541395 write_tx. send ( to_write) . await ?;
13551396 return Ok ( to_return) ;
13561397 }
0 commit comments