Skip to content

Commit 86ca3c7

Browse files
committed
wip_implement_dynamic_partition_mode
1 parent 2b7802a commit 86ca3c7

File tree

4 files changed

+14
-22
lines changed

4 files changed

+14
-22
lines changed

native/core/src/execution/operators/parquet_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ impl ParquetWriterExec {
8686
compression: CompressionCodec,
8787
partition_id: i32,
8888
column_names: Vec<String>,
89+
partition_columns: Vec<String>
8990
) -> Result<Self> {
9091
// Preserve the input's partitioning so each partition writes its own file
9192
let input_partitioning = input.output_partitioning().clone();

native/core/src/execution/planner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1261,6 +1261,7 @@ impl PhysicalPlanner {
12611261
codec,
12621262
self.partition,
12631263
writer.column_names.clone(),
1264+
writer.partition_columns.clone()
12641265
)?);
12651266

12661267
Ok((

native/proto/src/proto/operator.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ message ParquetWriter {
245245
optional string job_id = 6;
246246
// Task attempt ID for this specific task
247247
optional int32 task_attempt_id = 7;
248+
repeated string partition_columns = 8; // list of partition columns to support dynamic partitioning mode
248249
}
249250

250251
enum AggregateMode {

spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020
package org.apache.comet.serde.operator
2121

2222
import java.util.Locale
23-
2423
import scala.jdk.CollectionConverters._
25-
2624
import org.apache.hadoop.conf.Configuration
2725
import org.apache.hadoop.fs.Path
2826
import org.apache.spark.SparkException
@@ -32,12 +30,12 @@ import org.apache.spark.sql.execution.command.DataWritingCommandExec
3230
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, WriteFilesExec}
3331
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
3432
import org.apache.spark.sql.internal.SQLConf
35-
3633
import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport}
3734
import org.apache.comet.CometSparkSessionExtensions.withInfo
3835
import org.apache.comet.serde.{CometOperatorSerde, Incompatible, OperatorOuterClass, SupportLevel, Unsupported}
3936
import org.apache.comet.serde.OperatorOuterClass.Operator
4037
import org.apache.comet.serde.QueryPlanSerde.serializeDataType
38+
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
4139

4240
/**
4341
* CometOperatorSerde implementation for DataWritingCommandExec that converts Parquet write
@@ -64,7 +62,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
6462
}
6563

6664
if (cmd.partitionColumns.nonEmpty || cmd.staticPartitions.nonEmpty) {
67-
return Unsupported(Some("Partitioned writes are not supported"))
65+
return Incompatible(Some("Partitioned writes are not supported"))
6866
}
6967

7068
if (cmd.query.output.exists(attr => DataTypeSupport.isComplexType(attr.dataType))) {
@@ -135,6 +133,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
135133
.addAllColumnNames(cmd.query.output.map(_.name).asJava)
136134
// Note: work_dir, job_id, and task_attempt_id will be set at execution time
137135
// in CometNativeWriteExec, as they depend on the Spark task context
136+
.addPartitionColumns(cmd.partitionColumns.map(_.toString()).mkString(","))
138137
.build()
139138

140139
val writerOperator = Operator
@@ -159,16 +158,6 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
159158
val cmd = op.cmd.asInstanceOf[InsertIntoHadoopFsRelationCommand]
160159
val outputPath = cmd.outputPath.toString
161160

162-
// SaveMode.Overwrite - delete existing output in the driver itself
163-
if (cmd.mode == SaveMode.Overwrite) {
164-
val outputPathObj = new Path(outputPath)
165-
val fs = outputPathObj.getFileSystem(new Configuration())
166-
167-
if (fs.exists(outputPathObj)) {
168-
fs.delete(outputPathObj, true)
169-
}
170-
}
171-
172161
// Get the child plan from the WriteFilesExec or use the child directly
173162
val childPlan = op.child match {
174163
case writeFiles: WriteFilesExec =>
@@ -188,14 +177,14 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
188177
classOf[org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol]
189178
val constructor =
190179
committerClass.getConstructor(classOf[String], classOf[String], classOf[Boolean])
191-
Some(
192-
constructor
193-
.newInstance(
194-
jobId,
195-
outputPath,
196-
java.lang.Boolean.FALSE // dynamicPartitionOverwrite = false for now
197-
)
198-
.asInstanceOf[org.apache.spark.internal.io.FileCommitProtocol])
180+
181+
val isDynamicOverWriteModeEnabled = cmd.partitionColumns.nonEmpty &&
182+
SQLConf.get.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
183+
184+
Some(
185+
constructor
186+
.newInstance(jobId, outputPath, isDynamicOverWriteModeEnabled)
187+
.asInstanceOf[org.apache.spark.internal.io.FileCommitProtocol])
199188
} catch {
200189
case e: Exception =>
201190
throw new SparkException(s"Could not instantiate FileCommitProtocol: ${e.getMessage}")

0 commit comments

Comments
 (0)