Skip to content

Commit 2d7c931

Browse files
[DOP-21450] Add compression options for file formats (#159)
1 parent 7a6b1a1 commit 2d7c931

File tree

15 files changed

+135
-14
lines changed

15 files changed

+135
-14
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add compression options to file formats CSV, JSON, JSONLine, Excel, ORC, Parquet

syncmaster/dto/transfers.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from dataclasses import dataclass
55
from typing import ClassVar
66

7-
from onetl.file.format import CSV, JSON, Excel, JSONLine
7+
from onetl.file.format import CSV, JSON, ORC, Excel, JSONLine, Parquet
88

99

1010
@dataclass
@@ -20,7 +20,7 @@ class DBTransferDTO(TransferDTO):
2020
@dataclass
2121
class FileTransferDTO(TransferDTO):
2222
directory_path: str
23-
file_format: CSV | JSONLine | JSON | Excel
23+
file_format: CSV | JSONLine | JSON | Excel | ORC | Parquet
2424
options: dict
2525
df_schema: dict | None = None
2626

@@ -29,6 +29,8 @@ class FileTransferDTO(TransferDTO):
2929
"jsonline": JSONLine,
3030
"json": JSON,
3131
"excel": Excel,
32+
"orc": ORC,
33+
"parquet": Parquet,
3234
}
3335

3436
def __post_init__(self):

syncmaster/schemas/v1/transfers/file_format.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from __future__ import annotations
44

5+
from enum import Enum
6+
57
from pydantic import BaseModel
68

79
from syncmaster.schemas.v1.file_formats import (
@@ -15,6 +17,38 @@
1517
)
1618

1719

20+
class ORCCompression(str, Enum):
21+
NONE = "none"
22+
SNAPPY = "snappy"
23+
ZLIB = "zlib"
24+
LZ4 = "lz4"
25+
26+
27+
class ParquetCompression(str, Enum):
28+
NONE = "none"
29+
SNAPPY = "snappy"
30+
GZIP = "gzip"
31+
LZ4 = "lz4"
32+
33+
34+
class JSONCompression(str, Enum):
35+
NONE = "none"
36+
BZIP2 = "bzip2"
37+
GZIP = "gzip"
38+
LZ4 = "lz4"
39+
SNAPPY = "snappy"
40+
DEFLATE = "deflate"
41+
42+
43+
class CSVCompression(str, Enum):
44+
NONE = "none"
45+
BZIP2 = "bzip2"
46+
GZIP = "gzip"
47+
LZ4 = "lz4"
48+
SNAPPY = "snappy"
49+
DEFLATE = "deflate"
50+
51+
1852
class CSV(BaseModel):
1953
type: CSV_FORMAT
2054
delimiter: str = ","
@@ -23,18 +57,21 @@ class CSV(BaseModel):
2357
escape: str = "\\"
2458
include_header: bool = False
2559
line_sep: str = "\n"
60+
compression: CSVCompression = CSVCompression.GZIP
2661

2762

2863
class JSONLine(BaseModel):
2964
type: JSONLINE_FORMAT
3065
encoding: str = "utf-8"
3166
line_sep: str = "\n"
67+
compression: JSONCompression = JSONCompression.GZIP
3268

3369

3470
class JSON(BaseModel):
3571
type: JSON_FORMAT
3672
encoding: str = "utf-8"
3773
line_sep: str = "\n"
74+
compression: JSONCompression = JSONCompression.GZIP
3875

3976

4077
class Excel(BaseModel):
@@ -51,7 +88,9 @@ class XML(BaseModel):
5188

5289
class ORC(BaseModel):
5390
type: ORC_FORMAT
91+
compression: ORCCompression = ORCCompression.ZLIB
5492

5593

5694
class Parquet(BaseModel):
5795
type: PARQUET_FORMAT
96+
compression: ParquetCompression = ParquetCompression.SNAPPY

tests/resources/file_df_connection/generate_files.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def get_pyarrow_schema() -> ArrowSchema:
6464
pa.field("REGION", pa.string()),
6565
pa.field("NUMBER", pa.int32()),
6666
pa.field("BIRTH_DATE", pa.date32()),
67-
pa.field("REGISTERED_AT", pa.timestamp("ms")),
67+
pa.field("REGISTERED_AT", pa.timestamp("us")),
6868
pa.field("ACCOUNT_BALANCE", pa.float64()),
6969
],
7070
)
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

