Skip to content

Commit 52fb182

Browse files
HyukjinKwonBen Duffield
authored andcommitted
[SPARK-25700][SQL] Creates ReadSupport in only Append Mode in Data Source V2 write path
## What changes were proposed in this pull request? This PR proposes to avoid to make a readsupport and read schema when it writes in other save modes. apache@5fef6e3 happened to create a readsupport in write path, which ended up with reading schema from readsupport at write path. This breaks `spark.range(1).format("source").write.save("non-existent-path")` case since there's no way to read the schema from "non-existent-path". See also apache#22009 (comment) See also apache#22697 See also http://apache-spark-developers-list.1001551.n3.nabble.com/Possible-bug-in-DatasourceV2-td25343.html ## How was this patch tested? Unit test and manual tests. Closes apache#22688 from HyukjinKwon/append-revert-2. Authored-by: hyukjinkwon <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 8882973 commit 52fb182

File tree

3 files changed

+32
-4
lines changed

3 files changed

+32
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
246246
df.sparkSession.sessionState.conf)
247247
val options = sessionOptions ++ extraOptions
248248

249-
val relation = DataSourceV2Relation.create(source, options)
250249
if (mode == SaveMode.Append) {
250+
val relation = DataSourceV2Relation.create(source, options)
251251
runCommand(df.sparkSession, "save") {
252252
AppendData.byName(relation, df.logicalPlan)
253253
}

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,24 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
351351
}
352352
}
353353
}
354+
355+
test("SPARK-25700: do not read schema when writing in other modes except append mode") {
356+
withTempPath { file =>
357+
val cls = classOf[SimpleWriteOnlyDataSource]
358+
val path = file.getCanonicalPath
359+
val df = spark.range(5).select('id as 'i, -'id as 'j)
360+
try {
361+
df.write.format(cls.getName).option("path", path).mode("error").save()
362+
df.write.format(cls.getName).option("path", path).mode("overwrite").save()
363+
df.write.format(cls.getName).option("path", path).mode("ignore").save()
364+
} catch {
365+
case e: SchemaReadAttemptException => fail("Schema read was attempted.", e)
366+
}
367+
intercept[SchemaReadAttemptException] {
368+
df.write.format(cls.getName).option("path", path).mode("append").save()
369+
}
370+
}
371+
}
354372
}
355373

356374

@@ -640,3 +658,14 @@ object SpecificReaderFactory extends PartitionReaderFactory {
640658
}
641659
}
642660
}
661+
662+
class SchemaReadAttemptException(m: String) extends RuntimeException(m)
663+
664+
class SimpleWriteOnlyDataSource extends SimpleWritableDataSource {
665+
override def fullSchema(): StructType = {
666+
// This is a bit hacky since this source implements read support but throws
667+
// during schema retrieval. Might have to rewrite but it's done
668+
// such so for minimised changes.
669+
throw new SchemaReadAttemptException("read is not supported")
670+
}
671+
}

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ class SimpleWritableDataSource extends DataSourceV2
4343
with BatchWriteSupportProvider
4444
with SessionConfigSupport {
4545

46-
private val schema = new StructType().add("i", "long").add("j", "long")
46+
protected def fullSchema(): StructType = new StructType().add("i", "long").add("j", "long")
4747

4848
override def keyPrefix: String = "simpleWritableDataSource"
4949

5050
class ReadSupport(path: String, conf: Configuration) extends SimpleReadSupport {
5151

52-
override def fullSchema(): StructType = schema
52+
override def fullSchema(): StructType = SimpleWritableDataSource.this.fullSchema()
5353

5454
override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
5555
val dataPath = new Path(path)
@@ -116,7 +116,6 @@ class SimpleWritableDataSource extends DataSourceV2
116116
schema: StructType,
117117
mode: SaveMode,
118118
options: DataSourceOptions): Optional[BatchWriteSupport] = {
119-
assert(DataType.equalsStructurally(schema.asNullable, this.schema.asNullable))
120119
assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", false))
121120

122121
val path = new Path(options.get("path").get())

0 commit comments

Comments
 (0)