diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index 061337b56faa..95be75c19aea 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -399,7 +399,13 @@ case class PaimonSparkWriter(table: FileStoreTable) { } private def repartitionByPartitionsAndBucket(df: DataFrame): DataFrame = { - val partitionCols = tableSchema.partitionKeys().asScala.map(col).toSeq + val inputSchema = df.schema + val partitionCols = tableSchema + .partitionKeys() + .asScala + .map(tableSchema.fieldNames().indexOf(_)) + .map(x => col(inputSchema.fieldNames(x))) + .toSeq df.repartition(partitionCols ++ Seq(col(BUCKET_COL)): _*) } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala index 77c5180e71ea..3a0273d72859 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala @@ -61,6 +61,26 @@ class DataFrameWriteTest extends PaimonSparkTestBase { Assertions.assertFalse(paimonTable.options().containsKey("write.merge-schema.explicit-cast")) } + test("Paimon: DataFrameWrite partition table") { + withTable("t") { + spark.sql(s""" + |CREATE TABLE t (a INT, b STRING, dt STRING) PARTITIONED BY(dt) + |TBLPROPERTIES ('file.format' = 'avro', 'bucket' = 2, 'bucket-key' = 'b') + |""".stripMargin) + + val table = loadTable("t") + val location = table.location().toString + + Seq((1, "x1", "a"), (2, "x2", "b")) + .toDF("a", "b", "c") + .write + .format("paimon") + .mode("append") + .save(location) + checkAnswer(sql("SELECT * FROM t"), Row(1, "x1", "a") :: Row(2, "x2", "b") :: Nil) + } + } + fileFormats.foreach { fileFormat => test(s"Paimon: DataFrameWrite.saveAsTable in ByName mode, file.format: $fileFormat") {