Skip to content

Commit 3c1fc06

Browse files
authored
fix: remove code duplication in native_datafusion and native_iceberg_compat implementations (#1443)
* fix: remove code duplication in native_datafusion and native_iceberg_compat implementations
1 parent 59fae94 commit 3c1fc06

File tree

3 files changed

+248
-156
lines changed

3 files changed

+248
-156
lines changed

native/core/src/execution/planner.rs

Lines changed: 71 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -72,19 +72,17 @@ use datafusion::{
7272
};
7373
use datafusion_comet_spark_expr::{create_comet_physical_fun, create_negate_expr};
7474

75+
use crate::execution::operators::ExecutionError::GeneralError;
7576
use crate::execution::shuffle::CompressionCodec;
7677
use crate::execution::spark_plan::SparkPlan;
77-
use crate::parquet::parquet_support::{prepare_object_store, SparkParquetOptions};
78-
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
79-
use datafusion::common::config::TableParquetOptions;
78+
use crate::parquet::parquet_exec::init_datasource_exec;
79+
use crate::parquet::parquet_support::prepare_object_store;
8080
use datafusion::common::scalar::ScalarStructBuilder;
8181
use datafusion::common::{
8282
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter},
8383
JoinType as DFJoinType, ScalarValue,
8484
};
8585
use datafusion::datasource::listing::PartitionedFile;
86-
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
87-
use datafusion::datasource::source::DataSourceExec;
8886
use datafusion::functions_nested::array_has::ArrayHas;
8987
use datafusion::logical_expr::type_coercion::other::get_coerce_type_for_case_expression;
9088
use datafusion::logical_expr::{
@@ -94,8 +92,10 @@ use datafusion::logical_expr::{
9492
use datafusion::physical_expr::expressions::{Literal, StatsType};
9593
use datafusion::physical_expr::window::WindowExpr;
9694
use datafusion::physical_expr::LexOrdering;
95+
9796
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
9897
use datafusion::physical_plan::filter::FilterExec as DataFusionFilterExec;
98+
use datafusion_comet_proto::spark_operator::SparkFilePartition;
9999
use datafusion_comet_proto::{
100100
spark_expression::{
101101
self, agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, literal::Value, AggExpr,
@@ -181,6 +181,60 @@ impl PhysicalPlanner {
181181
}
182182
}
183183

184+
/// get DataFusion PartitionedFiles from a Spark FilePartition
185+
fn get_partitioned_files(
186+
&self,
187+
partition: &SparkFilePartition,
188+
) -> Result<Vec<PartitionedFile>, ExecutionError> {
189+
let mut files = Vec::with_capacity(partition.partitioned_file.len());
190+
partition.partitioned_file.iter().try_for_each(|file| {
191+
assert!(file.start + file.length <= file.file_size);
192+
193+
let mut partitioned_file = PartitionedFile::new_with_range(
194+
String::new(), // Dummy file path.
195+
file.file_size as u64,
196+
file.start,
197+
file.start + file.length,
198+
);
199+
200+
// Spark sends the path over as URL-encoded, parse that first.
201+
let url =
202+
Url::parse(file.file_path.as_ref()).map_err(|e| GeneralError(e.to_string()))?;
203+
// Convert that to a Path object to use in the PartitionedFile.
204+
let path = Path::from_url_path(url.path()).map_err(|e| GeneralError(e.to_string()))?;
205+
partitioned_file.object_meta.location = path;
206+
207+
// Process partition values
208+
// Create an empty input schema for partition values because they are all literals.
209+
let empty_schema = Arc::new(Schema::empty());
210+
let partition_values: Result<Vec<_>, _> = file
211+
.partition_values
212+
.iter()
213+
.map(|partition_value| {
214+
let literal =
215+
self.create_expr(partition_value, Arc::<Schema>::clone(&empty_schema))?;
216+
literal
217+
.as_any()
218+
.downcast_ref::<DataFusionLiteral>()
219+
.ok_or_else(|| {
220+
ExecutionError::GeneralError(
221+
"Expected literal of partition value".to_string(),
222+
)
223+
})
224+
.map(|literal| literal.value().clone())
225+
})
226+
.collect();
227+
let partition_values = partition_values?;
228+
229+
partitioned_file.partition_values = partition_values;
230+
231+
files.push(partitioned_file);
232+
Ok::<(), ExecutionError>(())
233+
})?;
234+
235+
Ok(files)
236+
}
237+
184238
/// Create a DataFusion physical expression from Spark physical expression
185239
fn create_expr(
186240
&self,
@@ -1195,19 +1249,6 @@ impl PhysicalPlanner {
11951249
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema)))
11961250
.collect();
11971251

1198-
// Create a conjunctive form of the vector because ParquetExecBuilder takes
1199-
// a single expression
1200-
let data_filters = data_filters?;
1201-
let cnf_data_filters = data_filters.clone().into_iter().reduce(|left, right| {
1202-
Arc::new(BinaryExpr::new(
1203-
left,
1204-
datafusion::logical_expr::Operator::And,
1205-
right,
1206-
))
1207-
});
1208-
1209-
// By default, local FS object store registered
1210-
// if `hdfs` feature enabled then HDFS file object store registered
12111252
// Get one file from the list of files
12121253
let one_file = scan
12131254
.file_partitions
@@ -1224,53 +1265,7 @@ impl PhysicalPlanner {
12241265
let mut file_groups: Vec<Vec<PartitionedFile>> =
12251266
Vec::with_capacity(partition_count);
12261267
scan.file_partitions.iter().try_for_each(|partition| {
1227-
let mut files = Vec::with_capacity(partition.partitioned_file.len());
1228-
partition.partitioned_file.iter().try_for_each(|file| {
1229-
assert!(file.start + file.length <= file.file_size);
1230-
1231-
let mut partitioned_file = PartitionedFile::new_with_range(
1232-
String::new(), // Dummy file path.
1233-
file.file_size as u64,
1234-
file.start,
1235-
file.start + file.length,
1236-
);
1237-
1238-
// Spark sends the path over as URL-encoded, parse that first.
1239-
let url = Url::parse(file.file_path.as_ref()).unwrap();
1240-
// Convert that to a Path object to use in the PartitionedFile.
1241-
let path = Path::from_url_path(url.path()).unwrap();
1242-
partitioned_file.object_meta.location = path;
1243-
1244-
// Process partition values
1245-
// Create an empty input schema for partition values because they are all literals.
1246-
let empty_schema = Arc::new(Schema::empty());
1247-
let partition_values: Result<Vec<_>, _> = file
1248-
.partition_values
1249-
.iter()
1250-
.map(|partition_value| {
1251-
let literal = self.create_expr(
1252-
partition_value,
1253-
Arc::<Schema>::clone(&empty_schema),
1254-
)?;
1255-
literal
1256-
.as_any()
1257-
.downcast_ref::<DataFusionLiteral>()
1258-
.ok_or_else(|| {
1259-
ExecutionError::GeneralError(
1260-
"Expected literal of partition value".to_string(),
1261-
)
1262-
})
1263-
.map(|literal| literal.value().clone())
1264-
})
1265-
.collect();
1266-
let partition_values = partition_values?;
1267-
1268-
partitioned_file.partition_values = partition_values;
1269-
1270-
files.push(partitioned_file);
1271-
Ok::<(), ExecutionError>(())
1272-
})?;
1273-
1268+
let files = self.get_partitioned_files(partition)?;
12741269
file_groups.push(files);
12751270
Ok::<(), ExecutionError>(())
12761271
})?;
@@ -1284,47 +1279,20 @@ impl PhysicalPlanner {
12841279
Field::new(field.name(), field.data_type().clone(), field.is_nullable())
12851280
})
12861281
.collect_vec();
1287-
1288-
let mut table_parquet_options = TableParquetOptions::new();
1289-
// TODO: Maybe these are configs?
1290-
table_parquet_options.global.pushdown_filters = true;
1291-
table_parquet_options.global.reorder_filters = true;
1292-
1293-
let mut spark_parquet_options = SparkParquetOptions::new(
1294-
EvalMode::Legacy,
1295-
scan.session_timezone.as_str(),
1296-
false,
1297-
);
1298-
spark_parquet_options.allow_cast_unsigned_ints = true;
1299-
1300-
let mut parquet_source = ParquetSource::new(table_parquet_options)
1301-
.with_schema_adapter_factory(Arc::new(SparkSchemaAdapterFactory::new(
1302-
spark_parquet_options,
1303-
)));
1304-
1305-
if let Some(filter) = cnf_data_filters {
1306-
parquet_source =
1307-
parquet_source.with_predicate(Arc::clone(&data_schema), filter);
1308-
}
1309-
1310-
let mut file_scan_config = FileScanConfig::new(
1282+
let scan = init_datasource_exec(
1283+
required_schema,
1284+
Some(data_schema),
1285+
Some(partition_schema),
1286+
Some(partition_fields),
13111287
object_store_url,
1312-
Arc::clone(&data_schema),
1313-
Arc::new(parquet_source),
1314-
)
1315-
.with_file_groups(file_groups)
1316-
.with_table_partition_cols(partition_fields);
1317-
1318-
assert_eq!(
1319-
projection_vector.len(),
1320-
required_schema.fields.len() + partition_schema.fields.len()
1321-
);
1322-
file_scan_config = file_scan_config.with_projection(Some(projection_vector));
1323-
1324-
let scan = DataSourceExec::new(Arc::new(file_scan_config));
1288+
file_groups,
1289+
Some(projection_vector),
1290+
Some(data_filters?),
1291+
scan.session_timezone.as_str(),
1292+
)?;
13251293
Ok((
13261294
vec![],
1327-
Arc::new(SparkPlan::new(spark_plan.plan_id, Arc::new(scan), vec![])),
1295+
Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])),
13281296
))
13291297
}
13301298
OpStruct::Scan(scan) => {

native/core/src/parquet/mod.rs

Lines changed: 38 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub use mutable_vector::*;
2121

2222
#[macro_use]
2323
pub mod util;
24+
pub mod parquet_exec;
2425
pub mod parquet_support;
2526
pub mod read;
2627
pub mod schema_adapter;
@@ -46,23 +47,21 @@ use self::util::jni::TypePromotionInfo;
4647
use crate::execution::operators::ExecutionError;
4748
use crate::execution::utils::SparkArrowConvert;
4849
use crate::parquet::data_type::AsBytes;
49-
use crate::parquet::parquet_support::{prepare_object_store, SparkParquetOptions};
50-
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
50+
use crate::parquet::parquet_exec::init_datasource_exec;
51+
use crate::parquet::parquet_support::prepare_object_store;
5152
use arrow::array::{Array, RecordBatch};
5253
use arrow::buffer::{Buffer, MutableBuffer};
53-
use datafusion::common::config::TableParquetOptions;
5454
use datafusion::datasource::listing::PartitionedFile;
55-
use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
56-
use datafusion::datasource::source::DataSourceExec;
5755
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
5856
use datafusion::physical_plan::ExecutionPlan;
5957
use datafusion::prelude::SessionContext;
60-
use datafusion_comet_spark_expr::EvalMode;
6158
use futures::{poll, StreamExt};
6259
use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, JString, ReleaseMode};
6360
use jni::sys::jstring;
61+
use object_store::path::Path;
6462
use read::ColumnReader;
6563
use util::jni::{convert_column_descriptor, convert_encoding, deserialize_schema};
64+
6665
/// Parquet read context maintained across multiple JNI calls.
6766
struct Context {
6867
pub column_reader: ColumnReader,
@@ -620,12 +619,21 @@ fn get_batch_context<'a>(handle: jlong) -> Result<&'a mut BatchContext, CometErr
620619
}
621620
}
622621

