Spark Job Submit Tool (SJST) is a tool to submit Spark jobs on Aurora. SJST enables OAP MLlib, which takes advantage of Intel® oneAPI Data Analytics Library (oneDAL) and Intel® oneAPI Collective Communications Library (oneCCL) to implement highly optimized machine learning algorithms. It can get most out of CPU and GPU capabilities and take efficient communication patterns in multi-node multi-GPU clusters.
The following is a quick start for SJST with an example about how to run K-Means program with GPU, and a detailed configuration description.
SJST is available at /lus/flare/projects/Aurora_deployment/spark/spark-job. The component of SJST is listed in the table below.
spark-job
├── bin/ // The scripts that setup spark cluster and submit jobs
├── conf-to-your-submit-dir/ // Configurations about workers and executors
│ // You'll mainly work with this directory.
├── example/ // Example for testing and reference
├── jars/ // Files for loading OAP MLlib
├── LICENSE
├── README.docx
└── README.txt
The following sections give a detailed example about how to submit a job.
The spark-job/conf-to-your-submit-dir/ directory contains configuration files you want. You need to make a copy in your working directory.
For example:
$ mkdir ~/spark_work_home
$ cp /lus/flare/projects/Aurora_deployment/spark/spark-job/conf-to-your-submit-dir/* ~/spark_work_home
After setup configurations, you can use the scripts in spark-job/bin/ to submit jobs from your working directory. Please make sure your configuration files are placed properly.
Here are the steps to submit the dense K-Means job with 2 nodes to queue “lustre_scaling” from Aurora login node. The data path given below is a 192G CSV file from DAOS
$ cd ~/spark_work_home
# Submit a Spark job with 2 nodes to read data from DAOS and run K-Means example.
$ /lus/flare/projects/Aurora_deployment/spark/spark-job/bin/submit-spark.sh -A Aurora_deployment -l walltime=2:00:00 -l select=2 -l filesystems=flare:daos_user -q lustre_scaling /lus/flare/projects/Aurora_deployment/spark/spark-job/example/kmeans-pyspark.py daos://Intel2/hadoop_fs/HiBench/Kmeans/Input/36000000
# Submit a Spark job with 2 nodes to read data from Lustre Filesystem and run K-Means example.
$ /lus/flare/projects/Aurora_deployment/spark/spark-job/bin/submit-spark.sh -A Aurora_deployment -l walltime=2:00:00 -l select=2 -l filesystems=flare:daos_user -q lustre_scaling /lus/flare/projects/Aurora_deployment/spark/spark-job/example/kmeans-pyspark.py file:///lus/flare/projects/Aurora_deployment/spark/DataRoot/HiBench/Kmeans/Input/36000000
After submitted successfully, you can get your job ID which is generated by Aurora. It is 27643.amn-0001 in this example. The following shows a successful submission:
# Submitting job: /lus/flare/projects/Aurora_deployment/spark/spark-job/example/kmeans-pyspark.py daos://Intel/hadoop_fs2/HiBench/Kmeans/Input/36000000
debug 1286330.aurora-pbs-0001.hostmgmt.cm.aurora.alcf.anl.gov
debug -A Aurora_deployment -l select=2 -l walltime=2:00:00 -l filesystems=flare:daos_user -q lustre_scaling -v SPARKJOB_SCRIPTS_DIR=/lus/flare/projects/Aurora_deployment/spark/spark-job/bin,SPARKJOB_CONFIG_DIR=/home/damon/spark_work_home,SPARKJOB_INTERACTIVE=0,SPARKJOB_SCRIPTMODE=0,SPARKJOB_OUTPUT_DIR=/home/damon/spark_work_home,SPARKJOB_SEPARATE_MASTER=0,SPARKJOB_OAPML=1,SPARKJOB_DAOS=1,SPARKJOB_ARG=/lus/flare/projects/Aurora_deployment/spark/spark-job/example/kmeans-pyspark.py^daos://Intel2/hadoop_fs/HiBench/Kmeans/Input/36000000 -o /home/damon/spark_work_home -e /home/damon/spark_work_home /lus/flare/projects/Aurora_deployment/spark/spark-job/bin/start-spark.sh
# Submitted
SPARKJOB_JOBID=1286330.aurora-pbs-0001.hostmgmt.cm.aurora.alcf.anl.gov
If the submission failed, the output will be like the following:
# Submitting job: /lus/flare/projects/Aurora_deployment/spark/spark-job/example/kmeans-pyspark.py daos://Intel2/hadoop_fs/HiBench/Kmeans/Input/36000000
qsub: would exceed queue generic's per-user limit of jobs in 'Q' state
debug
debug -A Aurora_deployment -l select=2 -l walltime=2:00:00 -l filesystems=flare:daos_user -q lustre_scaling -v SPARKJOB_SCRIPTS_DIR=/lus/flare/projects/Aurora_deployment/spark/spark-job/bin,SPARKJOB_CONFIG_DIR=/home/damon/spark_work_home,SPARKJOB_INTERACTIVE=0,SPARKJOB_SCRIPTMODE=0,SPARKJOB_OUTPUT_DIR=/home/damon/spark_work_home,SPARKJOB_SEPARATE_MASTER=0,SPARKJOB_OAPML=1,SPARKJOB_DAOS=1,SPARKJOB_ARG=/lus/flare/projects/Aurora_deployment/spark/spark-job/example/kmeans-pyspark.py^daos://Intel2/hadoop_fs/HiBench/Kmeans/Input/36000000 -o /home/damon/spark_work_home -e /home/damon/spark_work_home /lus/flare/projects/Aurora_deployment/spark/spark-job/bin/start-spark.sh
# Submitting failed.
After setup configurations, you can use the scripts in spark-job/bin/ to submit jobs from your working directory. Please make sure your configuration files are placed properly.
Here are the steps to enter the interactive interface with 2 nodes to queue “lustre_scaling” from Aurora login node.
$ cd ~/spark_work_home
$ /lus/flare/projects/Aurora_deployment/spark/spark-job/bin/submit-spark.sh -A Aurora_deployment -l walltime=2:00:00 -l select=2 -l filesystems=flare:daos_user -q lustre_scaling -I
After submitted successfully, you can get your job ID which is generated by Aurora. It is 67690.amn-0001 in this example. The following shows a successful submission:
Submitting an interactive job and wait for at most 1800 sec.
debug 67690.amn-0001
debug -A Aurora_deployment -l select=2 -l walltime=2:00:00 -l filesystems=flare:daos_user -q lustre_scaling -v SPARKJOB_SCRIPTS_DIR=/lus/flare/projects/Aurora_deployment/spark/spark-job/bin,SPARKJOB_CONFIG_DIR=/home/damon/spark_work_home,SPARKJOB_INTERACTIVE=1,SPARKJOB_SCRIPTMODE=0,SPARKJOB_OUTPUT_DIR=/home/damon/spark_work_home,SPARKJOB_SEPARATE_MASTER=0,SPARKJOB_OAPML=1,SPARKJOB_DAOS=1,SPARKJOB_ARG= -o /home/damon/spark_work_home -e /home/damon/spark_work_home /lus/flare/projects/Aurora_deployment/spark/spark-job/bin/start-spark.sh
# Submitted
SPARKJOB_JOBID=67690.amn-0001
need spark job host: 0
Waiting for Spark to launch by checking /home/damon/spark_work_home/67690.amn-0001
# Spark is now running (SPARKJOB_JOBID=67690.amn-0001) on:
# x1921c1s2b0n0.hostmgmt2000.cm.americas.sgi.com
# x1921c1s5b0n0.hostmgmt2000.cm.americas.sgi.com
declare -x SPARK_MASTER="spark://x1921c1s2b0n0.hostmgmt.cm.americas.sgi.com:7077"
# Spawning bash on host: x1921c1s2b0n0
Adding oap mllib to additional class path
need spark job host: 1
sourced /lus/flare/projects/Aurora_deployment/spark/spark-job/bin/env_gpu.sh
need spark job host: 1
loading DAOS module
sourced /home/damon/spark_work_home/env_aurora.sh
sourced /lus/flare/projects/Aurora_deployment/spark/spark-job/bin/env_spark_daos.sh
GPU Options: --conf spark.oap.mllib.device=GPU
sourced /home/damon/spark_work_home/env_local.sh
After entering the interactive interface, you can run command line.
$ spark-submit /lus/flare/projects/Aurora_deployment/spark/spark-job/example/kmeans-pyspark.py daos://Intel2/hadoop_fs/HiBench/Kmeans/Input/36000000
After submitting jobs, Aurora will allocate resources and schedule your jobs. You can check your job with command qstat.
For the example above, the output will be like this:
// The job is waiting for resources.
$ qstat 27643.amn-0001
Job id Name User Time Use S Queue
---------------- ---------------- ---------------- -------- - -----
27643.amn-0001 start-spark.sh damon 0 Q lustre_scaling
// The job is running.
$ qstat 27643.amn-0001
Job id Name User Time Use S Queue
---------------- ---------------- ---------------- -------- - -----
27643.amn-0001 start-spark.sh damon 0 R lustre_scaling
// The job is end.
$ qstat 27643.amn-0001
Job id Name User Time Use S Queue
---------------- ---------------- ---------------- -------- - -----
27643.amn-0001 start-spark.sh damon 0 E lustre_scaling
To check your job status periodically, you can use watch command. After the job ends, you can use Ctrl-C to stop "watch" process.
$ watch -n 2 qstat 27643.amn-0001
Job id Name User Time Use S Queue
---------------- ---------------- ---------------- -------- - -----
27643.amn-0001 start-spark.sh damon 0 Q lustre_scaling
The output of your job is stored in your working directory and here is an example.
spark_work_home
├── 27643.amn-0001/
│ └── conf/ // Spark conf like spark-env.sh, spark-default.xml and slaves
│ └── logs/ // Spark master and worker logs
│ └── app-20230301080732-0000/ // After Spark submitted successfully, you will get application ID that contains stderr and stdout log for each executor.
├── 27643.amn-0001.ER // All stderr output for your job.
└── 27643.amn-0001.OU // All stdout output for your job.
The results of dense K-Means example are at the end of 27643.amn-0001.OU.
submit-spark.sh provides a few arguments. You can get detailed info about the arguments and examples with below command.
$ /lus/flare/projects/Aurora_deployment/spark/spark-job/bin/submit-spark.sh -h
Spark Job Submit Tool v1.0.3 for Aurora
Usage:
submit-spark.sh [options] JOBFILE [arguments ...]
JOBFILE can be:
script.py pyspark scripts
run-example examplename run Spark examples, like "SparkPi"
--class classname example.jar run Spark job "classname" wrapped in example.jar
shell-script.sh when option "-s" is specified
Required options:
-l walltime=<v> Max run time, e.g., -l walltime=30:00 for running 30 minutes at most
-l select=<v> Job node count, e.g., -l select=2 for requesting 2 nodes
-l filesystems=<v> Filesystem type, e.g., -l filesystems=flare:daos_user for requesting flare and daos filesystems
-q QUEUE Queue name
Optional options:
-A PROJECT Allocation name
-o OUTPUTDIR Directory for job output files (default: current dir)
-s Enable shell script mode
-y Spark master uses a separate node
-I Start an interactive ssh session
-b Prefer Spark built-in mllib to default OAP mllib
-n Run without DAOS loaded
-h Print this help messages
Example:
submit-spark.sh -A Aurora_deployment -l walltime=60 -l select=2 -l filesystems=flare:daos_user -q workq kmeans-pyspark.py daos://pool0/cont1/kmeans/input/libsvm 10
submit-spark.sh -A Aurora_deployment -l walltime=30 -l select=1 -l filesystems=flare:daos_user -q workq -o output-dir run-example SparkPi
submit-spark.sh -A Aurora_deployment -I -l walltime=30 -l select=2 -l filesystems=flare:daos_user -q workq --class com.intel.jlse.ml.KMeansExample example/jlse-ml-1.0-SNAPSHOT.jar daos://pool0/cont1/kmeans/input/csv csv 10
submit-spark.sh -A Aurora_deployment -l walltime=30 -l select=1 -l filesystems=flare:daos_user -q workq -s example/test_script.sh job.log
$ /lus/flare/projects/Aurora_deployment/spark/spark-job/bin/submit-spark.sh -A Aurora_deployment -l walltime=2:00:00 -l select=1 -l filesystems=flare:daos_user -q lustre_scaling -s test_script.sh
$ /lus/flare/projects/Aurora_deployment/spark/spark-job/bin/submit-spark.sh -A Aurora_deployment -l walltime=2:00:00 -l select=1 -l filesystems=flare:daos_user -q lustre_scaling
$ ./test_script.sh
This section introduces how to configure SJST and make Spark work as you want. By default, SJST provides env_aurora.sh and env_local.sh that already configured resources properly for Aurora. If you want to customize resources, the following information may help you.
From here you can change Spark worker's CPU, memory and GPU resources.
Configuration Option | Description |
---|---|
SPARK_WORKER_CORES | Maximum number of cores for each worker |
SPARK_WORKER_MEMORY | Maximum memory for each worker |
GPU_RESOURCE_FILE | Path to resources file which is used to find various resources while worker starting up |
GPU_WORKER_AMOUNT | Amount of GPUs resource each worker to use. |
From here you can change Spark and OAP MLlib configurations. For more Spark configuration details, you can refer to Spark configuration docs.
Configuration Option | Description |
---|---|
spark.oap.mllib.device | Select compute device as CPU or GPU. default value is GPU. (Currently OAP MLlib Jar only supports GPU, CPU support will be added in the future releases.) |
spark.executor.cores | The number of cores to use on each executor. |
spark.driver.memory | Amount of memory to use for the driver process. |
spark.executor.memory | Amount of memory to use per executor process. |
spark.executor.instances | If --num-executors (or spark.executor.instances ) is set and larger than this value, it will be used as the initial number of executors. |
spark.worker.resourcesFile | Path to resource file which is used to find various resources while worker starting up. default value is $GPU_RESOURCE_FILE |
spark.worker.resource.gpu.amount | Amount of a particular resource to use on the worker. default value is $GPU_WORKER_AMOUNT |
spark.executor.resource.gpu.amount | Amount of GPUs resource for each executor |
spark.task.resource.gpu.amount | Amount of GPUs resource for each task |
spark.driver.extraJavaOptions | A string of extra JVM options to pass to driver. |
spark.executor.extraJavaOptions | A string of extra JVM options to pass to executors. |
spark.executor.extraClassPath | Extra classpath entries to prepend to the classpath of executors. OAP MLlib Jar path is added here. |
spark.driver.extraClassPath | Extra classpath entries to prepend to the classpath of the driver. OAP MLlib Jar path is added here. |
spark.eventLog.enabled | Whether to log Spark events. |
spark.eventLog.dir | Base directory in which Spark events are logged. |
Here is an example that prioritizes making the dense K-Means example most efficient.
Suppose that we have a cluster as bellow.
Cluster | Parameters |
---|---|
Node | 3 |
Memory per node | 1024G |
Physical cores | 104 |
Logical cores | 208 |
GPUs | 12 |
We want to ensure each spark executor corresponds to one GPU which is most efficient. Therefore, we want to launch 12 executors and make each executor get enough resources to run the dense K-Means example. We will set 96 physical cores and 840G memory for the worker (leaving some cores and memory for the OS). Meanwhile we set each executor uses 8(96/12) logical cores and 70G (840G/12) memory. Then we come up with the configuration files as bellow.
# set Spark Worker resources
export SPARK_WORKER_CORES=96 # Maximum number of cores that each worker can get
export SPARK_WORKER_MEMORY=840G # Maximum memory that each worker can get
# set GPU options
export GPU_RESOURCE_FILE=$SPARKJOB_CONFIG_DIR/gpuResourceFile_aurora.json # Path to resources file which is used to find various resources while worker starting up
export GPU_WORKER_AMOUNT=12
spark.driver.memory 20g # each driver has 20g memory
spark.executor.cores 8 # each executor has 8 (96/12) cores
spark.executor.memory 70g # each executor has 70g (840G/12) memory
spark.worker.resource.gpu.amount 12 # each node has 1 worker and each worker has 12 GPUs
spark.executor.resource.gpu.amount 1 # each executor has 1 GPU
spark.executor.instances 36 # 36 (12 x 3) executors in total
spark.task.resource.gpu.amount 0.125 # Each task uses 1/8 of a GPU. This value should be determined based on your **target task concurrency**. For example, if each node has 12 GPUs and you want 96 concurrent GPU tasks per node, then spark.task.resource.gpu.amount = 12 / 96 = 0.125. In this case, 8 tasks share each GPU, allowing better parallelism for lightweight stages like preprocessing.