Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 970f68c

Browse files
viiryacloud-fan
authored andcommitted
[SPARK-19104][SQL] Lambda variables in ExternalMapToCatalyst should be global
The issue happens in `ExternalMapToCatalyst`. For example, the following codes create `ExternalMapToCatalyst` to convert Scala Map to catalyst map format. val data = Seq.tabulate(10)(i => NestedData(1, Map("key" -> InnerData("name", i + 100)))) val ds = spark.createDataset(data) The `valueConverter` in `ExternalMapToCatalyst` looks like: if (isnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true))) null else named_struct(name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true)).name, true), value, assertnotnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true)).value) There is a `CreateNamedStruct` expression (`named_struct`) to create a row of `InnerData.name` and `InnerData.value` that are referred by `ExternalMapToCatalyst_value52`. Because `ExternalMapToCatalyst_value52` are local variable, when `CreateNamedStruct` splits expressions to individual functions, the local variable can't be accessed anymore. Jenkins tests. Author: Liang-Chi Hsieh <[email protected]> Closes apache#18418 from viirya/SPARK-19104. (cherry picked from commit fd8c931) Signed-off-by: Wenchen Fan <[email protected]>
1 parent d8e3a4a commit 970f68c

File tree

2 files changed

+20
-6
lines changed

2 files changed

+20
-6
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,12 @@ case class ExternalMapToCatalyst private(
729729
val entry = ctx.freshName("entry")
730730
val entries = ctx.freshName("entries")
731731

732+
val keyElementJavaType = ctx.javaType(keyType)
733+
val valueElementJavaType = ctx.javaType(valueType)
734+
ctx.addMutableState(keyElementJavaType, key, "")
735+
ctx.addMutableState("boolean", valueIsNull, "")
736+
ctx.addMutableState(valueElementJavaType, value, "")
737+
732738
val (defineEntries, defineKeyValue) = child.dataType match {
733739
case ObjectType(cls) if classOf[java.util.Map[_, _]].isAssignableFrom(cls) =>
734740
val javaIteratorCls = classOf[java.util.Iterator[_]].getName
@@ -740,8 +746,8 @@ case class ExternalMapToCatalyst private(
740746
val defineKeyValue =
741747
s"""
742748
final $javaMapEntryCls $entry = ($javaMapEntryCls) $entries.next();
743-
${ctx.javaType(keyType)} $key = (${ctx.boxedType(keyType)}) $entry.getKey();
744-
${ctx.javaType(valueType)} $value = (${ctx.boxedType(valueType)}) $entry.getValue();
749+
$key = (${ctx.boxedType(keyType)}) $entry.getKey();
750+
$value = (${ctx.boxedType(valueType)}) $entry.getValue();
745751
"""
746752

747753
defineEntries -> defineKeyValue
@@ -755,17 +761,17 @@ case class ExternalMapToCatalyst private(
755761
val defineKeyValue =
756762
s"""
757763
final $scalaMapEntryCls $entry = ($scalaMapEntryCls) $entries.next();
758-
${ctx.javaType(keyType)} $key = (${ctx.boxedType(keyType)}) $entry._1();
759-
${ctx.javaType(valueType)} $value = (${ctx.boxedType(valueType)}) $entry._2();
764+
$key = (${ctx.boxedType(keyType)}) $entry._1();
765+
$value = (${ctx.boxedType(valueType)}) $entry._2();
760766
"""
761767

762768
defineEntries -> defineKeyValue
763769
}
764770

765771
val valueNullCheck = if (ctx.isPrimitiveType(valueType)) {
766-
s"boolean $valueIsNull = false;"
772+
s"$valueIsNull = false;"
767773
} else {
768-
s"boolean $valueIsNull = $value == null;"
774+
s"$valueIsNull = $value == null;"
769775
}
770776

771777
val arrayCls = classOf[GenericArrayData].getName

sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ case class QueueClass(q: Queue[Int])
3232

3333
case class ComplexClass(seq: SeqClass, list: ListClass, queue: QueueClass)
3434

35+
case class InnerData(name: String, value: Int)
36+
case class NestedData(id: Int, param: Map[String, InnerData])
37+
3538
package object packageobject {
3639
case class PackageClass(value: Int)
3740
}
@@ -268,4 +271,9 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext {
268271
checkDataset(Seq(PackageClass(1)).toDS(), PackageClass(1))
269272
}
270273

274+
test("SPARK-19104: Lambda variables in ExternalMapToCatalyst should be global") {
275+
val data = Seq.tabulate(10)(i => NestedData(1, Map("key" -> InnerData("name", i + 100))))
276+
val ds = spark.createDataset(data)
277+
checkDataset(ds, data: _*)
278+
}
271279
}

0 commit comments

Comments
 (0)