Skip to content

Commit d043d1c

Browse files
committed
map our column reps to deltalake reps
1 parent 09ebef5 commit d043d1c

File tree

1 file changed

+32
-1
lines changed

1 file changed

+32
-1
lines changed

src/database.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use datafusion::{
2525
use datafusion_functions_json;
2626
use delta_kernel::arrow::record_batch::RecordBatch;
2727
use deltalake::datafusion::parquet::file::properties::WriterProperties;
28+
use deltalake::delta_datafusion::DataFusionMixins;
2829
use deltalake::kernel::transaction::CommitProperties;
2930
use deltalake::PartitionFilter;
3031
use deltalake::{DeltaOps, DeltaTable, DeltaTableBuilder};
@@ -1856,14 +1857,44 @@ impl TableProvider for ProjectRoutingTable {
18561857
let delta_table = self.database.resolve_table(&project_id, &self.table_name).instrument(resolve_span).await?;
18571858
let table = delta_table.read().await;
18581859

1860+
// Map projection indices from our schema to the Delta table's schema
1861+
let mapped_projection = if let Some(proj) = projection {
1862+
// Get the actual Delta table arrow schema directly
1863+
let snapshot = table.snapshot().map_err(|e| DataFusionError::External(Box::new(e)))?;
1864+
let delta_arrow_schema = snapshot.arrow_schema()
1865+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
1866+
1867+
// Map projection indices
1868+
let mut mapped_indices = Vec::new();
1869+
for &idx in proj {
1870+
// Get field name from our schema
1871+
if let Some(field) = self.schema.fields().get(idx) {
1872+
let field_name = field.name();
1873+
// Find corresponding index in Delta schema
1874+
if let Ok(delta_idx) = delta_arrow_schema.index_of(field_name) {
1875+
mapped_indices.push(delta_idx);
1876+
} else {
1877+
// Field not found in Delta schema - this shouldn't happen but handle gracefully
1878+
warn!("Field '{}' at index {} not found in Delta table schema", field_name, idx);
1879+
return Err(DataFusionError::Plan(format!("Column '{}' not found in table", field_name)));
1880+
}
1881+
} else {
1882+
return Err(DataFusionError::Plan(format!("Invalid projection index: {}", idx)));
1883+
}
1884+
}
1885+
Some(mapped_indices)
1886+
} else {
1887+
None
1888+
};
1889+
18591890
// Create a span for the table scan that will be the parent for all object store operations
18601891
let scan_span = tracing::trace_span!("delta_table.scan",
18611892
table.name = %self.table_name,
18621893
table.project_id = %project_id,
18631894
partition_filters = ?optimized_filters.iter().filter(|f| matches!(f, Expr::BinaryExpr(_))).count()
18641895
);
18651896

1866-
let plan = table.scan(state, projection, &optimized_filters, limit).instrument(scan_span).await?;
1897+
let plan = table.scan(state, mapped_projection.as_ref(), &optimized_filters, limit).instrument(scan_span).await?;
18671898

18681899
Ok(plan)
18691900
}

0 commit comments

Comments
 (0)