From f94d506a1da8de12b1fbe300bb54d45e6eab07be Mon Sep 17 00:00:00 2001 From: dvlascenco Date: Tue, 7 Apr 2026 12:10:54 +0300 Subject: [PATCH 1/2] [HSTACK] - fix crash on unparseable timestamps in stats JSON --- .../src/kernel/snapshot/iterators/scan_row.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/crates/core/src/kernel/snapshot/iterators/scan_row.rs b/crates/core/src/kernel/snapshot/iterators/scan_row.rs index 1a6536255..ab5629099 100644 --- a/crates/core/src/kernel/snapshot/iterators/scan_row.rs +++ b/crates/core/src/kernel/snapshot/iterators/scan_row.rs @@ -10,6 +10,7 @@ use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; use delta_kernel::engine::arrow_conversion::TryIntoKernel; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::parse_json; +use delta_kernel::EngineData; use delta_kernel::expressions::Scalar; use delta_kernel::expressions::UnaryExpressionOp; use delta_kernel::scan::scan_row_schema; @@ -142,7 +143,20 @@ fn parse_stats_column_impl( let stats_batch = batch.project(&[stats_idx])?; let stats_data = Box::new(ArrowEngineData::new(stats_batch)); - let parsed = parse_json(stats_data, stats_schema)?; + let parsed: Box = match parse_json(stats_data, stats_schema.clone()) { + Ok(p) => p, + Err(e) => { + warn!("Failed to parse stats JSON, treating stats as absent: {e}"); + let arrow_schema: Arc = Arc::new(stats_schema.as_ref().try_into_arrow()?); + let null_columns: Vec<_> = arrow_schema + .fields() + .iter() + .map(|f| new_null_array(f.data_type(), batch.num_rows())) + .collect(); + let null_batch = RecordBatch::try_new(arrow_schema, null_columns)?; + Box::new(ArrowEngineData::new(null_batch)) + } + }; let parsed: RecordBatch = ArrowEngineData::try_from_engine_data(parsed)?.into(); let stats_array: Arc = Arc::new(parsed.into()); From d999da1ce09eeb5da0b9805a09abe1a8965c166b Mon Sep 17 00:00:00 2001 From: dvlascenco Date: Tue, 7 Apr 2026 14:25:54 +0300 Subject: [PATCH 2/2] fix: reject update expressions with incompatible column types --- crates/core/src/operations/update.rs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 678276885..75d29226e 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -28,7 +28,8 @@ use datafusion::{ error::DataFusionError, execution::context::SessionState, logical_expr::{ - Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode, case, col, lit, when, + Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode, case, cast, col, lit, + when, }, physical_plan::{ExecutionPlan, metrics::MetricBuilder}, physical_planner::{ExtensionPlanner, PhysicalPlanner}, @@ -262,6 +263,7 @@ impl ExtensionPlanner for UpdateMetricExtensionPlanner { } } + #[allow(clippy::too_many_arguments)] #[tracing::instrument( skip_all, @@ -342,10 +344,18 @@ async fn execute( .into_iter() .map(|field| { let expr = match updates.get(field.name()) { - Some(expr) => case(col(UPDATE_PREDICATE_COLNAME)) - .when(lit(true), expr.to_owned()) - .otherwise(col(Column::from_name(field.name())))? - .alias(field.name()), + Some(expr) => { + // Cast the update expression to the target column type so that + // (a) the CASE branches are always the same type and DataFusion cannot + // silently widen the column (e.g. Int32 → Utf8), and + // (b) incompatible assignments (e.g. a non-numeric string into an Int32 + // column) are caught during plan optimisation via constant folding. + let typed_expr = cast(expr.to_owned(), field.data_type().clone()); + case(col(UPDATE_PREDICATE_COLNAME)) + .when(lit(true), typed_expr) + .otherwise(col(Column::from_name(field.name())))? + .alias(field.name()) + } None => col(Column::from_name(field.name())), }; Ok::<_, DataFusionError>(expr)