Skip to content

Commit 10122be

Browse files
author
maxim-lixakov
committed
[DOP-21445] remove hardcoding spark-version for recieving packages
1 parent 708c3e9 commit 10122be

File tree

5 files changed

+33
-27
lines changed

5 files changed

+33
-27
lines changed

poetry.lock

Lines changed: 12 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ python-jose = { version = "^3.3.0", extras = ["cryptography"], optional = true }
5959
jinja2 = { version = "^3.1.4", optional = true }
6060
python-multipart = { version = ">=0.0.9,<0.0.20", optional = true }
6161
celery = { version = "^5.4.0", optional = true }
62-
onetl = { version = "^0.12.0", extras = ["spark"], optional = true }
62+
# spark-excel and spark-xml doesn't support Spark 3.5.2 and 3.5.3 yet
63+
pyspark = { version = "3.5.1", optional = true }
64+
onetl = { version = "^0.12.0", optional = true }
6365
pyyaml = {version = "*", optional = true}
6466
# due to not supporting MacOS 14.x https://www.psycopg.org/psycopg3/docs/news.html#psycopg-3-1-20
6567
psycopg = { version = ">=3.1.0,<3.2.4", extras = ["binary"], optional = true }
@@ -103,6 +105,7 @@ worker = [
103105
"sqlalchemy",
104106
"sqlalchemy-utils",
105107
"celery",
108+
"pyspark",
106109
"onetl",
107110
"asgi-correlation-id",
108111
"jinja2",
@@ -135,7 +138,9 @@ pytest-asyncio = "^0.24.0"
135138
pytest-randomly = "^3.15.0"
136139
pytest-deadfixtures = "^2.2.1"
137140
pytest-mock = "^3.14.0"
138-
onetl = {extras = ["spark", "s3", "hdfs"], version = "^0.12.0"}
141+
# spark-excel and spark-xml doesn't support Spark 3.5.2 and 3.5.3 yet
142+
pyspark = { version = "3.5.1", optional = true }
143+
onetl = {extras = ["s3", "hdfs"], version = "^0.12.0"}
139144
faker = ">=28.4.1,<34.0.0"
140145
coverage = "^7.6.1"
141146
gevent = "^24.2.1"

syncmaster/worker/spark.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,19 @@ def get_worker_spark_session(
3232
log.debug("Enabling Hive support")
3333
spark_builder = spark_builder.enableHiveSupport()
3434

35+
spark_builder.sparkContext.setLogLevel("DEBUG")
3536
return spark_builder.getOrCreate()
3637

3738

3839
def get_packages(db_type: str) -> list[str]:
40+
import pyspark
3941
from onetl.connection import MSSQL, Clickhouse, MySQL, Oracle, Postgres, SparkS3
4042
from onetl.file.format import XML, Excel
4143

44+
spark_version = pyspark.__version__
4245
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
43-
file_formats_spark_packages: list[str] = XML.get_packages(spark_version="3.5.1") + Excel.get_packages(
44-
spark_version="3.5.1",
46+
file_formats_spark_packages: list[str] = XML.get_packages(spark_version=spark_version) + Excel.get_packages(
47+
spark_version=spark_version,
4548
)
4649

4750
if db_type == "postgres":

tests/test_integration/test_run_transfer/conftest.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,11 @@ def spark(settings: Settings, request: FixtureRequest) -> SparkSession:
6666

6767
spark = (
6868
SparkSession.builder.appName("celery_worker")
69+
.enableHiveSupport()
6970
.config("spark.sql.pyspark.jvmStacktrace.enabled", "true")
7071
.config("spark.driver.host", "localhost")
7172
)
7273

73-
if "hive" in markers:
74-
spark = spark.enableHiveSupport()
75-
7674
if "postgres" in markers:
7775
maven_packages.extend(Postgres.get_packages())
7876

@@ -116,9 +114,10 @@ def spark(settings: Settings, request: FixtureRequest) -> SparkSession:
116114
)
117115

118116
if "hdfs" in markers or "s3" in markers:
117+
spark_version = pyspark.__version__
119118
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
120-
file_formats_spark_packages: list[str] = XML.get_packages(spark_version="3.5.1") + Excel.get_packages(
121-
spark_version="3.5.1",
119+
file_formats_spark_packages: list[str] = XML.get_packages(spark_version=spark_version) + Excel.get_packages(
120+
spark_version=spark_version,
122121
)
123122
maven_packages.extend(file_formats_spark_packages)
124123

tests/test_integration/test_run_transfer/test_s3.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@ async def test_run_transfer_s3_to_postgres(
193193
@pytest.mark.parametrize(
194194
"target_file_format, file_format_flavor",
195195
[
196+
pytest.param(
197+
("xml", {}),
198+
"without_compression",
199+
id="xml",
200+
),
196201
pytest.param(
197202
("csv", {}),
198203
"with_header",
@@ -208,11 +213,6 @@ async def test_run_transfer_s3_to_postgres(
208213
"with_header",
209214
id="excel",
210215
),
211-
pytest.param(
212-
("xml", {}),
213-
"without_compression",
214-
id="xml",
215-
),
216216
],
217217
indirect=["target_file_format", "file_format_flavor"],
218218
)

0 commit comments

Comments
 (0)