|
1 | 1 | use crate::object_store_cache::{FoyerCacheConfig, FoyerObjectStoreCache, SharedFoyerCache}; |
2 | | -use crate::schema_loader::{get_default_schema, get_schema, TableSchema}; |
| 2 | +use crate::schema_loader::{get_default_schema, get_schema}; |
3 | 3 | use crate::statistics::DeltaStatisticsExtractor; |
4 | 4 | use anyhow::Result; |
5 | 5 | use arrow_schema::SchemaRef; |
6 | 6 | use async_trait::async_trait; |
7 | 7 | use chrono::Utc; |
8 | | -use datafusion::arrow::array::{Array, AsArray, new_null_array}; |
9 | | -use datafusion::arrow::compute::cast; |
| 8 | +use datafusion::arrow::array::{Array, AsArray}; |
10 | 9 | use datafusion::common::not_impl_err; |
11 | 10 | use datafusion::common::{SchemaExt, Statistics}; |
12 | 11 | use datafusion::datasource::sink::{DataSink, DataSinkExec}; |
@@ -1165,72 +1164,6 @@ impl Database { |
1165 | 1164 | .map_err(|e| anyhow::anyhow!("Failed to load table: {}", e)) |
1166 | 1165 | } |
1167 | 1166 |
|
1168 | | - /// Maps a RecordBatch to match the expected Delta table schema |
1169 | | - /// This includes reordering columns and coercing types where necessary |
1170 | | - async fn map_batch_to_delta_schema(&self, batch: &RecordBatch, delta_table: &DeltaTable, _expected_schema: &TableSchema) -> Result<RecordBatch> { |
1171 | | - // Get the Delta table's current schema |
1172 | | - let snapshot = delta_table.snapshot() |
1173 | | - .map_err(|e| anyhow::anyhow!("Failed to get Delta snapshot: {}", e))?; |
1174 | | - let delta_arrow_schema = snapshot.arrow_schema() |
1175 | | - .map_err(|e| anyhow::anyhow!("Failed to get Delta arrow schema: {}", e))?; |
1176 | | - |
1177 | | - // Build new columns in the order expected by Delta table |
1178 | | - let mut new_columns = Vec::new(); |
1179 | | - |
1180 | | - for field in delta_arrow_schema.fields() { |
1181 | | - let field_name = field.name(); |
1182 | | - |
1183 | | - // Try to find the column in the incoming batch |
1184 | | - if let Some((idx, _)) = batch.schema().column_with_name(field_name) { |
1185 | | - let column = batch.column(idx); |
1186 | | - |
1187 | | - // Check if types match, if not try to cast |
1188 | | - if column.data_type() != field.data_type() { |
1189 | | - // Attempt to cast the column to the expected type |
1190 | | - match cast(column, field.data_type()) { |
1191 | | - Ok(casted_column) => { |
1192 | | - new_columns.push(casted_column); |
1193 | | - } |
1194 | | - Err(e) => { |
1195 | | - // If cast fails, log warning and return error |
1196 | | - warn!( |
1197 | | - "Failed to cast column '{}' from {:?} to {:?}: {}", |
1198 | | - field_name, |
1199 | | - column.data_type(), |
1200 | | - field.data_type(), |
1201 | | - e |
1202 | | - ); |
1203 | | - return Err(anyhow::anyhow!( |
1204 | | - "Type mismatch for column '{}': cannot cast from {:?} to {:?}", |
1205 | | - field_name, |
1206 | | - column.data_type(), |
1207 | | - field.data_type() |
1208 | | - )); |
1209 | | - } |
1210 | | - } |
1211 | | - } else { |
1212 | | - // Types match, use column as-is |
1213 | | - new_columns.push(column.clone()); |
1214 | | - } |
1215 | | - } else { |
1216 | | - // Column not found in batch |
1217 | | - // For nullable columns, create a null array |
1218 | | - if field.is_nullable() { |
1219 | | - let null_array = new_null_array(field.data_type(), batch.num_rows()); |
1220 | | - new_columns.push(null_array); |
1221 | | - } else { |
1222 | | - return Err(anyhow::anyhow!( |
1223 | | - "Required column '{}' not found in batch", |
1224 | | - field_name |
1225 | | - )); |
1226 | | - } |
1227 | | - } |
1228 | | - } |
1229 | | - |
1230 | | - // Create new batch with mapped columns |
1231 | | - RecordBatch::try_new(delta_arrow_schema.clone(), new_columns) |
1232 | | - .map_err(|e| anyhow::anyhow!("Failed to create mapped batch: {}", e)) |
1233 | | - } |
1234 | 1167 |
|
1235 | 1168 | #[instrument( |
1236 | 1169 | name = "delta.insert_batch", |
@@ -1294,26 +1227,11 @@ impl Database { |
1294 | 1227 | debug!("Failed to update table before write (attempt {}): {}", retry_count + 1, e); |
1295 | 1228 | } |
1296 | 1229 |
|
1297 | | - // Map batches to match Delta table schema |
1298 | | - let mapped_batches = { |
1299 | | - let mut mapped = Vec::new(); |
1300 | | - for batch in &batches { |
1301 | | - match self.map_batch_to_delta_schema(batch, &table, &schema).await { |
1302 | | - Ok(mapped_batch) => mapped.push(mapped_batch), |
1303 | | - Err(e) => { |
1304 | | - warn!("Failed to map batch to Delta schema: {}", e); |
1305 | | - return Err(e); |
1306 | | - } |
1307 | | - } |
1308 | | - } |
1309 | | - mapped |
1310 | | - }; |
1311 | | - |
1312 | 1230 | let write_span = tracing::trace_span!(parent: &span, "delta.write_operation", retry_attempt = retry_count + 1); |
1313 | 1231 | let write_result = async { |
1314 | 1232 | // Schema evolution enabled: new columns will be automatically added to the table |
1315 | 1233 | DeltaOps(table.clone()) |
1316 | | - .write(mapped_batches) |
| 1234 | + .write(batches.clone()) |
1317 | 1235 | .with_partition_columns(schema.partitions.clone()) |
1318 | 1236 | .with_writer_properties(writer_properties.clone()) |
1319 | 1237 | .with_save_mode(deltalake::protocol::SaveMode::Append) |
|
0 commit comments