|
1 | 1 | use crate::object_store_cache::{FoyerCacheConfig, FoyerObjectStoreCache, SharedFoyerCache}; |
2 | | -use crate::schema_loader::{get_default_schema, get_schema}; |
| 2 | +use crate::schema_loader::{get_default_schema, get_schema, TableSchema}; |
3 | 3 | use crate::statistics::DeltaStatisticsExtractor; |
4 | 4 | use anyhow::Result; |
5 | 5 | use arrow_schema::SchemaRef; |
6 | 6 | use async_trait::async_trait; |
7 | 7 | use chrono::Utc; |
8 | | -use datafusion::arrow::array::{Array, AsArray}; |
| 8 | +use datafusion::arrow::array::{Array, AsArray, new_null_array}; |
| 9 | +use datafusion::arrow::compute::cast; |
9 | 10 | use datafusion::common::not_impl_err; |
10 | 11 | use datafusion::common::{SchemaExt, Statistics}; |
11 | 12 | use datafusion::datasource::sink::{DataSink, DataSinkExec}; |
@@ -1164,6 +1165,73 @@ impl Database { |
1164 | 1165 | .map_err(|e| anyhow::anyhow!("Failed to load table: {}", e)) |
1165 | 1166 | } |
1166 | 1167 |
|
| 1168 | + /// Maps a RecordBatch to match the expected Delta table schema |
| 1169 | + /// This includes reordering columns and coercing types where necessary |
| 1170 | + async fn map_batch_to_delta_schema(&self, batch: &RecordBatch, delta_table: &DeltaTable, _expected_schema: &TableSchema) -> Result<RecordBatch> { |
| 1171 | + // Get the Delta table's current schema |
| 1172 | + let snapshot = delta_table.snapshot() |
| 1173 | + .map_err(|e| anyhow::anyhow!("Failed to get Delta snapshot: {}", e))?; |
| 1174 | + let delta_arrow_schema = snapshot.arrow_schema() |
| 1175 | + .map_err(|e| anyhow::anyhow!("Failed to get Delta arrow schema: {}", e))?; |
| 1176 | + |
| 1177 | + // Build new columns in the order expected by Delta table |
| 1178 | + let mut new_columns = Vec::new(); |
| 1179 | + |
| 1180 | + for field in delta_arrow_schema.fields() { |
| 1181 | + let field_name = field.name(); |
| 1182 | + |
| 1183 | + // Try to find the column in the incoming batch |
| 1184 | + if let Some((idx, _)) = batch.schema().column_with_name(field_name) { |
| 1185 | + let column = batch.column(idx); |
| 1186 | + |
| 1187 | + // Check if types match, if not try to cast |
| 1188 | + if column.data_type() != field.data_type() { |
| 1189 | + // Attempt to cast the column to the expected type |
| 1190 | + match cast(column, field.data_type()) { |
| 1191 | + Ok(casted_column) => { |
| 1192 | + new_columns.push(casted_column); |
| 1193 | + } |
| 1194 | + Err(e) => { |
| 1195 | + // If cast fails, log warning and return error |
| 1196 | + warn!( |
| 1197 | + "Failed to cast column '{}' from {:?} to {:?}: {}", |
| 1198 | + field_name, |
| 1199 | + column.data_type(), |
| 1200 | + field.data_type(), |
| 1201 | + e |
| 1202 | + ); |
| 1203 | + return Err(anyhow::anyhow!( |
| 1204 | + "Type mismatch for column '{}': cannot cast from {:?} to {:?}", |
| 1205 | + field_name, |
| 1206 | + column.data_type(), |
| 1207 | + field.data_type() |
| 1208 | + )); |
| 1209 | + } |
| 1210 | + } |
| 1211 | + } else { |
| 1212 | + // Types match, use column as-is |
| 1213 | + new_columns.push(column.clone()); |
| 1214 | + } |
| 1215 | + } else { |
| 1216 | + // Column not found in batch |
| 1217 | + // For nullable columns, create a null array |
| 1218 | + if field.is_nullable() { |
| 1219 | + let null_array = new_null_array(field.data_type(), batch.num_rows()); |
| 1220 | + new_columns.push(null_array); |
| 1221 | + } else { |
| 1222 | + return Err(anyhow::anyhow!( |
| 1223 | + "Required column '{}' not found in batch", |
| 1224 | + field_name |
| 1225 | + )); |
| 1226 | + } |
| 1227 | + } |
| 1228 | + } |
| 1229 | + |
| 1230 | + // Create new batch with mapped columns |
| 1231 | + RecordBatch::try_new(delta_arrow_schema.clone(), new_columns) |
| 1232 | + .map_err(|e| anyhow::anyhow!("Failed to create mapped batch: {}", e)) |
| 1233 | + } |
| 1234 | + |
1167 | 1235 | #[instrument( |
1168 | 1236 | name = "delta.insert_batch", |
1169 | 1237 | skip_all, |
@@ -1226,11 +1294,26 @@ impl Database { |
1226 | 1294 | debug!("Failed to update table before write (attempt {}): {}", retry_count + 1, e); |
1227 | 1295 | } |
1228 | 1296 |
|
| 1297 | + // Map batches to match Delta table schema |
| 1298 | + let mapped_batches = { |
| 1299 | + let mut mapped = Vec::new(); |
| 1300 | + for batch in &batches { |
| 1301 | + match self.map_batch_to_delta_schema(batch, &table, &schema).await { |
| 1302 | + Ok(mapped_batch) => mapped.push(mapped_batch), |
| 1303 | + Err(e) => { |
| 1304 | + warn!("Failed to map batch to Delta schema: {}", e); |
| 1305 | + return Err(e); |
| 1306 | + } |
| 1307 | + } |
| 1308 | + } |
| 1309 | + mapped |
| 1310 | + }; |
| 1311 | + |
1229 | 1312 | let write_span = tracing::trace_span!(parent: &span, "delta.write_operation", retry_attempt = retry_count + 1); |
1230 | 1313 | let write_result = async { |
1231 | 1314 | // Schema evolution enabled: new columns will be automatically added to the table |
1232 | 1315 | DeltaOps(table.clone()) |
1233 | | - .write(batches.clone()) |
| 1316 | + .write(mapped_batches) |
1234 | 1317 | .with_partition_columns(schema.partitions.clone()) |
1235 | 1318 | .with_writer_properties(writer_properties.clone()) |
1236 | 1319 | .with_save_mode(deltalake::protocol::SaveMode::Append) |
|
0 commit comments