Skip to content

Commit 7d86289

Browse files
authored
[spark] Refactor spark v2 DELETE (#6851)
1 parent fde14ef commit 7d86289

File tree

19 files changed

+406
-518
lines changed

19 files changed

+406
-518
lines changed

docs/layouts/shortcodes/generated/spark_connector_configuration.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@
9696
<td><h5>write.use-v2-write</h5></td>
9797
<td style="word-wrap: break-word;">false</td>
9898
<td>Boolean</td>
99-
<td>If true, v2 write will be used. Currently, only HASH_FIXED and BUCKET_UNAWARE bucket modes are supported. Will fall back to v1 write for other bucket modes. Currently, Spark V2 write does not support TableCapability.STREAMING_WRITE and TableCapability.ACCEPT_ANY_SCHEMA.</td>
99+
<td>If true, v2 write will be used. Currently, only HASH_FIXED and BUCKET_UNAWARE bucket modes are supported. Will fall back to v1 write for other bucket modes. Currently, Spark V2 write does not support TableCapability.STREAMING_WRITE.</td>
100100
</tr>
101101
</tbody>
102102
</table>

paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,12 @@
1818

1919
package org.apache.paimon.spark.sql
2020

21+
import org.apache.spark.SparkConf
22+
2123
class DeleteFromTableTest extends DeleteFromTableTestBase {}
24+
25+
class V2DeleteFromTableTest extends DeleteFromTableTestBase {
26+
override protected def sparkConf: SparkConf = {
27+
super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
28+
}
29+
}

paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/V2DeleteFromTableTest.scala

Lines changed: 0 additions & 28 deletions
This file was deleted.

paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,12 @@
1818

1919
package org.apache.paimon.spark.sql
2020

21+
import org.apache.spark.SparkConf
22+
2123
class DeleteFromTableTest extends DeleteFromTableTestBase {}
24+
25+
class V2DeleteFromTableTest extends DeleteFromTableTestBase {
26+
override protected def sparkConf: SparkConf = {
27+
super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
28+
}
29+
}

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class SparkConnectorOptions {
5151
.booleanType()
5252
.defaultValue(false)
5353
.withDescription(
54-
"If true, v2 write will be used. Currently, only HASH_FIXED and BUCKET_UNAWARE bucket modes are supported. Will fall back to v1 write for other bucket modes. Currently, Spark V2 write does not support TableCapability.STREAMING_WRITE and TableCapability.ACCEPT_ANY_SCHEMA.");
54+
"If true, v2 write will be used. Currently, only HASH_FIXED and BUCKET_UNAWARE bucket modes are supported. Will fall back to v1 write for other bucket modes. Currently, Spark V2 write does not support TableCapability.STREAMING_WRITE.");
5555

5656
public static final ConfigOption<Integer> MAX_FILES_PER_TRIGGER =
5757
key("read.stream.maxFilesPerTrigger")

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala

Lines changed: 0 additions & 105 deletions
This file was deleted.

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkRowLevelOperationBuilder.scala

Lines changed: 0 additions & 31 deletions
This file was deleted.

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818

1919
package org.apache.paimon.spark
2020

21-
import org.apache.paimon.table.Table
21+
import org.apache.paimon.spark.rowops.PaimonSparkCopyOnWriteOperation
22+
import org.apache.paimon.table.{FileStoreTable, Table}
2223

2324
import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
2425
import org.apache.spark.sql.connector.write.{RowLevelOperationBuilder, RowLevelOperationInfo}
@@ -27,9 +28,17 @@ import org.apache.spark.sql.connector.write.{RowLevelOperationBuilder, RowLevelO
2728
case class SparkTable(override val table: Table)
2829
extends PaimonSparkTableBase(table)
2930
with SupportsRowLevelOperations {
31+
3032
override def newRowLevelOperationBuilder(
31-
rowLevelOperationInfo: RowLevelOperationInfo): RowLevelOperationBuilder = {
32-
new PaimonSparkRowLevelOperationBuilder(table, rowLevelOperationInfo)
33+
info: RowLevelOperationInfo): RowLevelOperationBuilder = {
34+
table match {
35+
case t: FileStoreTable if useV2Write =>
36+
() => new PaimonSparkCopyOnWriteOperation(t, info)
37+
case _ =>
38+
throw new UnsupportedOperationException(
39+
s"Write operation is only supported for FileStoreTable with V2 write enabled. " +
40+
s"Actual table type: ${table.getClass.getSimpleName}, useV2Write: $useV2Write")
41+
}
3342
}
3443
}
3544

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,49 +22,33 @@ import org.apache.paimon.spark.SparkTable
2222
import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
2323
import org.apache.paimon.table.FileStoreTable
2424

25+
import org.apache.spark.sql.catalyst.expressions.Expression
26+
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
2527
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan}
2628
import org.apache.spark.sql.catalyst.rules.Rule
2729

28-
import scala.collection.JavaConverters._
29-
3030
object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
3131

32-
/**
33-
* Determines if DataSourceV2 delete is not supported for the given table. DataSourceV2 delete is
34-
* not supported in the following scenarios:
35-
* - Spark version is 3.4 or earlier
36-
* - Table does not use V2 write
37-
* - Row tracking is enabled
38-
* - Deletion vectors are enabled
39-
* - Table has primary keys defined
40-
* - Table is not a FileStoreTable
41-
* - Data evolution is enabled
42-
*/
43-
private def shouldFallbackToV1Delete(table: SparkTable): Boolean = {
32+
/** Determines if DataSourceV2 delete is not supported for the given table. */
33+
private def shouldFallbackToV1Delete(table: SparkTable, condition: Expression): Boolean = {
4434
val baseTable = table.getTable
45-
46-
val baseConditions = org.apache.spark.SPARK_VERSION <= "3.4" ||
47-
!table.useV2Write ||
48-
table.coreOptions.rowTrackingEnabled() ||
49-
table.coreOptions.deletionVectorsEnabled() ||
50-
!baseTable.primaryKeys().isEmpty
51-
52-
baseConditions || {
53-
baseTable match {
54-
case paimonTable: FileStoreTable =>
55-
paimonTable.coreOptions().dataEvolutionEnabled()
56-
case _ =>
57-
true
58-
}
59-
}
35+
org.apache.spark.SPARK_VERSION < "3.5" ||
36+
!baseTable.isInstanceOf[FileStoreTable] ||
37+
!baseTable.primaryKeys().isEmpty ||
38+
!table.useV2Write ||
39+
table.coreOptions.deletionVectorsEnabled() ||
40+
table.coreOptions.rowTrackingEnabled() ||
41+
table.coreOptions.dataEvolutionEnabled() ||
42+
// todo: Optimize v2 delete when conditions are all partition filters
43+
condition == null || condition == TrueLiteral
6044
}
6145

6246
override val operation: RowLevelOp = Delete
6347

6448
override def apply(plan: LogicalPlan): LogicalPlan = {
6549
plan.resolveOperators {
6650
case d @ DeleteFromTable(PaimonRelation(table), condition)
67-
if d.resolved && shouldFallbackToV1Delete(table) =>
51+
if d.resolved && shouldFallbackToV1Delete(table, condition) =>
6852
checkPaimonTable(table.getTable)
6953

7054
table.getTable match {

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -92,28 +92,26 @@ case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema: Struc
9292

9393
private case class FormatTableBatchWrite(
9494
table: FormatTable,
95-
overwriteDynamic: Boolean,
95+
overwriteDynamic: Option[Boolean],
9696
overwritePartitions: Option[Map[String, String]],
9797
writeSchema: StructType)
9898
extends BatchWrite
9999
with Logging {
100100

101-
assert(
102-
!(overwriteDynamic && overwritePartitions.exists(_.nonEmpty)),
103-
"Cannot overwrite dynamically and by filter both")
104-
105101
private val batchWriteBuilder = {
106102
val builder = table.newBatchWriteBuilder()
107-
if (overwriteDynamic) {
103+
// todo: add test for static overwrite the whole table
104+
if (overwriteDynamic.contains(true)) {
108105
builder.withOverwrite()
109106
} else {
110107
overwritePartitions.foreach(partitions => builder.withOverwrite(partitions.asJava))
111108
}
112109
builder
113110
}
114111

115-
override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory =
116-
FormatTableWriterFactory(batchWriteBuilder, writeSchema)
112+
override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = {
113+
(_: Int, _: Long) => new FormatTableDataWriter(batchWriteBuilder, writeSchema)
114+
}
117115

118116
override def useCommitCoordinator(): Boolean = false
119117

@@ -140,16 +138,6 @@ private case class FormatTableBatchWrite(
140138
}
141139
}
142140

143-
private case class FormatTableWriterFactory(
144-
batchWriteBuilder: BatchWriteBuilder,
145-
writeSchema: StructType)
146-
extends DataWriterFactory {
147-
148-
override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = {
149-
new FormatTableDataWriter(batchWriteBuilder, writeSchema)
150-
}
151-
}
152-
153141
private class FormatTableDataWriter(batchWriteBuilder: BatchWriteBuilder, writeSchema: StructType)
154142
extends V2DataWrite
155143
with Logging {

0 commit comments

Comments
 (0)