diff --git a/kernel/src/main/scala/org/apache/toree/boot/KernelBootstrap.scala b/kernel/src/main/scala/org/apache/toree/boot/KernelBootstrap.scala index cea008000..79dd4b0eb 100644 --- a/kernel/src/main/scala/org/apache/toree/boot/KernelBootstrap.scala +++ b/kernel/src/main/scala/org/apache/toree/boot/KernelBootstrap.scala @@ -65,6 +65,8 @@ class KernelBootstrap(config: Config) extends LogLike { // customPrintStream as their initial Console.out value // + val startNanos: Double = System.nanoTime() + // ENSURE THAT WE SET THE RIGHT SPARK PROPERTIES val execUri = System.getenv("SPARK_EXECUTOR_URI") System.setProperty("spark.repl.class.outputDir", outputDir.getAbsolutePath) @@ -78,6 +80,9 @@ class KernelBootstrap(config: Config) extends LogLike { // is ready initializeShutdownHook() + //handle scala interpreter separately because is slow to startup + val (interpreterManager, maybeEventualPartialScalaInterpreter) = initializeSlowComponents(config) + // Initialize the bare minimum to report a starting message val (actorSystem, actorLoader, kernelMessageRelayActor, statusDispatch) = initializeBare( @@ -96,10 +101,12 @@ class KernelBootstrap(config: Config) extends LogLike { // Initialize components needed elsewhere val (commStorage, commRegistrar, commManager, interpreter, kernel, dependencyDownloader, - magicManager, pluginManager, responseMap) = + magicManager, pluginManager, responseMap, maybeEventualReadyScalaInterpreter) = initializeComponents( config = config, - actorLoader = actorLoader + actorLoader = actorLoader, + interpreterManager = interpreterManager, + maybeEventualScalaInterp = maybeEventualPartialScalaInterpreter ) this.interpreters ++= Seq(interpreter) @@ -126,12 +133,19 @@ class KernelBootstrap(config: Config) extends LogLike { logger.debug("Initializing security manager") System.setSecurityManager(new KernelSecurityManager) + //Wait for scala interpreter to finish initializing if it is present + maybeEventualReadyScalaInterpreter.map { eventualScalaInterpreter => + Await.ready(eventualScalaInterpreter, Duration.Inf) + } + logger.debug("Running postInit for interpreters") interpreters foreach {_.postInit()} logger.info("Marking relay as ready for receiving messages") kernelMessageRelayActor ! true + val endNanos: Double = System.nanoTime() + logger.trace(s"Kernel bootstrap took ${(endNanos - startNanos) / 1000000000}s") this } diff --git a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala index 42999d72e..bf8453d6e 100644 --- a/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala +++ b/kernel/src/main/scala/org/apache/toree/boot/layer/ComponentInitialization.scala @@ -28,6 +28,7 @@ import org.apache.toree.comm.{CommManager, CommRegistrar, CommStorage, KernelCom import org.apache.toree.dependencies.{CoursierDependencyDownloader, Credentials, DependencyDownloader} import org.apache.toree.interpreter._ import org.apache.toree.kernel.api.Kernel +import org.apache.toree.kernel.interpreter.scala.ScalaInterpreter import org.apache.toree.kernel.protocol.v5.KMBuilder import org.apache.toree.kernel.protocol.v5.kernel.ActorLoader import org.apache.toree.magic.MagicManager @@ -36,22 +37,39 @@ import org.apache.toree.utils.{LogLike, FileUtils} import scala.collection.JavaConverters._ import org.apache.toree.plugins.AllInterpretersReady +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global + /** * Represents the component initialization. All component-related pieces of the * kernel (non-actors) should be created here. Limited items should be exposed. */ trait ComponentInitialization { + + /** + * Start initializing slow components, specifically the scala interpreter. These are broken out so we + * can start them as early as possible. + * + * @param config the config used for initialization + */ + def initializeSlowComponents(config: Config): (InterpreterManager, Option[Future[ScalaInterpreter]]) + /** * Initializes and registers all components (not needed by bare init). * * @param config The config used for initialization * @param actorLoader The actor loader to use for some initialization + * @param interpreterManager the interpreterManager + * @param maybeEventualScalaInterp a future that will hold the Scala Interpreter when its ready */ def initializeComponents( - config: Config, actorLoader: ActorLoader + config: Config, + actorLoader: ActorLoader, + interpreterManager: InterpreterManager, + maybeEventualScalaInterp: Option[Future[ScalaInterpreter]] ): (CommStorage, CommRegistrar, CommManager, Interpreter, Kernel, DependencyDownloader, MagicManager, PluginManager, - collection.mutable.Map[String, ActorRef]) + collection.mutable.Map[String, ActorRef], Option[Future[ScalaInterpreter]]) } /** @@ -60,29 +78,54 @@ trait ComponentInitialization { trait StandardComponentInitialization extends ComponentInitialization { this: LogLike => + /** + * Start initializing slow components, specifically the scala interpreter. These are broken out so we + * can start them as early as possible. + * + * @param config the config used for initialization + */ + def initializeSlowComponents(config: Config): (InterpreterManager, Option[Future[ScalaInterpreter]]) = { + val interpreterManager= InterpreterManager(config) + val optionScalaInterp = interpreterManager.scalaInterpreter + val futureScalaInterp = optionScalaInterp.map { scalaInterp => + Future(scalaInterp.startInit()) + } + + (interpreterManager, futureScalaInterp) + } + + /** * Initializes and registers all components (not needed by bare init). * * @param config The config used for initialization * @param actorLoader The actor loader to use for some initialization + * @param interpreterManager the interpreterManager + * @param maybeEventualScalaInterp a future that will hold the Scala Interpreter when its ready */ def initializeComponents( - config: Config, actorLoader: ActorLoader + config: Config, + actorLoader: ActorLoader, + interpreterManager: InterpreterManager, + maybeEventualScalaInterp: Option[Future[ScalaInterpreter]] ) = { val (commStorage, commRegistrar, commManager) = initializeCommObjects(actorLoader) - val interpreterManager = InterpreterManager(config) - interpreterManager.interpreters foreach(println) - val dependencyDownloader = initializeDependencyDownloader(config) val pluginManager = createPluginManager(config, interpreterManager, dependencyDownloader) val kernel = initializeKernel(config, actorLoader, interpreterManager, commManager, pluginManager) + val updatedScalaInterp: Option[Future[ScalaInterpreter]] = maybeEventualScalaInterp.map { futureScalaInterp => + futureScalaInterp.map { scalaInterp => + scalaInterp.finishInit(kernel) + } + } + initializePlugins(config, pluginManager) - interpreterManager.initializeInterpreters(kernel) + interpreterManager.initializeRegularInterpreters(kernel) pluginManager.fireEvent(AllInterpretersReady) @@ -90,7 +133,7 @@ trait StandardComponentInitialization extends ComponentInitialization { (commStorage, commRegistrar, commManager, interpreterManager.defaultInterpreter.get, kernel, - dependencyDownloader, kernel.magics, pluginManager, responseMap) + dependencyDownloader, kernel.magics, pluginManager, responseMap, updatedScalaInterp) } diff --git a/kernel/src/main/scala/org/apache/toree/boot/layer/InterpreterManager.scala b/kernel/src/main/scala/org/apache/toree/boot/layer/InterpreterManager.scala index 94b8f82c9..9f107fd1d 100644 --- a/kernel/src/main/scala/org/apache/toree/boot/layer/InterpreterManager.scala +++ b/kernel/src/main/scala/org/apache/toree/boot/layer/InterpreterManager.scala @@ -20,8 +20,9 @@ package org.apache.toree.boot.layer import org.apache.toree.kernel.api.KernelLike import com.typesafe.config.Config import org.apache.toree.interpreter._ -import scala.collection.JavaConverters._ +import org.apache.toree.kernel.interpreter.scala.ScalaInterpreter +import scala.collection.JavaConverters._ import org.slf4j.LoggerFactory case class InterpreterManager( @@ -29,6 +30,10 @@ case class InterpreterManager( interpreters: Map[String, Interpreter] = Map[String, Interpreter]() ) { + //Scala Interpreter is handled separately + def initializeRegularInterpreters(kernel: KernelLike): Unit = interpreters + .filterNot { case (name, interp) => name == "Scala" && interp.isInstanceOf[ScalaInterpreter] } + .foreach { case (_, interpreter) => interpreter.init(kernel) } def initializeInterpreters(kernel: KernelLike): Unit = { interpreters.values.foreach(interpreter => @@ -46,8 +51,15 @@ case class InterpreterManager( def defaultInterpreter: Option[Interpreter] = { interpreters.get(default) } + + /** + * returns builtin toree scala interpreter if present. + * @return an option containg the scala interpreter if present + */ + def scalaInterpreter: Option[ScalaInterpreter] = interpreters.get("Scala").collect { case s: ScalaInterpreter => s} } + object InterpreterManager { protected val logger = LoggerFactory.getLogger(this.getClass.getName) diff --git a/scala-interpreter/src/main/scala-2.11/org/apache/toree/kernel/interpreter/scala/ScalaInterpreterSpecific.scala b/scala-interpreter/src/main/scala-2.11/org/apache/toree/kernel/interpreter/scala/ScalaInterpreterSpecific.scala index fa667b9f4..73b5daf58 100644 --- a/scala-interpreter/src/main/scala-2.11/org/apache/toree/kernel/interpreter/scala/ScalaInterpreterSpecific.scala +++ b/scala-interpreter/src/main/scala-2.11/org/apache/toree/kernel/interpreter/scala/ScalaInterpreterSpecific.scala @@ -258,14 +258,13 @@ trait ScalaInterpreterSpecific extends SettingsProducerLike { this: ScalaInterpr * Starts the interpreter, initializing any internal state. * @return A reference to the interpreter */ - override def start(): Interpreter = { + override def start(): ScalaInterpreter = { require(iMain == null && taskManager == null) taskManager = newTaskManager() logger.debug("Initializing task manager") taskManager.start() - iMain = newIMain(settings, new JPrintWriter(lastResultOut, true)) //logger.debug("Initializing interpreter") diff --git a/scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala b/scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala index 2a4fb8865..1af488ebe 100644 --- a/scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala +++ b/scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala @@ -18,7 +18,8 @@ package org.apache.toree.kernel.interpreter.scala import java.io.ByteArrayOutputStream -import java.util.concurrent.{ExecutionException, TimeoutException, TimeUnit} +import java.util.concurrent.{ExecutionException, TimeUnit, TimeoutException} + import com.typesafe.config.{Config, ConfigFactory} import jupyter.Displayers import org.apache.spark.SparkContext @@ -30,6 +31,7 @@ import org.apache.toree.utils.TaskManager import org.slf4j.LoggerFactory import org.apache.toree.kernel.BuildInfo import org.apache.toree.kernel.protocol.v5.MIMEType + import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.concurrent.{Await, Future} @@ -66,9 +68,7 @@ class ScalaInterpreter(private val config:Config = ConfigFactory.load) extends I settings } - protected var settings: Settings = newSettings(List()) - settings = appendClassPath(settings) - + protected var settings: Settings = _ private val maxInterpreterThreads: Int = { if(config.hasPath("max_interpreter_threads")) @@ -99,6 +99,33 @@ class ScalaInterpreter(private val config:Config = ConfigFactory.load) extends I this } + /** + * Start initialization with only the given config + * + * @return this partially initialized scalaInterpreter + */ + def startInit(): ScalaInterpreter = { + import scala.collection.JavaConverters._ + val args = config.getStringList("interpreter_args").asScala.toList + settings = newSettings(args) + settings = appendClassPath(settings) + + start() + } + + + /** + * Finish initializing this interpreter with the kernel. + * + * @param kernel the kernel + * @return this fully initialized scalaInterpreter + */ + def finishInit(kernel: KernelLike): ScalaInterpreter = { + this._kernel = kernel + bindVariables() + this + } + protected def bindVariables(): Unit = { bindKernelVariable(kernel) bindSparkSession()