@@ -24,9 +24,7 @@ use std::sync::{Arc, LazyLock};
2424#[ cfg( feature = "extended_tests" ) ]
2525mod memory_limit_validation;
2626mod repartition_mem_limit;
27- use arrow:: array:: {
28- ArrayRef , DictionaryArray , Int32Array , Int64Array , RecordBatch , StringViewArray ,
29- } ;
27+ use arrow:: array:: { ArrayRef , DictionaryArray , Int32Array , RecordBatch , StringViewArray } ;
3028use arrow:: compute:: SortOptions ;
3129use arrow:: datatypes:: { Int32Type , SchemaRef } ;
3230use arrow_schema:: { DataType , Field , Schema } ;
@@ -58,7 +56,6 @@ use datafusion_physical_plan::collect as collect_batches;
5856use datafusion_physical_plan:: common:: collect;
5957use datafusion_physical_plan:: spill:: get_record_batch_memory_size;
6058use rand:: Rng ;
61- use std:: collections:: HashSet ;
6259use test_utils:: AccessLogGenerator ;
6360
6461use async_trait:: async_trait;
@@ -1175,123 +1172,3 @@ impl TableProvider for SortedTableProvider {
11751172 Ok ( DataSourceExec :: from_data_source ( mem_conf) )
11761173 }
11771174}
1178-
1179- // ============================================================================
1180- // Regression tests for https://github.com/apache/datafusion/issues/20724
1181- //
1182- // When hash aggregation spills and switches to streaming merge,
1183- // `group_values` must be recreated with the streaming variant.
1184- // Otherwise `vectorized_intern` can produce non-monotonic group indices
1185- // under hash collisions, causing `GroupOrderingFull` to prematurely
1186- // emit groups → duplicate keys in output.
1187- // ============================================================================
1188-
1189- /// Helper: set up a session that forces spilling during aggregation.
1190- async fn setup_spill_agg_context (
1191- memory_limit : usize ,
1192- batch_size : usize ,
1193- ) -> Result < SessionContext > {
1194- let runtime = RuntimeEnvBuilder :: new ( )
1195- . with_memory_pool ( Arc :: new ( FairSpillPool :: new ( memory_limit) ) )
1196- . with_disk_manager_builder (
1197- DiskManagerBuilder :: default ( ) . with_mode ( DiskManagerMode :: OsTmpDirectory ) ,
1198- )
1199- . build_arc ( )
1200- . unwrap ( ) ;
1201-
1202- let config = SessionConfig :: new ( )
1203- . with_sort_spill_reservation_bytes ( 64 * 1024 )
1204- . with_sort_in_place_threshold_bytes ( 0 )
1205- . with_spill_compression ( SpillCompression :: Uncompressed )
1206- . with_batch_size ( batch_size)
1207- . with_target_partitions ( 1 ) ;
1208-
1209- Ok ( SessionContext :: new_with_config_rt ( config, runtime) )
1210- }
1211-
1212- /// Regression test for https://github.com/apache/datafusion/issues/20724
1213- ///
1214- /// When hash aggregation spills and switches to streaming merge,
1215- /// `group_values` (GroupValuesColumn<false>) is not recreated with the
1216- /// streaming variant (<true>). This means `vectorized_intern` is used
1217- /// post-spill, which can produce non-monotonic group indices under hash
1218- /// collisions, causing `GroupOrderingFull` to prematurely emit groups
1219- /// and create duplicate keys in the output.
1220- ///
1221- /// Requirements to trigger:
1222- /// - Two-column GROUP BY → forces `GroupValuesColumn` (not `GroupValuesPrimitive`)
1223- /// - `force_hash_partial_collisions` feature → truncated hashes create the mix
1224- /// of colliding/non-colliding keys needed for non-monotonic indices
1225- /// - `batch_size=50` → not a multiple of rows-per-group in the merged stream,
1226- /// so groups span batch boundaries and premature emission causes duplicates
1227- #[ tokio:: test]
1228- async fn test_no_duplicate_groups_after_spill ( ) -> Result < ( ) > {
1229- let num_keys: i64 = 5000 ;
1230- let rows_per_key: i64 = 4 ;
1231- let total_rows = ( num_keys * rows_per_key) as usize ;
1232-
1233- let schema = Arc :: new ( Schema :: new ( vec ! [
1234- Field :: new( "key_a" , DataType :: Int64 , false ) ,
1235- Field :: new( "key_b" , DataType :: Int64 , false ) ,
1236- Field :: new( "value" , DataType :: Int64 , false ) ,
1237- ] ) ) ;
1238-
1239- let mut keys_a = Vec :: with_capacity ( total_rows) ;
1240- let mut keys_b = Vec :: with_capacity ( total_rows) ;
1241- let mut vals = Vec :: with_capacity ( total_rows) ;
1242- for r in 0 ..rows_per_key {
1243- for k in 0 ..num_keys {
1244- keys_a. push ( k / 100 ) ;
1245- keys_b. push ( k % 100 ) ;
1246- vals. push ( r * num_keys + k) ;
1247- }
1248- }
1249-
1250- let mut batches = Vec :: new ( ) ;
1251- for start in ( 0 ..total_rows) . step_by ( 500 ) {
1252- let end = ( start + 500 ) . min ( total_rows) ;
1253- batches. push ( RecordBatch :: try_new (
1254- Arc :: clone ( & schema) ,
1255- vec ! [
1256- Arc :: new( Int64Array :: from( keys_a[ start..end] . to_vec( ) ) ) ,
1257- Arc :: new( Int64Array :: from( keys_b[ start..end] . to_vec( ) ) ) ,
1258- Arc :: new( Int64Array :: from( vals[ start..end] . to_vec( ) ) ) ,
1259- ] ,
1260- ) ?) ;
1261- }
1262-
1263- let ctx = setup_spill_agg_context ( 128 * 1024 , 50 ) . await ?;
1264- let table = MemTable :: try_new ( schema, vec ! [ batches] ) ?;
1265- ctx. register_table ( "t" , Arc :: new ( table) ) ?;
1266-
1267- let df = ctx
1268- . sql ( "SELECT key_a, key_b, COUNT(*) as cnt FROM t GROUP BY key_a, key_b" )
1269- . await ?;
1270- let results =
1271- collect_batches ( df. create_physical_plan ( ) . await ?, ctx. task_ctx ( ) ) . await ?;
1272-
1273- let mut seen = HashSet :: new ( ) ;
1274- for batch in & results {
1275- let ka = batch
1276- . column ( 0 )
1277- . as_any ( )
1278- . downcast_ref :: < Int64Array > ( )
1279- . unwrap ( ) ;
1280- let kb = batch
1281- . column ( 1 )
1282- . as_any ( )
1283- . downcast_ref :: < Int64Array > ( )
1284- . unwrap ( ) ;
1285- for i in 0 ..batch. num_rows ( ) {
1286- assert ! (
1287- seen. insert( ( ka. value( i) , kb. value( i) ) ) ,
1288- "DUPLICATE group key ({}, {}). \
1289- Bug #20724: group_values not recreated for streaming merge.",
1290- ka. value( i) ,
1291- kb. value( i) ,
1292- ) ;
1293- }
1294- }
1295- assert_eq ! ( seen. len( ) , num_keys as usize ) ;
1296- Ok ( ( ) )
1297- }
0 commit comments