623-
/*
624-
#[inline]
625-
fn get_batch_reader<'a>(handle: jlong) -> Result<&'a mut ParquetRecordBatchReader, CometError> {
626-
Ok(&mut get_batch_context(handle)?.batch_reader.unwrap())
622+
fn get_file_groups_single_file(
623+
path: &Path,
624+
file_size: u64,
625+
start: i64,
626+
length: i64,
627+
) -> Vec<Vec<PartitionedFile>> {
628+
let mut partitioned_file = PartitionedFile::new_with_range(
629+
String::new(), // Dummy file path. We will override this with our path so that url encoding does not occur
630+
file_size,
631+
start,
632+
start + length,
633+
);
634+
partitioned_file.object_meta.location = (*path).clone();
635+
vec![vec![partitioned_file]]
627636
}
628-
*/
629637

630638
/// # Safety
631639
/// This function is inherently unsafe since it deals with raw pointers passed from JNI.
@@ -645,65 +653,42 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
645653
.get_string(&JString::from_raw(file_path))
646654
.unwrap()
647655
.into();
648-
let batch_stream: Option<SendableRecordBatchStream>;
649-
// TODO: (ARROW NATIVE) Use the common global runtime
656+
650657
let runtime = tokio::runtime::Builder::new_multi_thread()
651658
.enable_all()
652659
.build()?;
653660
let session_ctx = SessionContext::new();
661+
654662
let (object_store_url, object_store_path) =
655663
prepare_object_store(session_ctx.runtime_env(), path.clone())?;
656664

