Skip to content

Commit 23c5fdf

Browse files
committed
feat: support lookup map by key
1 parent 1f75eda commit 23c5fdf

File tree

3 files changed

+61
-1
lines changed

3 files changed

+61
-1
lines changed

native/core/src/execution/planner.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,21 @@ impl PhysicalPlanner {
555555
fail_on_error,
556556
)))
557557
}
558-
ExprStruct::ScalarFunc(expr) => self.create_scalar_function_expr(expr, input_schema),
558+
ExprStruct::ScalarFunc(expr) => {
559+
let func = self.create_scalar_function_expr(expr, input_schema)?;
560+
match expr.func.as_ref() {
561+
// DataFusion map_extract returns array of struct entries even if lookup by key
562+
// Apache Spark waits a single value, so wrap the result into additional list extraction
563+
"map_extract" => Ok(Arc::new(ListExtract::new(
564+
func,
565+
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
566+
None,
567+
true,
568+
false,
569+
))),
570+
_ => Ok(func),
571+
}
572+
}
559573
ExprStruct::EqNullSafe(expr) => {
560574
let left =
561575
self.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?;

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1973,6 +1973,10 @@ object QueryPlanSerde extends Logging with CometExprShim {
19731973
case mv: MapValues =>
19741974
val childExpr = exprToProtoInternal(mv.child, inputs, binding)
19751975
scalarFunctionExprToProto("map_values", childExpr)
1976+
case gmv: GetMapValue =>
1977+
val mapExpr = exprToProtoInternal(gmv.child, inputs, binding)
1978+
val keyExpr = exprToProtoInternal(gmv.key, inputs, binding)
1979+
scalarFunctionExprToProto("map_extract", mapExpr, keyExpr)
19761980
case _ =>
19771981
withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*)
19781982
None

spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,4 +394,46 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper
394394
"select * from tbl",
395395
readSchema = Some(readSchema))
396396
}
397+
398+
test("native reader - extract map by key") {
399+
// existing key
400+
testSingleLineQuery(
401+
"""
402+
| select map(str0, str1) c0 from
403+
| (
404+
| select 'key0' str0, named_struct('a', 1, 'b', 'str') str1
405+
| )
406+
|""".stripMargin,
407+
"select c0['key0'] from tbl")
408+
409+
// existing key, existing struct subfield
410+
testSingleLineQuery(
411+
"""
412+
| select map(str0, str1) c0 from
413+
| (
414+
| select 'key0' str0, named_struct('a', 1, 'b', 'str') str1
415+
| )
416+
|""".stripMargin,
417+
"select c0['key0'].b from tbl")
418+
419+
// nonexisting key
420+
testSingleLineQuery(
421+
"""
422+
| select map(str0, str1) c0 from
423+
| (
424+
| select 'key0' str0, named_struct('a', 1, 'b', 'str') str1
425+
| )
426+
|""".stripMargin,
427+
"select c0['key1'] from tbl")
428+
429+
// nonexisting key, existing struct subfield
430+
testSingleLineQuery(
431+
"""
432+
| select map(str0, str1) c0 from
433+
| (
434+
| select 'key0' str0, named_struct('a', 1, 'b', 'str') str1
435+
| )
436+
|""".stripMargin,
437+
"select c0['key1'].b from tbl")
438+
}
397439
}

0 commit comments

Comments
 (0)