Skip to content

Commit e915312

Browse files
dillitzzhengruifeng
authored andcommitted
[SPARK-53192][CONNECT] Always cache a DataSource in the Spark Connect Plan Cache
### What changes were proposed in this pull request? I believe we can dramatically improve the performance of the `SparkConnectPlanner` for plans using the same `Read.DataSource` (`spark.read`) multiple times (within the same session) by actively caching them in the [Spark Connect Plan Cache](a1fc6d5). At the moment, every occurrence of a `Read.DataSource` issues a separate analysis of the `DataSource`, which leads to us kicking off a new Spark Job per analysis, if no explicit schema is provided. This leads to very slow plan translation, because we need to fetch the (meta)data every time. For example, the following code, unionizing the same CSV file N times, kicks off N+1 Spark Jobs for the analysis of the final DataFrame in Spark Connect (compared to exactly 1 for Spark Classic): ```scala val df = spark.read.csv("abc.csv") (0 until N).foldLeft(df)((a, _) => a.union(df)).schema ``` I propose to adjust the Spark Connect Plan Cache to always cache a `Read.DataSource`, even when it is not the root of a relation. This always reduces the required Spark Jobs for analysis to at most 1 per **unique** `DataSource`. This has the same effect as when one explicitly analyzes the base DataSource today to populate the Spark Connect Plan Cache with its base plan, greatly improving the performance for subsequent queries using this `DataSource`: ```scala val df = spark.read.csv("abc.csv") df.schema (0 until N).foldLeft(df)((a, _) => a.union(df)).schema ``` To compensate for the increase in cached plans, I increased the Plan Cache size from 16 to 32. The DataSource plans are always leaf nodes, so I do not think they will add much memory pressure - this increases the cache size for all plans, though, so it is up for debate if we really want to do this. I also added a flag that lets one turn off this aggressive caching: `spark.connect.session.planCache.alwaysCacheDataSourceReadsEnabled`. Side note: This fix works so well because the `SparkConnectPlanner` [actively analyses](https://github.com/apache/spark/blob/8b80ea04d0b14d19b819cd4648b5ddd3e1c42650/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L1518) the `LogicalPlan` when translating the `Read.DataSource`. I am wondering whether this makes sense, conceptually, and why we chose to use `queryExecution.analyzed` instead of `queryExecution.logical` here. ### Why are the changes needed? Translating `DataSource`s today using the `SparkConnectPlanner` is very expensive/ineffective. There are a ton of Spark Connect Plan Cache misses because we only cache top-level plans. The improvement greatly improves the performance of Spark Connect plan translation involving `Read.DataSource`s, which are very common. ### Does this PR introduce _any_ user-facing change? We now always cache the analyzed `Read.DataSource` in the Spark Connect Plan Cache instead of only a top-level plan containing it. This is equivalent to accessing the `DataSource`'s schema after its creation, so I would argue it makes the whole caching experience only more consistent, while greatly improving the performance. ### How was this patch tested? Added a test case to `SparkConnectSessionHolderSuite` that checks the improved caching and also verifies that it can be turned off with the newly added flag. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51921 from dillitz/datasource-caching. Authored-by: Robert Dillitz <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
1 parent 6b71b22 commit e915312

File tree

3 files changed

+68
-12
lines changed

3 files changed

