Skip to content

Commit 4589172

Browse files
authored
feat: add complex type support to native Parquet writer (#3214)
1 parent 68f127b commit 4589172

File tree

4 files changed

+402
-95
lines changed

4 files changed

+402
-95
lines changed

benchmarks/pyspark/run_all_benchmarks.sh

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ set -e
2525

2626
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
2727
DATA_PATH="${1:-/tmp/shuffle-benchmark-data}"
28-
COMET_JAR="${COMET_JAR:-$SCRIPT_DIR/../spark/target/comet-spark-spark3.5_2.12-0.13.0-SNAPSHOT.jar}"
28+
COMET_JAR="${COMET_JAR:-$SCRIPT_DIR/../../spark/target/comet-spark-spark3.5_2.12-0.13.0-SNAPSHOT.jar}"
2929
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
3030
EXECUTOR_MEMORY="${EXECUTOR_MEMORY:-16g}"
3131
EVENT_LOG_DIR="${EVENT_LOG_DIR:-/tmp/spark-events}"
@@ -71,9 +71,10 @@ $SPARK_HOME/bin/spark-submit \
7171
--conf spark.memory.offHeap.enabled=true \
7272
--conf spark.memory.offHeap.size=16g \
7373
--conf spark.comet.enabled=true \
74-
--conf spark.comet.exec.enabled=true \
75-
--conf spark.comet.exec.all.enabled=true \
76-
--conf spark.comet.exec.shuffle.enabled=true \
74+
--conf spark.comet.operator.DataWritingCommandExec.allowIncompatible=true \
75+
--conf spark.comet.parquet.write.enabled=true \
76+
--conf spark.comet.logFallbackReasons.enabled=true \
77+
--conf spark.comet.explainFallback.enabled=true \
7778
--conf spark.comet.shuffle.mode=jvm \
7879
--conf spark.comet.exec.shuffle.mode=jvm \
7980
--conf spark.comet.exec.replaceSortMergeJoin=true \
@@ -98,9 +99,10 @@ $SPARK_HOME/bin/spark-submit \
9899
--conf spark.memory.offHeap.enabled=true \
99100
--conf spark.memory.offHeap.size=16g \
100101
--conf spark.comet.enabled=true \
101-
--conf spark.comet.exec.enabled=true \
102-
--conf spark.comet.exec.all.enabled=true \
103-
--conf spark.comet.exec.shuffle.enabled=true \
102+
--conf spark.comet.operator.DataWritingCommandExec.allowIncompatible=true \
103+
--conf spark.comet.parquet.write.enabled=true \
104+
--conf spark.comet.logFallbackReasons.enabled=true \
105+
--conf spark.comet.explainFallback.enabled=true \
104106
--conf spark.comet.exec.shuffle.mode=native \
105107
--conf spark.comet.exec.replaceSortMergeJoin=true \
106108
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \

native/core/src/execution/operators/parquet_writer.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -535,8 +535,12 @@ impl ExecutionPlan for ParquetWriterExec {
535535
DataFusionError::Execution(format!("Failed to close writer: {}", e))
536536
})?;
537537

538-
// Get file size
539-
let file_size = std::fs::metadata(&part_file)
538+
// Get file size - strip file:// prefix if present for local filesystem access
539+
let local_path = part_file
540+
.strip_prefix("file://")
541+
.or_else(|| part_file.strip_prefix("file:"))
542+
.unwrap_or(&part_file);
543+
let file_size = std::fs::metadata(local_path)
540544
.map(|m| m.len() as i64)
541545
.unwrap_or(0);
542546

spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCom
3131
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
3232
import org.apache.spark.sql.internal.SQLConf
3333

34-
import org.apache.comet.{CometConf, ConfigEntry, DataTypeSupport}
34+
import org.apache.comet.{CometConf, ConfigEntry}
3535
import org.apache.comet.CometSparkSessionExtensions.withInfo
3636
import org.apache.comet.objectstore.NativeConfig
3737
import org.apache.comet.serde.{CometOperatorSerde, Incompatible, OperatorOuterClass, SupportLevel, Unsupported}
@@ -67,10 +67,6 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec
6767
return Unsupported(Some("Partitioned writes are not supported"))
6868
}
6969

70-
if (cmd.query.output.exists(attr => DataTypeSupport.isComplexType(attr.dataType))) {
71-
return Unsupported(Some("Complex types are not supported"))
72-
}
73-
7470
val codec = parseCompressionCodec(cmd)
7571
if (!supportedCompressionCodes.contains(codec)) {
7672
return Unsupported(Some(s"Unsupported compression codec: $codec"))

0 commit comments

Comments
 (0)