Skip to content

Commit 36d4c7f

Browse files
author
maxim-lixakov
committed
[DOP-21445] Add XML integration tests
1 parent 2d7c931 commit 36d4c7f

File tree

5 files changed

+57
-12
lines changed

5 files changed

+57
-12
lines changed

syncmaster/dto/transfers.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from dataclasses import dataclass
55
from typing import ClassVar
66

7-
from onetl.file.format import CSV, JSON, ORC, Excel, JSONLine, Parquet
7+
from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
88

99

1010
@dataclass
@@ -20,7 +20,7 @@ class DBTransferDTO(TransferDTO):
2020
@dataclass
2121
class FileTransferDTO(TransferDTO):
2222
directory_path: str
23-
file_format: CSV | JSONLine | JSON | Excel | ORC | Parquet
23+
file_format: CSV | JSONLine | JSON | Excel | XML | ORC | Parquet
2424
options: dict
2525
df_schema: dict | None = None
2626

@@ -31,6 +31,7 @@ class FileTransferDTO(TransferDTO):
3131
"excel": Excel,
3232
"orc": ORC,
3333
"parquet": Parquet,
34+
"xml": XML,
3435
}
3536

3637
def __post_init__(self):

syncmaster/worker/spark.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,14 @@ def get_worker_spark_session(
3636

3737

3838
def get_packages(db_type: str) -> list[str]:
39+
import pyspark
3940
from onetl.connection import MSSQL, Clickhouse, MySQL, Oracle, Postgres, SparkS3
40-
from onetl.file.format import Excel
41+
from onetl.file.format import XML, Excel
42+
43+
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
44+
file_formats_spark_packages: list[str] = XML.get_packages(spark_version=pyspark.__version__) + Excel.get_packages(
45+
spark_version="3.5.1",
46+
)
4147

4248
if db_type == "postgres":
4349
return Postgres.get_packages()
@@ -55,10 +61,11 @@ def get_packages(db_type: str) -> list[str]:
5561

5662
spark_version = pyspark.__version__
5763
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
58-
return SparkS3.get_packages(spark_version=spark_version) + Excel.get_packages(spark_version="3.5.1")
64+
65+
return SparkS3.get_packages(spark_version=spark_version) + file_formats_spark_packages
5966
if db_type == "hdfs":
6067
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
61-
return Excel.get_packages(spark_version="3.5.1")
68+
return file_formats_spark_packages
6269

6370
# If the database type does not require downloading .jar packages
6471
return []

tests/test_integration/test_run_transfer/conftest.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from onetl.connection import MSSQL, Clickhouse, Hive, MySQL, Oracle, Postgres, SparkS3
1212
from onetl.connection.file_connection.s3 import S3
1313
from onetl.db import DBWriter
14-
from onetl.file.format import CSV, JSON, ORC, Excel, JSONLine, Parquet
14+
from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
1515
from pyspark.sql import DataFrame, SparkSession
1616
from pyspark.sql.types import (
1717
DateType,
@@ -115,7 +115,10 @@ def spark(settings: Settings, request: FixtureRequest) -> SparkSession:
115115

116116
if "hdfs" in markers or "s3" in markers:
117117
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
118-
maven_packages.extend(Excel.get_packages(spark_version="3.5.1"))
118+
file_formats_spark_packages: list[str] = XML.get_packages(
119+
spark_version=pyspark.__version__,
120+
) + Excel.get_packages(spark_version="3.5.1")
121+
maven_packages.extend(file_formats_spark_packages)
119122

120123
if maven_packages:
121124
spark = spark.config("spark.jars.packages", ",".join(maven_packages))
@@ -804,7 +807,9 @@ def fill_with_data(df: DataFrame):
804807
pass
805808

806809

807-
@pytest.fixture()
810+
@pytest.fixture(
811+
params=[("csv", {}), ("jsonline", {}), ("json", {}), ("excel", {}), ("xml", {}), ("orc", {}), ("parquet", {})],
812+
)
808813
def source_file_format(request: FixtureRequest):
809814
name, params = request.param
810815
if name == "csv":
@@ -845,6 +850,12 @@ def source_file_format(request: FixtureRequest):
845850
**params,
846851
)
847852

853+
if name == "xml":
854+
return "xml", XML(
855+
row_tag="item",
856+
**params,
857+
)
858+
848859
raise ValueError(f"Unsupported file format: {name}")
849860

850861

@@ -883,6 +894,12 @@ def target_file_format(request: FixtureRequest):
883894
**params,
884895
)
885896

897+
if name == "xml":
898+
return "xml", XML(
899+
row_tag="item",
900+
**params,
901+
)
902+
886903
raise ValueError(f"Unsupported file format: {name}")
887904

888905

tests/test_integration/test_run_transfer/test_hdfs.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ async def postgres_to_hdfs(
139139
"without_compression",
140140
id="parquet",
141141
),
142+
pytest.param(
143+
("xml", {}),
144+
"without_compression",
145+
id="xml",
146+
),
142147
],
143148
indirect=["source_file_format", "file_format_flavor"],
144149
)
@@ -223,6 +228,11 @@ async def test_run_transfer_hdfs_to_postgres(
223228
"with_compression",
224229
id="parquet",
225230
),
231+
pytest.param(
232+
("xml", {}),
233+
"without_compression",
234+
id="xml",
235+
),
226236
],
227237
indirect=["target_file_format", "file_format_flavor"],
228238
)

tests/test_integration/test_run_transfer/test_s3.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,15 +130,20 @@ async def postgres_to_s3(
130130
id="excel",
131131
),
132132
pytest.param(
133-
("orc", {}),
134-
"without_compression",
133+
("orc", {"compression": "none"}),
134+
"with_compression",
135135
id="orc",
136136
),
137137
pytest.param(
138-
("parquet", {}),
139-
"without_compression",
138+
("parquet", {"compression": "gzip"}),
139+
"with_compression",
140140
id="parquet",
141141
),
142+
pytest.param(
143+
("xml", {}),
144+
"without_compression",
145+
id="xml",
146+
),
142147
],
143148
indirect=["source_file_format", "file_format_flavor"],
144149
)
@@ -224,6 +229,11 @@ async def test_run_transfer_s3_to_postgres(
224229
"with_compression",
225230
id="parquet",
226231
),
232+
pytest.param(
233+
("xml", {}),
234+
"without_compression",
235+
id="xml",
236+
),
227237
],
228238
indirect=["target_file_format", "file_format_flavor"],
229239
)

0 commit comments

Comments
 (0)