diff --git a/datafusion/functions-aggregate/src/correlation.rs b/datafusion/functions-aggregate/src/correlation.rs index 538311dfa263..119f861a5760 100644 --- a/datafusion/functions-aggregate/src/correlation.rs +++ b/datafusion/functions-aggregate/src/correlation.rs @@ -411,11 +411,15 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator { } fn evaluate(&mut self, emit_to: EmitTo) -> Result { - let n = match emit_to { - EmitTo::All => self.count.len(), - EmitTo::First(n) => n, - }; - + // Drain the state vectors for the groups being emitted + let counts = emit_to.take_needed(&mut self.count); + let sum_xs = emit_to.take_needed(&mut self.sum_x); + let sum_ys = emit_to.take_needed(&mut self.sum_y); + let sum_xys = emit_to.take_needed(&mut self.sum_xy); + let sum_xxs = emit_to.take_needed(&mut self.sum_xx); + let sum_yys = emit_to.take_needed(&mut self.sum_yy); + + let n = counts.len(); let mut values = Vec::with_capacity(n); let mut nulls = NullBufferBuilder::new(n); @@ -427,14 +431,13 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator { // result should be `Null` (according to PostgreSQL's behavior). // - However, if any of the accumulated values contain NaN, the result should // be NaN regardless of the count (even for single-row groups). - // for i in 0..n { - let count = self.count[i]; - let sum_x = self.sum_x[i]; - let sum_y = self.sum_y[i]; - let sum_xy = self.sum_xy[i]; - let sum_xx = self.sum_xx[i]; - let sum_yy = self.sum_yy[i]; + let count = counts[i]; + let sum_x = sum_xs[i]; + let sum_y = sum_ys[i]; + let sum_xy = sum_xys[i]; + let sum_xx = sum_xxs[i]; + let sum_yy = sum_yys[i]; // If BOTH sum_x AND sum_y are NaN, then both input values are NaN → return NaN // If only ONE of them is NaN, then only one input value is NaN → return NULL @@ -470,18 +473,21 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator { } fn state(&mut self, emit_to: EmitTo) -> Result> { - let n = match emit_to { - EmitTo::All => self.count.len(), - EmitTo::First(n) => n, - }; + // Drain the state vectors for the groups being emitted + let count = emit_to.take_needed(&mut self.count); + let sum_x = emit_to.take_needed(&mut self.sum_x); + let sum_y = emit_to.take_needed(&mut self.sum_y); + let sum_xy = emit_to.take_needed(&mut self.sum_xy); + let sum_xx = emit_to.take_needed(&mut self.sum_xx); + let sum_yy = emit_to.take_needed(&mut self.sum_yy); Ok(vec![ - Arc::new(UInt64Array::from(self.count[0..n].to_vec())), - Arc::new(Float64Array::from(self.sum_x[0..n].to_vec())), - Arc::new(Float64Array::from(self.sum_y[0..n].to_vec())), - Arc::new(Float64Array::from(self.sum_xy[0..n].to_vec())), - Arc::new(Float64Array::from(self.sum_xx[0..n].to_vec())), - Arc::new(Float64Array::from(self.sum_yy[0..n].to_vec())), + Arc::new(UInt64Array::from(count)), + Arc::new(Float64Array::from(sum_x)), + Arc::new(Float64Array::from(sum_y)), + Arc::new(Float64Array::from(sum_xy)), + Arc::new(Float64Array::from(sum_xx)), + Arc::new(Float64Array::from(sum_yy)), ]) } @@ -537,12 +543,12 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator { } fn size(&self) -> usize { - size_of_val(&self.count) - + size_of_val(&self.sum_x) - + size_of_val(&self.sum_y) - + size_of_val(&self.sum_xy) - + size_of_val(&self.sum_xx) - + size_of_val(&self.sum_yy) + self.count.capacity() * size_of::() + + self.sum_x.capacity() * size_of::() + + self.sum_y.capacity() * size_of::() + + self.sum_xy.capacity() * size_of::() + + self.sum_xx.capacity() * size_of::() + + self.sum_yy.capacity() * size_of::() } } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 3c962a0f87f3..a5f3ef04139f 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -8387,3 +8387,284 @@ ORDER BY grp, id; statement ok DROP TABLE string_agg_window_test; + +# Enable streaming aggregation by limiting partitions and ensuring sorted input +statement ok +set datafusion.execution.target_partitions = 1; + +# Setup data +statement ok +CREATE TABLE stream_test ( + g INT, + x DOUBLE, + y DOUBLE, + i INT, + b BOOLEAN, + s VARCHAR +) AS VALUES +(1, 1.0, 1.0, 1, true, 'a'), (1, 2.0, 2.0, 2, true, 'b'), +(2, 1.0, 5.0, 3, false, 'c'), (2, 2.0, 5.0, 4, true, 'd'), +(3, 1.0, 1.0, 7, false, 'e'), (3, 2.0, 2.0, 8, false, 'f'); + +# Test comprehensive aggregates with streaming +# This verifies that CORR and other aggregates work together in a streaming plan (ordering_mode=Sorted) + +# Basic Aggregates +query TT +EXPLAIN SELECT + g, + COUNT(*), + SUM(x), + AVG(x), + MEAN(x), + MIN(x), + MAX(y), + BIT_AND(i), + BIT_OR(i), + BIT_XOR(i), + BOOL_AND(b), + BOOL_OR(b), + MEDIAN(x), + GROUPING(g), + VAR(x), + VAR_SAMP(x), + VAR_POP(x), + VAR_SAMPLE(x), + VAR_POPULATION(x), + STDDEV(x), + STDDEV_SAMP(x), + STDDEV_POP(x) +FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000) +GROUP BY g +ORDER BY g; +---- +logical_plan +01)Sort: stream_test.g ASC NULLS LAST +02)--Projection: stream_test.g, count(Int64(1)) AS count(*), sum(stream_test.x), avg(stream_test.x), avg(stream_test.x) AS mean(stream_test.x), min(stream_test.x), max(stream_test.y), bit_and(stream_test.i), bit_or(stream_test.i), bit_xor(stream_test.i), bool_and(stream_test.b), bool_or(stream_test.b), median(stream_test.x), Int32(0) AS grouping(stream_test.g), var(stream_test.x), var(stream_test.x) AS var_samp(stream_test.x), var_pop(stream_test.x), var(stream_test.x) AS var_sample(stream_test.x), var_pop(stream_test.x) AS var_population(stream_test.x), stddev(stream_test.x), stddev(stream_test.x) AS stddev_samp(stream_test.x), stddev_pop(stream_test.x) +03)----Aggregate: groupBy=[[stream_test.g]], aggr=[[count(Int64(1)), sum(stream_test.x), avg(stream_test.x), min(stream_test.x), max(stream_test.y), bit_and(stream_test.i), bit_or(stream_test.i), bit_xor(stream_test.i), bool_and(stream_test.b), bool_or(stream_test.b), median(stream_test.x), var(stream_test.x), var_pop(stream_test.x), stddev(stream_test.x), stddev_pop(stream_test.x)]] +04)------Sort: stream_test.g ASC NULLS LAST, fetch=10000 +05)--------TableScan: stream_test projection=[g, x, y, i, b] +physical_plan +01)ProjectionExec: expr=[g@0 as g, count(Int64(1))@1 as count(*), sum(stream_test.x)@2 as sum(stream_test.x), avg(stream_test.x)@3 as avg(stream_test.x), avg(stream_test.x)@3 as mean(stream_test.x), min(stream_test.x)@4 as min(stream_test.x), max(stream_test.y)@5 as max(stream_test.y), bit_and(stream_test.i)@6 as bit_and(stream_test.i), bit_or(stream_test.i)@7 as bit_or(stream_test.i), bit_xor(stream_test.i)@8 as bit_xor(stream_test.i), bool_and(stream_test.b)@9 as bool_and(stream_test.b), bool_or(stream_test.b)@10 as bool_or(stream_test.b), median(stream_test.x)@11 as median(stream_test.x), 0 as grouping(stream_test.g), var(stream_test.x)@12 as var(stream_test.x), var(stream_test.x)@12 as var_samp(stream_test.x), var_pop(stream_test.x)@13 as var_pop(stream_test.x), var(stream_test.x)@12 as var_sample(stream_test.x), var_pop(stream_test.x)@13 as var_population(stream_test.x), stddev(stream_test.x)@14 as stddev(stream_test.x), stddev(stream_test.x)@14 as stddev_samp(stream_test.x), stddev_pop(stream_test.x)@15 as stddev_pop(stream_test.x)] +02)--AggregateExec: mode=Single, gby=[g@0 as g], aggr=[count(Int64(1)), sum(stream_test.x), avg(stream_test.x), min(stream_test.x), max(stream_test.y), bit_and(stream_test.i), bit_or(stream_test.i), bit_xor(stream_test.i), bool_and(stream_test.b), bool_or(stream_test.b), median(stream_test.x), var(stream_test.x), var_pop(stream_test.x), stddev(stream_test.x), stddev_pop(stream_test.x)], ordering_mode=Sorted +03)----SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST], preserve_partitioning=[false] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +query IIRRRRRIIIBBRIRRRRRRRR +SELECT + g, + COUNT(*), + SUM(x), + AVG(x), + MEAN(x), + MIN(x), + MAX(y), + BIT_AND(i), + BIT_OR(i), + BIT_XOR(i), + BOOL_AND(b), + BOOL_OR(b), + MEDIAN(x), + GROUPING(g), + VAR(x), + VAR_SAMP(x), + VAR_POP(x), + VAR_SAMPLE(x), + VAR_POPULATION(x), + STDDEV(x), + STDDEV_SAMP(x), + STDDEV_POP(x) +FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000) +GROUP BY g +ORDER BY g; +---- +1 2 3 1.5 1.5 1 2 0 3 3 true true 1.5 0 0.5 0.5 0.25 0.5 0.25 0.707106781187 0.707106781187 0.5 +2 2 3 1.5 1.5 1 5 0 7 7 false true 1.5 0 0.5 0.5 0.25 0.5 0.25 0.707106781187 0.707106781187 0.5 +3 2 3 1.5 1.5 1 2 0 15 15 false false 1.5 0 0.5 0.5 0.25 0.5 0.25 0.707106781187 0.707106781187 0.5 + +# Ordered Aggregates (by x) +query TT +EXPLAIN SELECT + g, + ARRAY_AGG(x ORDER BY x), + ARRAY_AGG(DISTINCT x ORDER BY x), + FIRST_VALUE(x ORDER BY x), + LAST_VALUE(x ORDER BY x), + NTH_VALUE(x, 1 ORDER BY x) +FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000) +GROUP BY g +ORDER BY g; +---- +logical_plan +01)Sort: stream_test.g ASC NULLS LAST +02)--Aggregate: groupBy=[[stream_test.g]], aggr=[[array_agg(stream_test.x) ORDER BY [stream_test.x ASC NULLS LAST], array_agg(DISTINCT stream_test.x) ORDER BY [stream_test.x ASC NULLS LAST], first_value(stream_test.x) ORDER BY [stream_test.x ASC NULLS LAST], last_value(stream_test.x) ORDER BY [stream_test.x ASC NULLS LAST], nth_value(stream_test.x, Int64(1)) ORDER BY [stream_test.x ASC NULLS LAST]]] +03)----Sort: stream_test.g ASC NULLS LAST, fetch=10000 +04)------TableScan: stream_test projection=[g, x] +physical_plan +01)AggregateExec: mode=Single, gby=[g@0 as g], aggr=[array_agg(stream_test.x) ORDER BY [stream_test.x ASC NULLS LAST], array_agg(DISTINCT stream_test.x) ORDER BY [stream_test.x ASC NULLS LAST], first_value(stream_test.x) ORDER BY [stream_test.x ASC NULLS LAST], last_value(stream_test.x) ORDER BY [stream_test.x ASC NULLS LAST], nth_value(stream_test.x,Int64(1)) ORDER BY [stream_test.x ASC NULLS LAST]], ordering_mode=Sorted +02)--SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST, x@1 ASC NULLS LAST], preserve_partitioning=[false] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + +query I??RRR +SELECT + g, + ARRAY_AGG(x ORDER BY x), + ARRAY_AGG(DISTINCT x ORDER BY x), + FIRST_VALUE(x ORDER BY x), + LAST_VALUE(x ORDER BY x), + NTH_VALUE(x, 1 ORDER BY x) +FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000) +GROUP BY g +ORDER BY g; +---- +1 [1.0, 2.0] [1.0, 2.0] 1 2 1 +2 [1.0, 2.0] [1.0, 2.0] 1 2 1 +3 [1.0, 2.0] [1.0, 2.0] 1 2 1 + +# Ordered Aggregates (by s) +query TT +EXPLAIN SELECT + g, + ARRAY_AGG(s ORDER BY s), + STRING_AGG(s, '|' ORDER BY s), + STRING_AGG(DISTINCT s, '|' ORDER BY s) +FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000) +GROUP BY g +ORDER BY g; +---- +logical_plan +01)Sort: stream_test.g ASC NULLS LAST +02)--Aggregate: groupBy=[[stream_test.g]], aggr=[[array_agg(stream_test.s) ORDER BY [stream_test.s ASC NULLS LAST], string_agg(stream_test.s, Utf8("|")) ORDER BY [stream_test.s ASC NULLS LAST], string_agg(DISTINCT stream_test.s, Utf8("|")) ORDER BY [stream_test.s ASC NULLS LAST]]] +03)----Sort: stream_test.g ASC NULLS LAST, fetch=10000 +04)------TableScan: stream_test projection=[g, s] +physical_plan +01)AggregateExec: mode=Single, gby=[g@0 as g], aggr=[array_agg(stream_test.s) ORDER BY [stream_test.s ASC NULLS LAST], string_agg(stream_test.s,Utf8("|")) ORDER BY [stream_test.s ASC NULLS LAST], string_agg(DISTINCT stream_test.s,Utf8("|")) ORDER BY [stream_test.s ASC NULLS LAST]], ordering_mode=Sorted +02)--SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST, s@1 ASC NULLS LAST], preserve_partitioning=[false] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + +query I?TT +SELECT + g, + ARRAY_AGG(s ORDER BY s), + STRING_AGG(s, '|' ORDER BY s), + STRING_AGG(DISTINCT s, '|' ORDER BY s) +FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000) +GROUP BY g +ORDER BY g; +---- +1 [a, b] a|b a|b +2 [c, d] c|d c|d +3 [e, f] e|f e|f + +# Statistical & Regression Aggregates +query TT +EXPLAIN SELECT + g, + CORR(x, y), + COVAR(x, y), + COVAR_SAMP(x, y), + COVAR_POP(x, y), + REGR_SXX(x, y), + REGR_SXY(x, y), + REGR_SYY(x, y), + REGR_AVGX(x, y), + REGR_AVGY(x, y), + REGR_COUNT(x, y), + REGR_SLOPE(x, y), + REGR_INTERCEPT(x, y), + REGR_R2(x, y) +FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000) +GROUP BY g +ORDER BY g; +---- +logical_plan +01)Sort: stream_test.g ASC NULLS LAST +02)--Projection: stream_test.g, corr(stream_test.x,stream_test.y), covar_samp(stream_test.x,stream_test.y) AS covar(stream_test.x,stream_test.y), covar_samp(stream_test.x,stream_test.y), covar_pop(stream_test.x,stream_test.y), regr_sxx(stream_test.x,stream_test.y), regr_sxy(stream_test.x,stream_test.y), regr_syy(stream_test.x,stream_test.y), regr_avgx(stream_test.x,stream_test.y), regr_avgy(stream_test.x,stream_test.y), regr_count(stream_test.x,stream_test.y), regr_slope(stream_test.x,stream_test.y), regr_intercept(stream_test.x,stream_test.y), regr_r2(stream_test.x,stream_test.y) +03)----Aggregate: groupBy=[[stream_test.g]], aggr=[[corr(stream_test.x, stream_test.y), covar_samp(stream_test.x, stream_test.y), covar_pop(stream_test.x, stream_test.y), regr_sxx(stream_test.x, stream_test.y), regr_sxy(stream_test.x, stream_test.y), regr_syy(stream_test.x, stream_test.y), regr_avgx(stream_test.x, stream_test.y), regr_avgy(stream_test.x, stream_test.y), regr_count(stream_test.x, stream_test.y), regr_slope(stream_test.x, stream_test.y), regr_intercept(stream_test.x, stream_test.y), regr_r2(stream_test.x, stream_test.y)]] +04)------Sort: stream_test.g ASC NULLS LAST, fetch=10000 +05)--------TableScan: stream_test projection=[g, x, y] +physical_plan +01)ProjectionExec: expr=[g@0 as g, corr(stream_test.x,stream_test.y)@1 as corr(stream_test.x,stream_test.y), covar_samp(stream_test.x,stream_test.y)@2 as covar(stream_test.x,stream_test.y), covar_samp(stream_test.x,stream_test.y)@2 as covar_samp(stream_test.x,stream_test.y), covar_pop(stream_test.x,stream_test.y)@3 as covar_pop(stream_test.x,stream_test.y), regr_sxx(stream_test.x,stream_test.y)@4 as regr_sxx(stream_test.x,stream_test.y), regr_sxy(stream_test.x,stream_test.y)@5 as regr_sxy(stream_test.x,stream_test.y), regr_syy(stream_test.x,stream_test.y)@6 as regr_syy(stream_test.x,stream_test.y), regr_avgx(stream_test.x,stream_test.y)@7 as regr_avgx(stream_test.x,stream_test.y), regr_avgy(stream_test.x,stream_test.y)@8 as regr_avgy(stream_test.x,stream_test.y), regr_count(stream_test.x,stream_test.y)@9 as regr_count(stream_test.x,stream_test.y), regr_slope(stream_test.x,stream_test.y)@10 as regr_slope(stream_test.x,stream_test.y), regr_intercept(stream_test.x,stream_test.y)@11 as regr_intercept(stream_test.x,stream_test.y), regr_r2(stream_test.x,stream_test.y)@12 as regr_r2(stream_test.x,stream_test.y)] +02)--AggregateExec: mode=Single, gby=[g@0 as g], aggr=[corr(stream_test.x,stream_test.y), covar_samp(stream_test.x,stream_test.y), covar_pop(stream_test.x,stream_test.y), regr_sxx(stream_test.x,stream_test.y), regr_sxy(stream_test.x,stream_test.y), regr_syy(stream_test.x,stream_test.y), regr_avgx(stream_test.x,stream_test.y), regr_avgy(stream_test.x,stream_test.y), regr_count(stream_test.x,stream_test.y), regr_slope(stream_test.x,stream_test.y), regr_intercept(stream_test.x,stream_test.y), regr_r2(stream_test.x,stream_test.y)], ordering_mode=Sorted +03)----SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST], preserve_partitioning=[false] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +query IRRRRRRRRRIRRR +SELECT + g, + CORR(x, y), + COVAR(x, y), + COVAR_SAMP(x, y), + COVAR_POP(x, y), + REGR_SXX(x, y), + REGR_SXY(x, y), + REGR_SYY(x, y), + REGR_AVGX(x, y), + REGR_AVGY(x, y), + REGR_COUNT(x, y), + REGR_SLOPE(x, y), + REGR_INTERCEPT(x, y), + REGR_R2(x, y) +FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000) +GROUP BY g +ORDER BY g; +---- +1 1 0.5 0.5 0.25 0.5 0.5 0.5 1.5 1.5 2 1 0 1 +2 NULL 0 0 0 0 0 0.5 5 1.5 2 NULL NULL NULL +3 1 0.5 0.5 0.25 0.5 0.5 0.5 1.5 1.5 2 1 0 1 + +# Approximate and Ordered-Set Aggregates +query TT +EXPLAIN SELECT + g, + APPROX_DISTINCT(i), + APPROX_MEDIAN(x), + PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY x), + QUANTILE_CONT(0.5) WITHIN GROUP (ORDER BY x), + APPROX_PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY x), + APPROX_PERCENTILE_CONT_WITH_WEIGHT(1.0, 0.5) WITHIN GROUP (ORDER BY x), + PERCENTILE_CONT(x, 0.5), + APPROX_PERCENTILE_CONT(x, 0.5), + APPROX_PERCENTILE_CONT_WITH_WEIGHT(x, 1.0, 0.5) +FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000) +GROUP BY g +ORDER BY g; +---- +logical_plan +01)Sort: stream_test.g ASC NULLS LAST +02)--Projection: stream_test.g, approx_distinct(stream_test.i), approx_median(stream_test.x), percentile_cont(Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS LAST], percentile_cont(Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS LAST] AS quantile_cont(stream_test.x,Float64(0.5)), approx_percentile_cont(Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS LAST], approx_percentile_cont_with_weight(Float64(1),Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS LAST], percentile_cont(stream_test.x,Float64(0.5)), approx_percentile_cont(stream_test.x,Float64(0.5)), approx_percentile_cont_with_weight(stream_test.x,Float64(1),Float64(0.5)) +03)----Aggregate: groupBy=[[stream_test.g]], aggr=[[approx_distinct(stream_test.i), approx_median(stream_test.x), percentile_cont(stream_test.x, Float64(0.5)) ORDER BY [stream_test.x ASC NULLS LAST], approx_percentile_cont(stream_test.x, Float64(0.5)) ORDER BY [stream_test.x ASC NULLS LAST], approx_percentile_cont_with_weight(stream_test.x, Float64(1), Float64(0.5)) ORDER BY [stream_test.x ASC NULLS LAST], percentile_cont(stream_test.x, Float64(0.5)), approx_percentile_cont(stream_test.x, Float64(0.5)), approx_percentile_cont_with_weight(stream_test.x, Float64(1), Float64(0.5))]] +04)------Sort: stream_test.g ASC NULLS LAST, fetch=10000 +05)--------TableScan: stream_test projection=[g, x, i] +physical_plan +01)ProjectionExec: expr=[g@0 as g, approx_distinct(stream_test.i)@1 as approx_distinct(stream_test.i), approx_median(stream_test.x)@2 as approx_median(stream_test.x), percentile_cont(Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS LAST]@3 as percentile_cont(Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS LAST], percentile_cont(Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS LAST]@3 as quantile_cont(stream_test.x,Float64(0.5)), approx_percentile_cont(Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS LAST]@4 as approx_percentile_cont(Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS LAST], approx_percentile_cont_with_weight(Float64(1),Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS LAST]@5 as approx_percentile_cont_with_weight(Float64(1),Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS LAST], percentile_cont(stream_test.x,Float64(0.5))@6 as percentile_cont(stream_test.x,Float64(0.5)), approx_percentile_cont(stream_test.x,Float64(0.5))@7 as approx_percentile_cont(stream_test.x,Float64(0.5)), approx_percentile_cont_with_weight(stream_test.x,Float64(1),Float64(0.5))@8 as approx_percentile_cont_with_weight(stream_test.x,Float64(1),Float64(0.5))] +02)--AggregateExec: mode=Single, gby=[g@0 as g], aggr=[approx_distinct(stream_test.i), approx_median(stream_test.x), percentile_cont(Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS LAST], approx_percentile_cont(Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS LAST], approx_percentile_cont_with_weight(Float64(1),Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS LAST], percentile_cont(stream_test.x,Float64(0.5)), approx_percentile_cont(stream_test.x,Float64(0.5)), approx_percentile_cont_with_weight(stream_test.x,Float64(1),Float64(0.5))], ordering_mode=Sorted +03)----SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST], preserve_partitioning=[false] +04)------DataSourceExec: partitions=1, partition_sizes=[1] + +query IIRRRRRRRR +SELECT + g, + APPROX_DISTINCT(i), + APPROX_MEDIAN(x), + PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY x), + QUANTILE_CONT(0.5) WITHIN GROUP (ORDER BY x), + APPROX_PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY x), + APPROX_PERCENTILE_CONT_WITH_WEIGHT(1.0, 0.5) WITHIN GROUP (ORDER BY x), + PERCENTILE_CONT(x, 0.5), + APPROX_PERCENTILE_CONT(x, 0.5), + APPROX_PERCENTILE_CONT_WITH_WEIGHT(x, 1.0, 0.5) +FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000) +GROUP BY g +ORDER BY g; +---- +1 2 1.5 1.5 1.5 1.5 1.5 1.5 1.5 1.5 +2 2 1.5 1.5 1.5 1.5 1.5 1.5 1.5 1.5 +3 2 1.5 1.5 1.5 1.5 1.5 1.5 1.5 1.5 + +statement ok +DROP TABLE stream_test; + +# Restore default target partitions +statement ok +set datafusion.execution.target_partitions = 4;