Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit ee56fc3

Browse files
kiszkcloud-fan
authored andcommitted
[SPARK-18016][SQL] Code Generation: Constant Pool Limit - reduce entries for mutable state
## What changes were proposed in this pull request? This PR is follow-on of apache#19518. This PR tries to reduce the number of constant pool entries used for accessing mutable state. There are two directions: 1. Primitive type variables should be allocated at the outer class due to better performance. Otherwise, this PR allocates an array. 2. The length of allocated array is up to 32768 due to avoiding usage of constant pool entry at access (e.g. `mutableStateArray[32767]`). Here are some discussions to determine these directions. 1. [[1]](apache#19518 (comment)), [[2]](apache#19518 (comment)), [[3]](apache#19518 (comment)), [[4]](apache#19518 (comment)), [[5]](apache#19518 (comment)) 2. [[6]](apache#19518 (comment)), [[7]](apache#19518 (comment)), [[8]](apache#19518 (comment)) This PR modifies `addMutableState` function in the `CodeGenerator` to check if the declared state can be easily initialized compacted into an array. We identify three types of states that cannot compacted: - Primitive type state (ints, booleans, etc) if the number of them does not exceed threshold - Multiple-dimensional array type - `inline = true` When `useFreshName = false`, the given name is used. Many codes were ported from apache#19518. Many efforts were put here. I think this PR should credit to bdrillard With this PR, the following code is generated: ``` /* 005 */ class SpecificMutableProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection { /* 006 */ /* 007 */ private Object[] references; /* 008 */ private InternalRow mutableRow; /* 009 */ private boolean isNull_0; /* 010 */ private boolean isNull_1; /* 011 */ private boolean isNull_2; /* 012 */ private int value_2; /* 013 */ private boolean isNull_3; ... /* 10006 */ private int value_4999; /* 10007 */ private boolean isNull_5000; /* 10008 */ private int value_5000; /* 10009 */ private InternalRow[] mutableStateArray = new InternalRow[2]; /* 10010 */ private boolean[] mutableStateArray1 = new boolean[7001]; /* 10011 */ private int[] mutableStateArray2 = new int[1001]; /* 10012 */ private UTF8String[] mutableStateArray3 = new UTF8String[6000]; /* 10013 */ ... /* 107956 */ private void init_176() { /* 107957 */ isNull_4986 = true; /* 107958 */ value_4986 = -1; ... /* 108004 */ } ... ``` ## How was this patch tested? Added a new test case to `GeneratedProjectionSuite` Author: Kazuaki Ishizaki <[email protected]> Closes apache#19811 from kiszk/SPARK-18016.
1 parent b779c93 commit ee56fc3

37 files changed

