Skip to content

Commit 5467e23

Browse files
author
maxim-lixakov
committed
[DOP-21445] add orc, parquest tests
1 parent 10122be commit 5467e23

File tree

5 files changed

+82
-18
lines changed

5 files changed

+82
-18
lines changed

syncmaster/dto/transfers.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from dataclasses import dataclass
55
from typing import ClassVar
66

7-
from onetl.file.format import CSV, JSON, XML, Excel, JSONLine
7+
from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
88

99

1010
@dataclass
@@ -30,6 +30,8 @@ class FileTransferDTO(TransferDTO):
3030
"json": JSON,
3131
"excel": Excel,
3232
"xml": XML,
33+
"orc": ORC,
34+
"parquet": Parquet,
3335
}
3436

3537
def __post_init__(self):

syncmaster/worker/spark.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ def get_worker_spark_session(
3232
log.debug("Enabling Hive support")
3333
spark_builder = spark_builder.enableHiveSupport()
3434

35-
spark_builder.sparkContext.setLogLevel("DEBUG")
3635
return spark_builder.getOrCreate()
3736

3837

tests/test_integration/test_run_transfer/conftest.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from onetl.connection import MSSQL, Clickhouse, Hive, MySQL, Oracle, Postgres, SparkS3
1212
from onetl.connection.file_connection.s3 import S3
1313
from onetl.db import DBWriter
14-
from onetl.file.format import CSV, JSON, XML, Excel, JSONLine
14+
from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
1515
from pyspark.sql import DataFrame, SparkSession
1616
from pyspark.sql.types import (
1717
DateType,
@@ -119,6 +119,7 @@ def spark(settings: Settings, request: FixtureRequest) -> SparkSession:
119119
file_formats_spark_packages: list[str] = XML.get_packages(spark_version=spark_version) + Excel.get_packages(
120120
spark_version=spark_version,
121121
)
122+
122123
maven_packages.extend(file_formats_spark_packages)
123124

124125
if maven_packages:
@@ -808,7 +809,9 @@ def fill_with_data(df: DataFrame):
808809
pass
809810

810811

811-
@pytest.fixture(params=[("csv", {}), ("jsonline", {}), ("json", {}), ("excel", {}), ("xml", {})])
812+
@pytest.fixture(
813+
params=[("csv", {}), ("jsonline", {}), ("json", {}), ("excel", {}), ("xml", {}), ("orc", {}), ("parquet", {})],
814+
)
812815
def source_file_format(request: FixtureRequest):
813816
name, params = request.param
814817
if name == "csv":
@@ -845,10 +848,20 @@ def source_file_format(request: FixtureRequest):
845848
**params,
846849
)
847850

851+
if name == "orc":
852+
return "orc", ORC(
853+
**params,
854+
)
855+
856+
if name == "parquet":
857+
return "parquet", Parquet(
858+
**params,
859+
)
860+
848861
raise ValueError(f"Unsupported file format: {name}")
849862

850863

851-
@pytest.fixture(params=[("csv", {}), ("jsonline", {}), ("excel", {}), ("xml", {})])
864+
@pytest.fixture(params=[("csv", {}), ("jsonline", {}), ("excel", {}), ("xml", {}), ("orc", {}), ("parquet", {})])
852865
def target_file_format(request: FixtureRequest):
853866
name, params = request.param
854867
if name == "csv":
@@ -879,6 +892,16 @@ def target_file_format(request: FixtureRequest):
879892
**params,
880893
)
881894

895+
if name == "orc":
896+
return "orc", ORC(
897+
**params,
898+
)
899+
900+
if name == "parquet":
901+
return "parquet", Parquet(
902+
**params,
903+
)
904+
882905
raise ValueError(f"Unsupported file format: {name}")
883906

884907

tests/test_integration/test_run_transfer/test_hdfs.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,16 @@ async def postgres_to_hdfs(
133133
"without_compression",
134134
id="xml",
135135
),
136+
pytest.param(
137+
("orc", {}),
138+
"without_compression",
139+
id="orc",
140+
),
141+
pytest.param(
142+
("parquet", {}),
143+
"without_compression",
144+
id="parquet",
145+
),
136146
],
137147
indirect=["source_file_format", "file_format_flavor"],
138148
)
@@ -178,8 +188,8 @@ async def test_run_transfer_hdfs_to_postgres(
178188
)
179189
df = reader.run()
180190

