|
1 | 1 | use crate::config::{self, AppConfig}; |
2 | 2 | use crate::object_store_cache::{FoyerCacheConfig, FoyerObjectStoreCache, SharedFoyerCache}; |
3 | | -use crate::schema_loader::{get_default_schema, get_schema}; |
| 3 | +use crate::schema_loader::{get_default_schema, get_schema, is_variant_type}; |
4 | 4 | use crate::statistics::DeltaStatisticsExtractor; |
5 | 5 | use anyhow::Result; |
6 | | -use arrow_schema::SchemaRef; |
| 6 | +use arrow_schema::{Schema, SchemaRef}; |
7 | 7 | use async_trait::async_trait; |
8 | 8 | use chrono::Utc; |
9 | 9 | use datafusion::arrow::array::Array; |
@@ -36,10 +36,10 @@ use deltalake::operations::create::CreateBuilder; |
36 | 36 | use deltalake::{DeltaTable, DeltaTableBuilder}; |
37 | 37 | use futures::StreamExt; |
38 | 38 | use instrumented_object_store::instrument_object_store; |
39 | | -use std::sync::Mutex; |
40 | 39 | use serde::{Deserialize, Serialize}; |
41 | 40 | use sqlx::{PgPool, postgres::PgPoolOptions}; |
42 | 41 | use std::fmt; |
| 42 | +use std::sync::Mutex; |
43 | 43 | use std::sync::OnceLock; |
44 | 44 | use std::{any::Any, collections::HashMap, sync::Arc}; |
45 | 45 | use tokio::sync::RwLock; |
@@ -82,6 +82,85 @@ pub fn extract_project_id(batch: &RecordBatch) -> Option<String> { |
82 | 82 | }) |
83 | 83 | } |
84 | 84 |
|
| 85 | +/// Convert string columns to Variant binary format where the target schema expects Variant type. |
| 86 | +/// This enables automatic JSON string → Variant conversion during INSERT. |
| 87 | +pub fn convert_variant_columns(batch: RecordBatch, target_schema: &SchemaRef) -> DFResult<RecordBatch> { |
| 88 | + use datafusion::arrow::array::{ArrayRef, LargeStringArray, StringArray, StringViewArray}; |
| 89 | + use datafusion::arrow::datatypes::{DataType, Field}; |
| 90 | + |
| 91 | + let batch_schema = batch.schema(); |
| 92 | + let mut columns: Vec<ArrayRef> = batch.columns().to_vec(); |
| 93 | + let mut new_fields: Vec<Arc<Field>> = batch_schema.fields().iter().cloned().collect(); |
| 94 | + |
| 95 | + for (idx, target_field) in target_schema.fields().iter().enumerate() { |
| 96 | + if !is_variant_type(target_field.data_type()) { |
| 97 | + continue; |
| 98 | + } |
| 99 | + // Skip columns beyond batch length - this is normal for INSERT with fewer columns than table schema |
| 100 | + // (e.g., columns with defaults or nullable columns omitted from INSERT) |
| 101 | + if idx >= columns.len() { |
| 102 | + continue; |
| 103 | + } |
| 104 | + |
| 105 | + let col = &columns[idx]; |
| 106 | + let col_type = col.data_type(); |
| 107 | + |
| 108 | + // Only convert if source is a string type and target is Variant |
| 109 | + let converted: Option<ArrayRef> = |
| 110 | + match col_type { |
| 111 | + DataType::Utf8View => { |
| 112 | + let arr = col.as_any().downcast_ref::<StringViewArray>().ok_or_else(|| { |
| 113 | + DataFusionError::Execution(format!("Expected StringViewArray for field '{}' but downcast failed", target_field.name())) |
| 114 | + })?; |
| 115 | + Some(Arc::new(json_strings_to_variant(arr.iter())?)) |
| 116 | + } |
| 117 | + DataType::Utf8 => { |
| 118 | + let arr = col |
| 119 | + .as_any() |
| 120 | + .downcast_ref::<StringArray>() |
| 121 | + .ok_or_else(|| DataFusionError::Execution(format!("Expected StringArray for field '{}' but downcast failed", target_field.name())))?; |
| 122 | + Some(Arc::new(json_strings_to_variant(arr.iter())?)) |
| 123 | + } |
| 124 | + DataType::LargeUtf8 => { |
| 125 | + let arr = col.as_any().downcast_ref::<LargeStringArray>().ok_or_else(|| { |
| 126 | + DataFusionError::Execution(format!("Expected LargeStringArray for field '{}' but downcast failed", target_field.name())) |
| 127 | + })?; |
| 128 | + Some(Arc::new(json_strings_to_variant(arr.iter())?)) |
| 129 | + } |
| 130 | + _ => None, // Already Variant or other type, skip |
| 131 | + }; |
| 132 | + |
| 133 | + if let Some(variant_array) = converted { |
| 134 | + columns[idx] = variant_array; |
| 135 | + new_fields[idx] = target_field.clone(); |
| 136 | + } |
| 137 | + } |
| 138 | + |
| 139 | + let new_schema = Arc::new(Schema::new(new_fields)); |
| 140 | + RecordBatch::try_new(new_schema, columns).map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) |
| 141 | +} |
| 142 | + |
| 143 | +/// Convert an iterator of optional JSON strings to a Variant StructArray. |
| 144 | +/// Fails fast on invalid JSON to ensure data integrity. |
| 145 | +fn json_strings_to_variant<'a>(iter: impl Iterator<Item = Option<&'a str>>) -> DFResult<datafusion::arrow::array::StructArray> { |
| 146 | + use parquet_variant_compute::VariantArrayBuilder; |
| 147 | + use parquet_variant_json::JsonToVariant; |
| 148 | + |
| 149 | + let items: Vec<_> = iter.collect(); |
| 150 | + let mut builder = VariantArrayBuilder::new(items.len()); |
| 151 | + |
| 152 | + for (row_idx, item) in items.into_iter().enumerate() { |
| 153 | + match item { |
| 154 | + Some(json_str) => builder |
| 155 | + .append_json(json_str) |
| 156 | + .map_err(|e| DataFusionError::Execution(format!("Invalid JSON at row {}: {} (value: '{}')", row_idx, e, json_str)))?, |
| 157 | + None => builder.append_null(), |
| 158 | + } |
| 159 | + } |
| 160 | + |
| 161 | + Ok(builder.build().into()) |
| 162 | +} |
| 163 | + |
85 | 164 | // Compression level for parquet files - kept for WriterProperties fallback |
86 | 165 | const ZSTD_COMPRESSION_LEVEL: i32 = 3; |
87 | 166 |
|
@@ -712,11 +791,14 @@ impl Database { |
712 | 791 |
|
713 | 792 | self.register_pg_settings_table(ctx)?; |
714 | 793 | self.register_set_config_udf(ctx); |
715 | | - self.register_json_functions(ctx); |
716 | 794 |
|
717 | | - // Register custom PostgreSQL-compatible functions |
| 795 | + // CRITICAL: Register custom functions BEFORE JSON functions to ensure VariantAwareExprPlanner |
| 796 | + // intercepts -> and ->> operators on Variant columns before JsonExprPlanner handles them as strings |
718 | 797 | crate::functions::register_custom_functions(ctx).map_err(|e| DataFusionError::Execution(format!("Failed to register custom functions: {}", e)))?; |
719 | 798 |
|
| 799 | + // JSON functions (JsonExprPlanner for -> and ->> on string columns - must come after Variant handlers) |
| 800 | + self.register_json_functions(ctx); |
| 801 | + |
720 | 802 | Ok(()) |
721 | 803 | } |
722 | 804 |
|
@@ -1205,7 +1287,10 @@ impl Database { |
1205 | 1287 |
|
1206 | 1288 | // Fallback to legacy batch queue if configured |
1207 | 1289 | let enable_queue = self.config.core.enable_batch_queue; |
1208 | | - if !skip_queue && enable_queue && let Some(ref queue) = self.batch_queue { |
| 1290 | + if !skip_queue |
| 1291 | + && enable_queue |
| 1292 | + && let Some(ref queue) = self.batch_queue |
| 1293 | + { |
1209 | 1294 | span.record("use_queue", true); |
1210 | 1295 | for batch in batches { |
1211 | 1296 | if let Err(e) = queue.queue(batch) { |
@@ -1892,14 +1977,18 @@ impl DataSink for ProjectRoutingTable { |
1892 | 1977 | let span = tracing::Span::current(); |
1893 | 1978 | let mut total_row_count = 0; |
1894 | 1979 | let mut project_batches: HashMap<String, Vec<RecordBatch>> = HashMap::new(); |
| 1980 | + let target_schema = self.schema(); |
1895 | 1981 |
|
1896 | | - // Collect and group batches by project_id |
| 1982 | + // Collect and group batches by project_id, converting variant columns |
1897 | 1983 | while let Some(batch) = data.next().await.transpose()? { |
1898 | 1984 | let batch_rows = batch.num_rows(); |
1899 | 1985 | debug!("write_all: received batch with {} rows", batch_rows); |
1900 | 1986 | total_row_count += batch_rows; |
1901 | 1987 | let project_id = extract_project_id(&batch).unwrap_or_else(|| self.default_project.clone()); |
1902 | | - project_batches.entry(project_id).or_default().push(batch); |
| 1988 | + |
| 1989 | + // Convert string columns to Variant where target schema expects Variant |
| 1990 | + let converted_batch = convert_variant_columns(batch, &target_schema)?; |
| 1991 | + project_batches.entry(project_id).or_default().push(converted_batch); |
1903 | 1992 | } |
1904 | 1993 |
|
1905 | 1994 | span.record("rows.count", total_row_count); |
|
0 commit comments