Skip to content

Commit 3e6779e

Browse files
CTTYliurenjie1024
andauthored
feat(datafusion): Implement insert_into for IcebergTableProvider (#1600)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - A part of #1540 - See draft: #1511 ## What changes are included in this PR? - Added `catalog` to `IcebergTableProvider` as optional - Added table refresh logic in `IcebergTableProvider::scan` - Implement `insert_into` for `IcebergTableProvider` using write node and commit node for non-partitioned tables <!-- Provide a summary of the modifications in this PR. List the main changes such as new features, bug fixes, refactoring, or any other updates. --> ## Are these changes tested? Added tests <!-- Specify what test covers (unit test, integration test, etc.). If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> --------- Co-authored-by: Renjie Liu <[email protected]>
1 parent f7e9308 commit 3e6779e

File tree

2 files changed

+428
-3
lines changed

2 files changed

+428
-3
lines changed

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,22 @@ use std::sync::Arc;
2424
use async_trait::async_trait;
2525
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
2626
use datafusion::catalog::Session;
27+
use datafusion::common::DataFusionError;
2728
use datafusion::datasource::{TableProvider, TableType};
2829
use datafusion::error::Result as DFResult;
30+
use datafusion::logical_expr::dml::InsertOp;
2931
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
3032
use datafusion::physical_plan::ExecutionPlan;
33+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
3134
use iceberg::arrow::schema_to_arrow_schema;
3235
use iceberg::inspect::MetadataTableType;
3336
use iceberg::table::Table;
3437
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
3538
use metadata_table::IcebergMetadataTableProvider;
3639

40+
use crate::physical_plan::commit::IcebergCommitExec;
3741
use crate::physical_plan::scan::IcebergTableScan;
42+
use crate::physical_plan::write::IcebergWriteExec;
3843

3944
/// Represents a [`TableProvider`] for the Iceberg [`Catalog`],
4045
/// managing access to a [`Table`].
@@ -46,6 +51,8 @@ pub struct IcebergTableProvider {
4651
snapshot_id: Option<i64>,
4752
/// A reference-counted arrow `Schema`.
4853
schema: ArrowSchemaRef,
54+
/// The catalog that the table belongs to.
55+
catalog: Option<Arc<dyn Catalog>>,
4956
}
5057

5158
impl IcebergTableProvider {
@@ -54,6 +61,7 @@ impl IcebergTableProvider {
5461
table,
5562
snapshot_id: None,
5663
schema,
64+
catalog: None,
5765
}
5866
}
5967
/// Asynchronously tries to construct a new [`IcebergTableProvider`]
@@ -73,6 +81,7 @@ impl IcebergTableProvider {
7381
table,
7482
snapshot_id: None,
7583
schema,
84+
catalog: Some(client),
7685
})
7786
}
7887

@@ -84,6 +93,7 @@ impl IcebergTableProvider {
8493
table,
8594
snapshot_id: None,
8695
schema,
96+
catalog: None,
8797
})
8898
}
8999

@@ -108,6 +118,7 @@ impl IcebergTableProvider {
108118
table,
109119
snapshot_id: Some(snapshot_id),
110120
schema,
121+
catalog: None,
111122
})
112123
}
113124

@@ -152,11 +163,52 @@ impl TableProvider for IcebergTableProvider {
152163
fn supports_filters_pushdown(
153164
&self,
154165
filters: &[&Expr],
155-
) -> std::result::Result<Vec<TableProviderFilterPushDown>, datafusion::error::DataFusionError>
156-
{
166+
) -> DFResult<Vec<TableProviderFilterPushDown>> {
157167
// Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down
158168
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
159169
}
170+
171+
async fn insert_into(
172+
&self,
173+
_state: &dyn Session,
174+
input: Arc<dyn ExecutionPlan>,
175+
_insert_op: InsertOp,
176+
) -> DFResult<Arc<dyn ExecutionPlan>> {
177+
if !self
178+
.table
179+
.metadata()
180+
.default_partition_spec()
181+
.is_unpartitioned()
182+
{
183+
// TODO add insert into support for partitioned tables
184+
return Err(DataFusionError::NotImplemented(
185+
"IcebergTableProvider::insert_into does not support partitioned tables yet"
186+
.to_string(),
187+
));
188+
}
189+
190+
let Some(catalog) = self.catalog.clone() else {
191+
return Err(DataFusionError::Execution(
192+
"Catalog cannot be none for insert_into".to_string(),
193+
));
194+
};
195+
196+
let write_plan = Arc::new(IcebergWriteExec::new(
197+
self.table.clone(),
198+
input,
199+
self.schema.clone(),
200+
));
201+
202+
// Merge the outputs of write_plan into one so we can commit all files together
203+
let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan));
204+
205+
Ok(Arc::new(IcebergCommitExec::new(
206+
self.table.clone(),
207+
catalog,
208+
coalesce_partitions,
209+
self.schema.clone(),
210+
)))
211+
}
160212
}
161213

162214
#[cfg(test)]

0 commit comments

Comments
 (0)