Skip to content

Commit d3abb36

Browse files
zsxwingtdas
authored andcommitted
[SPARK-21788][SS] Handle more exceptions when stopping a streaming query
## What changes were proposed in this pull request? Add more cases we should view as a normal query stop rather than a failure. ## How was this patch tested? The new unit tests. Author: Shixiong Zhu <[email protected]> Closes apache#18997 from zsxwing/SPARK-21788.
1 parent 2dd37d8 commit d3abb36

File tree

2 files changed

+89
-5
lines changed

2 files changed

+89
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,18 @@
1717

1818
package org.apache.spark.sql.execution.streaming
1919

20-
import java.io.{InterruptedIOException, IOException}
20+
import java.io.{InterruptedIOException, IOException, UncheckedIOException}
21+
import java.nio.channels.ClosedByInterruptException
2122
import java.util.UUID
22-
import java.util.concurrent.{CountDownLatch, TimeUnit}
23+
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
2324
import java.util.concurrent.atomic.AtomicReference
2425
import java.util.concurrent.locks.ReentrantLock
2526

2627
import scala.collection.mutable.{Map => MutableMap}
2728
import scala.collection.mutable.ArrayBuffer
2829
import scala.util.control.NonFatal
2930

31+
import com.google.common.util.concurrent.UncheckedExecutionException
3032
import org.apache.hadoop.fs.Path
3133

3234
import org.apache.spark.internal.Logging
@@ -335,7 +337,7 @@ class StreamExecution(
335337
// `stop()` is already called. Let `finally` finish the cleanup.
336338
}
337339
} catch {
338-
case _: InterruptedException | _: InterruptedIOException if state.get == TERMINATED =>
340+
case e if isInterruptedByStop(e) =>
339341
// interrupted by stop()
340342
updateStatusMessage("Stopped")
341343
case e: IOException if e.getMessage != null
@@ -407,6 +409,32 @@ class StreamExecution(
407409
}
408410
}
409411

412+
private def isInterruptedByStop(e: Throwable): Boolean = {
413+
if (state.get == TERMINATED) {
414+
e match {
415+
// InterruptedIOException - thrown when an I/O operation is interrupted
416+
// ClosedByInterruptException - thrown when an I/O operation upon a channel is interrupted
417+
case _: InterruptedException | _: InterruptedIOException | _: ClosedByInterruptException =>
418+
true
419+
// The cause of the following exceptions may be one of the above exceptions:
420+
//
421+
// UncheckedIOException - thrown by codes that cannot throw a checked IOException, such as
422+
// BiFunction.apply
423+
// ExecutionException - thrown by codes running in a thread pool and these codes throw an
424+
// exception
425+
// UncheckedExecutionException - thrown by codes that cannot throw a checked
426+
// ExecutionException, such as BiFunction.apply
427+
case e2 @ (_: UncheckedIOException | _: ExecutionException | _: UncheckedExecutionException)
428+
if e2.getCause != null =>
429+
isInterruptedByStop(e2.getCause)
430+
case _ =>
431+
false
432+
}
433+
} else {
434+
false
435+
}
436+
}
437+
410438
/**
411439
* Populate the start offsets to start the execution at the current offsets stored in the sink
412440
* (i.e. avoid reprocessing data that we have already processed). This function must be called

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717

1818
package org.apache.spark.sql.streaming
1919

20-
import java.io.{File, InterruptedIOException, IOException}
21-
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
20+
import java.io.{File, InterruptedIOException, IOException, UncheckedIOException}
21+
import java.nio.channels.ClosedByInterruptException
22+
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeoutException, TimeUnit}
2223

2324
import scala.reflect.ClassTag
2425
import scala.util.control.ControlThrowable
2526

27+
import com.google.common.util.concurrent.UncheckedExecutionException
2628
import org.apache.commons.io.FileUtils
2729
import org.apache.hadoop.conf.Configuration
2830

@@ -691,6 +693,31 @@ class StreamSuite extends StreamTest {
691693
}
692694
}
693695
}
696+
697+
for (e <- Seq(
698+
new InterruptedException,
699+
new InterruptedIOException,
700+
new ClosedByInterruptException,
701+
new UncheckedIOException("test", new ClosedByInterruptException),
702+
new ExecutionException("test", new InterruptedException),
703+
new UncheckedExecutionException("test", new InterruptedException))) {
704+
test(s"view ${e.getClass.getSimpleName} as a normal query stop") {
705+
ThrowingExceptionInCreateSource.createSourceLatch = new CountDownLatch(1)
706+
ThrowingExceptionInCreateSource.exception = e
707+
val query = spark
708+
.readStream
709+
.format(classOf[ThrowingExceptionInCreateSource].getName)
710+
.load()
711+
.writeStream
712+
.format("console")
713+
.start()
714+
assert(ThrowingExceptionInCreateSource.createSourceLatch
715+
.await(streamingTimeout.toMillis, TimeUnit.MILLISECONDS),
716+
"ThrowingExceptionInCreateSource.createSource wasn't called before timeout")
717+
query.stop()
718+
assert(query.exception.isEmpty)
719+
}
720+
}
694721
}
695722

696723
abstract class FakeSource extends StreamSourceProvider {
@@ -824,3 +851,32 @@ class TestStateStoreProvider extends StateStoreProvider {
824851

825852
override def getStore(version: Long): StateStore = null
826853
}
854+
855+
/** A fake source that throws `ThrowingExceptionInCreateSource.exception` in `createSource` */
856+
class ThrowingExceptionInCreateSource extends FakeSource {
857+
858+
override def createSource(
859+
spark: SQLContext,
860+
metadataPath: String,
861+
schema: Option[StructType],
862+
providerName: String,
863+
parameters: Map[String, String]): Source = {
864+
ThrowingExceptionInCreateSource.createSourceLatch.countDown()
865+
try {
866+
Thread.sleep(30000)
867+
throw new TimeoutException("sleep was not interrupted in 30 seconds")
868+
} catch {
869+
case _: InterruptedException =>
870+
throw ThrowingExceptionInCreateSource.exception
871+
}
872+
}
873+
}
874+
875+
object ThrowingExceptionInCreateSource {
876+
/**
877+
* A latch to allow the user to wait until `ThrowingExceptionInCreateSource.createSource` is
878+
* called.
879+
*/
880+
@volatile var createSourceLatch: CountDownLatch = null
881+
@volatile var exception: Exception = null
882+
}

0 commit comments

Comments
 (0)