657-
// EXPERIMENTAL - BEGIN
658-
// TODO: (ARROW NATIVE) - Remove code duplication between this and POC 1
659-
// copy the input on-heap buffer to native
660665
let required_schema_array = JByteArray::from_raw(required_schema);
661666
let required_schema_buffer = env.convert_byte_array(&required_schema_array)?;
662-
let required_schema_arrow = deserialize_schema(required_schema_buffer.as_bytes())?;
663-
let mut partitioned_file = PartitionedFile::new_with_range(
664-
String::new(), // Dummy file path. We will override this with our path so that url encoding does not occur
665-
file_size as u64,
666-
start,
667-
start + length,
668-
);
669-
partitioned_file.object_meta.location = object_store_path;
667+
let required_schema = Arc::new(deserialize_schema(required_schema_buffer.as_bytes())?);
668+
669+
let file_groups =
670+
get_file_groups_single_file(&object_store_path, file_size as u64, start, length);
671+
670672
let session_timezone: String = env
671673
.get_string(&JString::from_raw(session_timezone))
672674
.unwrap()
673675
.into();
674676

675-
let mut spark_parquet_options =
676-
SparkParquetOptions::new(EvalMode::Legacy, session_timezone.as_str(), false);
677-
spark_parquet_options.allow_cast_unsigned_ints = true;
678-
679-
let mut table_parquet_options = TableParquetOptions::new();
680-
// TODO: Maybe these are configs?
681-
table_parquet_options.global.pushdown_filters = true;
682-
table_parquet_options.global.reorder_filters = true;
683-
684-
let parquet_source = ParquetSource::new(table_parquet_options).with_schema_adapter_factory(
685-
Arc::new(SparkSchemaAdapterFactory::new(spark_parquet_options)),
686-
);
687-
688-
// We build the file scan config with the *required* schema so that the reader knows
689-
// the output schema we want
690-
let file_scan_config = FileScanConfig::new(object_store_url, Arc::new(required_schema_arrow), Arc::new(parquet_source))
691-
.with_file(partitioned_file)
692-
// TODO: (ARROW NATIVE) - do partition columns in native
693-
// - will need partition schema and partition values to do so
694-
// .with_table_partition_cols(partition_fields)
695-
;
696-
697-
//TODO: (ARROW NATIVE) - predicate pushdown??
698-
// builder = builder.with_predicate(filter);
699-
700-
let scan = Arc::new(DataSourceExec::new(Arc::new(file_scan_config)));
677+
let scan = init_datasource_exec(
678+
required_schema,
679+
None,
680+
None,
681+
None,
682+
object_store_url,
683+
file_groups,
684+
None,
685+
None,
686+
session_timezone.as_str(),
687+
)?;
701688

702689
let ctx = TaskContext::default();
703690
let partition_index: usize = 0;
704-
batch_stream = Some(scan.execute(partition_index, Arc::new(ctx))?);
705-
706-
// EXPERIMENTAL - END
691+
let batch_stream = Some(scan.execute(partition_index, Arc::new(ctx))?);
707692

708693
let ctx = BatchContext {
709694
runtime,

0 commit comments

Comments
 (0)