18
18
package org .apache .spark .sql .streaming
19
19
20
20
import java .util .UUID
21
- import java . util .concurrent .atomic . AtomicLong
21
+ import javax . annotation .concurrent .GuardedBy
22
22
23
23
import scala .collection .mutable
24
24
@@ -44,10 +44,13 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
44
44
private [sql] val stateStoreCoordinator =
45
45
StateStoreCoordinatorRef .forDriver(sparkSession.sparkContext.env)
46
46
private val listenerBus = new StreamingQueryListenerBus (sparkSession.sparkContext.listenerBus)
47
+
48
+ @ GuardedBy (" activeQueriesLock" )
47
49
private val activeQueries = new mutable.HashMap [UUID , StreamingQuery ]
48
50
private val activeQueriesLock = new Object
49
51
private val awaitTerminationLock = new Object
50
52
53
+ @ GuardedBy (" awaitTerminationLock" )
51
54
private var lastTerminatedQuery : StreamingQuery = null
52
55
53
56
/**
@@ -181,8 +184,65 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
181
184
listenerBus.post(event)
182
185
}
183
186
187
+ private def createQuery (
188
+ userSpecifiedName : Option [String ],
189
+ userSpecifiedCheckpointLocation : Option [String ],
190
+ df : DataFrame ,
191
+ sink : Sink ,
192
+ outputMode : OutputMode ,
193
+ useTempCheckpointLocation : Boolean ,
194
+ recoverFromCheckpointLocation : Boolean ,
195
+ trigger : Trigger ,
196
+ triggerClock : Clock ): StreamExecution = {
197
+ val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
198
+ new Path (userSpecified).toUri.toString
199
+ }.orElse {
200
+ df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
201
+ new Path (location, userSpecifiedName.getOrElse(UUID .randomUUID().toString)).toUri.toString
202
+ }
203
+ }.getOrElse {
204
+ if (useTempCheckpointLocation) {
205
+ Utils .createTempDir(namePrefix = s " temporary " ).getCanonicalPath
206
+ } else {
207
+ throw new AnalysisException (
208
+ " checkpointLocation must be specified either " +
209
+ """ through option("checkpointLocation", ...) or """ +
210
+ s """ SparkSession.conf.set(" ${SQLConf .CHECKPOINT_LOCATION .key}", ...) """ )
211
+ }
212
+ }
213
+
214
+ // If offsets have already been created, we trying to resume a query.
215
+ if (! recoverFromCheckpointLocation) {
216
+ val checkpointPath = new Path (checkpointLocation, " offsets" )
217
+ val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
218
+ if (fs.exists(checkpointPath)) {
219
+ throw new AnalysisException (
220
+ s " This query does not support recovering from checkpoint location. " +
221
+ s " Delete $checkpointPath to start over. " )
222
+ }
223
+ }
224
+
225
+ val analyzedPlan = df.queryExecution.analyzed
226
+ df.queryExecution.assertAnalyzed()
227
+
228
+ if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
229
+ UnsupportedOperationChecker .checkForStreaming(analyzedPlan, outputMode)
230
+ }
231
+
232
+ new StreamExecution (
233
+ sparkSession,
234
+ userSpecifiedName.orNull,
235
+ checkpointLocation,
236
+ analyzedPlan,
237
+ sink,
238
+ trigger,
239
+ triggerClock,
240
+ outputMode)
241
+ }
242
+
184
243
/**
185
244
* Start a [[StreamingQuery ]].
245
+ *
186
246
* @param userSpecifiedName Query name optionally specified by the user.
187
247
* @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user.
188
248
* @param df Streaming DataFrame.
@@ -206,72 +266,50 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
206
266
recoverFromCheckpointLocation : Boolean = true ,
207
267
trigger : Trigger = ProcessingTime (0 ),
208
268
triggerClock : Clock = new SystemClock ()): StreamingQuery = {
209
- activeQueriesLock.synchronized {
210
- val name = userSpecifiedName match {
211
- case Some (n) =>
212
- if (activeQueries.values.exists(_.name == userSpecifiedName.get)) {
213
- throw new IllegalArgumentException (
214
- s " Cannot start query with name $n as a query with that name is already active " )
215
- }
216
- n
217
- case None => null
218
- }
219
- val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
220
- new Path (userSpecified).toUri.toString
221
- }.orElse {
222
- df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
223
- new Path (location, name).toUri.toString
224
- }
225
- }.getOrElse {
226
- if (useTempCheckpointLocation) {
227
- Utils .createTempDir(namePrefix = s " temporary " ).getCanonicalPath
228
- } else {
229
- throw new AnalysisException (
230
- " checkpointLocation must be specified either " +
231
- """ through option("checkpointLocation", ...) or """ +
232
- s """ SparkSession.conf.set(" ${SQLConf .CHECKPOINT_LOCATION .key}", ...) """ )
233
- }
234
- }
269
+ val query = createQuery(
270
+ userSpecifiedName,
271
+ userSpecifiedCheckpointLocation,
272
+ df,
273
+ sink,
274
+ outputMode,
275
+ useTempCheckpointLocation,
276
+ recoverFromCheckpointLocation,
277
+ trigger,
278
+ triggerClock)
235
279
236
- // If offsets have already been created, we trying to resume a query.
237
- if (! recoverFromCheckpointLocation) {
238
- val checkpointPath = new Path (checkpointLocation, " offsets" )
239
- val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
240
- if (fs.exists(checkpointPath)) {
241
- throw new AnalysisException (
242
- s " This query does not support recovering from checkpoint location. " +
243
- s " Delete $checkpointPath to start over. " )
280
+ activeQueriesLock.synchronized {
281
+ // Make sure no other query with same name is active
282
+ userSpecifiedName.foreach { name =>
283
+ if (activeQueries.values.exists(_.name == name)) {
284
+ throw new IllegalArgumentException (
285
+ s " Cannot start query with name $name as a query with that name is already active " )
244
286
}
245
287
}
246
288
247
- val analyzedPlan = df.queryExecution.analyzed
248
- df.queryExecution.assertAnalyzed()
249
-
250
- if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
251
- UnsupportedOperationChecker .checkForStreaming(analyzedPlan, outputMode)
252
- }
253
-
254
- val query = new StreamExecution (
255
- sparkSession,
256
- name,
257
- checkpointLocation,
258
- analyzedPlan,
259
- sink,
260
- trigger,
261
- triggerClock,
262
- outputMode)
263
-
289
+ // Make sure no other query with same id is active
264
290
if (activeQueries.values.exists(_.id == query.id)) {
265
291
throw new IllegalStateException (
266
292
s " Cannot start query with id ${query.id} as another query with same id is " +
267
- s " already active. Perhaps you are attempting to restart a query from checkpoint " +
293
+ s " already active. Perhaps you are attempting to restart a query from checkpoint " +
268
294
s " that is already active. " )
269
295
}
270
296
271
- query.start()
272
297
activeQueries.put(query.id, query)
273
- query
274
298
}
299
+ try {
300
+ // When starting a query, it will call `StreamingQueryListener.onQueryStarted` synchronously.
301
+ // As it's provided by the user and can run arbitrary codes, we must not hold any lock here.
302
+ // Otherwise, it's easy to cause dead-lock, or block too long if the user codes take a long
303
+ // time to finish.
304
+ query.start()
305
+ } catch {
306
+ case e : Throwable =>
307
+ activeQueriesLock.synchronized {
308
+ activeQueries -= query.id
309
+ }
310
+ throw e
311
+ }
312
+ query
275
313
}
276
314
277
315
/** Notify (by the StreamingQuery) that the query has been terminated */
0 commit comments