@@ -287,8 +287,10 @@ class JobManagerActor(contextConfig: Config) extends InstrumentedActor {
287
287
statusActor ! JobFinished (jobId, DateTime .now())
288
288
resultActor ! JobResult (jobId, result)
289
289
case Failure (error : Throwable ) =>
290
+ // Wrapping the error inside a RuntimeException to handle the case of throwing custom exceptions.
291
+ val wrappedError = wrapInRuntimeException(error)
290
292
// If and only if job validation fails, JobErroredOut message is dropped silently in JobStatusActor.
291
- statusActor ! JobErroredOut (jobId, DateTime .now(), error )
293
+ statusActor ! JobErroredOut (jobId, DateTime .now(), wrappedError )
292
294
logger.warn(" Exception from job " + jobId + " : " , error)
293
295
}(executionContext).andThen {
294
296
case _ =>
@@ -301,6 +303,28 @@ class JobManagerActor(contextConfig: Config) extends InstrumentedActor {
301
303
}(executionContext)
302
304
}
303
305
306
+ // Wraps a Throwable object into a RuntimeException. This is useful in case
307
+ // a custom exception is thrown. Currently, throwing a custom exception doesn't
308
+ // work and this is a workaround to wrap it into a standard exception.
309
+ protected def wrapInRuntimeException (t : Throwable ): RuntimeException = {
310
+ val cause : Throwable = getRootCause(t)
311
+ val e : RuntimeException = new RuntimeException (" %s: %s"
312
+ .format(cause.getClass().getName() ,cause.getMessage))
313
+ e.setStackTrace(cause.getStackTrace())
314
+ return e
315
+ }
316
+
317
+ // Gets the very first exception that caused the current exception to be thrown.
318
+ protected def getRootCause (t : Throwable ): Throwable = {
319
+ var result : Throwable = t
320
+ var cause : Throwable = result.getCause()
321
+ while (cause != null && (result != cause) ) {
322
+ result = cause
323
+ cause = result.getCause()
324
+ }
325
+ return result
326
+ }
327
+
304
328
// Use our classloader and a factory to create the SparkContext. This ensures the SparkContext will use
305
329
// our class loader when it spins off threads, and ensures SparkContext can find the job and dependent jars
306
330
// when doing serialization, for example.
0 commit comments