Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ message ParquetWriter {
optional string job_id = 6;
// Task attempt ID for this specific task
optional int32 task_attempt_id = 7;
// set of partition columns
repeated string partition_columns = 8;
}

enum AggregateMode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, WriteFilesExec}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode

import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport}
import org.apache.comet.CometSparkSessionExtensions.withInfo
Expand Down Expand Up @@ -62,7 +63,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
}

if (cmd.partitionColumns.nonEmpty || cmd.staticPartitions.nonEmpty) {
return Unsupported(Some("Partitioned writes are not supported"))
return Incompatible(Some("Partitioned writes are not supported"))
}

if (cmd.query.output.exists(attr => DataTypeSupport.isComplexType(attr.dataType))) {
Expand Down Expand Up @@ -167,6 +168,9 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
other
}

val isDynamicOverWriteMode = cmd.partitionColumns.nonEmpty &&
SQLConf.get.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC

// Create FileCommitProtocol for atomic writes
val jobId = java.util.UUID.randomUUID().toString
val committer =
Expand All @@ -178,11 +182,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
committerClass.getConstructor(classOf[String], classOf[String], classOf[Boolean])
Some(
constructor
.newInstance(
jobId,
outputPath,
java.lang.Boolean.FALSE // dynamicPartitionOverwrite = false for now
)
.newInstance(jobId, outputPath, isDynamicOverWriteMode)
.asInstanceOf[org.apache.spark.internal.io.FileCommitProtocol])
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,32 @@ class CometParquetWriterSuite extends CometTestBase {
}
}
}

test("parquet write with mode overwrite") {
withTempPath { dir =>
val outputPath = new File(dir, "output.parquet").getAbsolutePath

withTempPath { inputDir =>
val inputPath = createTestData(inputDir)

withSQLConf(
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax",
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true") {

val df = spark.read.parquet(inputPath)

// First write
df.repartition(2).write.parquet(outputPath)
// verifyWrittenFile(outputPath)
// Second write (with overwrite mode and a different record count to make sure we are not reading the same data)
df.limit(500).repartition(2).write.mode("overwrite").parquet(outputPath)
// // Verify the data was written
val resultDf = spark.read.parquet(outputPath)
assert(resultDf.count() == 500, "Expected 1000 rows after overwrite")
}
}
}
}
}