|
| 1 | +--- |
| 2 | +layout: global |
| 3 | +title: Implementation of Submitting Applications to Kubernetes |
| 4 | +--- |
| 5 | + |
| 6 | + |
| 7 | +Similarly to YARN and Standalone mode, it is common for Spark applications to be deployed on Kubernetes through the |
| 8 | +`spark-submit` process. Applications are deployed on Kubernetes via sending YAML files to the Kubernetes API server. |
| 9 | +These YAML files declare the structure and behavior of the processes that will be run. However, such a declarative |
| 10 | +approach to application deployment differs considerably from how Spark applications are deployed via the `spark-submit` |
| 11 | +API. There are contracts provided by `spark-submit` that should work in Kubernetes in a consistent manner to the other |
| 12 | +cluster managers that `spark-submit` can deploy on. |
| 13 | + |
| 14 | +This document outlines the design of the **Kubernetes submission client**, which effectively serves as a *translation |
| 15 | +of options provided in spark-submit to a specification of one or more Kubernetes API resources that represent the |
| 16 | +Spark driver*. |
| 17 | + |
| 18 | +# Entry Point |
| 19 | + |
| 20 | +As with the other cluster managers, the user's invocation of `spark-submit` will eventually delegate to running |
| 21 | +`org.apache.spark.deploy.SparkSubmit#submit`. This method calls a main method that handles the submission logic |
| 22 | +for a specific type of cluster manager. The top level entry point for the Kubernetes submission logic is in |
| 23 | +`org.apache.spark.deploy.kubernetes.submit.Client#main()`. |
| 24 | + |
| 25 | +# Driver Configuration Steps |
| 26 | + |
| 27 | +In order to render submission parameters into the final Kubernetes driver pod specification, and do it in a scalable |
| 28 | +manner, the submission client breaks pod construction down into a |
| 29 | +series of configuration steps, each of which is responsible for handling some specific aspect of configuring the driver. |
| 30 | +A top level component then iterates through all of the steps to produce a final set of Kubernetes resources that are |
| 31 | +then deployed on the cluster. |
| 32 | + |
| 33 | +## Interface Definitions |
| 34 | + |
| 35 | +More formally, a configuration step must implement the following trait: |
| 36 | + |
| 37 | +```scala |
| 38 | +package org.apache.spark.deploy.kubernetes.submit.submitsteps |
| 39 | + |
| 40 | +/** |
| 41 | + * Represents a step in preparing the Kubernetes driver. |
| 42 | + */ |
| 43 | +private[spark] trait DriverConfigurationStep { |
| 44 | + |
| 45 | + /** |
| 46 | + * Apply some transformation to the previous state of the driver to add a new feature to it. |
| 47 | + */ |
| 48 | + def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec |
| 49 | +} |
| 50 | +``` |
| 51 | + |
| 52 | +A `DriverConfigurationStep` is thus a function that transforms a `KubernetesDriverSpec` into another |
| 53 | +`KubernetesDriverSpec`, by taking the original specification and making additions to the specification in accordance to |
| 54 | +the specific feature that step is responsible for. A `KubernetesDriverSpec` is a data structure with the following |
| 55 | +properties: |
| 56 | + |
| 57 | +```scala |
| 58 | +private[spark] case class KubernetesDriverSpec( |
| 59 | + driverPod: Pod, |
| 60 | + driverContainer: Container, |
| 61 | + otherKubernetesResources: Seq[HasMetadata], |
| 62 | + driverSparkConf: SparkConf) |
| 63 | +``` |
| 64 | + |
| 65 | +The `Pod` and `Container` classes are Java representations of Kubernetes pods and containers respectively, and the |
| 66 | +`HasMetadata` type corresponds to an arbitrary Kubernetes API resource such as a `Secret` or a `ConfigMap`. Kubernetes |
| 67 | +primitives are represented using an [open-source Java Kubernetes client](https://github.com/fabric8io/kubernetes-client). |
| 68 | +The `otherKubernetesResources` field represents Kubernetes resources that are required by the Spark application. For |
| 69 | +example, the driver may require a `ConfigMap` or `Secret` resource to be created that will be mounted into the driver |
| 70 | +container. |
| 71 | + |
| 72 | +## Requirements for Configuration Steps |
| 73 | + |
| 74 | +Configuration steps must be *independent*. A given configuration step should not be opinionated about the other |
| 75 | +configuration steps that are executed before or after it. By extension, configuration steps should be *strictly |
| 76 | +additive*. A given configuration step should not attempt to mutate an existing field nor remove fields set in the |
| 77 | +input driver specification. |
| 78 | + |
| 79 | +## Composition of Configuration Steps |
| 80 | + |
| 81 | +Finally, configuration steps are wired together by an **orchestrator**. The orchestrator effectively translates the |
| 82 | +parameters sent to `spark-submit` into the set of steps required to configure the final `KubernetesDriverSpec`. The |
| 83 | +top level submission client takes the final `KubernetesDriverSpec` object and builds the final requests to the |
| 84 | +Kubernetes API server to deploy the Kubernetes resources that comprise the Spark driver. The top level submission |
| 85 | +process can thus be expressed as follows in pseudo-code with roughly Scala syntax: |
| 86 | + |
| 87 | +```scala |
| 88 | +def runApplication(sparkSubmitArguments: SparkSubmitArguments) { |
| 89 | + val initialSpec = createEmptyDriverSpec() |
| 90 | + val orchestrator = new DriverConfigurationStepsOrchestrator(sparkSubmitArguments) |
| 91 | + val steps = orchestrator.getSubmissionSteps() |
| 92 | + var currentSpec = initialSpec |
| 93 | + // iteratively apply the configuration steps to build up the pod spec: |
| 94 | + for (step <- steps) { |
| 95 | + currentSpec = step.configureDriver(currentSpec) |
| 96 | + } |
| 97 | + // Put the container in the pod spec |
| 98 | + val resolvedPod = attachContainer(currentSpec.driverPod, currentSpec.driverContainer) |
| 99 | + kubernetes.create(resolvedPod + currentSpec.otherKubernetesResources) |
| 100 | +} |
| 101 | +``` |
| 102 | + |
| 103 | +## Writing a New Configuration Step |
| 104 | + |
| 105 | +All configuration steps should be placed in the `org.apache.spark.deploy.kubernetes.submit.submitsteps` package. |
| 106 | +Examples of other configuration steps can be found in this package as well. Ensure that the new configuration step is |
| 107 | +returned in `org.apache.spark.deploy.kubernetes.submit.DriverConfigurationStepsOrchestrator#getAllConfigurationSteps()`. |
| 108 | + |
| 109 | +# Dependency Management |
| 110 | + |
| 111 | +Spark applications typically depend on binaries and various configuration files which are hosted in various locations. |
| 112 | +Kubernetes applications typically bundle binary dependencies such as jars inside Docker images. However, Spark's API |
| 113 | +fundamentally allows dependencies to be provided from many other locations, including the submitter's local disk. |
| 114 | +These dependencies have to be deployed into the driver and executor containers before they run. This is challenging |
| 115 | +because unlike Hadoop YARN which requires co-deployment with an HDFS cluster, Kubernetes clusters do not have a |
| 116 | +large-scale persistent storage layer that would be available across every Kubernetes cluster. |
| 117 | + |
| 118 | +## Resource Staging Server |
| 119 | + |
| 120 | +The *resource staging server* is a lightweight daemon that serves as a file store for application dependencies. It has |
| 121 | +two endpoints which effectively correspond to putting files into the server and getting files out of the server. When |
| 122 | +files are put into the server, the server returns a unique identifier and a secret token in the response to the client. |
| 123 | +This identifier and secret token must be provided when a client makes a request to retrieve the files that were uploaded |
| 124 | +to the server. |
| 125 | + |
| 126 | +### Resource Staging Server API Definition |
| 127 | + |
| 128 | +The resource staging server has the following Scala API which would then be translated into HTTP endpoints via Jetty and |
| 129 | +JAX-RS. Associated structures passed as input and output are also defined below: |
| 130 | + |
| 131 | +```scala |
| 132 | +private[spark] trait ResourceStagingService { |
| 133 | + |
| 134 | + def uploadResources(resources: InputStream, resourcesOwner: StagedResourcesOwner): SubmittedResourceIdAndSecret |
| 135 | + def downloadResources(resourceId: String, resourceSecret: String): StreamingOutput |
| 136 | +} |
| 137 | + |
| 138 | +case class StagedResourcesOwner( |
| 139 | + ownerNamespace: String, |
| 140 | + ownerLabels: Map[String, String], |
| 141 | + ownerType: StagedResourcesOwnerType.OwnerType) |
| 142 | + |
| 143 | +// Pseudo-code to represent an enum |
| 144 | +enum StagedResourcesOwnerType.OwnerType = { Pod } |
| 145 | + |
| 146 | +case class SubmittedResourceIdAndSecret(resourceId: String, resourceSecret: String) |
| 147 | +``` |
| 148 | + |
| 149 | +Clients that send resources to the server do so in a streaming manner so that both the server and the client do not |
| 150 | +need to hold the entire resource bundle in memory. Aside from the notion of the `StagedResourcesOwner` that is provided |
| 151 | +on uploads and not for downloads, uploading is symmetrical to downloading. The significance of the |
| 152 | +`StagedResourcesOwner` is discussed below. |
| 153 | + |
| 154 | +### Cleaning Up Stale Resources |
| 155 | + |
| 156 | +The resource staging server is built to provide resources for the pods and containers in a Kubernetes cluster. These |
| 157 | +pods are ephemeral, so at some point there will be no need for the resources that were sent for a specific application. |
| 158 | +Clients indicate the set of resources that would be using a given resource bundle by providing a description of the |
| 159 | +resource's "owner". The `StagedResourceOwner` is this description, defining the owner as a Kubernetes API object in |
| 160 | +a given namespace and having a specific set of labels. |
| 161 | + |
| 162 | +The resource staging server keeps track of the resources that were sent to it. When the resource is first uploaded, it |
| 163 | +is marked as "unused". If the resource remains unused for a period of time, it is cleaned up. A resource is marked as |
| 164 | +"used" when a request is made to download it. After that, the server periodically checks the API server to see if any |
| 165 | +Kubernetes API objects exist that match the description of the owner. If no such objects exist, then resource staging |
| 166 | +server cleans up the uploaded resource. See `org.apache.spark.deploy.rest.kubernetes.StagedResourcesCleaner` for the |
| 167 | +code that manages the resource's lifecycle. |
| 168 | + |
| 169 | +A resource owner can currently only be a pod, but hypothetically one could want to tie the lifetime of a resource to the |
| 170 | +lifetime of many pods under a higher level Kubernetes object like a Deployment or a StatefulSet, all of which depend on |
| 171 | +the uploaded resource. The resource staging server's API can be extended to tie ownership of a resource to any |
| 172 | +Kubernetes API object type, as long as we update the `StagedResourcesOwnerType` enumeration accordingly. |
| 173 | + |
| 174 | +### Usage in Spark |
| 175 | + |
| 176 | +Spark-submit supports adding jars and files by passing `--jars` and `--files` to `spark-submit` respectively. The spark |
| 177 | +configurations `spark.jars` and `spark.files` can also be set to provide this information. The submission client |
| 178 | +determines the list of jars and files that the application needs, and it determines if any of them are files being sent |
| 179 | +from the submitter's local machine. If any files are being sent from the local machine, the user must have specified a |
| 180 | +URL for the resource staging server to send the files to. |
| 181 | + |
| 182 | +Local jars and files are compacted into a tarball which are then uploaded to the resource staging server. The submission |
| 183 | +client then knows the secret token that the driver and executors must use to download the files again. These secrets |
| 184 | +are mounted into an [init-container](https://kubernetes.io/docs/concepts/workloads/pods/init-containers/) |
| 185 | +that runs before the driver and executor processes run, and the init-container |
| 186 | +downloads the uploaded resources from the resource staging server. |
| 187 | + |
| 188 | +### Other Considered Alternatives |
| 189 | + |
| 190 | +The resource staging server was considered the best option among other alternative solutions to this problem. |
| 191 | + |
| 192 | +A first implementation effectively included the resource staging server in the driver container itself. The driver |
| 193 | +container ran a custom command that opened an HTTP endpoint and waited for the submission client to send resources to |
| 194 | +it. The server would then run the driver application after it had received the resources from the user's local |
| 195 | +machine. The problem with this approach is that the submission client needs to deploy the driver in such a way that the |
| 196 | +driver itself would be reachable from outside of the cluster, but it is difficult for an automated framework which is |
| 197 | +not aware of the cluster's configuration to expose an arbitrary pod in a generic way. The resource staging server allows |
| 198 | +a cluster administrator to expose the resource staging server in a manner that makes sense for their cluster, such as |
| 199 | +with an Ingress or with a NodePort service. |
| 200 | + |
| 201 | +It is also impossible to use Kubernetes API objects like Secrets or ConfigMaps to store application binaries. These |
| 202 | +objects require their contents to be small so that they can fit in etcd. |
| 203 | + |
| 204 | +Finally, as mentioned before, the submission client should not be opinionated about storing dependencies in a |
| 205 | +distributed storage system like HDFS, because not all Kubernetes clusters will have the same types of persistent storage |
| 206 | +layers. Spark supports fetching jars directly from distributed storage layers though, so users can feel free to manually |
| 207 | +push their dependencies to their appropriate systems and refer to them by their remote URIs in the submission request. |
| 208 | + |
| 209 | +## Init-Containers |
| 210 | + |
| 211 | +The driver pod and executor pods both use [init-containers](https://kubernetes.io/docs/concepts/workloads/pods/init-containers/) |
| 212 | +to localize resources before the driver and |
| 213 | +executor processes launch. As mentioned before, the init-container fetches dependencies from the resource staging |
| 214 | +server. However, even if the resource staging server is not being used, files still need to be localized from remote |
| 215 | +locations such as HDFS clusters or HTTP file servers. The init-container will fetch these dependencies accordingly as |
| 216 | +well. |
| 217 | + |
| 218 | +Init-containers were preferred over fetching the dependencies in the main container primarily because this allows the |
| 219 | +main container's runtime commands to be simplified. Using init-containers to fetch these remote dependencies allows the |
| 220 | +main image command to simply be an invocation of Java that runs the user's main class directly. The execution of the |
| 221 | +file localizer process can also be shared by both the driver and the executor images without needing to be copied |
| 222 | +into both image commands. Finally, it becomes easier to debug localization failures as they will be easily spotted as |
| 223 | +being a failure in the pod's initialization lifecycle phase. |
| 224 | + |
| 225 | +# Future Work |
| 226 | + |
| 227 | +* The driver's pod specification should be highly customizable, to the point where users may want to specify a template |
| 228 | +pod spec in a YAML file: https://github.com/apache-spark-on-k8s/spark/issues/38. |
| 229 | +* The resource staging server can be backed by a distributed file store like HDFS to improve robustness and scalability. |
| 230 | +* Additional driver bootstrap steps need to be added to support communication with Kerberized HDFS clusters: |
| 231 | + * https://github.com/apache-spark-on-k8s/spark/pull/391 |
0 commit comments