Skip to content

Commit 791d562

Browse files
committed
[DOP-30579] Allow using SyncMaster worker image as spark.kubernetes.container.image
1 parent f0d42ad commit 791d562

File tree

5 files changed

+119
-8
lines changed

5 files changed

+119
-8
lines changed

docker/Dockerfile.worker

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ ARG PYTHON_VERSION=3.13
22
FROM python:$PYTHON_VERSION-slim-bookworm AS base
33

44
RUN apt-get update && apt-get install -y --no-install-recommends \
5+
tini \
56
openjdk-17-jdk-headless \
67
# required for HDFS/Hive with Kerberos enabled
78
krb5-user \

docker/entrypoint_worker.sh

Lines changed: 98 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,101 @@
11
#!/usr/bin/env bash
2-
set -e
32

4-
# https://docs.celeryq.dev/en/stable/userguide/workers.html#max-tasks-per-child-setting
5-
# Required to start each Celery task in separated process, avoiding issues with global Spark session object
3+
# Based on https://github.com/apache/spark/blob/v3.5.7/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
64

7-
# exec is required to forward all signals to the main process
8-
exec python -m celery -A syncmaster.worker.celery worker --max-tasks-per-child=1 "$@"
5+
set -ex
6+
7+
if [ -z "$JAVA_HOME" ]; then
8+
JAVA_HOME=$(java -XshowSettings:properties -version 2>&1 > /dev/null | grep 'java.home' | awk '{print $3}')
9+
fi
10+
if [ -z "$SPARK_HOME" ]; then
11+
SPARK_HOME=$(python -c 'import pyspark; import pathlib; print(pathlib.Path(pyspark.__file__).parent.resolve())')
12+
fi
13+
14+
SPARK_CLASSPATH="$SPARK_CLASSPATH:${SPARK_HOME}/jars/*"
15+
16+
env | grep SPARK_JAVA_OPT_ | sort -t_ -k4 -n | sed 's/[^=]*=\(.*\)/\1/g' > java_opts.txt
17+
if [ "$(command -v readarray)" ]; then
18+
readarray -t SPARK_EXECUTOR_JAVA_OPTS < java_opts.txt
19+
else
20+
SPARK_EXECUTOR_JAVA_OPTS=("${(@f)$(< java_opts.txt)}")
21+
fi
22+
23+
if [ -n "$SPARK_EXTRA_CLASSPATH" ]; then
24+
SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_EXTRA_CLASSPATH"
25+
fi
26+
27+
if ! [ -z ${PYSPARK_PYTHON+x} ]; then
28+
export PYSPARK_PYTHON
29+
fi
30+
if ! [ -z ${PYSPARK_DRIVER_PYTHON+x} ]; then
31+
export PYSPARK_DRIVER_PYTHON
32+
fi
33+
34+
# If HADOOP_HOME is set and SPARK_DIST_CLASSPATH is not set, set it here so Hadoop jars are available to the executor.
35+
# It does not set SPARK_DIST_CLASSPATH if already set, to avoid overriding customizations of this value from elsewhere e.g. Docker/K8s.
36+
if [ -n "${HADOOP_HOME}" ] && [ -z "${SPARK_DIST_CLASSPATH}" ]; then
37+
export SPARK_DIST_CLASSPATH="$($HADOOP_HOME/bin/hadoop classpath)"
38+
fi
39+
40+
if ! [ -z ${HADOOP_CONF_DIR+x} ]; then
41+
SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH";
42+
fi
43+
44+
if ! [ -z ${SPARK_CONF_DIR+x} ]; then
45+
SPARK_CLASSPATH="$SPARK_CONF_DIR:$SPARK_CLASSPATH";
46+
elif ! [ -z ${SPARK_HOME+x} ]; then
47+
SPARK_CLASSPATH="$SPARK_HOME/conf:$SPARK_CLASSPATH";
48+
fi
49+
50+
# Place IVY2 jars at the end of classpath to avoid conflicts with Spark and Hadoop jars
51+
IVY2_HOME=$(realpath ~/.ivy2)
52+
SPARK_CLASSPATH="$SPARK_CLASSPATH:${IVY2_HOME}/jars/*"
53+
54+
# SPARK-43540: add current working directory into executor classpath
55+
SPARK_CLASSPATH="$SPARK_CLASSPATH:$PWD"
56+
57+
case "$1" in
58+
driver)
59+
shift 1
60+
CMD=(
61+
"$SPARK_HOME/bin/spark-submit"
62+
--conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
63+
--conf "spark.executorEnv.SPARK_DRIVER_POD_IP=$SPARK_DRIVER_BIND_ADDRESS"
64+
--deploy-mode client
65+
"$@"
66+
)
67+
;;
68+
executor)
69+
shift 1
70+
CMD=(
71+
${JAVA_HOME}/bin/java
72+
"${SPARK_EXECUTOR_JAVA_OPTS[@]}"
73+
-Xms$SPARK_EXECUTOR_MEMORY
74+
-Xmx$SPARK_EXECUTOR_MEMORY
75+
-cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH"
76+
org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBackend
77+
--driver-url $SPARK_DRIVER_URL
78+
--executor-id $SPARK_EXECUTOR_ID
79+
--cores $SPARK_EXECUTOR_CORES
80+
--app-id $SPARK_APPLICATION_ID
81+
--hostname $SPARK_EXECUTOR_POD_IP
82+
--resourceProfileId $SPARK_RESOURCE_PROFILE_ID
83+
--podName $SPARK_EXECUTOR_POD_NAME
84+
)
85+
;;
86+
87+
*)
88+
# https://docs.celeryq.dev/en/stable/userguide/workers.html#max-tasks-per-child-setting
89+
# Required to start each Celery task in separated process, avoiding issues with global Spark session object
90+
CMD=(
91+
python -m celery
92+
-A syncmaster.worker.celery
93+
worker
94+
--max-tasks-per-child=1
95+
"$@"
96+
)
97+
;;
98+
esac
99+
100+
# Execute the container CMD under tini for better hygiene
101+
exec /usr/bin/tini -s -- "${CMD[@]}"
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Allow using SyncMaster worker image as ``spark.kubernetes.container.image``.