+404
-304
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,7 @@ abstract class Expression extends TreeNode[Expression] {
119119
// TODO: support whole stage codegen too
120120
if (eval.code.trim.length > 1024 && ctx.INPUT_ROW != null && ctx.currentVars == null) {
121121
val setIsNull = if (eval.isNull != "false" && eval.isNull != "true") {
122-
val globalIsNull = ctx.freshName("globalIsNull")
123-
ctx.addMutableState(ctx.JAVA_BOOLEAN, globalIsNull)
122+
val globalIsNull = ctx.addMutableState(ctx.JAVA_BOOLEAN, "globalIsNull")
124123
val localIsNull = eval.isNull
125124
eval.isNull = globalIsNull
126125
s"$globalIsNull = $localIsNull;"

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,8 @@ case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterminis
6565
}
6666

6767
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
68-
val countTerm = ctx.freshName("count")
69-
val partitionMaskTerm = ctx.freshName("partitionMask")
70-
ctx.addMutableState(ctx.JAVA_LONG, countTerm)
71-
ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm)
68+
val countTerm = ctx.addMutableState(ctx.JAVA_LONG, "count")
69+
val partitionMaskTerm = ctx.addMutableState(ctx.JAVA_LONG, "partitionMask")
7270
ctx.addPartitionInitializationStatement(s"$countTerm = 0L;")
7371
ctx.addPartitionInitializationStatement(s"$partitionMaskTerm = ((long) partitionIndex) << 33;")
7472

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SparkPartitionID.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ case class SparkPartitionID() extends LeafExpression with Nondeterministic {
4343
override protected def evalInternal(input: InternalRow): Int = partitionId
4444

4545
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
46-
val idTerm = ctx.freshName("partitionId")
47-
ctx.addMutableState(ctx.JAVA_INT, idTerm)
46+
val idTerm = ctx.addMutableState(ctx.JAVA_INT, "partitionId")
4847
ctx.addPartitionInitializationStatement(s"$idTerm = partitionIndex;")
4948
ev.copy(code = s"final ${ctx.javaType(dataType)} ${ev.value} = $idTerm;", isNull = "false")
5049
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -602,8 +602,7 @@ case class Least(children: Seq[Expression]) extends Expression {
602602

603603
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
604604
val evalChildren = children.map(_.genCode(ctx))
605-
val tmpIsNull = ctx.freshName("leastTmpIsNull")
606-
ctx.addMutableState(ctx.JAVA_BOOLEAN, tmpIsNull)
605+
val tmpIsNull = ctx.addMutableState(ctx.JAVA_BOOLEAN, "leastTmpIsNull")
607606
val evals = evalChildren.map(eval =>
608607
s"""
609608
|${eval.code}
@@ -683,8 +682,7 @@ case class Greatest(children: Seq[Expression]) extends Expression {
683682

684683
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
685684
val evalChildren = children.map(_.genCode(ctx))
686-
val tmpIsNull = ctx.freshName("greatestTmpIsNull")
687-
ctx.addMutableState(ctx.JAVA_BOOLEAN, tmpIsNull)
685+
val tmpIsNull = ctx.addMutableState(ctx.JAVA_BOOLEAN, "greatestTmpIsNull")
688686
val evals = evalChildren.map(eval =>
689687
s"""
690688
|${eval.code}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 133 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -137,22 +137,63 @@ class CodegenContext {
137137
var currentVars: Seq[ExprCode] = null
138138

139139
/**
140-
* Holding expressions' mutable states like `MonotonicallyIncreasingID.count` as a
141-
* 3-tuple: java type, variable name, code to init it.
142-
* As an example, ("int", "count", "count = 0;") will produce code:
140+
* Holding expressions' inlined mutable states like `MonotonicallyIncreasingID.count` as a
141+
* 2-tuple: java type, variable name.
142+
* As an example, ("int", "count") will produce code:
143143
* {{{
144144
* private int count;
145145
* }}}
146-
* as a member variable, and add
147-
* {{{
148-
* count = 0;
149-
* }}}
150-
* to the constructor.
146+
* as a member variable
151147
*
152148
* They will be kept as member variables in generated classes like `SpecificProjection`.
153149
*/
154-
val mutableStates: mutable.ArrayBuffer[(String, String, String)] =
155-
mutable.ArrayBuffer.empty[(String, String, String)]
150+
val inlinedMutableStates: mutable.ArrayBuffer[(String, String)] =
151+
mutable.ArrayBuffer.empty[(String, String)]
152+
153+
/**
154+
* The mapping between mutable state types and corrseponding compacted arrays.
155+
* The keys are java type string. The values are [[MutableStateArrays]] which encapsulates
156+
* the compacted arrays for the mutable states with the same java type.
157+
*/
158+
val arrayCompactedMutableStates: mutable.Map[String, MutableStateArrays] =
159+
mutable.Map.empty[String, MutableStateArrays]
160+
161+
// An array holds the code that will initialize each state
162+
val mutableStateInitCode: mutable.ArrayBuffer[String] =
163+
mutable.ArrayBuffer.empty[String]
164+
165+
/**
166+
* This class holds a set of names of mutableStateArrays that is used for compacting mutable
167+
* states for a certain type, and holds the next available slot of the current compacted array.
168+
*/
169+
class MutableStateArrays {
170+
val arrayNames = mutable.ListBuffer.empty[String]
171+
createNewArray()
172+
173+
private[this] var currentIndex = 0
174+
175+
private def createNewArray() = arrayNames.append(freshName("mutableStateArray"))
176+
177+
def getCurrentIndex: Int = currentIndex
178+
179+
/**
180+
* Returns the reference of next available slot in current compacted array. The size of each
181+
* compacted array is controlled by the config `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`.
182+
* Once reaching the threshold, new compacted array is created.
183+
*/
184+
def getNextSlot(): String = {
185+
if (currentIndex < CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT) {
186+
val res = s"${arrayNames.last}[$currentIndex]"
187+
currentIndex += 1
188+
res
189+
} else {
190+
createNewArray()
191+
currentIndex = 1
192+
s"${arrayNames.last}[0]"
193+
}
194+
}
195+
196+
}
156197

157198
/**
158199
* Add a mutable state as a field to the generated class. c.f. the comments above.
@@ -163,11 +204,52 @@ class CodegenContext {
163204
* the list of default imports available.
164205
* Also, generic type arguments are accepted but ignored.
165206
* @param variableName Name of the field.
166-
* @param initCode The statement(s) to put into the init() method to initialize this field.
207+
* @param initFunc Function includes statement(s) to put into the init() method to initialize
208+
* this field. The argument is the name of the mutable state variable.
167209
* If left blank, the field will be default-initialized.
210+
* @param forceInline whether the declaration and initialization code may be inlined rather than
211+
* compacted. Please set `true` into forceInline for one of the followings:
212+
* 1. use the original name of the status
213+
* 2. expect to non-frequently generate the status
214+
* (e.g. not much sort operators in one stage)
215+
* @param useFreshName If this is false and the mutable state ends up inlining in the outer
216+
* class, the name is not changed
217+
* @return the name of the mutable state variable, which is the original name or fresh name if
218+
* the variable is inlined to the outer class, or an array access if the variable is to
219+
* be stored in an array of variables of the same type.
220+
* A variable will be inlined into the outer class when one of the following conditions
221+
* are satisfied:
222+
* 1. forceInline is true
223+
* 2. its type is primitive type and the total number of the inlined mutable variables
224+
* is less than `CodeGenerator.OUTER_CLASS_VARIABLES_THRESHOLD`
225+
* 3. its type is multi-dimensional array
226+
* When a variable is compacted into an array, the max size of the array for compaction
227+
* is given by `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`.
168228
*/
169-
def addMutableState(javaType: String, variableName: String, initCode: String = ""): Unit = {
170-
mutableStates += ((javaType, variableName, initCode))
229+
def addMutableState(
230+
javaType: String,
231+
variableName: String,
232+
initFunc: String => String = _ => "",
233+
forceInline: Boolean = false,
234+
useFreshName: Boolean = true): String = {
235+
236+
// want to put a primitive type variable at outerClass for performance
237+
val canInlinePrimitive = isPrimitiveType(javaType) &&
238+
(inlinedMutableStates.length < CodeGenerator.OUTER_CLASS_VARIABLES_THRESHOLD)
239+
if (forceInline || canInlinePrimitive || javaType.contains("[][]")) {
240+
val varName = if (useFreshName) freshName(variableName) else variableName
241+
val initCode = initFunc(varName)
242+
inlinedMutableStates += ((javaType, varName))
243+
mutableStateInitCode += initCode
244+
varName
245+
} else {
246+
val arrays = arrayCompactedMutableStates.getOrElseUpdate(javaType, new MutableStateArrays)
247+
val element = arrays.getNextSlot()
248+
249+
val initCode = initFunc(element)
250+
mutableStateInitCode += initCode
251+
element
252+
}
171253
}
172254

173255
/**
@@ -176,8 +258,7 @@ class CodegenContext {
176258
* data types like: UTF8String, ArrayData, MapData & InternalRow.
177259
*/
178260
def addBufferedState(dataType: DataType, variableName: String, initCode: String): ExprCode = {
179-
val value = freshName(variableName)
180-
addMutableState(javaType(dataType), value, "")
261+
val value = addMutableState(javaType(dataType), variableName)
181262
val code = dataType match {
182263
case StringType => s"$value = $initCode.clone();"
183264
case _: StructType | _: ArrayType | _: MapType => s"$value = $initCode.copy();"
@@ -189,15 +270,37 @@ class CodegenContext {
189270
def declareMutableStates(): String = {
190271
// It's possible that we add same mutable state twice, e.g. the `mergeExpressions` in
191272
// `TypedAggregateExpression`, we should call `distinct` here to remove the duplicated ones.
192-
mutableStates.distinct.map { case (javaType, variableName, _) =>
273+
val inlinedStates = inlinedMutableStates.distinct.map { case (javaType, variableName) =>
193274
s"private $javaType $variableName;"
194-
}.mkString("\n")
275+
}
276+
277+
val arrayStates = arrayCompactedMutableStates.flatMap { case (javaType, mutableStateArrays) =>
278+
val numArrays = mutableStateArrays.arrayNames.size
279+
mutableStateArrays.arrayNames.zipWithIndex.map { case (arrayName, index) =>
280+
val length = if (index + 1 == numArrays) {
281+
mutableStateArrays.getCurrentIndex
282+
} else {
283+
CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT
284+
}
285+
if (javaType.contains("[]")) {
286+
// initializer had an one-dimensional array variable
287+
val baseType = javaType.substring(0, javaType.length - 2)
288+
s"private $javaType[] $arrayName = new $baseType[$length][];"
289+
} else {
290+
// initializer had a scalar variable
291+
s"private $javaType[] $arrayName = new $javaType[$length];"
292+
}
293+
}
294+
}
295+
296+
(inlinedStates ++ arrayStates).mkString("\n")
195297
}
196298

197299
def initMutableStates(): String = {
198300
// It's possible that we add same mutable state twice, e.g. the `mergeExpressions` in
199301
// `TypedAggregateExpression`, we should call `distinct` here to remove the duplicated ones.
200-
val initCodes = mutableStates.distinct.map(_._3 + "\n")
302+
val initCodes = mutableStateInitCode.distinct
303+
201304
// The generated initialization code may exceed 64kb function size limit in JVM if there are too
202305
// many mutable states, so split it into multiple functions.
203306
splitExpressions(expressions = initCodes, funcName = "init", arguments = Nil)
@@ -1011,9 +1114,9 @@ class CodegenContext {
10111114
val commonExprs = equivalentExpressions.getAllEquivalentExprs.filter(_.size > 1)
10121115
commonExprs.foreach { e =>
10131116
val expr = e.head
1014-
val fnName = freshName("evalExpr")
1015-
val isNull = s"${fnName}IsNull"
1016-
val value = s"${fnName}Value"
1117+
val fnName = freshName("subExpr")
1118+
val isNull = addMutableState(JAVA_BOOLEAN, "subExprIsNull")
1119+
val value = addMutableState(javaType(expr.dataType), "subExprValue")
10171120

10181121
// Generate the code for this expression tree and wrap it in a function.
10191122
val eval = expr.genCode(this)
@@ -1039,9 +1142,6 @@ class CodegenContext {
10391142
// 2. Less code.
10401143
// Currently, we will do this for all non-leaf only expression trees (i.e. expr trees with
10411144
// at least two nodes) as the cost of doing it is expected to be low.
1042-
addMutableState(JAVA_BOOLEAN, isNull, s"$isNull = false;")
1043-
addMutableState(javaType(expr.dataType), value,
1044-
s"$value = ${defaultValue(expr.dataType)};")
10451145

10461146
subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);"
10471147
val state = SubExprEliminationState(isNull, value)
@@ -1165,6 +1265,15 @@ object CodeGenerator extends Logging {
11651265
// class.
11661266
val GENERATED_CLASS_SIZE_THRESHOLD = 1000000
11671267

1268+
// This is the threshold for the number of global variables, whose types are primitive type or
1269+
// complex type (e.g. more than one-dimensional array), that will be placed at the outer class
1270+
val OUTER_CLASS_VARIABLES_THRESHOLD = 10000
1271+
1272+
// This is the maximum number of array elements to keep global variables in one Java array
1273+
// 32767 is the maximum integer value that does not require a constant pool entry in a Java
1274+
// bytecode instruction
1275+
val MUTABLESTATEARRAY_SIZE_LIMIT = 32768
1276+
11681277
/**
11691278
* Compile the Java source code into a Java class, using Janino.
11701279
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -57,41 +57,37 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
5757
case _ => true
5858
}.unzip
5959
val exprVals = ctx.generateExpressions(validExpr, useSubexprElimination)
60-
val projectionCodes = exprVals.zip(index).map {
60+
61+
// 4-tuples: (code for projection, isNull variable name, value variable name, column index)
62+
val projectionCodes: Seq[(String, String, String, Int)] = exprVals.zip(index).map {
6163
case (ev, i) =>
6264
val e = expressions(i)
65+
val value = ctx.addMutableState(ctx.javaType(e.dataType), "value")
6366
if (e.nullable) {
64-
val isNull = s"isNull_$i"
65-
val value = s"value_$i"
66-
ctx.addMutableState(ctx.JAVA_BOOLEAN, isNull, s"$isNull = true;")
67-
ctx.addMutableState(ctx.javaType(e.dataType), value,
68-
s"$value = ${ctx.defaultValue(e.dataType)};")
69-
s"""
70-
${ev.code}
71-
$isNull = ${ev.isNull};
72-
$value = ${ev.value};
73-
"""
67+
val isNull = ctx.addMutableState(ctx.JAVA_BOOLEAN, "isNull")
68+
(s"""
69+
|${ev.code}
70+
|$isNull = ${ev.isNull};
71+
|$value = ${ev.value};
72+
""".stripMargin, isNull, value, i)
7473
} else {
75-
val value = s"value_$i"
76-
ctx.addMutableState(ctx.javaType(e.dataType), value,
77-
s"$value = ${ctx.defaultValue(e.dataType)};")
78-
s"""
79-
${ev.code}
80-
$value = ${ev.value};
81-
"""
74+
(s"""
75+
|${ev.code}
76+
|$value = ${ev.value};
77+
""".stripMargin, ev.isNull, value, i)
8278
}
8379
}
8480

8581
// Evaluate all the subexpressions.
8682
val evalSubexpr = ctx.subexprFunctions.mkString("\n")
8783

88-
val updates = validExpr.zip(index).map {
89-
case (e, i) =>
90-
val ev = ExprCode("", s"isNull_$i", s"value_$i")
84+
val updates = validExpr.zip(projectionCodes).map {
85+
case (e, (_, isNull, value, i)) =>
86+
val ev = ExprCode("", isNull, value)
9187
ctx.updateColumn("mutableRow", e.dataType, i, ev, e.nullable)
9288
}
9389

94-
val allProjections = ctx.splitExpressionsWithCurrentInputs(projectionCodes)
90+
val allProjections = ctx.splitExpressionsWithCurrentInputs(projectionCodes.map(_._1))
9591
val allUpdates = ctx.splitExpressionsWithCurrentInputs(updates)
9692

9793
val codeBody = s"""

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,8 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
7373
bufferHolder: String,
7474
isTopLevel: Boolean = false): String = {
7575
val rowWriterClass = classOf[UnsafeRowWriter].getName
76-
val rowWriter = ctx.freshName("rowWriter")
77-
ctx.addMutableState(rowWriterClass, rowWriter,
78-
s"$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
76+
val rowWriter = ctx.addMutableState(rowWriterClass, "rowWriter",
77+
v => s"$v = new $rowWriterClass($bufferHolder, ${inputs.length});")
7978

8079
val resetWriter = if (isTopLevel) {
8180
// For top level row writer, it always writes to the beginning of the global buffer holder,
@@ -186,9 +185,8 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
186185
// Puts `input` in a local variable to avoid to re-evaluate it if it's a statement.
187186
val tmpInput = ctx.freshName("tmpInput")
188187
val arrayWriterClass = classOf[UnsafeArrayWriter].getName
189-
val arrayWriter = ctx.freshName("arrayWriter")
190-
ctx.addMutableState(arrayWriterClass, arrayWriter,
191-
s"$arrayWriter = new $arrayWriterClass();")
188+
val arrayWriter = ctx.addMutableState(arrayWriterClass, "arrayWriter",
189+
v => s"$v = new $arrayWriterClass();")
192190
val numElements = ctx.freshName("numElements")
193191
val index = ctx.freshName("index")
194192

@@ -318,13 +316,12 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
318316
case _ => true
319317
}
320318

321-
val result = ctx.freshName("result")
322-
ctx.addMutableState("UnsafeRow", result, s"$result = new UnsafeRow(${expressions.length});")
319+
val result = ctx.addMutableState("UnsafeRow", "result",
320+
v => s"$v = new UnsafeRow(${expressions.length});")
323321

324-
val holder = ctx.freshName("holder")
325322
val holderClass = classOf[BufferHolder].getName
326-
ctx.addMutableState(holderClass, holder,
327-
s"$holder = new $holderClass($result, ${numVarLenFields * 32});")
323+
val holder = ctx.addMutableState(holderClass, "holder",
324+
v => s"$v = new $holderClass($result, ${numVarLenFields * 32});")
328325

329326
val resetBufferHolder = if (numVarLenFields == 0) {
330327
""

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,7 @@ case class CaseWhen(
190190
// It is initialized to `NOT_MATCHED`, and if it's set to `HAS_NULL` or `HAS_NONNULL`,
191191
// We won't go on anymore on the computation.
192192
val resultState = ctx.freshName("caseWhenResultState")
193-
val tmpResult = ctx.freshName("caseWhenTmpResult")
194-
ctx.addMutableState(ctx.javaType(dataType), tmpResult)
193+
val tmpResult = ctx.addMutableState(ctx.javaType(dataType), "caseWhenTmpResult")
195194

196195
// these blocks are meant to be inside a
197196
// do {

0 commit comments

Comments
 (0)