Skip to content

Commit 48776fe

Browse files
authored
perf: reduce GC pressure in protobuf serialization (apache#3242)
1 parent 5e9a1ca commit 48776fe

File tree

3 files changed

+33
-24
lines changed

3 files changed

+33
-24
lines changed

spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.comet.parquet
2121

22-
import java.io.ByteArrayOutputStream
2322
import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort}
2423
import java.math.{BigDecimal => JBigDecimal}
2524
import java.sql.{Date, Timestamp}
@@ -43,6 +42,8 @@ import org.apache.spark.sql.sources
4342
import org.apache.spark.sql.types.StructType
4443
import org.apache.spark.unsafe.types.UTF8String
4544

45+
import com.google.protobuf.CodedOutputStream
46+
4647
import org.apache.comet.parquet.SourceFilterSerde.{createBinaryExpr, createNameExpr, createUnaryExpr, createValueExpr}
4748
import org.apache.comet.serde.ExprOuterClass
4849
import org.apache.comet.serde.QueryPlanSerde.scalarFunctionExprToProto
@@ -885,10 +886,12 @@ class ParquetFilters(
885886

886887
def createNativeFilters(predicates: Seq[sources.Filter]): Option[Array[Byte]] = {
887888
predicates.reduceOption(sources.And).flatMap(createNativeFilter).map { expr =>
888-
val outputStream = new ByteArrayOutputStream()
889-
expr.writeTo(outputStream)
890-
outputStream.close()
891-
outputStream.toByteArray
889+
val size = expr.getSerializedSize
890+
val bytes = new Array[Byte](size)
891+
val codedOutput = CodedOutputStream.newInstance(bytes)
892+
expr.writeTo(codedOutput)
893+
codedOutput.checkNoSpaceLeft()
894+
bytes
892895
}
893896
}
894897

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.apache.spark.sql.comet
2121

22-
import java.io.ByteArrayOutputStream
23-
2422
import scala.jdk.CollectionConverters._
2523

2624
import org.apache.hadoop.fs.Path
@@ -34,6 +32,8 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
3432
import org.apache.spark.sql.vectorized.ColumnarBatch
3533
import org.apache.spark.util.Utils
3634

35+
import com.google.protobuf.CodedOutputStream
36+
3737
import org.apache.comet.CometExecIterator
3838
import org.apache.comet.serde.OperatorOuterClass.Operator
3939

@@ -75,10 +75,12 @@ case class CometNativeWriteExec(
7575
sparkContext.collectionAccumulator[FileCommitProtocol.TaskCommitMessage]("taskCommitMessages")
7676

7777
override def serializedPlanOpt: SerializedPlan = {
78-
val outputStream = new ByteArrayOutputStream()
79-
nativeOp.writeTo(outputStream)
80-
outputStream.close()
81-
SerializedPlan(Some(outputStream.toByteArray))
78+
val size = nativeOp.getSerializedSize
79+
val bytes = new Array[Byte](size)
80+
val codedOutput = CodedOutputStream.newInstance(bytes)
81+
nativeOp.writeTo(codedOutput)
82+
codedOutput.checkNoSpaceLeft()
83+
SerializedPlan(Some(bytes))
8284
}
8385

8486
override def withNewChildInternal(newChild: SparkPlan): SparkPlan =
@@ -196,10 +198,11 @@ case class CometNativeWriteExec(
196198

197199
val nativeMetrics = CometMetricNode.fromCometPlan(this)
198200

199-
val outputStream = new ByteArrayOutputStream()
200-
modifiedNativeOp.writeTo(outputStream)
201-
outputStream.close()
202-
val planBytes = outputStream.toByteArray
201+
val size = modifiedNativeOp.getSerializedSize
202+
val planBytes = new Array[Byte](size)
203+
val codedOutput = CodedOutputStream.newInstance(planBytes)
204+
modifiedNativeOp.writeTo(codedOutput)
205+
codedOutput.checkNoSpaceLeft()
203206

204207
val execIterator = new CometExecIterator(
205208
CometExec.newIterId,

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.spark.sql.comet
2121

22-
import java.io.ByteArrayOutputStream
2322
import java.util.Locale
2423

2524
import scala.collection.mutable
@@ -50,6 +49,7 @@ import org.apache.spark.util.SerializableConfiguration
5049
import org.apache.spark.util.io.ChunkedByteBuffer
5150

5251
import com.google.common.base.Objects
52+
import com.google.protobuf.CodedOutputStream
5353

5454
import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException, ConfigEntry}
5555
import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleEnabled, withInfo}
@@ -139,10 +139,11 @@ object CometExec {
139139
partitionIdx: Int,
140140
broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]],
141141
encryptedFilePaths: Seq[String]): CometExecIterator = {
142-
val outputStream = new ByteArrayOutputStream()
143-
nativePlan.writeTo(outputStream)
144-
outputStream.close()
145-
val bytes = outputStream.toByteArray
142+
val size = nativePlan.getSerializedSize
143+
val bytes = new Array[Byte](size)
144+
val codedOutput = CodedOutputStream.newInstance(bytes)
145+
nativePlan.writeTo(codedOutput)
146+
codedOutput.checkNoSpaceLeft()
146147
new CometExecIterator(
147148
newIterId,
148149
inputs,
@@ -414,10 +415,12 @@ abstract class CometNativeExec extends CometExec {
414415
def convertBlock(): CometNativeExec = {
415416
def transform(arg: Any): AnyRef = arg match {
416417
case serializedPlan: SerializedPlan if serializedPlan.isEmpty =>
417-
val out = new ByteArrayOutputStream()
418-
nativeOp.writeTo(out)
419-
out.close()
420-
SerializedPlan(Some(out.toByteArray))
418+
val size = nativeOp.getSerializedSize
419+
val bytes = new Array[Byte](size)
420+
val codedOutput = CodedOutputStream.newInstance(bytes)
421+
nativeOp.writeTo(codedOutput)
422+
codedOutput.checkNoSpaceLeft()
423+
SerializedPlan(Some(bytes))
421424
case other: AnyRef => other
422425
case null => null
423426
}

0 commit comments

Comments
 (0)