Skip to content

Commit 2fa598d

Browse files
authored
[spark] Delete with deletion vectors should enable AQE (#4171)
1 parent 57135be commit 2fa598d

File tree

1 file changed

+10
-13
lines changed

1 file changed

+10
-13
lines changed

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -117,24 +117,21 @@ case class DeleteFromPaimonTableCommand(
117117
}
118118

119119
def performNonPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] = {
120-
val pathFactory = fileStore.pathFactory()
121120
// Step1: the candidate data splits which are filtered by Paimon Predicate.
122121
val candidateDataSplits = findCandidateDataSplits(condition, relation.output)
123122
val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
124123

125124
if (deletionVectorsEnabled) {
126-
withSQLConf("spark.sql.adaptive.enabled" -> "false") {
127-
// Step2: collect all the deletion vectors that marks the deleted rows.
128-
val deletionVectors = collectDeletionVectors(
129-
candidateDataSplits,
130-
dataFilePathToMeta,
131-
condition,
132-
relation,
133-
sparkSession)
134-
135-
// Step3: update the touched deletion vectors and index files
136-
writer.persistDeletionVectors(deletionVectors)
137-
}
125+
// Step2: collect all the deletion vectors that marks the deleted rows.
126+
val deletionVectors = collectDeletionVectors(
127+
candidateDataSplits,
128+
dataFilePathToMeta,
129+
condition,
130+
relation,
131+
sparkSession)
132+
133+
// Step3: update the touched deletion vectors and index files
134+
writer.persistDeletionVectors(deletionVectors)
138135
} else {
139136
// Step2: extract out the exactly files, which must have at least one record to be updated.
140137
val touchedFilePaths =

0 commit comments

Comments
 (0)