Skip to content

Commit f0090de

Browse files
committed
nested struct
1 parent fb07e4d commit f0090de

File tree

2 files changed

+158
-0
lines changed

2 files changed

+158
-0
lines changed

native/spark-expr/src/json_funcs/from_json.rs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,11 @@ enum FieldBuilder {
212212
Float64(Float64Builder),
213213
Boolean(BooleanBuilder),
214214
String(StringBuilder),
215+
Struct {
216+
fields: arrow::datatypes::Fields,
217+
builders: Vec<FieldBuilder>,
218+
null_buffer: Vec<bool>,
219+
},
215220
}
216221

217222
fn create_field_builders(
@@ -236,6 +241,14 @@ fn create_field_builders(
236241
capacity,
237242
capacity * 16,
238243
))),
244+
DataType::Struct(nested_fields) => {
245+
let nested_builders = create_field_builders(nested_fields, capacity)?;
246+
Ok(FieldBuilder::Struct {
247+
fields: nested_fields.clone(),
248+
builders: nested_builders,
249+
null_buffer: Vec::with_capacity(capacity),
250+
})
251+
}
239252
dt => Err(datafusion::common::DataFusionError::Execution(format!(
240253
"Unsupported field type in from_json: {:?}",
241254
dt
@@ -253,6 +266,15 @@ fn append_null_to_all_builders(builders: &mut [FieldBuilder]) {
253266
FieldBuilder::Float64(b) => b.append_null(),
254267
FieldBuilder::Boolean(b) => b.append_null(),
255268
FieldBuilder::String(b) => b.append_null(),
269+
FieldBuilder::Struct {
270+
builders: nested_builders,
271+
null_buffer,
272+
..
273+
} => {
274+
// Append null to nested struct
275+
null_buffer.push(false);
276+
append_null_to_all_builders(nested_builders);
277+
}
256278
}
257279
}
258280
}
@@ -274,6 +296,14 @@ fn append_field_value(
274296
FieldBuilder::Float64(b) => b.append_null(),
275297
FieldBuilder::Boolean(b) => b.append_null(),
276298
FieldBuilder::String(b) => b.append_null(),
299+
FieldBuilder::Struct {
300+
builders: nested_builders,
301+
null_buffer,
302+
..
303+
} => {
304+
null_buffer.push(false);
305+
append_null_to_all_builders(nested_builders);
306+
}
277307
}
278308
return Ok(());
279309
}
@@ -328,6 +358,30 @@ fn append_field_value(
328358
b.append_value(value.to_string());
329359
}
330360
}
361+
(
362+
FieldBuilder::Struct {
363+
fields: nested_fields,
364+
builders: nested_builders,
365+
null_buffer,
366+
},
367+
DataType::Struct(_),
368+
) => {
369+
// Handle nested struct
370+
if let Some(obj) = value.as_object() {
371+
// Non-null nested struct
372+
null_buffer.push(true);
373+
for (nested_field, nested_builder) in
374+
nested_fields.iter().zip(nested_builders.iter_mut())
375+
{
376+
let nested_value = obj.get(nested_field.name());
377+
append_field_value(nested_builder, nested_field, nested_value)?;
378+
}
379+
} else {
380+
// Not an object -> null nested struct
381+
null_buffer.push(false);
382+
append_null_to_all_builders(nested_builders);
383+
}
384+
}
331385
_ => {
332386
return Err(datafusion::common::DataFusionError::Execution(
333387
"Type mismatch in from_json".to_string(),
@@ -346,6 +400,18 @@ fn finish_builder(builder: FieldBuilder) -> Result<ArrayRef> {
346400
FieldBuilder::Float64(mut b) => Arc::new(b.finish()),
347401
FieldBuilder::Boolean(mut b) => Arc::new(b.finish()),
348402
FieldBuilder::String(mut b) => Arc::new(b.finish()),
403+
FieldBuilder::Struct {
404+
fields,
405+
builders,
406+
null_buffer,
407+
} => {
408+
let nested_arrays: Vec<ArrayRef> = builders
409+
.into_iter()
410+
.map(finish_builder)
411+
.collect::<Result<Vec<_>>>()?;
412+
let null_buf = arrow::buffer::NullBuffer::from(null_buffer);
413+
Arc::new(StructArray::new(fields, nested_arrays, Some(null_buf)))
414+
}
349415
})
350416
}
351417

@@ -512,4 +578,75 @@ mod tests {
512578

513579
Ok(())
514580
}
581+
582+
#[test]
583+
fn test_nested_struct() -> Result<()> {
584+
let schema = DataType::Struct(Fields::from(vec![
585+
Field::new(
586+
"outer",
587+
DataType::Struct(Fields::from(vec![
588+
Field::new("inner_a", DataType::Int32, true),
589+
Field::new("inner_b", DataType::Utf8, true),
590+
])),
591+
true,
592+
),
593+
Field::new("top_level", DataType::Int32, true),
594+
]));
595+
596+
let input: Arc<dyn Array> = Arc::new(StringArray::from(vec![
597+
Some(r#"{"outer":{"inner_a":123,"inner_b":"hello"},"top_level":999}"#),
598+
Some(r#"{"outer":{"inner_a":456},"top_level":888}"#), // Missing nested field
599+
Some(r#"{"outer":null,"top_level":777}"#), // Null nested struct
600+
Some(r#"{"top_level":666}"#), // Missing nested struct
601+
]));
602+
603+
let result = json_string_to_struct(&input, &schema)?;
604+
let struct_array = result.as_any().downcast_ref::<StructArray>().unwrap();
605+
606+
assert_eq!(struct_array.len(), 4);
607+
608+
// Check outer struct
609+
let outer_array = struct_array
610+
.column(0)
611+
.as_any()
612+
.downcast_ref::<StructArray>()
613+
.unwrap();
614+
let top_level_array = struct_array
615+
.column(1)
616+
.as_any()
617+
.downcast_ref::<Int32Array>()
618+
.unwrap();
619+
620+
// Row 0: Valid nested struct
621+
assert!(!outer_array.is_null(0), "Nested struct should not be null");
622+
let inner_a_array = outer_array
623+
.column(0)
624+
.as_any()
625+
.downcast_ref::<Int32Array>()
626+
.unwrap();
627+
let inner_b_array = outer_array
628+
.column(1)
629+
.as_any()
630+
.downcast_ref::<StringArray>()
631+
.unwrap();
632+
assert_eq!(inner_a_array.value(0), 123);
633+
assert_eq!(inner_b_array.value(0), "hello");
634+
assert_eq!(top_level_array.value(0), 999);
635+
636+
// Row 1: Missing nested field
637+
assert!(!outer_array.is_null(1));
638+
assert_eq!(inner_a_array.value(1), 456);
639+
assert!(inner_b_array.is_null(1));
640+
assert_eq!(top_level_array.value(1), 888);
641+
642+
// Row 2: Null nested struct
643+
assert!(outer_array.is_null(2), "Nested struct should be null");
644+
assert_eq!(top_level_array.value(2), 777);
645+
646+
// Row 3: Missing nested struct
647+
assert!(outer_array.is_null(3), "Nested struct should be null");
648+
assert_eq!(top_level_array.value(3), 666);
649+
650+
Ok(())
651+
}
515652
}

spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,25 @@ class CometJsonExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelpe
127127
}
128128
}
129129
}
130+
131+
test("from_json - nested struct") {
132+
assume(!isSpark40Plus)
133+
134+
Seq(true, false).foreach { dictionaryEnabled =>
135+
withParquetTable(
136+
(0 until 50).map(i => {
137+
val json = s"""{"outer":{"inner_a":$i,"inner_b":"nested_$i"},"top_level":${i * 10}}"""
138+
(i, json)
139+
}),
140+
"tbl",
141+
withDictionary = dictionaryEnabled) {
142+
143+
val schema = "outer STRUCT<inner_a: INT, inner_b: STRING>, top_level INT"
144+
checkSparkAnswerAndOperator(s"SELECT from_json(_2, '$schema') FROM tbl")
145+
checkSparkAnswerAndOperator(s"SELECT from_json(_2, '$schema').outer FROM tbl")
146+
checkSparkAnswerAndOperator(s"SELECT from_json(_2, '$schema').outer.inner_a FROM tbl")
147+
checkSparkAnswerAndOperator(s"SELECT from_json(_2, '$schema').top_level FROM tbl")
148+
}
149+
}
150+
}
130151
}

0 commit comments

Comments
 (0)