Skip to content

Commit c8b4cb1

Browse files
committed
fix null handling
1 parent c5343f0 commit c8b4cb1

File tree

2 files changed

+47
-13
lines changed

2 files changed

+47
-13
lines changed

native/spark-expr/src/json_funcs/from_json.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ impl PhysicalExpr for FromJson {
131131
/// Parse JSON string array into struct array
132132
fn json_string_to_struct(arr: &Arc<dyn Array>, schema: &DataType) -> Result<ArrayRef> {
133133
use arrow::array::StringArray;
134+
use arrow::buffer::NullBuffer;
134135

135136
// Input must be string array
136137
let string_array = arr.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
@@ -150,30 +151,37 @@ fn json_string_to_struct(arr: &Arc<dyn Array>, schema: &DataType) -> Result<Arra
150151
// Create builders for each field
151152
let mut field_builders = create_field_builders(fields, num_rows)?;
152153

154+
// Track which rows should be null at the struct level
155+
let mut struct_nulls = vec![true; num_rows];
156+
153157
// Parse each row
154158
for row_idx in 0..num_rows {
155159
if string_array.is_null(row_idx) {
156-
// Null input -> null output
160+
// Null input -> null struct
161+
struct_nulls[row_idx] = false;
157162
append_null_to_all_builders(&mut field_builders);
158163
} else {
159164
let json_str = string_array.value(row_idx);
160165

161-
// Parse JSON (PERMISSIVE mode: return null on error)
166+
// Parse JSON (PERMISSIVE mode: return null fields on error)
162167
match serde_json::from_str::<serde_json::Value>(json_str) {
163168
Ok(json_value) => {
164169
if let serde_json::Value::Object(obj) = json_value {
165-
// Extract each field
170+
// Struct is not null, extract each field
171+
struct_nulls[row_idx] = true;
166172
for (field, builder) in fields.iter().zip(field_builders.iter_mut()) {
167173
let field_value = obj.get(field.name());
168174
append_field_value(builder, field, field_value)?;
169175
}
170176
} else {
171-
// Not an object -> null
177+
// Not an object -> struct with null fields
178+
struct_nulls[row_idx] = true;
172179
append_null_to_all_builders(&mut field_builders);
173180
}
174181
}
175182
Err(_) => {
176-
// Parse error -> null (PERMISSIVE mode)
183+
// Parse error -> struct with null fields (PERMISSIVE mode)
184+
struct_nulls[row_idx] = true;
177185
append_null_to_all_builders(&mut field_builders);
178186
}
179187
}
@@ -186,10 +194,13 @@ fn json_string_to_struct(arr: &Arc<dyn Array>, schema: &DataType) -> Result<Arra
186194
.map(finish_builder)
187195
.collect::<Result<Vec<_>>>()?;
188196

197+
// Create null buffer from struct_nulls
198+
let null_buffer = NullBuffer::from(struct_nulls);
199+
189200
Ok(Arc::new(StructArray::new(
190201
fields.clone(),
191202
arrays,
192-
None, // No top-level nullability bitmap needed
203+
Some(null_buffer),
193204
)))
194205
}
195206

@@ -381,13 +392,13 @@ mod tests {
381392
assert_eq!(a_array.value(1), 456);
382393
assert!(b_array.is_null(1));
383394

384-
// Third row (parse error -> all nulls)
395+
// Third row (parse error -> struct NOT null, all fields null)
396+
assert!(!struct_array.is_null(2), "Struct should not be null");
385397
assert!(a_array.is_null(2));
386398
assert!(b_array.is_null(2));
387399

388-
// Fourth row (null input -> all nulls)
389-
assert!(a_array.is_null(3));
390-
assert!(b_array.is_null(3));
400+
// Fourth row (null input -> struct IS null)
401+
assert!(struct_array.is_null(3), "Struct itself should be null");
391402

392403
Ok(())
393404
}
@@ -488,8 +499,9 @@ mod tests {
488499
.downcast_ref::<StringArray>()
489500
.unwrap();
490501

491-
// All rows should have null values for all fields
502+
// All rows should have non-null structs with null field values
492503
for i in 0..4 {
504+
assert!(!struct_array.is_null(i), "Row {} struct should not be null", i);
493505
assert!(a_array.is_null(i), "Row {} field a should be null", i);
494506
assert!(b_array.is_null(i), "Row {} field b should be null", i);
495507
}

spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ class CometJsonExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe
3434
pos: Position): Unit = {
3535
super.test(testName, testTags: _*) {
3636
withSQLConf(
37-
CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true",
38-
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false") {
37+
CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true") {
3938
testFun
4039
}
4140
}
@@ -106,4 +105,27 @@ class CometJsonExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe
106105
}
107106
}
108107
}
108+
109+
test("from_json - null input produces null struct") {
110+
assume(!isSpark40Plus)
111+
112+
Seq(true, false).foreach { dictionaryEnabled =>
113+
withParquetTable(
114+
Seq(
115+
(1, """{"a":1,"b":"x"}"""), // Valid JSON to establish column type
116+
(2, null) // Null input
117+
),
118+
"tbl",
119+
withDictionary = dictionaryEnabled) {
120+
121+
val schema = "a INT, b STRING"
122+
// Verify that null input produces a NULL struct (not a struct with null fields)
123+
checkSparkAnswerAndOperator(
124+
s"SELECT _1, from_json(_2, '$schema') IS NULL as struct_is_null FROM tbl WHERE _1 = 2")
125+
// Field access on null struct should return null
126+
checkSparkAnswerAndOperator(
127+
s"SELECT _1, from_json(_2, '$schema').a FROM tbl WHERE _1 = 2")
128+
}
129+
}
130+
}
109131
}

0 commit comments

Comments
 (0)