Skip to content

Commit 959765a

Browse files
author
jintao shen
authored
[Spark] Support OPTIMIZE tbl FULL for clustered table (delta-io#3793)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> 1. Add new sql syntax OPTIMIZE tbl FULL 2. Implemented OPTIMIZE tbl FULL to re-cluster all data in the table. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> new unit tests added ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> Yes Previously clustered table won't re-cluster data that was clustered against different cluster keys. With OPTIMIZE tbl FULL, they will be re-clustered against the new keys.
1 parent 0c916e0 commit 959765a

File tree

12 files changed

+392
-43
lines changed

12 files changed

+392
-43
lines changed

spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ statement
9393
(clusterBySpec | CLUSTER BY NONE) #alterTableClusterBy
9494
| ALTER TABLE table=qualifiedName
9595
(ALTER | CHANGE) COLUMN? column=qualifiedName SYNC IDENTITY #alterTableSyncIdentity
96-
| OPTIMIZE (path=STRING | table=qualifiedName)
96+
| OPTIMIZE (path=STRING | table=qualifiedName) FULL?
9797
(WHERE partitionPredicate=predicateToken)?
9898
(zorderSpec)? #optimizeTable
9999
| REORG TABLE table=qualifiedName
@@ -237,7 +237,7 @@ nonReserved
237237
: VACUUM | USING | INVENTORY | RETAIN | HOURS | DRY | RUN
238238
| CONVERT | TO | DELTA | PARTITIONED | BY
239239
| DESC | DESCRIBE | LIMIT | DETAIL
240-
| GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE
240+
| GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE | FULL
241241
| IDENTITY | SYNC | COLUMN | CHANGE
242242
| REORG | APPLY | PURGE | UPGRADE | UNIFORM | ICEBERG_COMPAT_VERSION
243243
| RESTORE | AS | OF
@@ -275,6 +275,7 @@ EXISTS: 'EXISTS';
275275
FALSE: 'FALSE';
276276
FEATURE: 'FEATURE';
277277
FOR: 'FOR';
278+
FULL: 'FULL';
278279
GENERATE: 'GENERATE';
279280
HISTORY: 'HISTORY';
280281
HOURS: 'HOURS';

spark/src/main/resources/error/delta-error-classes.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1963,6 +1963,12 @@
19631963
],
19641964
"sqlState" : "0AKDC"
19651965
},
1966+
"DELTA_OPTIMIZE_FULL_NOT_SUPPORTED" : {
1967+
"message" : [
1968+
"OPTIMIZE FULL is only supported for clustered tables with non-empty clustering columns."
1969+
],
1970+
"sqlState" : "42601"
1971+
},
19661972
"DELTA_OVERWRITE_SCHEMA_WITH_DYNAMIC_PARTITION_OVERWRITE" : {
19671973
"message" : [
19681974
"'overwriteSchema' cannot be used in dynamic partition overwrite mode."

spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,8 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
368368
OptimizeTableCommand(
369369
Option(ctx.path).map(string),
370370
Option(ctx.table).map(visitTableIdentifier),
371-
Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq)(interleaveBy)
371+
Option(ctx.partitionPredicate).map(extractRawText(_)).toSeq,
372+
DeltaOptimizeContext(isFull = ctx.FULL != null))(interleaveBy)
372373
}
373374

