Skip to content

Commit 66ea992

Browse files
[DOP-21445] Add XML integration tests (#158)
1 parent 2d7c931 commit 66ea992

File tree

7 files changed

+55
-13
lines changed

7 files changed

+55
-13
lines changed

docker-compose.test.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ services:
196196
interval: 30s
197197
timeout: 5s
198198
retries: 3
199-
profiles: [hive, hdfs, all]
199+
profiles: [hive, hdfs, s3, all]
200200

201201
keycloak:
202202
image: quay.io/keycloak/keycloak:latest
@@ -234,7 +234,8 @@ services:
234234
HIVE_METASTORE_DB_DRIVER: org.postgresql.Driver
235235
HIVE_METASTORE_DB_USER: test_hive
236236
HIVE_METASTORE_DB_PASSWORD: test_hive
237-
profiles: [hive, hdfs, all]
237+
# writing spark dataframe to s3 xml file fails without running hive metastore server
238+
profiles: [hive, hdfs, s3, all]
238239

239240
volumes:
240241
postgres_test_data:

syncmaster/dto/transfers.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from dataclasses import dataclass
55
from typing import ClassVar
66

7-
from onetl.file.format import CSV, JSON, ORC, Excel, JSONLine, Parquet
7+
from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
88

99

1010
@dataclass
@@ -20,7 +20,7 @@ class DBTransferDTO(TransferDTO):
2020
@dataclass
2121
class FileTransferDTO(TransferDTO):
2222
directory_path: str
23-
file_format: CSV | JSONLine | JSON | Excel | ORC | Parquet
23+
file_format: CSV | JSONLine | JSON | Excel | XML | ORC | Parquet
2424
options: dict
2525
df_schema: dict | None = None
2626

@@ -31,6 +31,7 @@ class FileTransferDTO(TransferDTO):
3131
"excel": Excel,
3232
"orc": ORC,
3333
"parquet": Parquet,
34+
"xml": XML,
3435
}
3536

3637
def __post_init__(self):

syncmaster/worker/handlers/file/s3.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def read(self) -> DataFrame:
3535
from pyspark.sql.types import StructType
3636

3737
options = {}
38-
if self.transfer_dto.file_format.__class__.__name__ == "Excel":
38+
if self.transfer_dto.file_format.__class__.__name__ in ("Excel", "XML"):
3939
options = {"inferSchema": True}
4040

4141
reader = FileDFReader(

syncmaster/worker/spark.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,14 @@ def get_worker_spark_session(
3636

3737

3838
def get_packages(db_type: str) -> list[str]:
39+
import pyspark
3940
from onetl.connection import MSSQL, Clickhouse, MySQL, Oracle, Postgres, SparkS3
40-
from onetl.file.format import Excel
41+
from onetl.file.format import XML, Excel
42+
43+
# excel version is hardcoded due to https://github.com/nightscape/spark-excel/issues/902
44+
file_formats_spark_packages: list[str] = XML.get_packages(spark_version=pyspark.__version__) + Excel.get_packages(
45+
spark_version="3.5.1",
46+
)
4147

4248
if db_type == "postgres":
4349
return Postgres.get_packages()
@@ -54,11 +60,10 @@ def get_packages(db_type: str) -> list[str]:
5460
import pyspark
5561

5662
spark_version = pyspark.__version__
57-
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
58-
return SparkS3.get_packages(spark_version=spark_version) + Excel.get_packages(spark_version="3.5.1")
63+
return SparkS3.get_packages(spark_version=spark_version) + file_formats_spark_packages
64+
5965
if db_type == "hdfs":
60-
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
61-
return Excel.get_packages(spark_version="3.5.1")
66+
return file_formats_spark_packages
6267

6368
# If the database type does not require downloading .jar packages
6469
return []

tests/test_integration/test_run_transfer/conftest.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from onetl.connection import MSSQL, Clickhouse, Hive, MySQL, Oracle, Postgres, SparkS3
1212
from onetl.connection.file_connection.s3 import S3
1313
from onetl.db import DBWriter
14-
from onetl.file.format import CSV, JSON, ORC, Excel, JSONLine, Parquet
14+
from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
1515
from pyspark.sql import DataFrame, SparkSession
1616
from pyspark.sql.types import (
1717
DateType,
@@ -114,8 +114,11 @@ def spark(settings: Settings, request: FixtureRequest) -> SparkSession:
114114
)
115115

116116
if "hdfs" in markers or "s3" in markers:
117-
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
118-
maven_packages.extend(Excel.get_packages(spark_version="3.5.1"))
117+
# excel version is hardcoded due to https://github.com/nightscape/spark-excel/issues/902
118+
file_formats_spark_packages: list[str] = XML.get_packages(
119+
spark_version=pyspark.__version__,
120+
) + Excel.get_packages(spark_version="3.5.1")
121+
maven_packages.extend(file_formats_spark_packages)
119122

120123
if maven_packages:
121124
spark = spark.config("spark.jars.packages", ",".join(maven_packages))
@@ -845,6 +848,12 @@ def source_file_format(request: FixtureRequest):
845848
**params,
846849
)
847850

851+
if name == "xml":
852+
return "xml", XML(
853+
row_tag="item",
854+
**params,
855+
)
856+
848857
raise ValueError(f"Unsupported file format: {name}")
849858

850859

@@ -883,6 +892,12 @@ def target_file_format(request: FixtureRequest):
883892
**params,
884893
)
885894

895+
if name == "xml":
896+
return "xml", XML(
897+
row_tag="item",
898+
**params,
899+
)
900+
886901
raise ValueError(f"Unsupported file format: {name}")
887902

888903

tests/test_integration/test_run_transfer/test_hdfs.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ async def postgres_to_hdfs(
139139
"without_compression",
140140
id="parquet",
141141
),
142+
pytest.param(
143+
("xml", {}),
144+
"without_compression",
145+
id="xml",
146+
),
142147
],
143148
indirect=["source_file_format", "file_format_flavor"],
144149
)
@@ -223,6 +228,11 @@ async def test_run_transfer_hdfs_to_postgres(
223228
"with_compression",
224229
id="parquet",
225230
),
231+
pytest.param(
232+
("xml", {}),
233+
"without_compression",
234+
id="xml",
235+
),
226236
],
227237
indirect=["target_file_format", "file_format_flavor"],
228238
)

tests/test_integration/test_run_transfer/test_s3.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ async def postgres_to_s3(
139139
"without_compression",
140140
id="parquet",
141141
),
142+
pytest.param(
143+
("xml", {}),
144+
"without_compression",
145+
id="xml",
146+
),
142147
],
143148
indirect=["source_file_format", "file_format_flavor"],
144149
)
@@ -224,6 +229,11 @@ async def test_run_transfer_s3_to_postgres(
224229
"with_compression",
225230
id="parquet",
226231
),
232+
pytest.param(
233+
("xml", {}),
234+
"without_compression",
235+
id="xml",
236+
),
227237
],
228238
indirect=["target_file_format", "file_format_flavor"],
229239
)

0 commit comments

Comments
 (0)