@@ -284,27 +284,27 @@ class JobManagerActor(contextConfig: Config) extends InstrumentedActor {
284
284
}
285
285
}
286
286
287
- try {
288
- statusActor ! JobStatusActor .JobInit (jobInfo)
289
-
290
- val jobC = jobContext.asInstanceOf [job.C ]
291
- job.validate(jobC, jobConfig) match {
292
- case SparkJobInvalid (reason) => {
293
- val err = new Throwable (reason)
294
- statusActor ! JobValidationFailed (jobId, DateTime .now(), err)
295
- throw err
296
- }
297
- case SparkJobValid => {
298
- statusActor ! JobStarted (jobId : String , contextName, jobInfo.startTime)
299
- val sc = jobContext.sparkContext
300
- sc.setJobGroup(jobId, s " Job group for $jobId and spark context ${sc.applicationId}" , true )
301
- job.runJob(jobC, jobConfig)
287
+ try {
288
+ statusActor ! JobStatusActor .JobInit (jobInfo)
289
+
290
+ val jobC = jobContext.asInstanceOf [job.C ]
291
+ job.validate(jobC, jobConfig) match {
292
+ case SparkJobInvalid (reason) => {
293
+ val err = new Throwable (reason)
294
+ statusActor ! JobValidationFailed (jobId, DateTime .now(), err)
295
+ throw err
296
+ }
297
+ case SparkJobValid => {
298
+ statusActor ! JobStarted (jobId : String , contextName, jobInfo.startTime)
299
+ val sc = jobContext.sparkContext
300
+ sc.setJobGroup(jobId, s " Job group for $jobId and spark context ${sc.applicationId}" , true )
301
+ job.runJob(jobC, jobConfig)
302
+ }
302
303
}
304
+ } finally {
305
+ org.slf4j.MDC .remove(" jobId" )
303
306
}
304
- } finally {
305
- org.slf4j.MDC .remove(" jobId" )
306
- }
307
- } catch {
307
+ } catch {
308
308
case e : java.lang.AbstractMethodError => {
309
309
logger.error(" Oops, there's an AbstractMethodError... maybe you compiled " +
310
310
" your code with an older version of SJS? here's the exception:" , e)
0 commit comments