Skip to content

Commit e7a49a6

Browse files
phillipleblancsgrebnov
authored andcommitted
Support retrieving the latest Iceberg table on table scan (#11)
Original PR: #11 Upstream PR: apache#1297
1 parent d144694 commit e7a49a6

File tree

2 files changed

+32
-14
lines changed

2 files changed

+32
-14
lines changed

crates/integrations/datafusion/src/schema.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use datafusion::datasource::TableProvider;
2525
use datafusion::error::{DataFusionError, Result as DFResult};
2626
use futures::future::try_join_all;
2727
use iceberg::inspect::MetadataTableType;
28-
use iceberg::{Catalog, NamespaceIdent, Result};
28+
use iceberg::{Catalog, NamespaceIdent, Result, TableIdent};
2929

3030
use crate::table::IcebergTableProvider;
3131

@@ -65,7 +65,10 @@ impl IcebergSchemaProvider {
6565
let providers = try_join_all(
6666
table_names
6767
.iter()
68-
.map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name))
68+
.map(|name| {
69+
let table_ident = TableIdent::new(namespace.clone(), name.clone());
70+
IcebergTableProvider::try_new(client.clone(), table_ident)
71+
})
6972
.collect::<Vec<_>>(),
7073
)
7174
.await?;

crates/integrations/datafusion/src/table/mod.rs

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,16 @@ use std::sync::Arc;
2424
use async_trait::async_trait;
2525
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
2626
use datafusion::catalog::Session;
27-
use datafusion::common::DataFusionError;
2827
use datafusion::datasource::{TableProvider, TableType};
29-
use datafusion::error::Result as DFResult;
28+
use datafusion::error::{DataFusionError, Result as DFResult};
3029
use datafusion::logical_expr::dml::InsertOp;
3130
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
3231
use datafusion::physical_plan::ExecutionPlan;
3332
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
3433
use iceberg::arrow::schema_to_arrow_schema;
3534
use iceberg::inspect::MetadataTableType;
3635
use iceberg::table::Table;
37-
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
36+
use iceberg::{Catalog, Error, ErrorKind, Result, TableIdent};
3837
use metadata_table::IcebergMetadataTableProvider;
3938

4039
use crate::physical_plan::commit::IcebergCommitExec;
@@ -43,7 +42,7 @@ use crate::physical_plan::write::IcebergWriteExec;
4342

4443
/// Represents a [`TableProvider`] for the Iceberg [`Catalog`],
4544
/// managing access to a [`Table`].
46-
#[derive(Debug, Clone)]
45+
#[derive(Clone)]
4746
pub struct IcebergTableProvider {
4847
/// A table in the catalog.
4948
table: Table,
@@ -55,6 +54,16 @@ pub struct IcebergTableProvider {
5554
catalog: Option<Arc<dyn Catalog>>,
5655
}
5756

57+
impl std::fmt::Debug for IcebergTableProvider {
58+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59+
f.debug_struct("IcebergTableProvider")
60+
.field("table", &self.table)
61+
.field("snapshot_id", &self.snapshot_id)
62+
.field("schema", &self.schema)
63+
.finish_non_exhaustive()
64+
}
65+
}
66+
5867
impl IcebergTableProvider {
5968
pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
6069
IcebergTableProvider {
@@ -67,13 +76,8 @@ impl IcebergTableProvider {
6776
/// Asynchronously tries to construct a new [`IcebergTableProvider`]
6877
/// using the given client and table name to fetch an actual [`Table`]
6978
/// in the provided namespace.
70-
pub(crate) async fn try_new(
71-
client: Arc<dyn Catalog>,
72-
namespace: NamespaceIdent,
73-
name: impl Into<String>,
74-
) -> Result<Self> {
75-
let ident = TableIdent::new(namespace, name.into());
76-
let table = client.load_table(&ident).await?;
79+
pub async fn try_new(client: Arc<dyn Catalog>, table_name: TableIdent) -> Result<Self> {
80+
let table = client.load_table(&table_name).await?;
7781

7882
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
7983

@@ -151,8 +155,19 @@ impl TableProvider for IcebergTableProvider {
151155
filters: &[Expr],
152156
_limit: Option<usize>,
153157
) -> DFResult<Arc<dyn ExecutionPlan>> {
158+
// Get the latest table metadata from the catalog if it exists
159+
let table = if let Some(catalog) = &self.catalog {
160+
catalog
161+
.load_table(self.table.identifier())
162+
.await
163+
.map_err(|e| {
164+
DataFusionError::Execution(format!("Error getting Iceberg table metadata: {e}"))
165+
})?
166+
} else {
167+
self.table.clone()
168+
};
154169
Ok(Arc::new(IcebergTableScan::new(
155-
self.table.clone(),
170+
table,
156171
self.snapshot_id,
157172
self.schema.clone(),
158173
projection,

0 commit comments

Comments
 (0)