181-
# as Excel does not support datetime values with precision greater than milliseconds
182-
if file_format == "excel":
191+
# as Excel and Parquet does not support datetime values with precision greater than milliseconds
192+
if file_format in ("excel", "parquet"):
183193
df = df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT")))
184194
init_df = init_df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT")))
185195

@@ -212,6 +222,16 @@ async def test_run_transfer_hdfs_to_postgres(
212222
"without_compression",
213223
id="xml",
214224
),
225+
pytest.param(
226+
("orc", {}),
227+
"without_compression",
228+
id="orc",
229+
),
230+
pytest.param(
231+
("parquet", {}),
232+
"without_compression",
233+
id="parquet",
234+
),
215235
],
216236
indirect=["target_file_format", "file_format_flavor"],
217237
)
@@ -264,8 +284,8 @@ async def test_run_transfer_postgres_to_hdfs(
264284
)
265285
df = reader.run()
266286

267-
# as Excel does not support datetime values with precision greater than milliseconds
268-
if format_name == "excel":
287+
# as Excel and Parquet does not support datetime values with precision greater than milliseconds
288+
if format_name in ("excel", "parquet"):
269289
init_df = init_df.withColumn(
270290
"REGISTERED_AT",
271291
to_timestamp(date_format(col("REGISTERED_AT"), "yyyy-MM-dd HH:mm:ss.SSS")),

tests/test_integration/test_run_transfer/test_s3.py

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,16 @@ async def postgres_to_s3(
133133
"without_compression",
134134
id="xml",
135135
),
136+
pytest.param(
137+
("orc", {}),
138+
"without_compression",
139+
id="orc",
140+
),
141+
pytest.param(
142+
("parquet", {}),
143+
"without_compression",
144+
id="parquet",
145+
),
136146
],
137147
indirect=["source_file_format", "file_format_flavor"],
138148
)
@@ -179,8 +189,8 @@ async def test_run_transfer_s3_to_postgres(
179189
)
180190
df = reader.run()
181191

182-
# as Excel does not support datetime values with precision greater than milliseconds
183-
if file_format == "excel":
192+
# as Excel and Parquet does not support datetime values with precision greater than milliseconds
193+
if file_format in ("excel", "parquet"):
184194
df = df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT")))
185195
init_df = init_df.withColumn("REGISTERED_AT", date_trunc("second", col("REGISTERED_AT")))
186196

@@ -193,11 +203,6 @@ async def test_run_transfer_s3_to_postgres(
193203
@pytest.mark.parametrize(
194204
"target_file_format, file_format_flavor",
195205
[
196-
pytest.param(
197-
("xml", {}),
198-
"without_compression",
199-
id="xml",
200-
),
201206
pytest.param(
202207
("csv", {}),
203208
"with_header",
@@ -213,6 +218,21 @@ async def test_run_transfer_s3_to_postgres(
213218
"with_header",
214219
id="excel",
215220
),
221+
pytest.param(
222+
("xml", {}),
223+
"without_compression",
224+
id="xml",
225+
),
226+
pytest.param(
227+
("orc", {}),
228+
"without_compression",
229+
id="orc",
230+
),
231+
pytest.param(
232+
("parquet", {}),
233+
"without_compression",
234+
id="parquet",
235+
),
216236
],
217237
indirect=["target_file_format", "file_format_flavor"],
218238
)
@@ -265,8 +285,8 @@ async def test_run_transfer_postgres_to_s3(
265285
)
266286
df = reader.run()
267287

268-
# as Excel does not support datetime values with precision greater than milliseconds
269-
if format_name == "excel":
288+
# as Excel and Parquet does not support datetime values with precision greater than milliseconds
289+
if format_name in ("excel", "parquet"):
270290
init_df = init_df.withColumn(
271291
"REGISTERED_AT",
272292
to_timestamp(date_format(col("REGISTERED_AT"), "yyyy-MM-dd HH:mm:ss.SSS")),

0 commit comments

Comments
 (0)