Skip to content

Commit 4822372

Browse files
committed
[DOP-29496] Set Spark session resources based on master
1 parent 4f14fde commit 4822372

File tree

1 file changed

+19
-7
lines changed

1 file changed

+19
-7
lines changed

syncmaster/worker/spark.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ def get_worker_spark_session(
3434
name = run.transfer.group.name + "_" + run.transfer.name # noqa: WPS336
3535
spark_builder = SparkSession.builder.appName(f"SyncMaster__{name}")
3636

37+
master = settings.spark_session_default_config.get("spark.master")
3738
spark_session_config = settings.spark_session_default_config.copy()
38-
spark_session_config.update(get_spark_session_conf(source, target, run.transfer.resources))
39+
spark_session_config.update(get_spark_session_conf(master, source, target, run.transfer.resources))
3940

4041
for k, v in spark_session_config.items():
4142
spark_builder = spark_builder.config(k, v)
@@ -109,23 +110,34 @@ def get_excluded_packages() -> list[str]:
109110

110111

111112
def get_spark_session_conf(
113+
spark_master: str | None,
112114
source: ConnectionDTO,
113115
target: ConnectionDTO,
114116
resources: dict,
115117
) -> dict:
116118
maven_packages: list[str] = get_packages(connection_types={source.type, target.type})
117119
excluded_packages: list[str] = get_excluded_packages()
118120

119-
memory_mb = math.ceil(resources["ram_bytes_per_task"] / 1024 / 1024)
120-
config = {
121+
tasks: int = resources["max_parallel_tasks"]
122+
cores_per_task: int = resources["cpu_cores_per_task"]
123+
# Spark expects memory to be in MB
124+
memory_mb: int = math.ceil(resources["ram_bytes_per_task"] / 1024 / 1024)
125+
126+
config: dict[str, str | int] = {
121127
"spark.sql.pyspark.jvmStacktrace.enabled": "true",
122128
"spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs": "false",
123-
"spark.executor.cores": resources["cpu_cores_per_task"],
124-
# Spark expects memory to be in MB
125-
"spark.executor.memory": f"{memory_mb}M",
126-
"spark.executor.instances": resources["max_parallel_tasks"],
127129
}
128130

131+
if spark_master and spark_master.startswith("local"):
132+
config["spark.master"] = f"local[{tasks}]"
133+
config["spark.driver.memory"] = f"{memory_mb}M"
134+
config["spark.default.parallelism"] = tasks * cores_per_task
135+
else:
136+
config["spark.executor.memory"] = f"{memory_mb}M"
137+
config["spark.executor.cores"] = cores_per_task
138+
config["spark.executor.instances"] = tasks
139+
config["spark.dynamicAllocation.maxExecutors"] = tasks
140+
129141
if maven_packages:
130142
log.debug("Include Maven packages: %s", maven_packages)
131143
config["spark.jars.packages"] = ",".join(maven_packages)

0 commit comments

Comments
 (0)