Skip to content

Commit 08adb90

Browse files
author
maxim-lixakov
committed
[DOP-21445] add orc, parquest tests
1 parent 10122be commit 08adb90

File tree

5 files changed

+88
-14
lines changed

5 files changed

+88
-14
lines changed

syncmaster/dto/transfers.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from dataclasses import dataclass
55
from typing import ClassVar
66

7-
from onetl.file.format import CSV, JSON, XML, Excel, JSONLine
7+
from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
88

99

1010
@dataclass
@@ -30,6 +30,8 @@ class FileTransferDTO(TransferDTO):
3030
"json": JSON,
3131
"excel": Excel,
3232
"xml": XML,
33+
"orc": ORC,
34+
"parquet": Parquet,
3335
}
3436

3537
def __post_init__(self):

syncmaster/worker/spark.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,17 @@ def get_worker_spark_session(
3939
def get_packages(db_type: str) -> list[str]:
4040
import pyspark
4141
from onetl.connection import MSSQL, Clickhouse, MySQL, Oracle, Postgres, SparkS3
42-
from onetl.file.format import XML, Excel
42+
from onetl.file.format import ORC, XML, Excel, Parquet
4343

4444
spark_version = pyspark.__version__
4545
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
46-
file_formats_spark_packages: list[str] = XML.get_packages(spark_version=spark_version) + Excel.get_packages(
47-
spark_version=spark_version,
46+
file_formats_spark_packages: list[str] = (
47+
XML.get_packages(spark_version=spark_version)
48+
+ Excel.get_packages(
49+
spark_version=spark_version,
50+
)
51+
+ ORC.get_packages(spark_version=spark_version)
52+
+ Parquet.get_packages(spark_version=spark_version)
4853
)
4954

5055
if db_type == "postgres":

tests/test_integration/test_run_transfer/conftest.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from onetl.connection import MSSQL, Clickhouse, Hive, MySQL, Oracle, Postgres, SparkS3
1212
from onetl.connection.file_connection.s3 import S3
1313
from onetl.db import DBWriter
14-
from onetl.file.format import CSV, JSON, XML, Excel, JSONLine
14+
from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
1515
from pyspark.sql import DataFrame, SparkSession
1616
from pyspark.sql.types import (
1717
DateType,
@@ -116,8 +116,13 @@ def spark(settings: Settings, request: FixtureRequest) -> SparkSession:
116116
if "hdfs" in markers or "s3" in markers:
117117
spark_version = pyspark.__version__
118118
# see supported versions from https://mvnrepository.com/artifact/com.crealytics/spark-excel
119-
file_formats_spark_packages: list[str] = XML.get_packages(spark_version=spark_version) + Excel.get_packages(
120-
spark_version=spark_version,
119+
file_formats_spark_packages: list[str] = (
120+
XML.get_packages(spark_version=spark_version)
121+
+ Excel.get_packages(
122+
spark_version=spark_version,
123+
)
124+
+ ORC.get_packages(spark_version=spark_version)
125+
+ Parquet.get_packages(spark_version=spark_version)
121126
)
122127
maven_packages.extend(file_formats_spark_packages)
123128

@@ -808,7 +813,9 @@ def fill_with_data(df: DataFrame):
808813
pass
809814

810815

811-
@pytest.fixture(params=[("csv", {}), ("jsonline", {}), ("json", {}), ("excel", {}), ("xml", {})])
816+
@pytest.fixture(
817+
params=[("csv", {}), ("jsonline", {}), ("json", {}), ("excel", {}), ("xml", {}), ("orc", {}), ("parquet", {})],
818+
)
812819
def source_file_format(request: FixtureRequest):
813820
name, params = request.param
814821
if name == "csv":
@@ -845,10 +852,20 @@ def source_file_format(request: FixtureRequest):
845852
**params,
846853
)
847854

855+
if name == "orc":
856+
return "orc", ORC(
857+
**params,
858+
)
859+
860+
if name == "parquet":
861+
return "parquet", Parquet(
862+
**params,
863+
)
864+
848865
raise ValueError(f"Unsupported file format: {name}")
849866

850867

851-
@pytest.fixture(params=[("csv", {}), ("jsonline", {}), ("excel", {}), ("xml", {})])
868+
@pytest.fixture(params=[("csv", {}), ("jsonline", {}), ("excel", {}), ("xml", {}), ("orc", {}), ("parquet", {})])
852869
def target_file_format(request: FixtureRequest):
853870
name, params = request.param
854871
if name == "csv":
@@ -879,6 +896,16 @@ def target_file_format(request: FixtureRequest):
879896
**params,
880897
)
881898

899+
if name == "orc":
900+
return "orc", ORC(
901+
**params,
902+
)
903+
904+
if name == "parquet":
905+
return "parquet", Parquet(
906+
**params,
907+
)
908+
882909
raise ValueError(f"Unsupported file format: {name}")
883910

884911

tests/test_integration/test_run_transfer/test_hdfs.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,16 @@ async def postgres_to_hdfs(
133133
"without_compression",
134134
id="xml",
135135
),
136+
pytest.param(
137+
("orc", {}),
138+
"without_compression",
139+
id="orc",
140+
),
141+
pytest.param(
142+
("parquet", {}),
143+
"without_compression",
144+
id="parquet",
145+
),
136146
],
137147
indirect=["source_file_format", "file_format_flavor"],
138148
)
@@ -212,6 +222,16 @@ async def test_run_transfer_hdfs_to_postgres(
212222
"without_compression",
213223
id="xml",
214224
),
225+
pytest.param(
226+
("orc", {}),
227+
"without_compression",
228+
id="orc",
229+
),
230+
pytest.param(
231+
("parquet", {}),
232+
"without_compression",
233+
id="parquet",
234+
),
215235
],
216236
indirect=["target_file_format", "file_format_flavor"],
217237
)

tests/test_integration/test_run_transfer/test_s3.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,16 @@ async def postgres_to_s3(
133133
"without_compression",
134134
id="xml",
135135
),
136+
pytest.param(
137+
("orc", {}),
138+
"without_compression",
139+
id="orc",
140+
),
141+
pytest.param(
142+
("parquet", {}),
143+
"without_compression",
144+
id="parquet",
145+
),
136146
],
137147
indirect=["source_file_format", "file_format_flavor"],
138148
)
@@ -193,11 +203,6 @@ async def test_run_transfer_s3_to_postgres(
193203
@pytest.mark.parametrize(
194204
"target_file_format, file_format_flavor",
195205
[
196-
pytest.param(
197-
("xml", {}),
198-
"without_compression",
199-
id="xml",
200-
),
201206
pytest.param(
202207
("csv", {}),
203208
"with_header",
@@ -213,6 +218,21 @@ async def test_run_transfer_s3_to_postgres(
213218
"with_header",
214219
id="excel",
215220
),
221+
pytest.param(
222+
("xml", {}),
223+
"without_compression",
224+
id="xml",
225+
),
226+
pytest.param(
227+
("orc", {}),
228+
"without_compression",
229+
id="orc",
230+
),
231+
pytest.param(
232+
("parquet", {}),
233+
"without_compression",
234+
id="parquet",
235+
),
216236
],
217237
indirect=["target_file_format", "file_format_flavor"],
218238
)

0 commit comments

Comments
 (0)