@@ -573,102 +573,3 @@ impl GroupsAccumulator for SumIntGroupsAccumulator {
573573 size_of_val ( self )
574574 }
575575}
576-
577- #[ cfg( test) ]
578- mod tests {
579- use super :: * ;
580- use arrow:: array:: builder:: StringBuilder ;
581- use arrow:: array:: { Int64Builder , RecordBatch } ;
582- use arrow:: datatypes:: DataType :: Int64 ;
583- use arrow:: datatypes:: * ;
584- use datafusion:: common:: Result ;
585- use datafusion:: datasource:: memory:: MemorySourceConfig ;
586- use datafusion:: datasource:: source:: DataSourceExec ;
587- use datafusion:: execution:: TaskContext ;
588- use datafusion:: logical_expr:: AggregateUDF ;
589- use datafusion:: physical_expr:: aggregate:: AggregateExprBuilder ;
590- use datafusion:: physical_expr:: expressions:: Column ;
591- use datafusion:: physical_expr:: PhysicalExpr ;
592- use datafusion:: physical_plan:: aggregates:: { AggregateExec , AggregateMode , PhysicalGroupBy } ;
593- use datafusion:: physical_plan:: ExecutionPlan ;
594- use futures:: StreamExt ;
595-
596- #[ test]
597- fn invalid_data_type ( ) {
598- assert ! ( SumInteger :: try_new( DataType :: Date32 , EvalMode :: Legacy ) . is_err( ) ) ;
599- }
600-
601- #[ tokio:: test]
602- async fn sum_no_overflow ( ) -> Result < ( ) > {
603- let num_rows = 8192 ;
604- let batch = create_record_batch ( num_rows) ;
605- let mut batches = Vec :: new ( ) ;
606- for _ in 0 ..10 {
607- batches. push ( batch. clone ( ) ) ;
608- }
609- let partitions = & [ batches] ;
610- let c0: Arc < dyn PhysicalExpr > = Arc :: new ( Column :: new ( "c0" , 0 ) ) ;
611- let c1: Arc < dyn PhysicalExpr > = Arc :: new ( Column :: new ( "c1" , 1 ) ) ;
612-
613- let data_type = Int64 ;
614- let schema = Arc :: clone ( & partitions[ 0 ] [ 0 ] . schema ( ) ) ;
615- let scan: Arc < dyn ExecutionPlan > = Arc :: new ( DataSourceExec :: new ( Arc :: new (
616- MemorySourceConfig :: try_new ( partitions, Arc :: clone ( & schema) , None ) ?,
617- ) ) ) ;
618-
619- let aggregate_udf = Arc :: new ( AggregateUDF :: new_from_impl ( SumInteger :: try_new (
620- data_type. clone ( ) ,
621- EvalMode :: Legacy ,
622- ) ?) ) ;
623-
624- let aggr_expr = AggregateExprBuilder :: new ( aggregate_udf, vec ! [ c1] )
625- . schema ( Arc :: clone ( & schema) )
626- . alias ( "sum" )
627- . with_ignore_nulls ( false )
628- . with_distinct ( false )
629- . build ( ) ?;
630-
631- let aggregate = Arc :: new ( AggregateExec :: try_new (
632- AggregateMode :: Partial ,
633- PhysicalGroupBy :: new_single ( vec ! [ ( c0, "c0" . to_string( ) ) ] ) ,
634- vec ! [ aggr_expr. into( ) ] ,
635- vec ! [ None ] , // no filter expressions
636- scan,
637- Arc :: clone ( & schema) ,
638- ) ?) ;
639-
640- let mut stream = aggregate
641- . execute ( 0 , Arc :: new ( TaskContext :: default ( ) ) )
642- . unwrap ( ) ;
643- while let Some ( batch) = stream. next ( ) . await {
644- let _batch = batch?;
645- }
646-
647- Ok ( ( ) )
648- }
649-
650- fn create_record_batch ( num_rows : usize ) -> RecordBatch {
651- let mut int_builder = Int64Builder :: with_capacity ( num_rows) ;
652- let mut string_builder = StringBuilder :: with_capacity ( num_rows, num_rows * 32 ) ;
653- for i in 0 ..num_rows {
654- int_builder. append_value ( i as i64 ) ;
655- string_builder. append_value ( format ! ( "this is string #{}" , i % 1024 ) ) ;
656- }
657- let int_array = Arc :: new ( int_builder. finish ( ) ) ;
658- let string_array = Arc :: new ( string_builder. finish ( ) ) ;
659-
660- let mut fields = vec ! [ ] ;
661- let mut columns: Vec < ArrayRef > = vec ! [ ] ;
662-
663- // string column
664- fields. push ( Field :: new ( "c0" , DataType :: Utf8 , false ) ) ;
665- columns. push ( string_array) ;
666-
667- // decimal column
668- fields. push ( Field :: new ( "c1" , DataType :: Int64 , false ) ) ;
669- columns. push ( int_array) ;
670-
671- let schema = Schema :: new ( fields) ;
672- RecordBatch :: try_new ( Arc :: new ( schema) , columns) . unwrap ( )
673- }
674- }
0 commit comments