Skip to content

Commit 2e090ba

Browse files
maryannxueHyukjinKwon
authored andcommitted
[SPARK-27223][SQL] Remove private methods that skip conversion when passing user schemas for constructing a DataFrame
## What changes were proposed in this pull request? When passing in a user schema to create a DataFrame, there might be mismatched nullability between the user schema and the the actual data. All related public interfaces now perform catalyst conversion using the user provided schema, which catches such mismatches to avoid runtime errors later on. However, there're private methods which allow this conversion to be skipped, so we need to remove these private methods which may lead to confusion and potential issues. ## How was this patch tested? Passed existing tests. No new tests were added since this PR removed the private interfaces that would potentially cause null problems and other interfaces are covered already by existing tests. Closes apache#24162 from maryannxue/spark-27223. Authored-by: maryannxue <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent d6ee2f3 commit 2e090ba

File tree

3 files changed

+8
-32
lines changed

3 files changed

+8
-32
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -324,15 +324,6 @@ class SQLContext private[sql](val sparkSession: SparkSession)
324324
sparkSession.createDataFrame(rowRDD, schema)
325325
}
326326

327-
/**
328-
* Creates a DataFrame from an RDD[Row]. User can specify whether the input rows should be
329-
* converted to Catalyst rows.
330-
*/
331-
private[sql]
332-
def createDataFrame(rowRDD: RDD[Row], schema: StructType, needsConversion: Boolean) = {
333-
sparkSession.createDataFrame(rowRDD, schema, needsConversion)
334-
}
335-
336327
/**
337328
* :: Experimental ::
338329
* Creates a [[Dataset]] from a local Seq of data of a given type. This method requires an

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,11 @@ class SparkSession private(
361361
@DeveloperApi
362362
@Evolving
363363
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
364-
createDataFrame(rowRDD, schema, needsConversion = true)
364+
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
365+
// schema differs from the existing schema on any field data type.
366+
val encoder = RowEncoder(schema)
367+
val catalystRows = rowRDD.map(encoder.toRow)
368+
internalCreateDataFrame(catalystRows.setName(rowRDD.name), schema)
365369
}
366370

367371
/**
@@ -590,25 +594,6 @@ class SparkSession private(
590594
Dataset.ofRows(self, logicalPlan)
591595
}
592596

593-
/**
594-
* Creates a `DataFrame` from an `RDD[Row]`.
595-
* User can specify whether the input rows should be converted to Catalyst rows.
596-
*/
597-
private[sql] def createDataFrame(
598-
rowRDD: RDD[Row],
599-
schema: StructType,
600-
needsConversion: Boolean) = {
601-
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
602-
// schema differs from the existing schema on any field data type.
603-
val catalystRows = if (needsConversion) {
604-
val encoder = RowEncoder(schema)
605-
rowRDD.map(encoder.toRow)
606-
} else {
607-
rowRDD.map { r: Row => InternalRow.fromSeq(r.toSeq) }
608-
}
609-
internalCreateDataFrame(catalystRows.setName(rowRDD.name), schema)
610-
}
611-
612597

613598
/* ------------------------- *
614599
| Catalog-related methods |

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1572,8 +1572,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
15721572
val rdd = sparkContext.makeRDD(Seq(Row(1, 3), Row(2, 1)))
15731573
val df = spark.createDataFrame(
15741574
rdd,
1575-
new StructType().add("f1", IntegerType).add("f2", IntegerType),
1576-
needsConversion = false).select($"F1", $"f2".as("f2"))
1575+
new StructType().add("f1", IntegerType).add("f2", IntegerType))
1576+
.select($"F1", $"f2".as("f2"))
15771577
val df1 = df.as("a")
15781578
val df2 = df.as("b")
15791579
checkAnswer(df1.join(df2, $"a.f2" === $"b.f2"), Row(1, 3, 1, 3) :: Row(2, 1, 2, 1) :: Nil)
@@ -1774,7 +1774,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
17741774
val size = 201L
17751775
val rdd = sparkContext.makeRDD(Seq(Row.fromSeq(Seq.range(0, size))))
17761776
val schemas = List.range(0, size).map(a => StructField("name" + a, LongType, true))
1777-
val df = spark.createDataFrame(rdd, StructType(schemas), false)
1777+
val df = spark.createDataFrame(rdd, StructType(schemas))
17781778
assert(df.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100)
17791779
}
17801780

0 commit comments

Comments
 (0)