+68
-12
lines changed

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ object Connect {
304304
.version("4.0.0")
305305
.internal()
306306
.intConf
307-
.createWithDefault(16)
307+
.createWithDefault(32)
308308

309309
val CONNECT_SESSION_PLAN_CACHE_ENABLED =
310310
buildConf("spark.connect.session.planCache.enabled")
@@ -317,6 +317,17 @@ object Connect {
317317
.booleanConf
318318
.createWithDefault(true)
319319

320+
val CONNECT_ALWAYS_CACHE_DATA_SOURCE_READS_ENABLED =
321+
buildConf("spark.connect.session.planCache.alwaysCacheDataSourceReadsEnabled")
322+
.doc("When true, always cache the translation of Read.DataSource plans" +
323+
" in the plan cache. This massively improves the performance of queries that reuse the" +
324+
" same Read.DataSource within the same session, since these translations/analyses" +
325+
" are usually quite costly.")
326+
.version("4.1.0")
327+
.internal()
328+
.booleanConf
329+
.createWithDefault(true)
330+
320331
val CONNECT_AUTHENTICATE_TOKEN =
321332
buildStaticConf("spark.connect.authenticate.token")
322333
.doc("A pre-shared token that will be used to authenticate clients. This secret must be" +

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,11 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
521521
// We only cache plans that have a plan ID.
522522
val hasPlanId = rel.hasCommon && rel.getCommon.hasPlanId
523523

524+
// Always cache a `Read.DataSource` to avoid re-analyzing the same `DataSource` twice.
525+
lazy val alwaysCacheDataSourceReadsEnabled = Option(session)
526+
.forall(_.conf.get(Connect.CONNECT_ALWAYS_CACHE_DATA_SOURCE_READS_ENABLED, true))
527+
lazy val isDataSourceRead = rel.hasRead && rel.getRead.hasDataSource
528+
524529
def getPlanCache(rel: proto.Relation): Option[LogicalPlan] =
525530
planCache match {
526531
case Some(cache) if planCacheEnabled && hasPlanId =>
@@ -542,7 +547,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
542547
getPlanCache(rel)
543548
.getOrElse({
544549
val plan = transform(rel)
545-
if (cachePlan) {
550+
if (cachePlan || (alwaysCacheDataSourceReadsEnabled && isDataSourceRead)) {
546551
putPlanCache(rel, plan)
547552
}
548553
plan

sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -401,19 +401,23 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
401401
test("Test session plan cache - disabled") {
402402
val sessionHolder = SparkConnectTestUtils.createDummySessionHolder(spark)
403403
// Disable plan cache of the session
404-
sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED.key, false)
405-
val planner = new SparkConnectPlanner(sessionHolder)
404+
try {
405+
sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED.key, false)
406+
val planner = new SparkConnectPlanner(sessionHolder)
406407

407-
val query = buildRelation("select 1")
408+
val query = buildRelation("select 1")
408409

409-
// If cachePlan is false, the cache is still empty.
410-
// Although the cache is created as cache size is greater than zero, it won't be used.
411-
planner.transformRelation(query, cachePlan = false)
412-
assertPlanCache(sessionHolder, Some(Set()))
410+
// If cachePlan is false, the cache is still empty.
411+
// Although the cache is created as cache size is greater than zero, it won't be used.
412+
planner.transformRelation(query, cachePlan = false)
413+
assertPlanCache(sessionHolder, Some(Set()))
413414

414-
// Even if we specify "cachePlan = true", the cache is still empty.
415-
planner.transformRelation(query, cachePlan = true)
416-
assertPlanCache(sessionHolder, Some(Set()))
415+
// Even if we specify "cachePlan = true", the cache is still empty.
416+
planner.transformRelation(query, cachePlan = true)
417+
assertPlanCache(sessionHolder, Some(Set()))
418+
} finally {
419+
sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true)
420+
}
417421
}
418422

419423
test("Test duplicate operation IDs") {
@@ -440,4 +444,40 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
440444
sessionHolder.getPipelineExecution(graphId).isEmpty,
441445
"pipeline execution was not removed")
442446
}
447+
448+
gridTest("Actively cache data source reads")(Seq(true, false)) { enabled =>
449+
val sessionHolder = SparkConnectTestUtils.createDummySessionHolder(spark)
450+
val planner = new SparkConnectPlanner(sessionHolder)
451+
452+
val dataSourceRead = proto.Relation
453+
.newBuilder()
454+
.setRead(
455+
proto.Read
456+
.newBuilder()
457+
.setDataSource(proto.Read.DataSource
458+
.newBuilder()
459+
.setSchema("col int")))
460+
.setCommon(proto.RelationCommon.newBuilder().setPlanId(Random.nextLong()).build())
461+
.build()
462+
val dataSourceReadJoin = proto.Relation
463+
.newBuilder()
464+
.setJoin(
465+
proto.Join
466+
.newBuilder()
467+
.setLeft(dataSourceRead)
468+
.setRight(dataSourceRead)
469+
.setJoinType(proto.Join.JoinType.JOIN_TYPE_CROSS))
470+
.setCommon(proto.RelationCommon.newBuilder().setPlanId(Random.nextLong()).build())
471+
.build()
472+
473+
sessionHolder.session.conf
474+
.set(Connect.CONNECT_ALWAYS_CACHE_DATA_SOURCE_READS_ENABLED, enabled)
475+
planner.transformRelation(dataSourceReadJoin, cachePlan = true)
476+
val expected = if (enabled) {
477+
Set(dataSourceReadJoin, dataSourceRead)
478+
} else {
479+
Set(dataSourceReadJoin)
480+
}
481+
assertPlanCache(sessionHolder, Some(expected))
482+
}
443483
}

0 commit comments

Comments
 (0)