Skip to content

Commit 363af16

Browse files
committed
[SPARK-29568][SS] Stop existing running streams when a new stream is launched
### What changes were proposed in this pull request? This PR adds a SQL Conf: `spark.sql.streaming.stopActiveRunOnRestart`. When this conf is `true` (by default it is), an already running stream will be stopped, if a new copy gets launched on the same checkpoint location. ### Why are the changes needed? In multi-tenant environments where you have multiple SparkSessions, you can accidentally start multiple copies of the same stream (i.e. streams using the same checkpoint location). This will cause all new instantiations of the new stream to fail. However, sometimes you may want to turn off the old stream, as the old stream may have turned into a zombie (you no longer have access to the query handle or SparkSession). It would be nice to have a SQL flag that allows the stopping of the old stream for such zombie cases. ### Does this PR introduce any user-facing change? Yes. Now by default, if you launch a new copy of an already running stream on a multi-tenant cluster, the existing stream will be stopped. ### How was this patch tested? Unit tests in StreamingQueryManagerSuite Closes apache#26225 from brkyvz/stopStream. Lead-authored-by: Burak Yavuz <[email protected]> Co-authored-by: Burak Yavuz <[email protected]> Signed-off-by: Burak Yavuz <[email protected]>
1 parent 1f4075d commit 363af16

File tree

5 files changed

+184
-59
lines changed

5 files changed

+184
-59
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,6 +1093,15 @@ object SQLConf {
10931093
.checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
10941094
.createWithDefault(2)
10951095

1096+
val STREAMING_STOP_ACTIVE_RUN_ON_RESTART =
1097+
buildConf("spark.sql.streaming.stopActiveRunOnRestart")
1098+
.doc("Running multiple runs of the same streaming query concurrently is not supported. " +
1099+
"If we find a concurrent active run for a streaming query (in the same or different " +
1100+
"SparkSessions on the same cluster) and this flag is true, we will stop the old streaming " +
1101+
"query run to start the new one.")
1102+
.booleanConf
1103+
.createWithDefault(true)
1104+
10961105
val STREAMING_JOIN_STATE_FORMAT_VERSION =
10971106
buildConf("spark.sql.streaming.join.stateFormatVersion")
10981107
.internal()

sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.internal
2020
import java.net.URL
2121
import java.util.{Locale, UUID}
2222
import java.util.concurrent.ConcurrentHashMap
23+
import javax.annotation.concurrent.GuardedBy
2324

2425
import scala.reflect.ClassTag
2526
import scala.util.control.NonFatal
@@ -32,9 +33,10 @@ import org.apache.spark.internal.Logging
3233
import org.apache.spark.sql.SQLContext
3334
import org.apache.spark.sql.catalyst.catalog._
3435
import org.apache.spark.sql.execution.CacheManager
36+
import org.apache.spark.sql.execution.streaming.StreamExecution
3537
import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab}
3638
import org.apache.spark.sql.internal.StaticSQLConf._
37-
import org.apache.spark.sql.streaming.StreamingQueryManager
39+
import org.apache.spark.sql.streaming.StreamingQuery
3840
import org.apache.spark.status.ElementTrackingStore
3941
import org.apache.spark.util.Utils
4042

@@ -112,11 +114,15 @@ private[sql] class SharedState(
112114
*/
113115
val cacheManager: CacheManager = new CacheManager
114116

117+
/** A global lock for all streaming query lifecycle tracking and management. */
118+
private[sql] val activeQueriesLock = new Object
119+
115120
/**
116121
* A map of active streaming queries to the session specific StreamingQueryManager that manages
117122
* the lifecycle of that stream.
118123
*/
119-
private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamingQueryManager]()
124+
@GuardedBy("activeQueriesLock")
125+
private[sql] val activeStreamingQueries = new ConcurrentHashMap[UUID, StreamExecution]()
120126