374375
/**

spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3456,6 +3456,12 @@ trait DeltaErrorsBase
34563456
messageParameters = Array(s"${zOrderBy.map(_.name).mkString(", ")}"))
34573457
}
34583458

3459+
def optimizeFullNotSupportedException(): Throwable = {
3460+
new DeltaUnsupportedOperationException(
3461+
errorClass = "DELTA_OPTIMIZE_FULL_NOT_SUPPORTED",
3462+
messageParameters = Array.empty)
3463+
}
3464+
34593465
def alterClusterByNotOnDeltaTableException(): Throwable = {
34603466
new DeltaAnalysisException(
34613467
errorClass = "DELTA_ONLY_OPERATION",

spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,10 @@ object OptimizeTableCommand {
128128
/**
129129
* The `optimize` command implementation for Spark SQL. Example SQL:
130130
* {{{
131-
* OPTIMIZE ('/path/to/dir' | delta.table) [WHERE part = 25];
131+
* OPTIMIZE ('/path/to/dir' | delta.table) [WHERE part = 25] [FULL];
132132
* }}}
133+
*
134+
* Note FULL and WHERE clauses are set exclusively.
133135
*/
134136
case class OptimizeTableCommand(
135137
override val child: LogicalPlan,
@@ -151,7 +153,8 @@ case class OptimizeTableCommand(
151153
throw DeltaErrors.notADeltaTableException(table.deltaLog.dataPath.toString)
152154
}
153155

154-
if (ClusteredTableUtils.isSupported(snapshot.protocol)) {
156+
val isClusteredTable = ClusteredTableUtils.isSupported(snapshot.protocol)
157+
if (isClusteredTable) {
155158
if (userPartitionPredicates.nonEmpty) {
156159
throw DeltaErrors.clusteringWithPartitionPredicatesException(userPartitionPredicates)
157160
}
@@ -160,6 +163,11 @@ case class OptimizeTableCommand(
160163
}
161164
}
162165

166+
lazy val clusteringColumns = ClusteringColumnInfo.extractLogicalNames(snapshot)
167+
if (optimizeContext.isFull && (!isClusteredTable || clusteringColumns.isEmpty)) {
168+
throw DeltaErrors.optimizeFullNotSupportedException()
169+
}
170+
163171
val partitionColumns = snapshot.metadata.partitionColumns
164172
// Parse the predicate expression into Catalyst expression and verify only simple filters
165173
// on partition columns are present
@@ -199,12 +207,14 @@ case class OptimizeTableCommand(
199207
* this threshold will be rewritten by the OPTIMIZE command. If not
200208
* specified, [[DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO]]
201209
* will be used. This parameter must be set to `0` when [[reorg]] is set.
210+
* @param isFull whether OPTIMIZE FULL is run. This is only for clustered tables.
202211
*/
203212
case class DeltaOptimizeContext(
204213
reorg: Option[DeltaReorgOperation] = None,
205214
minFileSize: Option[Long] = None,
206215
maxFileSize: Option[Long] = None,
207-
maxDeletedRowsRatio: Option[Double] = None) {
216+
maxDeletedRowsRatio: Option[Double] = None,
217+
isFull: Boolean = false) {
208218
if (reorg.nonEmpty) {
209219
require(
210220
minFileSize.contains(0L) && maxDeletedRowsRatio.contains(0d),

spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableStrategy.scala

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ object OptimizeTableStrategy {
109109
zOrderBy: Seq[String]): OptimizeTableStrategy = getMode(snapshot, zOrderBy) match {
110110
case OptimizeTableMode.CLUSTERING =>
111111
ClusteringStrategy(
112-
sparkSession, ClusteringColumnInfo.extractLogicalNames(snapshot))
112+
sparkSession, ClusteringColumnInfo.extractLogicalNames(snapshot), optimizeContext)
113113
case OptimizeTableMode.ZORDER => ZOrderStrategy(sparkSession, zOrderBy)
114114
case OptimizeTableMode.COMPACTION =>
115115
CompactionStrategy(sparkSession, optimizeContext)
@@ -188,7 +188,8 @@ case class ZOrderStrategy(
188188
/** Implements clustering strategy for clustered tables */
189189
case class ClusteringStrategy(
190190
override val sparkSession: SparkSession,
191-
clusteringColumns: Seq[String]) extends OptimizeTableStrategy {
191+
clusteringColumns: Seq[String],
192+
optimizeContext: DeltaOptimizeContext) extends OptimizeTableStrategy {
192193

193194
override val optimizeTableMode: OptimizeTableMode.Value = OptimizeTableMode.CLUSTERING
194195

@@ -237,39 +238,58 @@ case class ClusteringStrategy(
237238
* clustering. The requirements to pick candidate files are:
238239
*
239240
* 1. Candidate files are either un-clustered (missing clusteringProvider) or the
240-
* clusteringProvider is "liquid".
241-
* 2. Clustered files (clusteringProvider is set) with different clustering columns are skipped.
242-
* When clustering columns are changed, existing clustered data is not re-clustered.
241+
* clusteringProvider is "liquid" when isFull is unset.
242+
* 2. Clustered files with different clustering columns are handled differently based
243+
* on isFull setting: If isFull is unset, existing clustered files with different columns are
244+
* skipped. If isFull is set, all files are considered.
243245
* 3. Files that belong to the partial ZCubes are picked. A ZCube is considered as a partial
244246
* ZCube if its size is smaller than [[DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE]].
245247
* 4. If there is only single ZCUBE with all files are clustered and if all clustered files
246248
* belong to that ZCube, all files are filtered out.
247249
*/
248250
private def applyMinZCube(files: Seq[AddFile]): Seq[AddFile] = {
249251
val targetSize = sparkSession.sessionState.conf.getConf(DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE)
250-
// Skip files with from different clusteringProviders or files clustered by a different set
251-
// of clustering columns.
252-
val inputFiles = files.iterator.filter { file =>
253-
clusteringStatsCollector.inputStats.updateStats(file)
252+
// Keep all files if isFull is set, otherwise skip files with different clusteringProviders
253+
// or files clustered by a different set of clustering columns.
254+
val (candidateFiles, skippedClusteredFiles) = files.iterator.map { f =>
255+
// Note that updateStats is moved out of Iterator.partition lambda since
256+
// scala2.13 doesn't call the lambda in the order of files which violates
257+
// the updateStats' requirement which requires files are ordered in the
258+
// ZCUBE id (files have been ordered before calling applyMinZCube).
259+
clusteringStatsCollector.inputStats.updateStats(f)
260+
f
261+
}.partition { file =>
254262
val sameOrMissingClusteringProvider =
255263
file.clusteringProvider.forall(_ == ClusteredTableUtils.clusteringProvider)
256264

257265
// If clustered before, remove those with different clustering columns.
258266
val zCubeInfo = ZCubeInfo.getForFile(file)
259267
val unmatchedClusteringColumns = zCubeInfo.exists(_.zOrderBy != clusteringColumns)
260268
sameOrMissingClusteringProvider && !unmatchedClusteringColumns
261-
}.map(AddFileWithNumRecords.createFromFile)
269+
}
262270
// Skip files that belong to a ZCUBE that is larger than target ZCUBE size.
263-
val smallZCubeFiles = ZCube.filterOutLargeZCubes(inputFiles, targetSize)
264-
265-
// Skip smallZCubeFiles if they all belong to a single ZCUBE.
266-
ZCube.filterOutSingleZCubes(smallZCubeFiles).map { file =>
267-
clusteringStatsCollector.outputStats.updateStats(file.addFile)
268-
file.addFile
269-
}.toSeq
271+
// Note that ZCube.filterOutLargeZCubes requires clustered files have
272+
// the same clustering columns, so skippedClusteredFiles are not included.
273+
val smallZCubeFiles = ZCube.filterOutLargeZCubes(
274+
candidateFiles.map(AddFileWithNumRecords.createFromFile), targetSize)
275+
276+
if (optimizeContext.isFull && skippedClusteredFiles.nonEmpty) {
277+
// Clustered files with different clustering columns have to be re-clustered.
278+
val finalFiles = (smallZCubeFiles.map(_.addFile) ++ skippedClusteredFiles).toSeq
279+
finalFiles.map { f =>
280+
clusteringStatsCollector.outputStats.updateStats(f)
281+
f
282+
}
283+
} else {
284+
// Skip smallZCubeFiles if they all belong to a single ZCUBE.
285+
ZCube.filterOutSingleZCubes(smallZCubeFiles).map { file =>
286+
clusteringStatsCollector.outputStats.updateStats(file.addFile)
287+
file.addFile
288+
}.toSeq
289+
}
270290
}
271291

272292
/** Metrics for clustering when [[isClusteredTable]] is true. */
273293
private val clusteringStatsCollector: ClusteringStatsCollector =
274-
ClusteringStatsCollector(clusteringColumns)
294+
ClusteringStatsCollector(clusteringColumns, optimizeContext)
275295
}

spark/src/main/scala/org/apache/spark/sql/delta/commands/optimize/ZCubeFileStatsCollector.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@ import org.apache.spark.sql.delta.zorder.ZCubeInfo.ZCubeID
2626
* calling updateStats on every new file seen.
2727
* The number of ZCubes, number of files from matching cubes and number of unoptimized files are
2828
* captured here.
29+
*
30+
* @param zOrderBy zOrder or clustering columns.
31+
* @param isFull whether OPTIMIZE FULL is run. This is only for clustered tables.
2932
*/
30-
class ZCubeFileStatsCollector(zOrderBy: Seq[String]) {
33+
class ZCubeFileStatsCollector(zOrderBy: Seq[String], isFull: Boolean) {
3134

3235
/** map that holds the file statistics Map("element" -> (number of files, total file size)) */
3336
private var processedZCube: ZCubeID = _
@@ -47,7 +50,9 @@ class ZCubeFileStatsCollector(zOrderBy: Seq[String]) {
4750
/** method to update the zCubeFileStats incrementally by file */
4851
def updateStats(file: AddFile): AddFile = {
4952
val zCubeInfo = ZCubeInfo.getForFile(file)
50-
if (zCubeInfo.isDefined && zCubeInfo.get.zOrderBy == zOrderBy) {
53+
// Note that clustered files with different clustering columns are considered candidate
54+
// files when isFull is set.
55+
if (zCubeInfo.isDefined && (isFull || zCubeInfo.get.zOrderBy == zOrderBy)) {
5156
if (processedZCube != zCubeInfo.get.zCubeID) {
5257
processedZCube = zCubeInfo.get.zCubeID
5358
numZCubes += 1

spark/src/main/scala/org/apache/spark/sql/delta/commands/optimize/ZOrderMetrics.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ package org.apache.spark.sql.delta.commands.optimize
2222
class ZOrderMetrics(zOrderBy: Seq[String]) {
2323

2424
var strategyName: String = _
25-
val inputStats = new ZCubeFileStatsCollector(zOrderBy)
26-
val outputStats = new ZCubeFileStatsCollector(zOrderBy)
25+
val inputStats = new ZCubeFileStatsCollector(zOrderBy, isFull = false)
26+
val outputStats = new ZCubeFileStatsCollector(zOrderBy, isFull = false)
2727
var numOutputCubes = 0
2828

2929
def getZOrderStats(): ZOrderStats = {

spark/src/main/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteringStats.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.apache.spark.sql.delta.skipping.clustering
1818

19+
import org.apache.spark.sql.delta.commands.DeltaOptimizeContext
1920
import org.apache.spark.sql.delta.commands.optimize.ZCubeFileStatsCollector
2021

2122
/**
@@ -54,10 +55,10 @@ case class ClusteringStats(
5455
/**
5556
* A class help collecting ClusteringStats.
5657
*/
57-
case class ClusteringStatsCollector(zOrderBy: Seq[String]) {
58+
case class ClusteringStatsCollector(zOrderBy: Seq[String], optimizeContext: DeltaOptimizeContext) {
5859

59-
val inputStats = new ZCubeFileStatsCollector(zOrderBy)
60-
val outputStats = new ZCubeFileStatsCollector(zOrderBy)
60+
val inputStats = new ZCubeFileStatsCollector(zOrderBy, optimizeContext.isFull)
61+
val outputStats = new ZCubeFileStatsCollector(zOrderBy, optimizeContext.isFull)
6162
var numOutputZCubes = 0
6263

6364
def getClusteringStats: ClusteringStats = {

spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterByTransform
2424
import org.apache.spark.sql.delta.CloneTableSQLTestUtils
2525
import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN
2626
import org.apache.spark.sql.delta.{UnresolvedPathBasedDeltaTable, UnresolvedPathBasedTable}
27-
import org.apache.spark.sql.delta.commands.{DescribeDeltaDetailCommand, DescribeDeltaHistory, OptimizeTableCommand, DeltaReorgTable}
27+
import org.apache.spark.sql.delta.commands.{DeltaOptimizeContext, DescribeDeltaDetailCommand, DescribeDeltaHistory, OptimizeTableCommand, DeltaReorgTable}
2828
import org.apache.spark.SparkFunSuite
2929
import org.apache.spark.sql.catalyst.{TableIdentifier, TimeTravel}
3030
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedTable}
@@ -148,6 +148,31 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
148148
assert(parser.parsePlan("OPTIMIZE tbl WHERE part = 1 ZORDER BY (col1, col2.subcol)") ===
149149
OptimizeTableCommand(None, Some(tblId("tbl")), Seq("part = 1"))(
150150
Seq(unresolvedAttr("col1"), unresolvedAttr("col2", "subcol"))))
151+
152+
// Validate OPTIMIZE works correctly with FULL keyword.
153+
parsedCmd = parser.parsePlan("OPTIMIZE tbl FULL")
154+
assert(parsedCmd ===
155+
OptimizeTableCommand(None, Some(tblId("tbl")), Nil, DeltaOptimizeContext(isFull = true))(Nil))
156+
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
157+
UnresolvedTable(Seq("tbl"), "OPTIMIZE"))
158+
159+
parsedCmd = parser.parsePlan("OPTIMIZE catalog_foo.db.tbl FULL")
160+
assert(parsedCmd === OptimizeTableCommand(
161+
None, Some(tblId("tbl", "db", "catalog_foo")), Nil, DeltaOptimizeContext(isFull = true))(Nil))
162+
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
163+
UnresolvedTable(Seq("catalog_foo", "db", "tbl"), "OPTIMIZE"))
164+
165+
parsedCmd = parser.parsePlan("OPTIMIZE '/path/to/tbl' FULL")
166+
assert(parsedCmd === OptimizeTableCommand(
167+
Some("/path/to/tbl"), None, Nil, DeltaOptimizeContext(isFull = true))(Nil))
168+
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
169+
UnresolvedPathBasedDeltaTable("/path/to/tbl", Map.empty, "OPTIMIZE"))
170+
171+
parsedCmd = parser.parsePlan("OPTIMIZE delta.`/path/to/tbl` FULL")
172+
assert(parsedCmd === OptimizeTableCommand(
173+
None, Some(tblId("/path/to/tbl", "delta")), Nil, DeltaOptimizeContext(isFull = true))(Nil))
174+
assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child ===
175+
UnresolvedTable(Seq("delta", "/path/to/tbl"), "OPTIMIZE"))
151176
}
152177

153178
test("OPTIMIZE command new tokens are non-reserved keywords") {
@@ -161,15 +186,18 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
161186
assert(parser.parsePlan("OPTIMIZE zorder") ===
162187
OptimizeTableCommand(None, Some(tblId("zorder")), Nil)(Nil))
163188

189+
assert(parser.parsePlan("OPTIMIZE full") ===
190+
OptimizeTableCommand(None, Some(tblId("full")), Nil)(Nil))
191+
164192
// Use the new keywords in column name
165-
assert(parser.parsePlan("OPTIMIZE tbl WHERE zorder = 1 and optimize = 2") ===
193+
assert(parser.parsePlan("OPTIMIZE tbl WHERE zorder = 1 and optimize = 2 and full = 3") ===
166194
OptimizeTableCommand(None,
167195
Some(tblId("tbl"))
168-
, Seq("zorder = 1 and optimize = 2"))(Nil))
196+
, Seq("zorder = 1 and optimize = 2 and full = 3"))(Nil))
169197

170-
assert(parser.parsePlan("OPTIMIZE tbl ZORDER BY (optimize, zorder)") ===
198+
assert(parser.parsePlan("OPTIMIZE tbl ZORDER BY (optimize, zorder, full)") ===
171199
OptimizeTableCommand(None, Some(tblId("tbl")), Nil)(
172-
Seq(unresolvedAttr("optimize"), unresolvedAttr("zorder"))))
200+
Seq(unresolvedAttr("optimize"), unresolvedAttr("zorder"), unresolvedAttr("full"))))
173201
}
174202

175203
test("DESCRIBE DETAIL command is parsed as expected") {

0 commit comments

Comments
 (0)