@@ -269,40 +269,51 @@ class JobManagerActor(contextConfig: Config) extends InstrumentedActor {
269
269
Future {
270
270
org.slf4j.MDC .put(" jobId" , jobId)
271
271
logger.info(" Starting job future thread" )
272
-
273
- // Need to re-set the SparkEnv because it's thread-local and the Future runs on a diff thread
274
- SparkEnv .set(sparkEnv)
275
-
276
- // Use the Spark driver's class loader as it knows about all our jars already
277
- // NOTE: This may not even be necessary if we set the driver ActorSystem classloader correctly
278
- Thread .currentThread.setContextClassLoader(jarLoader)
279
- val job = constructor()
280
- if (job.isInstanceOf [NamedObjectSupport ]) {
281
- val namedObjects = job.asInstanceOf [NamedObjectSupport ].namedObjectsPrivate
282
- if (namedObjects.get() == null ) {
283
- namedObjects.compareAndSet(null , jobServerNamedObjects)
284
- }
285
- }
286
-
287
272
try {
288
- statusActor ! JobStatusActor .JobInit (jobInfo)
289
-
290
- val jobC = jobContext.asInstanceOf [job.C ]
291
- job.validate(jobC, jobConfig) match {
292
- case SparkJobInvalid (reason) => {
293
- val err = new Throwable (reason)
294
- statusActor ! JobValidationFailed (jobId, DateTime .now(), err)
295
- throw err
273
+ // Need to re-set the SparkEnv because it's thread-local and the Future runs on a diff thread
274
+ SparkEnv .set(sparkEnv)
275
+
276
+ // Use the Spark driver's class loader as it knows about all our jars already
277
+ // NOTE: This may not even be necessary if we set the driver ActorSystem classloader correctly
278
+ Thread .currentThread.setContextClassLoader(jarLoader)
279
+ val job = constructor()
280
+ if (job.isInstanceOf [NamedObjectSupport ]) {
281
+ val namedObjects = job.asInstanceOf [NamedObjectSupport ].namedObjectsPrivate
282
+ if (namedObjects.get() == null ) {
283
+ namedObjects.compareAndSet(null , jobServerNamedObjects)
296
284
}
297
- case SparkJobValid => {
298
- statusActor ! JobStarted (jobId : String , contextName, jobInfo.startTime)
299
- val sc = jobContext.sparkContext
300
- sc.setJobGroup(jobId, s " Job group for $jobId and spark context ${sc.applicationId}" , true )
301
- job.runJob(jobC, jobConfig)
285
+ }
286
+
287
+ try {
288
+ statusActor ! JobStatusActor .JobInit (jobInfo)
289
+
290
+ val jobC = jobContext.asInstanceOf [job.C ]
291
+ job.validate(jobC, jobConfig) match {
292
+ case SparkJobInvalid (reason) => {
293
+ val err = new Throwable (reason)
294
+ statusActor ! JobValidationFailed (jobId, DateTime .now(), err)
295
+ throw err
296
+ }
297
+ case SparkJobValid => {
298
+ statusActor ! JobStarted (jobId : String , contextName, jobInfo.startTime)
299
+ val sc = jobContext.sparkContext
300
+ sc.setJobGroup(jobId, s " Job group for $jobId and spark context ${sc.applicationId}" , true )
301
+ job.runJob(jobC, jobConfig)
302
+ }
302
303
}
304
+ } finally {
305
+ org.slf4j.MDC .remove(" jobId" )
306
+ }
307
+ } catch {
308
+ case e : java.lang.AbstractMethodError => {
309
+ logger.error(" Oops, there's an AbstractMethodError... maybe you compiled " +
310
+ " your code with an older version of SJS? here's the exception:" , e)
311
+ throw e
303
312
}
304
- } finally {
305
- org.slf4j.MDC .remove(" jobId" )
313
+ case e : Throwable => {
314
+ logger.error(" Got Throwable" , e)
315
+ throw e
316
+ };
306
317
}
307
318
}(executionContext).andThen {
308
319
case Success (result : Any ) =>
@@ -322,7 +333,7 @@ class JobManagerActor(contextConfig: Config) extends InstrumentedActor {
322
333
val wrappedError = wrapInRuntimeException(error)
323
334
// If and only if job validation fails, JobErroredOut message is dropped silently in JobStatusActor.
324
335
statusActor ! JobErroredOut (jobId, DateTime .now(), wrappedError)
325
- logger.warn (" Exception from job " + jobId + " : " , error)
336
+ logger.error (" Exception from job " + jobId + " : " , error)
326
337
}(executionContext).andThen {
327
338
case _ =>
328
339
// Make sure to decrement the count of running jobs when a job finishes, in both success and failure
0 commit comments