Skip to content

Commit 39b502a

Browse files
SpaceRangerWesdongjoon-hyun
authored andcommitted
[SPARK-29778][SQL] pass writer options to saveAsTable in append mode
### What changes were proposed in this pull request? `saveAsTable` had an oversight where write options were not considered in the append save mode. ### Why are the changes needed? Address the bug so that write options can be considered during appends. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit test added that looks in the logic plan of `AppendData` for the existing write options. Closes apache#26474 from SpaceRangerWes/master. Authored-by: Wesley Hoffman <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 833a9f1 commit 39b502a

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
527527
return saveAsTable(TableIdentifier(ident.name(), ident.namespace().headOption))
528528

529529
case (SaveMode.Append, Some(table)) =>
530-
AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan)
530+
AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan, extraOptions.toMap)
531531

532532
case (SaveMode.Overwrite, _) =>
533533
ReplaceTableAsSelect(

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ package org.apache.spark.sql.connector
1919

2020
import org.apache.spark.sql.{DataFrame, Row, SaveMode}
2121
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
22+
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan}
23+
import org.apache.spark.sql.execution.QueryExecution
24+
import org.apache.spark.sql.util.QueryExecutionListener
2225

2326
class DataSourceV2DataFrameSuite
2427
extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = false) {
@@ -125,4 +128,39 @@ class DataSourceV2DataFrameSuite
125128
checkAnswer(spark.table(t1), Seq(Row("c", "d")))
126129
}
127130
}
131+
132+
testQuietly("SPARK-29778: saveAsTable: append mode takes write options") {
133+
134+
var plan: LogicalPlan = null
135+
val listener = new QueryExecutionListener {
136+
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
137+
plan = qe.analyzed
138+
}
139+
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {}
140+
}
141+
142+
try {
143+
spark.listenerManager.register(listener)
144+
145+
val t1 = "testcat.ns1.ns2.tbl"
146+
147+
sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo")
148+
149+
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
150+
df.write.option("other", "20").mode("append").saveAsTable(t1)
151+
152+
sparkContext.listenerBus.waitUntilEmpty()
153+
plan match {
154+
case p: AppendData =>
155+
assert(p.writeOptions == Map("other" -> "20"))
156+
case other =>
157+
fail(s"Expected to parse ${classOf[AppendData].getName} from query," +
158+
s"got ${other.getClass.getName}: $plan")
159+
}
160+
161+
checkAnswer(spark.table(t1), df)
162+
} finally {
163+
spark.listenerManager.unregister(listener)
164+
}
165+
}
128166
}

0 commit comments

Comments
 (0)