Skip to content

Commit 3ca7a7f

Browse files
andygroveclaude
andcommitted
perf: cache serialized query plans to avoid per-partition serialization
Add caching for serialized protobuf query plans to avoid serializing the same plan repeatedly for every partition in CometExecIterator. Changes: - Add serializeNativePlan() helper method in CometExec - Add getCometIterator() overload accepting pre-serialized bytes - Update CometExecUtils.getNativeLimitRDD to serialize once - Update CometTakeOrderedAndProjectExec to serialize plans once For a query with N partitions, this eliminates N-1 redundant protobuf serializations per affected code path, reducing CPU overhead and GC pressure during query execution. Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 0a6f97d commit 3ca7a7f

File tree

3 files changed

+60
-16
lines changed

3 files changed

+60
-16
lines changed

spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,11 @@ object CometExecUtils {
5353
limit: Int,
5454
offset: Int = 0): RDD[ColumnarBatch] = {
5555
val numParts = childPlan.getNumPartitions
56+
// Serialize the plan once before mapping to avoid repeated serialization per partition
57+
val limitOp = CometExecUtils.getLimitNativePlan(outputAttribute, limit, offset).get
58+
val serializedPlan = CometExec.serializeNativePlan(limitOp)
5659
childPlan.mapPartitionsWithIndexInternal { case (idx, iter) =>
57-
val limitOp = CometExecUtils.getLimitNativePlan(outputAttribute, limit, offset).get
58-
CometExec.getCometIterator(Seq(iter), outputAttribute.length, limitOp, numParts, idx)
60+
CometExec.getCometIterator(Seq(iter), outputAttribute.length, serializedPlan, numParts, idx)
5961
}
6062
}
6163

spark/src/main/scala/org/apache/spark/sql/comet/CometTakeOrderedAndProjectExec.scala

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,15 @@ case class CometTakeOrderedAndProjectExec(
133133
CometExecUtils.getNativeLimitRDD(childRDD, child.output, limit)
134134
} else {
135135
val numParts = childRDD.getNumPartitions
136+
// Serialize the plan once before mapping to avoid repeated serialization per partition
137+
val topK =
138+
CometExecUtils
139+
.getTopKNativePlan(child.output, sortOrder, child, limit)
140+
.get
141+
val serializedTopK = CometExec.serializeNativePlan(topK)
142+
val numOutputCols = child.output.length
136143
childRDD.mapPartitionsWithIndexInternal { case (idx, iter) =>
137-
val topK =
138-
CometExecUtils
139-
.getTopKNativePlan(child.output, sortOrder, child, limit)
140-
.get
141-
CometExec.getCometIterator(Seq(iter), child.output.length, topK, numParts, idx)
144+
CometExec.getCometIterator(Seq(iter), numOutputCols, serializedTopK, numParts, idx)
142145
}
143146
}
144147

@@ -154,11 +157,19 @@ case class CometTakeOrderedAndProjectExec(
154157
new CometShuffledBatchRDD(dep, readMetrics)
155158
}
156159

160+
// Serialize the plan once before mapping to avoid repeated serialization per partition
161+
val topKAndProjection = CometExecUtils
162+
.getProjectionNativePlan(projectList, child.output, sortOrder, child, limit, offset)
163+
.get
164+
val serializedTopKAndProjection = CometExec.serializeNativePlan(topKAndProjection)
165+
val finalOutputLength = output.length
157166
singlePartitionRDD.mapPartitionsInternal { iter =>
158-
val topKAndProjection = CometExecUtils
159-
.getProjectionNativePlan(projectList, child.output, sortOrder, child, limit, offset)
160-
.get
161-
val it = CometExec.getCometIterator(Seq(iter), output.length, topKAndProjection, 1, 0)
167+
val it = CometExec.getCometIterator(
168+
Seq(iter),
169+
finalOutputLength,
170+
serializedTopKAndProjection,
171+
1,
172+
0)
162173
setSubqueries(it.id, this)
163174

164175
Option(TaskContext.get()).foreach { context =>

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,19 @@ object CometExec {
113113

114114
def newIterId: Long = curId.getAndIncrement()
115115

116+
/**
117+
* Serialize a native plan to bytes. Use this method to serialize the plan once before calling
118+
* getCometIterator for each partition, avoiding repeated serialization.
119+
*/
120+
def serializeNativePlan(nativePlan: Operator): Array[Byte] = {
121+
val size = nativePlan.getSerializedSize
122+
val bytes = new Array[Byte](size)
123+
val codedOutput = CodedOutputStream.newInstance(bytes)
124+
nativePlan.writeTo(codedOutput)
125+
codedOutput.checkNoSpaceLeft()
126+
bytes
127+
}
128+
116129
def getCometIterator(
117130
inputs: Seq[Iterator[ColumnarBatch]],
118131
numOutputCols: Int,
@@ -130,6 +143,28 @@ object CometExec {
130143
encryptedFilePaths = Seq.empty)
131144
}
132145

146+
/**
147+
* Create a CometExecIterator with a pre-serialized native plan. Use this overload when
148+
* executing the same plan across multiple partitions to avoid serializing the plan repeatedly.
149+
*/
150+
def getCometIterator(
151+
inputs: Seq[Iterator[ColumnarBatch]],
152+
numOutputCols: Int,
153+
serializedPlan: Array[Byte],
154+
numParts: Int,
155+
partitionIdx: Int): CometExecIterator = {
156+
new CometExecIterator(
157+
newIterId,
158+
inputs,
159+
numOutputCols,
160+
serializedPlan,
161+
CometMetricNode(Map.empty),
162+
numParts,
163+
partitionIdx,
164+
broadcastedHadoopConfForEncryption = None,
165+
encryptedFilePaths = Seq.empty)
166+
}
167+
133168
def getCometIterator(
134169
inputs: Seq[Iterator[ColumnarBatch]],
135170
numOutputCols: Int,
@@ -139,11 +174,7 @@ object CometExec {
139174
partitionIdx: Int,
140175
broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]],
141176
encryptedFilePaths: Seq[String]): CometExecIterator = {
142-
val size = nativePlan.getSerializedSize
143-
val bytes = new Array[Byte](size)
144-
val codedOutput = CodedOutputStream.newInstance(bytes)
145-
nativePlan.writeTo(codedOutput)
146-
codedOutput.checkNoSpaceLeft()
177+
val bytes = serializeNativePlan(nativePlan)
147178
new CometExecIterator(
148179
newIterId,
149180
inputs,

0 commit comments

Comments
 (0)