Skip to content

Commit e5f3f8a

Browse files
author
maxim-lixakov
committed
[DOP-21445] Add XML integration tests
1 parent 7a6b1a1 commit e5f3f8a

File tree

5 files changed

+51
-10
lines changed

5 files changed

+51
-10
lines changed

syncmaster/dto/transfers.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from dataclasses import dataclass
55
from typing import ClassVar
66

7-
from onetl.file.format import CSV, JSON, Excel, JSONLine
7+
from onetl.file.format import CSV, JSON, XML, Excel, JSONLine
88

99

1010
@dataclass
@@ -20,7 +20,7 @@ class DBTransferDTO(TransferDTO):
2020
@dataclass
2121
class FileTransferDTO(TransferDTO):
2222
directory_path: str
23-
file_format: CSV | JSONLine | JSON | Excel
23+
file_format: CSV | JSONLine | JSON | Excel | XML
2424
options: dict
2525
df_schema: dict | None = None
2626

@@ -29,6 +29,7 @@ class FileTransferDTO(TransferDTO):
2929
"jsonline": JSONLine,
3030
"json": JSON,
3131
"excel": Excel,
32+
"xml": XML,
3233
}
3334

3435
def __post_init__(self):

syncmaster/worker/spark.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,12 @@ def get_worker_spark_session(
3737

3838
def get_packages(db_type: str) -> list[str]:
3939
from onetl.connection import MSSQL, Clickhouse, MySQL, Oracle, Postgres, SparkS3
40-
from onetl.file.format import Excel
40+
from onetl.file.format import XML, Excel
41+
42+
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
43+
file_formats_spark_packages: list[str] = XML.get_packages(spark_version="3.5.1") + Excel.get_packages(
44+
spark_version="3.5.1",
45+
)
4146

4247
if db_type == "postgres":
4348
return Postgres.get_packages()
@@ -55,10 +60,11 @@ def get_packages(db_type: str) -> list[str]:
5560

5661
spark_version = pyspark.__version__
5762
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
58-
return SparkS3.get_packages(spark_version=spark_version) + Excel.get_packages(spark_version="3.5.1")
63+
64+
return SparkS3.get_packages(spark_version=spark_version) + file_formats_spark_packages
5965
if db_type == "hdfs":
6066
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
61-
return Excel.get_packages(spark_version="3.5.1")
67+
return file_formats_spark_packages
6268

6369
# If the database type does not require downloading .jar packages
6470
return []

tests/test_integration/test_run_transfer/conftest.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from onetl.connection import MSSQL, Clickhouse, Hive, MySQL, Oracle, Postgres, SparkS3
1212
from onetl.connection.file_connection.s3 import S3
1313
from onetl.db import DBWriter
14-
from onetl.file.format import CSV, JSON, Excel, JSONLine
14+
from onetl.file.format import CSV, JSON, XML, Excel, JSONLine
1515
from pyspark.sql import DataFrame, SparkSession
1616
from pyspark.sql.types import (
1717
DateType,
@@ -115,7 +115,10 @@ def spark(settings: Settings, request: FixtureRequest) -> SparkSession:
115115

116116
if "hdfs" in markers or "s3" in markers:
117117
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
118-
maven_packages.extend(Excel.get_packages(spark_version="3.5.1"))
118+
file_formats_spark_packages: list[str] = XML.get_packages(spark_version="3.5.1") + Excel.get_packages(
119+
spark_version="3.5.1",
120+
)
121+
maven_packages.extend(file_formats_spark_packages)
119122

120123
if maven_packages:
121124
spark = spark.config("spark.jars.packages", ",".join(maven_packages))
@@ -804,7 +807,7 @@ def fill_with_data(df: DataFrame):
804807
pass
805808

806809

807-
@pytest.fixture(params=[("csv", {}), ("jsonline", {}), ("json", {}), ("excel", {})])
810+
@pytest.fixture(params=[("csv", {}), ("jsonline", {}), ("json", {}), ("excel", {}), ("xml", {})])
808811
def source_file_format(request: FixtureRequest):
809812
name, params = request.param
810813
if name == "csv":
@@ -835,10 +838,16 @@ def source_file_format(request: FixtureRequest):
835838
**params,
836839
)
837840

841+
if name == "xml":
842+
return "xml", XML(
843+
row_tag="item",
844+
**params,
845+
)
846+
838847
raise ValueError(f"Unsupported file format: {name}")
839848

840849

841-
@pytest.fixture(params=[("csv", {}), ("jsonline", {}), ("excel", {})])
850+
@pytest.fixture(params=[("csv", {}), ("jsonline", {}), ("excel", {}), ("xml", {})])
842851
def target_file_format(request: FixtureRequest):
843852
name, params = request.param
844853
if name == "csv":
@@ -863,6 +872,12 @@ def target_file_format(request: FixtureRequest):
863872
**params,
864873
)
865874

875+
if name == "xml":
876+
return "xml", XML(
877+
row_tag="item",
878+
**params,
879+
)
880+
866881
raise ValueError(f"Unsupported file format: {name}")
867882

868883

tests/test_integration/test_run_transfer/test_hdfs.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ async def postgres_to_hdfs(
128128
"with_header",
129129
id="excel",
130130
),
131+
pytest.param(
132+
("xml", {}),
133+
"without_compression",
134+
id="xml",
135+
),
131136
],
132137
indirect=["source_file_format", "file_format_flavor"],
133138
)
@@ -202,6 +207,11 @@ async def test_run_transfer_hdfs_to_postgres(
202207
"with_header",
203208
id="excel",
204209
),
210+
pytest.param(
211+
("xml", {}),
212+
"without_compression",
213+
id="xml",
214+
),
205215
],
206216
indirect=["target_file_format", "file_format_flavor"],
207217
)

tests/test_integration/test_run_transfer/test_s3.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ async def postgres_to_s3(
128128
"with_header",
129129
id="excel",
130130
),
131+
pytest.param(
132+
("xml", {}),
133+
"without_compression",
134+
id="xml",
135+
),
131136
],
132137
indirect=["source_file_format", "file_format_flavor"],
133138
)
@@ -178,7 +183,6 @@ async def test_run_transfer_s3_to_postgres(
178183
if file_format == "excel":
179184
df = df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT")))
180185
init_df = init_df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT")))
181-
182186
for field in init_df.schema:
183187
df = df.withColumn(field.name, df[field.name].cast(field.dataType))
184188

@@ -203,6 +207,11 @@ async def test_run_transfer_s3_to_postgres(
203207
"with_header",
204208
id="excel",
205209
),
210+
pytest.param(
211+
("xml", {}),
212+
"without_compression",
213+
id="xml",
214+
),
206215
],
207216
indirect=["target_file_format", "file_format_flavor"],
208217
)

0 commit comments

Comments
 (0)