@@ -74,7 +74,7 @@ object JobManagerActor {
74
74
* }}}
75
75
*/
76
76
class JobManagerActor (contextConfig : Config , daoActor : ActorRef )
77
- extends InstrumentedActor with SparkListener {
77
+ extends InstrumentedActor {
78
78
79
79
import CommonMessages ._
80
80
import JobManagerActor ._
@@ -125,9 +125,13 @@ class JobManagerActor(contextConfig: Config, daoActor: ActorRef)
125
125
}
126
126
127
127
// Handle external kill events (e.g. killed via YARN)
128
- override def onApplicationEnd (event : SparkListenerApplicationEnd ) {
129
- logger.info(" Got Spark Application end event, stopping job manger." )
130
- self ! PoisonPill
128
+ private def sparkListener = {
129
+ new SparkListener () {
130
+ override def onApplicationEnd (event : SparkListenerApplicationEnd ) {
131
+ logger.info(" Got Spark Application end event, stopping job manager." )
132
+ self ! PoisonPill
133
+ }
134
+ }
131
135
}
132
136
133
137
def wrappedReceive : Receive = {
@@ -142,7 +146,7 @@ class JobManagerActor(contextConfig: Config, daoActor: ActorRef)
142
146
}
143
147
factory = getContextFactory()
144
148
jobContext = factory.makeContext(config, contextConfig, contextName)
145
- jobContext.sparkContext.addSparkListener(this )
149
+ jobContext.sparkContext.addSparkListener(sparkListener )
146
150
sparkEnv = SparkEnv .get
147
151
jobCache = new JobCacheImpl (jobCacheSize, daoActor, jobContext.sparkContext, jarLoader)
148
152
getSideJars(contextConfig).foreach { jarUri => jobContext.sparkContext.addJar(jarUri) }
0 commit comments