Skip to content

Commit 8b98c58

Browse files
authored
Merge pull request apache-spark-on-k8s#426 from palantir/bd/SPARK-25700
[SPARK-25700][SQL] Creates ReadSupport in only Append Mode in Data So…
2 parents 8882973 + 52fb182 commit 8b98c58

File tree

3 files changed

+32
-4
lines changed

3 files changed

+32
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
246246
df.sparkSession.sessionState.conf)
247247
val options = sessionOptions ++ extraOptions
248248

249-
val relation = DataSourceV2Relation.create(source, options)
250249
if (mode == SaveMode.Append) {
250+
val relation = DataSourceV2Relation.create(source, options)
251251
runCommand(df.sparkSession, "save") {
252252
AppendData.byName(relation, df.logicalPlan)
253253
}

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,24 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
351351
}
352352
}
353353
}
354+
355+
test("SPARK-25700: do not read schema when writing in other modes except append mode") {
356+
withTempPath { file =>
357+
val cls = classOf[SimpleWriteOnlyDataSource]
358+
val path = file.getCanonicalPath
359+
val df = spark.range(5).select('id as 'i, -'id as 'j)
360+
try {
361+
df.write.format(cls.getName).option("path", path).mode("error").save()
362+
df.write.format(cls.getName).option("path", path).mode("overwrite").save()
363+
df.write.format(cls.getName).option("path", path).mode("ignore").save()
364+
} catch {
365+
case e: SchemaReadAttemptException => fail("Schema read was attempted.", e)
366+
}
367+
intercept[SchemaReadAttemptException] {
368+
df.write.format(cls.getName).option("path", path).mode("append").save()
369+
}
370+
}
371+
}
354372
}
355373

356374

@@ -640,3 +658,14 @@ object SpecificReaderFactory extends PartitionReaderFactory {
640658
}
641659
}
642660
}
661+
662+
class SchemaReadAttemptException(m: String) extends RuntimeException(m)
663+
664+
class SimpleWriteOnlyDataSource extends SimpleWritableDataSource {
665+
override def fullSchema(): StructType = {
666+
// This is a bit hacky since this source implements read support but throws
667+
// during schema retrieval. Might have to rewrite but it's done
668+
// such so for minimised changes.
669+
throw new SchemaReadAttemptException("read is not supported")
670+
}
671+
}

sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ class SimpleWritableDataSource extends DataSourceV2
4343
with BatchWriteSupportProvider
4444
with SessionConfigSupport {
4545

46-
private val schema = new StructType().add("i", "long").add("j", "long")
46+
protected def fullSchema(): StructType = new StructType().add("i", "long").add("j", "long")
4747

4848
override def keyPrefix: String = "simpleWritableDataSource"
4949

5050
class ReadSupport(path: String, conf: Configuration) extends SimpleReadSupport {
5151

52-
override def fullSchema(): StructType = schema
52+
override def fullSchema(): StructType = SimpleWritableDataSource.this.fullSchema()
5353

5454
override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
5555
val dataPath = new Path(path)
@@ -116,7 +116,6 @@ class SimpleWritableDataSource extends DataSourceV2
116116
schema: StructType,
117117
mode: SaveMode,
118118
options: DataSourceOptions): Optional[BatchWriteSupport] = {
119-
assert(DataType.equalsStructurally(schema.asNullable, this.schema.asNullable))
120119
assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", false))
121120

122121
val path = new Path(options.get("path").get())

0 commit comments

Comments
 (0)