Skip to content

Commit c957714

Browse files
kiszkcloud-fan
authored andcommitted
[SPARK-22508][SQL] Fix 64KB JVM bytecode limit problem with GenerateUnsafeRowJoiner.create()
## What changes were proposed in this pull request? This PR changes `GenerateUnsafeRowJoiner.create()` code generation to place generated code for statements to operate bitmap and offset into separated methods if these size could be large. ## How was this patch tested? Added a new test case into `GenerateUnsafeRowJoinerSuite` Author: Kazuaki Ishizaki <[email protected]> Closes #19737 from kiszk/SPARK-22508.
1 parent 9d45e67 commit c957714

File tree

2 files changed

+28
-8
lines changed

2 files changed

+28
-8
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions.codegen
1919

20+
import scala.collection.mutable
21+
import scala.collection.mutable.ArrayBuffer
22+
2023
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
2124
import org.apache.spark.sql.types.StructType
2225
import org.apache.spark.unsafe.Platform
@@ -51,6 +54,7 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
5154
}
5255

5356
def create(schema1: StructType, schema2: StructType): UnsafeRowJoiner = {
57+
val ctx = new CodegenContext
5458
val offset = Platform.BYTE_ARRAY_OFFSET
5559
val getLong = "Platform.getLong"
5660
val putLong = "Platform.putLong"
@@ -88,8 +92,14 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
8892
s"$getLong(obj2, offset2 + ${(i - bitset1Words) * 8})"
8993
}
9094
}
91-
s"$putLong(buf, ${offset + i * 8}, $bits);"
92-
}.mkString("\n")
95+
s"$putLong(buf, ${offset + i * 8}, $bits);\n"
96+
}
97+
98+
val copyBitsets = ctx.splitExpressions(
99+
expressions = copyBitset,
100+
funcName = "copyBitsetFunc",
101+
arguments = ("java.lang.Object", "obj1") :: ("long", "offset1") ::
102+
("java.lang.Object", "obj2") :: ("long", "offset2") :: Nil)
93103

94104
// --------------------- copy fixed length portion from row 1 ----------------------- //
95105
var cursor = offset + outputBitsetWords * 8
@@ -150,11 +160,14 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
150160
s"(${(outputBitsetWords - bitset2Words + schema1.size) * 8}L + numBytesVariableRow1)"
151161
}
152162
val cursor = offset + outputBitsetWords * 8 + i * 8
153-
s"""
154-
|$putLong(buf, $cursor, $getLong(buf, $cursor) + ($shift << 32));
155-
""".stripMargin
163+
s"$putLong(buf, $cursor, $getLong(buf, $cursor) + ($shift << 32));\n"
156164
}
157-
}.mkString("\n")
165+
}
166+
167+
val updateOffsets = ctx.splitExpressions(
168+
expressions = updateOffset,
169+
funcName = "copyBitsetFunc",
170+
arguments = ("long", "numBytesVariableRow1") :: Nil)
158171

159172
// ------------------------ Finally, put everything together --------------------------- //
160173
val codeBody = s"""
@@ -166,6 +179,8 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
166179
| private byte[] buf = new byte[64];
167180
| private UnsafeRow out = new UnsafeRow(${schema1.size + schema2.size});
168181
|
182+
| ${ctx.declareAddedFunctions()}
183+
|
169184
| public UnsafeRow join(UnsafeRow row1, UnsafeRow row2) {
170185
| // row1: ${schema1.size} fields, $bitset1Words words in bitset
171186
| // row2: ${schema2.size}, $bitset2Words words in bitset
@@ -180,12 +195,12 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
180195
| final java.lang.Object obj2 = row2.getBaseObject();
181196
| final long offset2 = row2.getBaseOffset();
182197
|
183-
| $copyBitset
198+
| $copyBitsets
184199
| $copyFixedLengthRow1
185200
| $copyFixedLengthRow2
186201
| $copyVariableLengthRow1
187202
| $copyVariableLengthRow2
188-
| $updateOffset
203+
| $updateOffsets
189204
|
190205
| out.pointTo(buf, sizeInBytes);
191206
|

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ class GenerateUnsafeRowJoinerSuite extends SparkFunSuite {
6666
}
6767
}
6868

69+
test("SPARK-22508: GenerateUnsafeRowJoiner.create should not generate codes beyond 64KB") {
70+
val N = 3000
71+
testConcatOnce(N, N, variable)
72+
}
73+
6974
private def testConcat(numFields1: Int, numFields2: Int, candidateTypes: Seq[DataType]): Unit = {
7075
for (i <- 0 until 10) {
7176
testConcatOnce(numFields1, numFields2, candidateTypes)

0 commit comments

Comments
 (0)