Skip to content

Commit 2fc01f5

Browse files
committed
[DOP-29498] Allow overriding Spark session config
1 parent 4bf877d commit 2fc01f5

File tree

6 files changed

+64
-11
lines changed

6 files changed

+64
-11
lines changed

docker/download_maven_packages.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def get_worker_spark_session_for_docker(connection_types: set[str]) -> SparkSess
4141
"""
4242
from pyspark.sql import SparkSession
4343

44-
spark_builder = SparkSession.builder.appName("syncmaster_jar_downloader").master("local")
44+
spark_builder = SparkSession.builder.appName("syncmaster_jar_downloader").master("local[1]")
4545

4646
for k, v in get_spark_session_conf_for_docker_image(connection_types).items():
4747
spark_builder = spark_builder.config(k, v)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
Allow passing default Spark session config via worker settings:
2+
3+
.. code-block:: yaml
4+
:caption: config.yml
5+
6+
worker:
7+
spark_session_default_config:
8+
spark.master: local
9+
spark.driver.host: 127.0.0.1
10+
spark.driver.bindAddress: 0.0.0.0
11+
spark.sql.pyspark.jvmStacktrace.enabled: true
12+
spark.ui.enabled: false

docs/reference/worker/create_spark_session.rst

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,32 @@
11
.. _worker-create-spark-session:
22

3-
Altering Spark session settings
4-
===============================
3+
Configuring Spark session
4+
=========================
55

66
SyncMaster Worker creates `SparkSession <https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession>`_ for each Run.
7-
By default, SparkSession is created with ``master=local``, all required .jar packages for specific DB/FileSystem types, and limiter by transfer resources.
87

9-
It is possible to alter SparkSession config by providing custom function:
8+
By default, SparkSession is created with ``master=local``, including all required .jar packages for DB/FileSystem types, and limited by transfer resources.
9+
10+
Custom Spark session configuration
11+
----------------------------------
12+
13+
It is possible to alter default `Spark Session configuration <https://spark.apache.org/docs/latest/configuration.html>`_ worker settings:
14+
15+
.. code-block:: yaml
16+
:caption: config.yml
17+
18+
worker:
19+
spark_session_default_config:
20+
spark.master: local
21+
spark.driver.host: 127.0.0.1
22+
spark.driver.bindAddress: 0.0.0.0
23+
spark.sql.pyspark.jvmStacktrace.enabled: true
24+
spark.ui.enabled: false
25+
26+
Custom Spark session factory
27+
----------------------------
28+
29+
It is also possible to use custom function which returns ``SparkSession`` object:
1030

1131
.. code-block:: yaml
1232
:caption: config.yml
@@ -21,17 +41,19 @@ Here is a function example:
2141
2242
from syncmaster.db.models import Run
2343
from syncmaster.dto.connections import ConnectionDTO
44+
from syncmaster.worker.settings import WorkerSettings
2445
from pyspark.sql import SparkSession
2546
2647
def create_custom_spark_session(
2748
run: Run,
2849
source: ConnectionDTO,
2950
target: ConnectionDTO,
51+
settings: WorkerSettings,
3052
) -> SparkSession:
3153
# any custom code returning SparkSession object
3254
return SparkSession.builde.config(...).getOrCreate()
3355
34-
Module with custom function should be placed in the same Docker image or Python virtual environment used by SyncMaster worker.
56+
Module with custom function should be placed into the same Docker image or Python virtual environment used by SyncMaster worker.
3557

3658
.. note::
3759

syncmaster/worker/controller.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,11 @@ def __init__(
203203
@slot
204204
def perform_transfer(self) -> None:
205205
try:
206-
spark = self.settings.worker.CREATE_SPARK_SESSION_FUNCTION(
206+
spark = self.settings.worker.create_spark_session_function(
207207
run=self.run,
208208
source=self.source_handler.connection_dto,
209209
target=self.target_handler.connection_dto,
210+
settings=self.settings.worker,
210211
)
211212

212213
with spark:

syncmaster/worker/settings/__init__.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Any
4+
35
from pydantic import BaseModel, Field
46
from pydantic.types import ImportString
57

@@ -24,14 +26,25 @@ class WorkerSettings(BaseModel):
2426
:caption: config.yml
2527
2628
worker:
27-
log_url_template: https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }}
29+
log_url_template: "https://logs.location.example.com/syncmaster-worker?correlation_id={{ correlation_id }}&run_id={{ run.id }}"
30+
2831
create_spark_session_function: custom_syncmaster.spark.get_worker_spark_session
32+
spark_session_default_config:
33+
spark.master: local
34+
spark.driver.host: 127.0.0.1
35+
spark.driver.bindAddress: 0.0.0.0
36+
spark.sql.pyspark.jvmStacktrace.enabled: true
37+
spark.ui.enabled: false
2938
"""
3039

31-
CREATE_SPARK_SESSION_FUNCTION: ImportString = Field(
40+
create_spark_session_function: ImportString = Field(
3241
"syncmaster.worker.spark.get_worker_spark_session",
3342
description="Function to create Spark session for worker",
3443
)
44+
spark_session_default_config: dict[str, Any] = Field(
45+
default_factory=dict,
46+
description="Default Spark session configuration",
47+
)
3548
log_url_template: str = Field(
3649
"",
3750
description=":ref:`URL template to access worker logs <worker-log-url>`",

syncmaster/worker/spark.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
HDFSConnectionDTO,
1515
HiveConnectionDTO,
1616
)
17+
from syncmaster.worker.settings import WorkerSettings
1718

1819
if TYPE_CHECKING:
1920
from pyspark.sql import SparkSession
@@ -25,14 +26,18 @@ def get_worker_spark_session(
2526
run: Run,
2627
source: ConnectionDTO,
2728
target: ConnectionDTO,
29+
settings: WorkerSettings,
2830
) -> SparkSession:
2931
"""Construct Spark Session using run parameters and application settings"""
3032
from pyspark.sql import SparkSession
3133

3234
name = run.transfer.group.name + "_" + run.transfer.name # noqa: WPS336
33-
spark_builder = SparkSession.builder.appName(f"syncmaster_{name}")
35+
spark_builder = SparkSession.builder.appName(f"SyncMaster__{name}")
3436

35-
for k, v in get_spark_session_conf(source, target, run.transfer.resources).items():
37+
spark_session_config = settings.spark_session_default_config.copy()
38+
spark_session_config.update(get_spark_session_conf(source, target, run.transfer.resources))
39+
40+
for k, v in spark_session_config.items():
3641
spark_builder = spark_builder.config(k, v)
3742

3843
for entity in source, target:

0 commit comments

Comments
 (0)