Skip to content

Conversation

fvaleye
Copy link
Contributor

@fvaleye fvaleye commented Aug 13, 2025

Which issue does this PR close?

What changes are included in this PR?

Implement a physical execution plan node that projects Iceberg partition columns from source data, supporting nested fields and all Iceberg transforms.

Are these changes tested?

Yes, with unit tests

@fvaleye fvaleye force-pushed the feature/implement-project-node-for-insert-into-datafusion branch from b3a8601 to 40a225a Compare August 13, 2025 13:17
…umns defined in Iceberg.

Implement physical execution plan node that projects Iceberg partition columns from source
data, supporting nested fields and all Iceberg transforms.
@fvaleye fvaleye force-pushed the feature/implement-project-node-for-insert-into-datafusion branch from 40a225a to 4d59f87 Compare August 13, 2025 14:50
Comment on lines 148 to 151
let field_path = Self::find_field_path(&self.table_schema, source_field.id)?;
let index_path = Self::resolve_arrow_index_path(batch_schema.as_ref(), &field_path)?;

let source_column = Self::extract_column_by_index_path(batch, &index_path)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very interesting! I actually came across the similar issue when implementing the sort node, and I was leaning toward implementing a new SchemaWithPartnerVisitor, wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect 👌
I was initially thinking this was needed just for this implementation, but it seems the right place would be closer to the Schema definition. Since this is a standard method for accessing column values by index, it makes sense to generalize!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I drafted a PartitionValueVisitor here to help extract partition values from a record batch in tree-traversal style

Pleast let me know what you think!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just saw this implementation to extract partition values and it actually makes more sense to me that it leverages the existing RecordBatchProjector: #1040

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good, thanks for sharing. I will use #1040 when merged!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @CTTY 👋,
I can use it now, but I have one concern about leveraging RecordBatchPartitionSplitter, it relies on PARQUET_FIELD_ID_META_KEY.
Since DataFusion doesn’t use this key, do you think we should adapt this method to make it compatible with DataFusion?

}

/// Find the path to a field by its ID (e.g., ["address", "city"]) in the Iceberg schema
fn find_field_path(table_schema: &Schema, field_id: i32) -> DFResult<Vec<String>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to consider this function as well @CTTY following our discussion here.
It may not be the right place at the moment.

@fvaleye fvaleye force-pushed the feature/implement-project-node-for-insert-into-datafusion branch from 57fe2dd to bc805db Compare September 21, 2025 15:47
@liurenjie1024
Copy link
Contributor

Hi, @fvaleye is this pr ready for review or you still need some effort to improve it?

@fvaleye
Copy link
Contributor Author

fvaleye commented Sep 22, 2025

Hi, @fvaleye is this pr ready for review or you still need some effort to improve it?

Hi @liurenjie1024 👋,
I removed the DataFusion integration and its extra node, so it's ready for review.
I tried to use RecordBatchProjector (from this), but it doesn't meet our requirements for DataFusion (it uses PARQUET_FIELD_META_KEY).

So, yes, it's ready for review. However, it might require additional refactoring if we want to make these utility functions more general.
Please tell me what you think!

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @fvaleye for this pr! I left some comments to improve, and I still have other questions:

  1. What's the entry point of this module?
  2. Could the entrypoint of this module be a funtion like sth below:
fn porject_with_partition(input: &ExecutionPlan, table: &Table) -> Result<Arc<dyn ExecutionPlan>> {
// This method extend `input` with an extra `PhysicalExpr`, which calculates the partition value.
....
}

/// Extract a column from a record batch by following an index path.
/// The index path specifies the column indices to traverse for nested structures.
#[allow(dead_code)]
fn extract_column_by_index_path(batch: &RecordBatch, index_path: &[usize]) -> DFResult<ArrayRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we reuse RecordBatchProjection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried, but I kept this implementation, the main reasons are below:

1. Metadata Dependency:
RecordBatchProjector depends on Arrow field metadata containing PARQUET:field_id
This metadata is added when reading Parquet files through Iceberg's reader
DataFusion ExecutionPlans might not always have this metadata preserved

2. Using the Iceberg table's schema directly
We resolve field paths using field names, not IDs
This works regardless of whether Arrow metadata is present

Depending on what you think:

  1. We could keep this implementation working with DataFusion
  2. Readapt RecordBatchProjection but it feels like it's not the same intent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced. There are two ways to solve your issue:

  1. Add a constructor in RecordBatchProjector to accept iceberg schema and target field ids.
  2. Convert iceberg schema to arrow schema, the converter will add field_id metadata.

Personally I prefer approach 1, but I don't have a strong opinion about. After using RecordBatchProjector, the whole pr could be simplified a lot.

…use PhysicalExpr for partitions values calculation.

Signed-off-by: Florian Valeye <[email protected]>
/// Extract a column from a record batch by following an index path.
/// The index path specifies the column indices to traverse for nested structures.
#[allow(dead_code)]
fn extract_column_by_index_path(batch: &RecordBatch, index_path: &[usize]) -> DFResult<ArrayRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced. There are two ways to solve your issue:

  1. Add a constructor in RecordBatchProjector to accept iceberg schema and target field ids.
  2. Convert iceberg schema to arrow schema, the converter will add field_id metadata.

Personally I prefer approach 1, but I don't have a strong opinion about. After using RecordBatchProjector, the whole pr could be simplified a lot.

Comment on lines 190 to 191
let field_path = find_field_path(&self.table_schema, source_field.id)?;
let index_path = resolve_arrow_index_path(batch_schema.as_ref(), &field_path)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to to them for every batch.

let partition_value = transform_fn
.transform(source_column)
.map_err(to_datafusion_error)?;
let transform_fn = iceberg::transform::create_transform_function(&pf.transform)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, this only needs to be done once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not resolved, we could create trnasform functions in constructor.

.map(|pf| pf.source_id)
.collect();

let projector = RecordBatchProjector::from_iceberg_schema_mapping(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let partition_value = transform_fn
.transform(source_column)
.map_err(to_datafusion_error)?;
let transform_fn = iceberg::transform::create_transform_function(&pf.transform)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not resolved, we could create trnasform functions in constructor.

iceberg_schema: Arc<IcebergSchema>,
target_field_ids: &[i32],
) -> Result<Self> {
let field_id_fetch_func = |field: &Field| -> Result<Option<i64>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is unnecessarily to be too complicated, following approach could simplify this:

  1. Create an arrow schema using iceberg schema.
  2. Prune arrow schema created in step 1 by matching top level field name.

Then we can pass pruned arrow schema to original constructor, and search field by PARQUET_FIELD_ID_META_KEY

@fvaleye fvaleye force-pushed the feature/implement-project-node-for-insert-into-datafusion branch from edb4719 to d4fd336 Compare October 1, 2025 11:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement Project Node: Caculate partition value

3 participants