Skip to content

Commit ea125f5

Browse files
authored
perf: Experimental fix to avoid join strategy regression (#1674)
1 parent e823163 commit ea125f5

File tree

3 files changed

+15
-2
lines changed

3 files changed

+15
-2
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,16 @@ object CometConf extends ShimCometConf {
414414
.doubleConf
415415
.createWithDefault(10.0)
416416

417+
val COMET_EXCHANGE_SIZE_MULTIPLIER: ConfigEntry[Double] = conf(
418+
"spark.comet.shuffle.sizeInBytesMultiplier")
419+
.doc(
420+
"Comet reports smaller sizes for shuffle due to using Arrow's columnar memory format " +
421+
"and this can result in Spark choosing a different join strategy due to the estimated " +
422+
"size of the exchange being smaller. Comet will multiple sizeInBytes by this amount to " +
423+
"avoid regressions in join strategy.")
424+
.doubleConf
425+
.createWithDefault(1.0)
426+
417427
val COMET_DPP_FALLBACK_ENABLED: ConfigEntry[Boolean] =
418428
conf("spark.comet.dppFallback.enabled")
419429
.doc("Whether to fall back to Spark for queries that use DPP.")

docs/source/user-guide/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,5 @@ Comet provides the following configuration settings.
8888
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. | false |
8989
| spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 |
9090
| spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 |
91+
| spark.comet.shuffle.sizeInBytesMultiplier | Comet reports smaller sizes for shuffle due to using Arrow's columnar memory format and this can result in Spark choosing a different join strategy due to the estimated size of the exchange being smaller. Comet will multiple sizeInBytes by this amount to avoid regressions in join strategy. | 1.0 |
9192
| spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list of operators that will be converted to Arrow columnar format when 'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan |

spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import org.apache.spark.util.random.XORShiftRandom
4646

4747
import com.google.common.base.Objects
4848

49+
import org.apache.comet.CometConf
4950
import org.apache.comet.shims.ShimCometShuffleExchangeExec
5051

5152
/**
@@ -113,9 +114,10 @@ case class CometShuffleExchangeExec(
113114
new CometShuffledBatchRDD(shuffleDependency, readMetrics, partitionSpecs)
114115

115116
override def runtimeStatistics: Statistics = {
116-
val dataSize = metrics("dataSize").value
117+
val dataSize =
118+
metrics("dataSize").value * Math.max(CometConf.COMET_EXCHANGE_SIZE_MULTIPLIER.get(conf), 1)
117119
val rowCount = metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN).value
118-
Statistics(dataSize, Some(rowCount))
120+
Statistics(dataSize.toLong, Some(rowCount))
119121
}
120122

121123
// TODO: add `override` keyword after dropping Spark-3.x supports

0 commit comments

Comments
 (0)