@@ -249,7 +249,7 @@ impl ExecutionPlan for ProjectionExec {
249
249
Ok ( stats_projection (
250
250
input_stats,
251
251
self . expr . iter ( ) . map ( |( e, _) | Arc :: clone ( e) ) ,
252
- Arc :: clone ( & self . schema ) ,
252
+ Arc :: clone ( & self . input . schema ( ) ) ,
253
253
) )
254
254
}
255
255
@@ -1030,8 +1030,10 @@ mod tests {
1030
1030
1031
1031
use crate :: common:: collect;
1032
1032
use crate :: test;
1033
+ use crate :: test:: exec:: StatisticsExec ;
1033
1034
1034
- use arrow:: datatypes:: DataType ;
1035
+ use arrow:: datatypes:: { DataType , Field , Schema } ;
1036
+ use datafusion_common:: stats:: { ColumnStatistics , Precision , Statistics } ;
1035
1037
use datafusion_common:: ScalarValue ;
1036
1038
1037
1039
use datafusion_expr:: Operator ;
@@ -1230,4 +1232,86 @@ mod tests {
1230
1232
1231
1233
assert_eq ! ( result, expected) ;
1232
1234
}
1235
+
1236
+ #[ test]
1237
+ fn test_projection_statistics_uses_input_schema ( ) {
1238
+ let input_schema = Schema :: new ( vec ! [
1239
+ Field :: new( "a" , DataType :: Int32 , false ) ,
1240
+ Field :: new( "b" , DataType :: Int32 , false ) ,
1241
+ Field :: new( "c" , DataType :: Int32 , false ) ,
1242
+ Field :: new( "d" , DataType :: Int32 , false ) ,
1243
+ Field :: new( "e" , DataType :: Int32 , false ) ,
1244
+ Field :: new( "f" , DataType :: Int32 , false ) ,
1245
+ ] ) ;
1246
+
1247
+ let input_statistics = Statistics {
1248
+ num_rows : Precision :: Exact ( 10 ) ,
1249
+ column_statistics : vec ! [
1250
+ ColumnStatistics {
1251
+ min_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) ) ,
1252
+ max_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( 100 ) ) ) ,
1253
+ ..Default :: default ( )
1254
+ } ,
1255
+ ColumnStatistics {
1256
+ min_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( 5 ) ) ) ,
1257
+ max_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( 50 ) ) ) ,
1258
+ ..Default :: default ( )
1259
+ } ,
1260
+ ColumnStatistics {
1261
+ min_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( 10 ) ) ) ,
1262
+ max_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( 40 ) ) ) ,
1263
+ ..Default :: default ( )
1264
+ } ,
1265
+ ColumnStatistics {
1266
+ min_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( 20 ) ) ) ,
1267
+ max_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( 30 ) ) ) ,
1268
+ ..Default :: default ( )
1269
+ } ,
1270
+ ColumnStatistics {
1271
+ min_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( 21 ) ) ) ,
1272
+ max_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( 29 ) ) ) ,
1273
+ ..Default :: default ( )
1274
+ } ,
1275
+ ColumnStatistics {
1276
+ min_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( 24 ) ) ) ,
1277
+ max_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( 26 ) ) ) ,
1278
+ ..Default :: default ( )
1279
+ } ,
1280
+ ] ,
1281
+ ..Default :: default ( )
1282
+ } ;
1283
+
1284
+ let input = Arc :: new ( StatisticsExec :: new ( input_statistics, input_schema) ) ;
1285
+
1286
+ // Create projection expressions that reference columns from the input schema and the length
1287
+ // of output schema columns < input schema columns and hence if we use the last few columns
1288
+ // from the input schema in the expressions here, bounds_check would fail on them if output
1289
+ // schema is supplied to the partitions_statistics method.
1290
+ let exprs: Vec < ( Arc < dyn PhysicalExpr > , String ) > = vec ! [
1291
+ (
1292
+ Arc :: new( Column :: new( "c" , 2 ) ) as Arc <dyn PhysicalExpr >,
1293
+ "c_renamed" . to_string( ) ,
1294
+ ) ,
1295
+ (
1296
+ Arc :: new( BinaryExpr :: new(
1297
+ Arc :: new( Column :: new( "e" , 4 ) ) ,
1298
+ Operator :: Plus ,
1299
+ Arc :: new( Column :: new( "f" , 5 ) ) ,
1300
+ ) ) as Arc <dyn PhysicalExpr >,
1301
+ "e_plus_f" . to_string( ) ,
1302
+ ) ,
1303
+ ] ;
1304
+
1305
+ let projection = ProjectionExec :: try_new ( exprs, input) . unwrap ( ) ;
1306
+
1307
+ let stats = projection. partition_statistics ( None ) . unwrap ( ) ;
1308
+
1309
+ assert_eq ! ( stats. num_rows, Precision :: Exact ( 10 ) ) ;
1310
+ assert_eq ! (
1311
+ stats. column_statistics. len( ) ,
1312
+ 2 ,
1313
+ "Expected 2 columns in projection statistics"
1314
+ ) ;
1315
+ assert ! ( stats. total_byte_size. is_exact( ) . unwrap_or( false ) ) ;
1316
+ }
1233
1317
}
0 commit comments