Skip to content

Commit c3473e0

Browse files
committed
[DOP-22424] Use SparkS3.get_exclude_packages
1 parent fbd4b8a commit c3473e0

File tree

2 files changed

+15
-26
lines changed

2 files changed

+15
-26
lines changed

syncmaster/worker/spark.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def get_worker_spark_session(
2222
"""Construct Spark Session using run parameters and application settings"""
2323
from pyspark.sql import SparkSession
2424

25-
name = run.transfer.group.name + "_" + run.transfer.name
25+
name = run.transfer.group.name + "_" + run.transfer.name # noqa: WPS336
2626
spark_builder = SparkSession.builder.appName(f"syncmaster_{name}")
2727

2828
for k, v in get_spark_session_conf(source, target).items():
@@ -35,15 +35,16 @@ def get_worker_spark_session(
3535
return spark_builder.getOrCreate()
3636

3737

38-
def get_packages(connection_type: str) -> list[str]:
38+
def get_packages(connection_type: str) -> list[str]: # noqa: WPS212
3939
import pyspark
4040
from onetl.connection import MSSQL, Clickhouse, MySQL, Oracle, Postgres, SparkS3
4141
from onetl.file.format import XML, Excel
4242

4343
# excel version is hardcoded due to https://github.com/nightscape/spark-excel/issues/902
44-
file_formats_spark_packages: list[str] = XML.get_packages(spark_version=pyspark.__version__) + Excel.get_packages(
45-
spark_version="3.5.1",
46-
)
44+
file_formats_spark_packages: list[str] = [
45+
*XML.get_packages(spark_version=pyspark.__version__),
46+
*Excel.get_packages(spark_version="3.5.1"),
47+
]
4748

4849
if connection_type == "postgres":
4950
return Postgres.get_packages()
@@ -71,15 +72,10 @@ def get_packages(connection_type: str) -> list[str]:
7172
return []
7273

7374

74-
def get_excluded_packages(db_type: str):
75-
if db_type == "s3":
76-
return [
77-
"com.google.cloud.bigdataoss:gcs-connector",
78-
"org.apache.hadoop:hadoop-aliyun",
79-
"org.apache.hadoop:hadoop-azure-datalake",
80-
"org.apache.hadoop:hadoop-azure",
81-
]
82-
return []
75+
def get_excluded_packages(db_type: str) -> list[str]:
76+
from onetl.connection import SparkS3
77+
78+
return SparkS3.get_exclude_packages()
8379

8480

8581
def get_spark_session_conf(

tests/test_integration/test_run_transfer/connection_fixtures/spark_fixtures.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def spark(settings: Settings, request: FixtureRequest) -> SparkSession:
2222
markers.update(marker.name for marker in func.iter_markers())
2323

2424
maven_packages: list[str] = []
25-
excluded_packages: list[str] = []
25+
excluded_packages: list[str] = SparkS3.get_exclude_packages()
2626

2727
spark = (
2828
SparkSession.builder.appName("celery_worker")
@@ -50,14 +50,6 @@ def spark(settings: Settings, request: FixtureRequest) -> SparkSession:
5050

5151
if "s3" in markers:
5252
maven_packages.extend(SparkS3.get_packages(spark_version=pyspark.__version__))
53-
excluded_packages.extend(
54-
[
55-
"com.google.cloud.bigdataoss:gcs-connector",
56-
"org.apache.hadoop:hadoop-aliyun",
57-
"org.apache.hadoop:hadoop-azure-datalake",
58-
"org.apache.hadoop:hadoop-azure",
59-
],
60-
)
6153
spark = (
6254
spark.config("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
6355
.config("spark.hadoop.fs.s3a.committer.name", "magic")
@@ -77,9 +69,10 @@ def spark(settings: Settings, request: FixtureRequest) -> SparkSession:
7769

7870
if set(markers).intersection({"hdfs", "s3", "sftp", "ftp", "ftps", "samba", "webdav"}):
7971
# excel version is hardcoded due to https://github.com/nightscape/spark-excel/issues/902
80-
file_formats_spark_packages: list[str] = XML.get_packages(
81-
spark_version=pyspark.__version__,
82-
) + Excel.get_packages(spark_version="3.5.1")
72+
file_formats_spark_packages: list[str] = [
73+
*XML.get_packages(spark_version=pyspark.__version__),
74+
*Excel.get_packages(spark_version="3.5.1"),
75+
]
8376
maven_packages.extend(file_formats_spark_packages)
8477

8578
if maven_packages:

0 commit comments

Comments
 (0)