Skip to content

Commit a78a904

Browse files
icexellossgatorsmile
authored andcommitted
[SPARK-24521][SQL][TEST] Fix ineffective test in CachedTableSuite
## What changes were proposed in this pull request? test("withColumn doesn't invalidate cached dataframe") in CachedTableSuite doesn't not work because: The UDF is executed and test count incremented when "df.cache()" is called and the subsequent "df.collect()" has no effect on the test result. This PR fixed this test and add another test for caching UDF. ## How was this patch tested? Add new tests. Author: Li Jin <[email protected]> Closes apache#21531 from icexelloss/fix-cache-test.
1 parent 9a75c18 commit a78a904

File tree

2 files changed

+37
-20
lines changed

2 files changed

+37
-20
lines changed

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,25 +83,6 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
8383
}.sum
8484
}
8585

86-
test("withColumn doesn't invalidate cached dataframe") {
87-
var evalCount = 0
88-
val myUDF = udf((x: String) => { evalCount += 1; "result" })
89-
val df = Seq(("test", 1)).toDF("s", "i").select(myUDF($"s"))
90-
df.cache()
91-
92-
df.collect()
93-
assert(evalCount === 1)
94-
95-
df.collect()
96-
assert(evalCount === 1)
97-
98-
val df2 = df.withColumn("newColumn", lit(1))
99-
df2.collect()
100-
101-
// We should not reevaluate the cached dataframe
102-
assert(evalCount === 1)
103-
}
104-
10586
test("cache temp table") {
10687
withTempView("tempTable") {
10788
testData.select('key).createOrReplaceTempView("tempTable")

sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717

1818
package org.apache.spark.sql
1919

20+
import org.scalatest.concurrent.TimeLimits
21+
import org.scalatest.time.SpanSugar._
22+
2023
import org.apache.spark.sql.functions._
2124
import org.apache.spark.sql.test.SharedSQLContext
2225
import org.apache.spark.storage.StorageLevel
2326

2427

25-
class DatasetCacheSuite extends QueryTest with SharedSQLContext {
28+
class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits {
2629
import testImplicits._
2730

2831
test("get storage level") {
@@ -96,4 +99,37 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
9699
agged.unpersist()
97100
assert(agged.storageLevel == StorageLevel.NONE, "The Dataset agged should not be cached.")
98101
}
102+
103+
test("persist and then withColumn") {
104+
val df = Seq(("test", 1)).toDF("s", "i")
105+
val df2 = df.withColumn("newColumn", lit(1))
106+
107+
df.cache()
108+
assertCached(df)
109+
assertCached(df2)
110+
111+
df.count()
112+
assertCached(df2)
113+
114+
df.unpersist()
115+
assert(df.storageLevel == StorageLevel.NONE)
116+
}
117+
118+
test("cache UDF result correctly") {
119+
val expensiveUDF = udf({x: Int => Thread.sleep(10000); x})
120+
val df = spark.range(0, 10).toDF("a").withColumn("b", expensiveUDF($"a"))
121+
val df2 = df.agg(sum(df("b")))
122+
123+
df.cache()
124+
df.count()
125+
assertCached(df2)
126+
127+
// udf has been evaluated during caching, and thus should not be re-evaluated here
128+
failAfter(5 seconds) {
129+
df2.collect()
130+
}
131+
132+
df.unpersist()
133+
assert(df.storageLevel == StorageLevel.NONE)
134+
}
99135
}

0 commit comments

Comments
 (0)