-
Notifications
You must be signed in to change notification settings - Fork 559
[GLUTEN-11251] Fix incorrect whole stage id in WholeStageTransformerExec #11252
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Run Gluten Clickhouse CI on x86 |
d3313ea to
e45c64d
Compare
|
Run Gluten Clickhouse CI on x86 |
1 similar comment
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
2 similar comments
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
@zhztheplayer @zhouyuan Could you help to review? Thanks! |
WholeStageTransformerExec75f416e to
b23ff21
Compare
|
Run Gluten Clickhouse CI on x86 |
|
|
||
| case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = false)( | ||
| val transformStageId: Int | ||
| var transformStageId: Int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why changing to var?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When AQE is on, transformStageId always starts at 0 within each query stage. In this case transformStageId is not globally unique within the whole plan tree. After all query stages have been executed, the RegenerateTransformStageId rule traverses the whole plan tree and updates this value incrementally to a unique id.
| * starts from 1. This rule updates the whole plan tree with incremental and unique transform stage | ||
| * id before the final execution. | ||
| */ | ||
| case class RegenerateTransformStageId() extends Rule[SparkPlan] with AdaptiveSparkPlanHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you compared with vanilla Spark's query planner? I assume Spark doesn't need such rule for the correctness of the stage IDs in AQE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark maintains a global counter in rule CollapseCodegenStages inside of the AdaptiveSparkPlanExec to make sure it generates unique id across query stages. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L147-L150
But for Gluten we only can use a new counter for each query stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood. Though I feel the code of ID generation gets much harder to reason about after the PR's change.
Do you think it's feasible to change ColumnarCollapseTransformStages to leave all stage IDs unassigned (e.g., -1), then rely on RegenerateTransformStageId to assign all IDs, both for AQE and non-AQE cases? So it's clear that only RegenerateTransformStageId manages these IDs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's feasible to change ColumnarCollapseTransformStages to leave all stage IDs unassigned (e.g., -1), then rely on RegenerateTransformStageId to assign all IDs, both for AQE and non-AQE cases? So it's clear that only RegenerateTransformStageId manages these IDs.
Yes make sense.
| case class ColumnarCollapseTransformStages( | ||
| glutenConf: GlutenConfig, | ||
| transformStageCounter: AtomicInteger = ColumnarCollapseTransformStages.transformStageCounter) | ||
| transformStageCounter: AtomicInteger = new AtomicInteger(0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| private val transformStageCounter: AtomicInteger = new AtomicInteger(0) | ||
|
|
||
| private val wholeStageTransformerCache = | ||
| new mutable.HashSet[WholeStageTransformer]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HashSet[WholeStageTransformer]() is expensive. Can we use IdentityHashMap or so?
|
Run Gluten Clickhouse CI on x86 |
1 similar comment
|
Run Gluten Clickhouse CI on x86 |
4b56312 to
33ed32f
Compare
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
| injector.injectPostTransform( | ||
| c => GlutenFallbackReporter(new GlutenConfig(c.sqlConf), c.session)) | ||
| injector.injectPostTransform(_ => RemoveFallbackTagRule()) | ||
| injector.injectPostTransform(_ => GenerateTransformStageId()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move the rule right after ColumnarCollapseTransformStages? To make it clear that GenerateTransformStageId doesn't depend on other rules.
zhztheplayer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Only a minor left from my end.
|
Run Gluten Clickhouse CI on x86 |
## Changes | Cause | Type | Category | Description | Affected Files | |-------|------|----------|-------------|----------------| | N/A | Feat | Build | Update build configuration to support Spark 4.1 UT | `.github/workflows/velox_backend_x86.yml`, `gluten-ut/pom.xml`, `gluten-ut/spark41/pom.xml`, `tools/gluten-it/pom.xml` | | [#52165](apache/spark#52165) | Fix | Dependency | Update Parquet dependency version to 1.16.0 to avoid NoSuchMethodError issue | `gluten-ut/spark41/pom.xml` | | [#51477](apache/spark#51477) | Fix | Compatibility | Update imports to reflect streaming runtime package refactoring in Apache Spark | `gluten-ut/spark41/.../GlutenDynamicPartitionPruningSuite.scala`, `gluten-ut/spark41/.../GlutenStreamingQuerySuite.scala` | | [#50674](apache/spark#50674) | Fix | Compatibility | Fix compatibility issue introduced by `TypedConfigBuilder` | `gluten-substrait/.../ExpressionConverter.scala`, `gluten-ut/spark41/.../GlutenCSVSuite.scala`, `gluten-ut/spark41/.../GlutenJsonSuite.scala` | | [#49766](apache/spark#49766) | Fix | Compatibility | Disable V2 bucketing in GlutenDynamicPartitionPruningSuite since spark.sql.sources.v2.bucketing.enabled is now enabled by default | `gluten-ut/spark41/.../GlutenDynamicPartitionPruningSuite.scala` | | [#42414](apache/spark#42414), [#53038](apache/spark#53038) | Fix | Bug Fix | Resolve an issue introduced by SPARK-42414, as identified in SPARK-53038 | `backends-velox/.../VeloxBloomFilterAggregate.scala` | | N/A | Fix | Bug Fix | Enforce row fallback for unsupported cached batches - keep columnar execution only when schema validation succeeds | `backends-velox/.../ColumnarCachedBatchSerializer.scala` | | [SPARK-53132](apache/spark#53132), [SPARK-53142](apache/spark#53142) | 4.1.0 | Test Exclusion | Exclude additional Spark 4.1 KeyGroupedPartitioningSuite tests. Excluded tests: `SPARK-53322*`, `SPARK-54439*` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [SPARK-53535](https://issues.apache.org/jira/browse/SPARK-53535), [SPARK-54220](https://issues.apache.org/jira/browse/SPARK-54220) | 4.1.0 | Test Exclusion | Exclude additional Spark 4.1 GlutenParquetIOSuite tests. Excluded tests: `SPARK-53535*`, `vectorized reader: missing all struct fields*`, `SPARK-54220*` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [#52645](apache/spark#52645) | 4.1.0 | Test Exclusion | Exclude additional Spark 4.1 GlutenStreamingQuerySuite tests. Excluded tests: `SPARK-53942: changing the number of stateless shuffle partitions via config`, `SPARK-53942: stateful shuffle partitions are retained from old checkpoint` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [#47856](apache/spark#47856) | 4.1.0 | Test Exclusion | Exclude additional Spark 4.1 GlutenDataFrameWindowFunctionsSuite and GlutenJoinSuite tests. Excluded tests: `SPARK-49386: Window spill with more than the inMemoryThreshold and spillSizeThreshold`, `SPARK-49386: test SortMergeJoin (with spill by size threshold)` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [#52157](apache/spark#52157) | 4.1.0 | Test Exclusion | Exclude additional Spark 4.1 GlutenQueryExecutionSuite tests. Excluded test: `#53413: Cleanup shuffle dependencies for commands` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [#48470](apache/spark#48470) | 4.1.0 | Test Exclusion | Exclude split test in GlutenRegexpExpressionsSuite. Excluded test: `GlutenRegexpExpressionsSuite.SPLIT` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [#51623](apache/spark#51623) | 4.1.0 | Test Exclusion | Add `spark.sql.unionOutputPartitioning=false` to Maven test args. Excluded tests: `GlutenBroadcastExchangeSuite.SPARK-52962`, `GlutenDataFrameSetOperationsSuite.SPARK-52921*` | `.github/workflows/velox_backend_x86.yml`, `gluten-ut/spark41/.../VeloxTestSettings.scala`, `tools/gluten-it/common/.../Suite.scala` | | N/A | 4.1.0 | Test Exclusion | Excludes failed SQL tests that need to be fixed for Spark 4.1 compatibility. Excluded tests: `decimalArithmeticOperations.sql`, `identifier-clause.sql`, `keywords.sql`, `literals.sql`, `operators.sql`, `exists-orderby-limit.sql`, `postgreSQL/date.sql`, `nonansi/keywords.sql`, `nonansi/literals.sql`, `datetime-legacy.sql`, `datetime-parsing-invalid.sql`, `misc-functions.sql` | `gluten-ut/spark41/.../VeloxSQLQueryTestSettings.scala` | | apache#11252 | 4.1.0 | Test Exclusion | Exclude Gluten test for SPARK-47939: Explain should work with parameterized queries | `gluten-ut/spark41/.../VeloxTestSettings.scala` |
## Changes | Cause | Type | Category | Description | Affected Files | |-------|------|----------|-------------|----------------| | N/A | Feat | Build | Update build configuration to support Spark 4.1 UT | `.github/workflows/velox_backend_x86.yml`, `gluten-ut/pom.xml`, `gluten-ut/spark41/pom.xml`, `tools/gluten-it/pom.xml` | | [#52165](apache/spark#52165) | Fix | Dependency | Update Parquet dependency version to 1.16.0 to avoid NoSuchMethodError issue | `gluten-ut/spark41/pom.xml` | | [#51477](apache/spark#51477) | Fix | Compatibility | Update imports to reflect streaming runtime package refactoring in Apache Spark | `gluten-ut/spark41/.../GlutenDynamicPartitionPruningSuite.scala`, `gluten-ut/spark41/.../GlutenStreamingQuerySuite.scala` | | [#50674](apache/spark#50674) | Fix | Compatibility | Fix compatibility issue introduced by `TypedConfigBuilder` | `gluten-substrait/.../ExpressionConverter.scala`, `gluten-ut/spark41/.../GlutenCSVSuite.scala`, `gluten-ut/spark41/.../GlutenJsonSuite.scala` | | [#49766](apache/spark#49766) | Fix | Compatibility | Disable V2 bucketing in GlutenDynamicPartitionPruningSuite since spark.sql.sources.v2.bucketing.enabled is now enabled by default | `gluten-ut/spark41/.../GlutenDynamicPartitionPruningSuite.scala` | | [#42414](apache/spark#42414), [#53038](apache/spark#53038) | Fix | Bug Fix | Resolve an issue introduced by SPARK-42414, as identified in SPARK-53038 | `backends-velox/.../VeloxBloomFilterAggregate.scala` | | N/A | Fix | Bug Fix | Enforce row fallback for unsupported cached batches - keep columnar execution only when schema validation succeeds | `backends-velox/.../ColumnarCachedBatchSerializer.scala` | | [SPARK-53132](apache/spark#53132), [SPARK-53142](apache/spark#53142) | 4.1.0 | Test Exclusion | Exclude additional Spark 4.1 KeyGroupedPartitioningSuite tests. Excluded tests: `SPARK-53322*`, `SPARK-54439*` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [SPARK-53535](https://issues.apache.org/jira/browse/SPARK-53535), [SPARK-54220](https://issues.apache.org/jira/browse/SPARK-54220) | 4.1.0 | Test Exclusion | Exclude additional Spark 4.1 GlutenParquetIOSuite tests. Excluded tests: `SPARK-53535*`, `vectorized reader: missing all struct fields*`, `SPARK-54220*` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [#52645](apache/spark#52645) | 4.1.0 | Test Exclusion | Exclude additional Spark 4.1 GlutenStreamingQuerySuite tests. Excluded tests: `SPARK-53942: changing the number of stateless shuffle partitions via config`, `SPARK-53942: stateful shuffle partitions are retained from old checkpoint` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [#47856](apache/spark#47856) | 4.1.0 | Test Exclusion | Exclude additional Spark 4.1 GlutenDataFrameWindowFunctionsSuite and GlutenJoinSuite tests. Excluded tests: `SPARK-49386: Window spill with more than the inMemoryThreshold and spillSizeThreshold`, `SPARK-49386: test SortMergeJoin (with spill by size threshold)` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [#52157](apache/spark#52157) | 4.1.0 | Test Exclusion | Exclude additional Spark 4.1 GlutenQueryExecutionSuite tests. Excluded test: `#53413: Cleanup shuffle dependencies for commands` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [#48470](apache/spark#48470) | 4.1.0 | Test Exclusion | Exclude split test in GlutenRegexpExpressionsSuite. Excluded test: `GlutenRegexpExpressionsSuite.SPLIT` | `gluten-ut/spark41/.../VeloxTestSettings.scala` | | [#51623](apache/spark#51623) | 4.1.0 | Test Exclusion | Add `spark.sql.unionOutputPartitioning=false` to Maven test args. Excluded tests: `GlutenBroadcastExchangeSuite.SPARK-52962`, `GlutenDataFrameSetOperationsSuite.SPARK-52921*` | `.github/workflows/velox_backend_x86.yml`, `gluten-ut/spark41/.../VeloxTestSettings.scala`, `tools/gluten-it/common/.../Suite.scala` | | N/A | 4.1.0 | Test Exclusion | Excludes failed SQL tests that need to be fixed for Spark 4.1 compatibility. Excluded tests: `decimalArithmeticOperations.sql`, `identifier-clause.sql`, `keywords.sql`, `literals.sql`, `operators.sql`, `exists-orderby-limit.sql`, `postgreSQL/date.sql`, `nonansi/keywords.sql`, `nonansi/literals.sql`, `datetime-legacy.sql`, `datetime-parsing-invalid.sql`, `misc-functions.sql` | `gluten-ut/spark41/.../VeloxSQLQueryTestSettings.scala` | | #11252 | 4.1.0 | Test Exclusion | Exclude Gluten test for SPARK-47939: Explain should work with parameterized queries | `gluten-ut/spark41/.../VeloxTestSettings.scala` |
Fix the transformer stage id contiguously increasing across different sql queries.
When AQE is off, the fix can be directly applied.
When AQE is on, vanillas spark set the rule
CollapseCodegenStagesto be stateful:https://github.com/apache/spark/blob/branch-4.0/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L153-L156
However in Gluten, when AQE is on, the columnar rules are applied upon each individual query stages, and there's no stateful context shared across query stages that is visible to the columnar rules.
Seems like there's no way to use a stateful counter to generate incremental ids across different query stages. This pr adds a new rule to update the stage id after the physical plan is generated.
Related issue: #11251