121127
/**
122128
* A status store to query SQL status/metrics of this Spark application, based on SQL-specific

sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala

Lines changed: 59 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.streaming
1919

20-
import java.util.UUID
20+
import java.util.{ConcurrentModificationException, UUID}
2121
import java.util.concurrent.TimeUnit
2222
import javax.annotation.concurrent.GuardedBy
2323

@@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
3737
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
3838
import org.apache.spark.sql.internal.SQLConf
3939
import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
40-
import org.apache.spark.util.{Clock, SystemClock, Utils}
40+
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
4141

4242
/**
4343
* A class to manage all the [[StreamingQuery]] active in a `SparkSession`.
@@ -51,9 +51,10 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
5151
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
5252
private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus)
5353

54-
@GuardedBy("activeQueriesLock")
54+
@GuardedBy("activeQueriesSharedLock")
5555
private val activeQueries = new mutable.HashMap[UUID, StreamingQuery]
56-
private val activeQueriesLock = new Object
56+
// A global lock to keep track of active streaming queries across Spark sessions
57+
private val activeQueriesSharedLock = sparkSession.sharedState.activeQueriesLock
5758
private val awaitTerminationLock = new Object
5859

5960
@GuardedBy("awaitTerminationLock")
@@ -77,7 +78,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
7778
*
7879
* @since 2.0.0
7980
*/
80-
def active: Array[StreamingQuery] = activeQueriesLock.synchronized {
81+
def active: Array[StreamingQuery] = activeQueriesSharedLock.synchronized {
8182
activeQueries.values.toArray
8283
}
8384

@@ -86,7 +87,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
8687
*
8788
* @since 2.1.0
8889
*/
89-
def get(id: UUID): StreamingQuery = activeQueriesLock.synchronized {
90+
def get(id: UUID): StreamingQuery = activeQueriesSharedLock.synchronized {
9091
activeQueries.get(id).orNull
9192
}
9293

@@ -343,27 +344,61 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
343344
trigger,
344345
triggerClock)
345346

346-
activeQueriesLock.synchronized {
347+
// The following code block checks if a stream with the same name or id is running. Then it
348+
// returns an Option of an already active stream to stop outside of the lock
349+
// to avoid a deadlock.
350+
val activeRunOpt = activeQueriesSharedLock.synchronized {
347351
// Make sure no other query with same name is active
348352
userSpecifiedName.foreach { name =>
349353
if (activeQueries.values.exists(_.name == name)) {
350-
throw new IllegalArgumentException(
351-
s"Cannot start query with name $name as a query with that name is already active")
354+
throw new IllegalArgumentException(s"Cannot start query with name $name as a query " +
355+
s"with that name is already active in this SparkSession")
352356
}
353357
}
354358

355359
// Make sure no other query with same id is active across all sessions
356-
val activeOption =
357-
Option(sparkSession.sharedState.activeStreamingQueries.putIfAbsent(query.id, this))
358-
if (activeOption.isDefined || activeQueries.values.exists(_.id == query.id)) {
359-
throw new IllegalStateException(
360-
s"Cannot start query with id ${query.id} as another query with same id is " +
361-
s"already active. Perhaps you are attempting to restart a query from checkpoint " +
362-
s"that is already active.")
360+
val activeOption = Option(sparkSession.sharedState.activeStreamingQueries.get(query.id))
361+
.orElse(activeQueries.get(query.id)) // shouldn't be needed but paranoia ...
362+
363+
val shouldStopActiveRun =
364+
sparkSession.sessionState.conf.getConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART)
365+
if (activeOption.isDefined) {
366+
if (shouldStopActiveRun) {
367+
val oldQuery = activeOption.get
368+
logWarning(s"Stopping existing streaming query [id=${query.id}, " +
369+
s"runId=${oldQuery.runId}], as a new run is being started.")
370+
Some(oldQuery)
371+
} else {
372+
throw new IllegalStateException(
373+
s"Cannot start query with id ${query.id} as another query with same id is " +
374+
s"already active. Perhaps you are attempting to restart a query from checkpoint " +
375+
s"that is already active. You may stop the old query by setting the SQL " +
376+
"configuration: " +
377+
s"""spark.conf.set("${SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key}", true) """ +
378+
"and retry.")
379+
}
380+
} else {
381+
// nothing to stop so, no-op
382+
None
363383
}
384+
}
364385

386+
// stop() will clear the queryId from activeStreamingQueries as well as activeQueries
387+
activeRunOpt.foreach(_.stop())
388+
389+
activeQueriesSharedLock.synchronized {
390+
// We still can have a race condition when two concurrent instances try to start the same
391+
// stream, while a third one was already active and stopped above. In this case, we throw a
392+
// ConcurrentModificationException.
393+
val oldActiveQuery = sparkSession.sharedState.activeStreamingQueries.put(
394+
query.id, query.streamingQuery) // we need to put the StreamExecution, not the wrapper
395+
if (oldActiveQuery != null) {
396+
throw new ConcurrentModificationException(
397+
"Another instance of this query was just started by a concurrent session.")
398+
}
365399
activeQueries.put(query.id, query)
366400
}
401+
367402
try {
368403
// When starting a query, it will call `StreamingQueryListener.onQueryStarted` synchronously.
369404
// As it's provided by the user and can run arbitrary codes, we must not hold any lock here.
@@ -372,15 +407,15 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
372407
query.streamingQuery.start()
373408
} catch {
374409
case e: Throwable =>
375-
unregisterTerminatedStream(query.id)
410+
unregisterTerminatedStream(query)
376411
throw e
377412
}
378413
query
379414
}
380415

381416
/** Notify (by the StreamingQuery) that the query has been terminated */
382417
private[sql] def notifyQueryTermination(terminatedQuery: StreamingQuery): Unit = {
383-
unregisterTerminatedStream(terminatedQuery.id)
418+
unregisterTerminatedStream(terminatedQuery)
384419
awaitTerminationLock.synchronized {
385420
if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) {
386421
lastTerminatedQuery = terminatedQuery
@@ -390,11 +425,12 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
390425
stateStoreCoordinator.deactivateInstances(terminatedQuery.runId)
391426
}
392427

393-
private def unregisterTerminatedStream(terminatedQueryId: UUID): Unit = {
394-
activeQueriesLock.synchronized {
395-
// remove from shared state only if the streaming query manager also matches
396-
sparkSession.sharedState.activeStreamingQueries.remove(terminatedQueryId, this)
397-
activeQueries -= terminatedQueryId
428+
private def unregisterTerminatedStream(terminatedQuery: StreamingQuery): Unit = {
429+
activeQueriesSharedLock.synchronized {
430+
// remove from shared state only if the streaming execution also matches
431+
sparkSession.sharedState.activeStreamingQueries.remove(
432+
terminatedQuery.id, terminatedQuery)
433+
activeQueries -= terminatedQuery.id
398434
}
399435
}
400436
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala

Lines changed: 103 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark.SparkException
3232
import org.apache.spark.sql.{Dataset, Encoders}
3333
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
3434
import org.apache.spark.sql.execution.streaming._
35+
import org.apache.spark.sql.internal.SQLConf
3536
import org.apache.spark.sql.streaming.util.BlockingSource
3637
import org.apache.spark.util.Utils
3738

@@ -274,48 +275,119 @@ class StreamingQueryManagerSuite extends StreamTest {
274275
}
275276

276277
testQuietly("can't start multiple instances of the same streaming query in the same session") {
277-
withTempDir { dir =>
278-
val (ms1, ds1) = makeDataset
279-
val (ms2, ds2) = makeDataset
280-
val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
281-
val dataLocation = new File(dir, "data").getCanonicalPath
282-
283-
val query1 = ds1.writeStream.format("parquet")
284-
.option("checkpointLocation", chkLocation).start(dataLocation)
285-
ms1.addData(1, 2, 3)
286-
try {
287-
val e = intercept[IllegalStateException] {
288-
ds2.writeStream.format("parquet")
278+
withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "false") {
279+
withTempDir { dir =>
280+
val (ms1, ds1) = makeDataset
281+
val (ms2, ds2) = makeDataset
282+
val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
283+
val dataLocation = new File(dir, "data").getCanonicalPath
284+
285+
val query1 = ds1.writeStream.format("parquet")
286+
.option("checkpointLocation", chkLocation).start(dataLocation)
287+
ms1.addData(1, 2, 3)
288+
try {
289+
val e = intercept[IllegalStateException] {
290+
ds2.writeStream.format("parquet")
291+
.option("checkpointLocation", chkLocation).start(dataLocation)
292+
}
293+
assert(e.getMessage.contains("same id"))
294+
} finally {
295+
spark.streams.active.foreach(_.stop())
296+
}
297+
}
298+
}
299+
}
300+
301+
testQuietly("new instance of the same streaming query stops old query in the same session") {
302+
failAfter(90 seconds) {
303+
withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") {
304+
withTempDir { dir =>
305+
val (ms1, ds1) = makeDataset
306+
val (ms2, ds2) = makeDataset
307+
val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
308+
val dataLocation = new File(dir, "data").getCanonicalPath
309+
310+
val query1 = ds1.writeStream.format("parquet")
311+
.option("checkpointLocation", chkLocation).start(dataLocation)
312+
ms1.addData(1, 2, 3)
313+
val query2 = ds2.writeStream.format("parquet")
289314
.option("checkpointLocation", chkLocation).start(dataLocation)
315+
try {
316+
ms2.addData(1, 2, 3)
317+
query2.processAllAvailable()
318+
assert(spark.sharedState.activeStreamingQueries.get(query2.id) ===
319+
query2.asInstanceOf[StreamingQueryWrapper].streamingQuery,
320+
"The correct streaming query is not being tracked in global state")
321+
322+
assert(!query1.isActive,
323+
"First query should have stopped before starting the second query")
324+
} finally {
325+
spark.streams.active.foreach(_.stop())
326+
}
290327
}
291-
assert(e.getMessage.contains("same id"))
292-
} finally {
293-
query1.stop()
294328
}
295329
}
296330
}
297331

298332
testQuietly(
299333
"can't start multiple instances of the same streaming query in the different sessions") {
300-
withTempDir { dir =>
301-
val session2 = spark.cloneSession()
302-
303-
val ms1 = MemoryStream(Encoders.INT, spark.sqlContext)
304-
val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
305-
val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
306-
val dataLocation = new File(dir, "data").getCanonicalPath
334+
withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "false") {
335+
withTempDir { dir =>
336+
val session2 = spark.cloneSession()
337+
338+
val ms1 = MemoryStream(Encoders.INT, spark.sqlContext)
339+
val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
340+
val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
341+
val dataLocation = new File(dir, "data").getCanonicalPath
342+
343+
val query1 = ms1.toDS().writeStream.format("parquet")
344+
.option("checkpointLocation", chkLocation).start(dataLocation)
345+
ms1.addData(1, 2, 3)
346+
try {
347+
val e = intercept[IllegalStateException] {
348+
ds2.writeStream.format("parquet")
349+
.option("checkpointLocation", chkLocation).start(dataLocation)
350+
}
351+
assert(e.getMessage.contains("same id"))
352+
} finally {
353+
spark.streams.active.foreach(_.stop())
354+
session2.streams.active.foreach(_.stop())
355+
}
356+
}
357+
}
358+
}
307359

308-
val query1 = ms1.toDS().writeStream.format("parquet")
309-
.option("checkpointLocation", chkLocation).start(dataLocation)
310-
ms1.addData(1, 2, 3)
311-
try {
312-
val e = intercept[IllegalStateException] {
313-
ds2.writeStream.format("parquet")
360+
testQuietly(
361+
"new instance of the same streaming query stops old query in a different session") {
362+
failAfter(90 seconds) {
363+
withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "true") {
364+
withTempDir { dir =>
365+
val session2 = spark.cloneSession()
366+
367+
val ms1 = MemoryStream(Encoders.INT, spark.sqlContext)
368+
val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
369+
val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
370+
val dataLocation = new File(dir, "data").getCanonicalPath
371+
372+
val query1 = ms1.toDS().writeStream.format("parquet")
373+
.option("checkpointLocation", chkLocation).start(dataLocation)
374+
ms1.addData(1, 2, 3)
375+
val query2 = ds2.writeStream.format("parquet")
314376
.option("checkpointLocation", chkLocation).start(dataLocation)
377+
try {
378+
ms1.addData(1, 2, 3)
379+
query2.processAllAvailable()
380+
assert(spark.sharedState.activeStreamingQueries.get(query2.id) ===
381+
query2.asInstanceOf[StreamingQueryWrapper].streamingQuery,
382+
"The correct streaming execution is not being tracked in global state")
383+
384+
assert(!query1.isActive,
385+
"First query should have stopped before starting the second query")
386+
} finally {
387+
spark.streams.active.foreach(_.stop())
388+
session2.streams.active.foreach(_.stop())
389+
}
315390
}
316-
assert(e.getMessage.contains("same id"))
317-
} finally {
318-
query1.stop()
319391
}
320392
}
321393
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
123123
assert(q3.runId !== q4.runId)
124124

125125
// Only one query with same id can be active
126-
val q5 = startQuery(restart = false)
127-
val e = intercept[IllegalStateException] {
128-
startQuery(restart = true)
126+
withSQLConf(SQLConf.STREAMING_STOP_ACTIVE_RUN_ON_RESTART.key -> "false") {
127+
val q5 = startQuery(restart = false)
128+
val e = intercept[IllegalStateException] {
129+
startQuery(restart = true)
130+
}
129131
}
130132
}
131133
}

0 commit comments

Comments
 (0)