tests/test_integration/test_run_transfer/conftest.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from onetl.connection import MSSQL, Clickhouse, Hive, MySQL, Oracle, Postgres, SparkS3
1212
from onetl.connection.file_connection.s3 import S3
1313
from onetl.db import DBWriter
14-
from onetl.file.format import CSV, JSON, Excel, JSONLine
14+
from onetl.file.format import CSV, JSON, ORC, Excel, JSONLine, Parquet
1515
from pyspark.sql import DataFrame, SparkSession
1616
from pyspark.sql.types import (
1717
DateType,
@@ -804,7 +804,7 @@ def fill_with_data(df: DataFrame):
804804
pass
805805

806806

807-
@pytest.fixture(params=[("csv", {}), ("jsonline", {}), ("json", {}), ("excel", {})])
807+
@pytest.fixture()
808808
def source_file_format(request: FixtureRequest):
809809
name, params = request.param
810810
if name == "csv":
@@ -835,10 +835,20 @@ def source_file_format(request: FixtureRequest):
835835
**params,
836836
)
837837

838+
if name == "orc":
839+
return "orc", ORC(
840+
**params,
841+
)
842+
843+
if name == "parquet":
844+
return "parquet", Parquet(
845+
**params,
846+
)
847+
838848
raise ValueError(f"Unsupported file format: {name}")
839849

840850

841-
@pytest.fixture(params=[("csv", {}), ("jsonline", {}), ("excel", {})])
851+
@pytest.fixture()
842852
def target_file_format(request: FixtureRequest):
843853
name, params = request.param
844854
if name == "csv":
@@ -863,6 +873,16 @@ def target_file_format(request: FixtureRequest):
863873
**params,
864874
)
865875

876+
if name == "orc":
877+
return "orc", ORC(
878+
**params,
879+
)
880+
881+
if name == "parquet":
882+
return "parquet", Parquet(
883+
**params,
884+
)
885+
866886
raise ValueError(f"Unsupported file format: {name}")
867887

868888

tests/test_integration/test_run_transfer/test_hdfs.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from sqlalchemy.ext.asyncio import AsyncSession
1414

1515
from syncmaster.db.models import Connection, Group, Queue, Status
16+
from syncmaster.db.models.transfer import Transfer
1617
from tests.mocks import MockUser
1718
from tests.test_unit.utils import create_transfer
1819
from tests.utils import get_run_on_end
@@ -128,6 +129,16 @@ async def postgres_to_hdfs(
128129
"with_header",
129130
id="excel",
130131
),
132+
pytest.param(
133+
("orc", {}),
134+
"without_compression",
135+
id="orc",
136+
),
137+
pytest.param(
138+
("parquet", {}),
139+
"without_compression",
140+
id="parquet",
141+
),
131142
],
132143
indirect=["source_file_format", "file_format_flavor"],
133144
)
@@ -136,7 +147,7 @@ async def test_run_transfer_hdfs_to_postgres(
136147
group_owner: MockUser,
137148
init_df: DataFrame,
138149
client: AsyncClient,
139-
hdfs_to_postgres: Connection,
150+
hdfs_to_postgres: Transfer,
140151
source_file_format,
141152
file_format_flavor,
142153
):
@@ -188,8 +199,8 @@ async def test_run_transfer_hdfs_to_postgres(
188199
"target_file_format, file_format_flavor",
189200
[
190201
pytest.param(
191-
("csv", {}),
192-
"with_header",
202+
("csv", {"compression": "lz4"}),
203+
"with_compression",
193204
id="csv",
194205
),
195206
pytest.param(
@@ -202,6 +213,16 @@ async def test_run_transfer_hdfs_to_postgres(
202213
"with_header",
203214
id="excel",
204215
),
216+
pytest.param(
217+
("orc", {"compression": "snappy"}),
218+
"with_compression",
219+
id="orc",
220+
),
221+
pytest.param(
222+
("parquet", {"compression": "lz4"}),
223+
"with_compression",
224+
id="parquet",
225+
),
205226
],
206227
indirect=["target_file_format", "file_format_flavor"],
207228
)
@@ -211,7 +232,7 @@ async def test_run_transfer_postgres_to_hdfs(
211232
client: AsyncClient,
212233
prepare_postgres,
213234
hdfs_file_df_connection: SparkHDFS,
214-
postgres_to_hdfs: Connection,
235+
postgres_to_hdfs: Transfer,
215236
hdfs_connection: SparkHDFS,
216237
target_file_format,
217238
file_format_flavor: str,

0 commit comments

Comments
 (0)