Skip to content

Commit 86f6eb6

Browse files
committed
perf: reduce GC pressure in protobuf serialization
Replace ByteArrayOutputStream with direct CodedOutputStream serialization to eliminate unnecessary allocations during query plan serialization. This optimization: - Pre-allocates exact buffer size using getSerializedSize() - Eliminates ByteArrayOutputStream's internal buffer resizing - Removes defensive array copying from toByteArray() - Applies to 5 hot paths called per-partition during query execution For a query with 1000 partitions, this eliminates 5000+ unnecessary allocations and array copies, significantly reducing GC pressure. Changes: - operators.scala: getCometIterator() and convertBlock() - CometNativeWriteExec.scala: serializedPlanOpt() and doExecute() - ParquetFilters.scala: createNativeFilters()
1 parent 313a9d9 commit 86f6eb6

File tree

3 files changed

+33
-20
lines changed

3 files changed

+33
-20
lines changed

spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ import org.apache.spark.sql.sources
4343
import org.apache.spark.sql.types.StructType
4444
import org.apache.spark.unsafe.types.UTF8String
4545

46+
import com.google.protobuf.CodedOutputStream
47+
4648
import org.apache.comet.parquet.SourceFilterSerde.{createBinaryExpr, createNameExpr, createUnaryExpr, createValueExpr}
4749
import org.apache.comet.serde.ExprOuterClass
4850
import org.apache.comet.serde.QueryPlanSerde.scalarFunctionExprToProto
@@ -885,10 +887,12 @@ class ParquetFilters(
885887

886888
def createNativeFilters(predicates: Seq[sources.Filter]): Option[Array[Byte]] = {
887889
predicates.reduceOption(sources.And).flatMap(createNativeFilter).map { expr =>
888-
val outputStream = new ByteArrayOutputStream()
889-
expr.writeTo(outputStream)
890-
outputStream.close()
891-
outputStream.toByteArray
890+
val size = expr.getSerializedSize
891+
val bytes = new Array[Byte](size)
892+
val codedOutput = CodedOutputStream.newInstance(bytes)
893+
expr.writeTo(codedOutput)
894+
codedOutput.checkNoSpaceLeft()
895+
bytes
892896
}
893897
}
894898

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
3434
import org.apache.spark.sql.vectorized.ColumnarBatch
3535
import org.apache.spark.util.Utils
3636

37+
import com.google.protobuf.CodedOutputStream
38+
3739
import org.apache.comet.CometExecIterator
3840
import org.apache.comet.serde.OperatorOuterClass.Operator
3941

@@ -75,10 +77,12 @@ case class CometNativeWriteExec(
7577
sparkContext.collectionAccumulator[FileCommitProtocol.TaskCommitMessage]("taskCommitMessages")
7678

7779
override def serializedPlanOpt: SerializedPlan = {
78-
val outputStream = new ByteArrayOutputStream()
79-
nativeOp.writeTo(outputStream)
80-
outputStream.close()
81-
SerializedPlan(Some(outputStream.toByteArray))
80+
val size = nativeOp.getSerializedSize
81+
val bytes = new Array[Byte](size)
82+
val codedOutput = CodedOutputStream.newInstance(bytes)
83+
nativeOp.writeTo(codedOutput)
84+
codedOutput.checkNoSpaceLeft()
85+
SerializedPlan(Some(bytes))
8286
}
8387

8488
override def withNewChildInternal(newChild: SparkPlan): SparkPlan =
@@ -196,10 +200,11 @@ case class CometNativeWriteExec(
196200

197201
val nativeMetrics = CometMetricNode.fromCometPlan(this)
198202

199-
val outputStream = new ByteArrayOutputStream()
200-
modifiedNativeOp.writeTo(outputStream)
201-
outputStream.close()
202-
val planBytes = outputStream.toByteArray
203+
val size = modifiedNativeOp.getSerializedSize
204+
val planBytes = new Array[Byte](size)
205+
val codedOutput = CodedOutputStream.newInstance(planBytes)
206+
modifiedNativeOp.writeTo(codedOutput)
207+
codedOutput.checkNoSpaceLeft()
203208

204209
val execIterator = new CometExecIterator(
205210
CometExec.newIterId,

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import org.apache.spark.util.SerializableConfiguration
5050
import org.apache.spark.util.io.ChunkedByteBuffer
5151

5252
import com.google.common.base.Objects
53+
import com.google.protobuf.CodedOutputStream
5354

5455
import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException, ConfigEntry}
5556
import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleEnabled, withInfo}
@@ -139,10 +140,11 @@ object CometExec {
139140
partitionIdx: Int,
140141
broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]],
141142
encryptedFilePaths: Seq[String]): CometExecIterator = {
142-
val outputStream = new ByteArrayOutputStream()
143-
nativePlan.writeTo(outputStream)
144-
outputStream.close()
145-
val bytes = outputStream.toByteArray
143+
val size = nativePlan.getSerializedSize
144+
val bytes = new Array[Byte](size)
145+
val codedOutput = CodedOutputStream.newInstance(bytes)
146+
nativePlan.writeTo(codedOutput)
147+
codedOutput.checkNoSpaceLeft()
146148
new CometExecIterator(
147149
newIterId,
148150
inputs,
@@ -414,10 +416,12 @@ abstract class CometNativeExec extends CometExec {
414416
def convertBlock(): CometNativeExec = {
415417
def transform(arg: Any): AnyRef = arg match {
416418
case serializedPlan: SerializedPlan if serializedPlan.isEmpty =>
417-
val out = new ByteArrayOutputStream()
418-
nativeOp.writeTo(out)
419-
out.close()
420-
SerializedPlan(Some(out.toByteArray))
419+
val size = nativeOp.getSerializedSize
420+
val bytes = new Array[Byte](size)
421+
val codedOutput = CodedOutputStream.newInstance(bytes)
422+
nativeOp.writeTo(codedOutput)
423+
codedOutput.checkNoSpaceLeft()
424+
SerializedPlan(Some(bytes))
421425
case other: AnyRef => other
422426
case null => null
423427
}

0 commit comments

Comments
 (0)