docs/reference/worker/create_spark_session.rst

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,20 @@ It is possible to alter default `Spark Session configuration <https://spark.apac
2323
spark.sql.pyspark.jvmStacktrace.enabled: true
2424
spark.ui.enabled: false
2525
26+
For example, to use SyncMaster on Spark + Kubernetes, you can use worker image for Spark executor containers:
27+
28+
.. code-block:: yaml
29+
:caption: config.yml
30+
31+
worker:
32+
spark_session_default_config:
33+
spark.master: k8s://https://kubernetes.default.svc
34+
spark.driver.bindAddress: 0.0.0.0
35+
spark.kubernetes.authenticate.driver.serviceAccountName: spark
36+
spark.sql.pyspark.jvmStacktrace.enabled: true
37+
spark.kubernetes.container.image: mtsrus/syncmaster-worker:{TAG}
38+
39+
2640
Custom Spark session factory
2741
----------------------------
2842

syncmaster/worker/spark.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,14 @@ def get_spark_session_conf(
131131
if spark_master and spark_master.startswith("local"):
132132
config["spark.master"] = f"local[{tasks}]"
133133
config["spark.driver.memory"] = f"{memory_mb}M"
134-
config["spark.default.parallelism"] = tasks * cores_per_task
135134
else:
136135
config["spark.executor.memory"] = f"{memory_mb}M"
137-
config["spark.executor.cores"] = cores_per_task
138136
config["spark.executor.instances"] = tasks
139-
config["spark.dynamicAllocation.maxExecutors"] = tasks
137+
138+
config["spark.executor.cores"] = cores_per_task
139+
config["spark.default.parallelism"] = tasks * cores_per_task
140+
config["spark.dynamicAllocation.maxExecutors"] = tasks # yarn
141+
config["spark.kubernetes.executor.limit.cores"] = cores_per_task # k8s
140142

141143
if maven_packages:
142144
log.debug("Include Maven packages: %s", maven_packages)

0 commit comments

Comments
 (0)