Skip to content

Commit 2c4b740

Browse files
author
maxim-lixakov
committed
[DOP-21445] Add XML integration tests
1 parent 2d7c931 commit 2c4b740

File tree

6 files changed

+52
-9
lines changed

6 files changed

+52
-9
lines changed

docker-compose.test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ services:
126126
interval: 30s
127127
timeout: 5s
128128
retries: 3
129-
profiles: [s3, all]
129+
profiles: [s3, hive, all]
130130

131131
test-oracle:
132132
image: gvenzl/oracle-xe:slim-faststart
@@ -234,7 +234,7 @@ services:
234234
HIVE_METASTORE_DB_DRIVER: org.postgresql.Driver
235235
HIVE_METASTORE_DB_USER: test_hive
236236
HIVE_METASTORE_DB_PASSWORD: test_hive
237-
profiles: [hive, hdfs, all]
237+
profiles: [hive, hdfs, s3, all]
238238

239239
volumes:
240240
postgres_test_data:

syncmaster/dto/transfers.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from dataclasses import dataclass
55
from typing import ClassVar
66

7-
from onetl.file.format import CSV, JSON, ORC, Excel, JSONLine, Parquet
7+
from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
88

99

1010
@dataclass
@@ -20,7 +20,7 @@ class DBTransferDTO(TransferDTO):
2020
@dataclass
2121
class FileTransferDTO(TransferDTO):
2222
directory_path: str
23-
file_format: CSV | JSONLine | JSON | Excel | ORC | Parquet
23+
file_format: CSV | JSONLine | JSON | Excel | XML | ORC | Parquet
2424
options: dict
2525
df_schema: dict | None = None
2626

@@ -31,6 +31,7 @@ class FileTransferDTO(TransferDTO):
3131
"excel": Excel,
3232
"orc": ORC,
3333
"parquet": Parquet,
34+
"xml": XML,
3435
}
3536

3637
def __post_init__(self):

syncmaster/worker/spark.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,14 @@ def get_worker_spark_session(
3636

3737

3838
def get_packages(db_type: str) -> list[str]:
39+
import pyspark
3940
from onetl.connection import MSSQL, Clickhouse, MySQL, Oracle, Postgres, SparkS3
40-
from onetl.file.format import Excel
41+
from onetl.file.format import XML, Excel
42+
43+
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
44+
file_formats_spark_packages: list[str] = XML.get_packages(spark_version=pyspark.__version__) + Excel.get_packages(
45+
spark_version="3.5.1",
46+
)
4147

4248
if db_type == "postgres":
4349
return Postgres.get_packages()
@@ -55,10 +61,11 @@ def get_packages(db_type: str) -> list[str]:
5561

5662
spark_version = pyspark.__version__
5763
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
58-
return SparkS3.get_packages(spark_version=spark_version) + Excel.get_packages(spark_version="3.5.1")
64+
65+
return SparkS3.get_packages(spark_version=spark_version) + file_formats_spark_packages
5966
if db_type == "hdfs":
6067
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
61-
return Excel.get_packages(spark_version="3.5.1")
68+
return file_formats_spark_packages
6269

6370
# If the database type does not require downloading .jar packages
6471
return []

tests/test_integration/test_run_transfer/conftest.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from onetl.connection import MSSQL, Clickhouse, Hive, MySQL, Oracle, Postgres, SparkS3
1212
from onetl.connection.file_connection.s3 import S3
1313
from onetl.db import DBWriter
14-
from onetl.file.format import CSV, JSON, ORC, Excel, JSONLine, Parquet
14+
from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
1515
from pyspark.sql import DataFrame, SparkSession
1616
from pyspark.sql.types import (
1717
DateType,
@@ -115,7 +115,10 @@ def spark(settings: Settings, request: FixtureRequest) -> SparkSession:
115115

116116
if "hdfs" in markers or "s3" in markers:
117117
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
118-
maven_packages.extend(Excel.get_packages(spark_version="3.5.1"))
118+
file_formats_spark_packages: list[str] = XML.get_packages(
119+
spark_version=pyspark.__version__,
120+
) + Excel.get_packages(spark_version="3.5.1")
121+
maven_packages.extend(file_formats_spark_packages)
119122

120123
if maven_packages:
121124
spark = spark.config("spark.jars.packages", ",".join(maven_packages))
@@ -845,6 +848,12 @@ def source_file_format(request: FixtureRequest):
845848
**params,
846849
)
847850

851+
if name == "xml":
852+
return "xml", XML(
853+
row_tag="item",
854+
**params,
855+
)
856+
848857
raise ValueError(f"Unsupported file format: {name}")
849858

850859

@@ -883,6 +892,12 @@ def target_file_format(request: FixtureRequest):
883892
**params,
884893
)
885894

895+
if name == "xml":
896+
return "xml", XML(
897+
row_tag="item",
898+
**params,
899+
)
900+
886901
raise ValueError(f"Unsupported file format: {name}")
887902

888903

tests/test_integration/test_run_transfer/test_hdfs.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ async def postgres_to_hdfs(
139139
"without_compression",
140140
id="parquet",
141141
),
142+
pytest.param(
143+
("xml", {}),
144+
"without_compression",
145+
id="xml",
146+
),
142147
],
143148
indirect=["source_file_format", "file_format_flavor"],
144149
)
@@ -223,6 +228,11 @@ async def test_run_transfer_hdfs_to_postgres(
223228
"with_compression",
224229
id="parquet",
225230
),
231+
pytest.param(
232+
("xml", {}),
233+
"without_compression",
234+
id="xml",
235+
),
226236
],
227237
indirect=["target_file_format", "file_format_flavor"],
228238
)

tests/test_integration/test_run_transfer/test_s3.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ async def postgres_to_s3(
139139
"without_compression",
140140
id="parquet",
141141
),
142+
pytest.param(
143+
("xml", {}),
144+
"without_compression",
145+
id="xml",
146+
),
142147
],
143148
indirect=["source_file_format", "file_format_flavor"],
144149
)
@@ -224,6 +229,11 @@ async def test_run_transfer_s3_to_postgres(
224229
"with_compression",
225230
id="parquet",
226231
),
232+
pytest.param(
233+
("xml", {}),
234+
"without_compression",
235+
id="xml",
236+
),
227237
],
228238
indirect=["target_file_format", "file_format_flavor"],
229239
)

0 commit comments

Comments
 (0)