Skip to content

Commit e94cddb

Browse files
author
Ilyas Gasanov
committed
[DOP-21450] Add compressions for file formats
1 parent 7a6b1a1 commit e94cddb

File tree

8 files changed

+157
-17
lines changed

8 files changed

+157
-17
lines changed

syncmaster/dto/transfers.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from dataclasses import dataclass
55
from typing import ClassVar
66

7-
from onetl.file.format import CSV, JSON, Excel, JSONLine
7+
from onetl.file.format import CSV, JSON, ORC, Excel, JSONLine, Parquet
88

99

1010
@dataclass
@@ -20,7 +20,7 @@ class DBTransferDTO(TransferDTO):
2020
@dataclass
2121
class FileTransferDTO(TransferDTO):
2222
directory_path: str
23-
file_format: CSV | JSONLine | JSON | Excel
23+
file_format: CSV | JSONLine | JSON | Excel | ORC | Parquet
2424
options: dict
2525
df_schema: dict | None = None
2626

@@ -29,6 +29,8 @@ class FileTransferDTO(TransferDTO):
2929
"jsonline": JSONLine,
3030
"json": JSON,
3131
"excel": Excel,
32+
"orc": ORC,
33+
"parquet": Parquet,
3234
}
3335

3436
def __post_init__(self):

syncmaster/schemas/v1/transfers/file_format.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from __future__ import annotations
44

5+
from enum import Enum
6+
57
from pydantic import BaseModel
68

79
from syncmaster.schemas.v1.file_formats import (
@@ -15,6 +17,52 @@
1517
)
1618

1719

20+
class ORCCompression(str, Enum):
21+
UNCOMPRESSED = "uncompressed"
22+
NONE = "none"
23+
SNAPPY = "snappy"
24+
ZLIB = "zlib"
25+
LZO = "lzo"
26+
ZSTD = "zstd"
27+
LZ4 = "lz4"
28+
29+
30+
class ParquetCompression(str, Enum):
31+
UNCOMPRESSED = "uncompressed"
32+
NONE = "none"
33+
SNAPPY = "snappy"
34+
GZIP = "gzip"
35+
LZO = "lzo"
36+
ZSTD = "zstd"
37+
LZ4 = "lz4"
38+
BROTLI = "brotli"
39+
40+
41+
class JSONCompression(str, Enum):
42+
NONE = "none"
43+
BZIP2 = "bzip2"
44+
GZIP = "gzip"
45+
LZ4 = "lz4"
46+
SNAPPY = "snappy"
47+
DEFLATE = "deflate"
48+
49+
50+
class CSVCompression(str, Enum):
51+
NONE = "none"
52+
BZIP2 = "bzip2"
53+
GZIP = "gzip"
54+
LZ4 = "lz4"
55+
SNAPPY = "snappy"
56+
DEFLATE = "deflate"
57+
58+
59+
class XMLCompression(str, Enum):
60+
BZIP2 = "bzip2"
61+
GZIP = "gzip"
62+
LZ4 = "lz4"
63+
SNAPPY = "snappy"
64+
65+
1866
class CSV(BaseModel):
1967
type: CSV_FORMAT
2068
delimiter: str = ","
@@ -23,18 +71,21 @@ class CSV(BaseModel):
2371
escape: str = "\\"
2472
include_header: bool = False
2573
line_sep: str = "\n"
74+
compression: CSVCompression = CSVCompression.NONE
2675

2776

2877
class JSONLine(BaseModel):
2978
type: JSONLINE_FORMAT
3079
encoding: str = "utf-8"
3180
line_sep: str = "\n"
81+
compression: JSONCompression = CSVCompression.NONE
3282

3383

3484
class JSON(BaseModel):
3585
type: JSON_FORMAT
3686
encoding: str = "utf-8"
3787
line_sep: str = "\n"
88+
compression: JSONCompression = CSVCompression.NONE
3889

3990

4091
class Excel(BaseModel):
@@ -47,11 +98,14 @@ class XML(BaseModel):
4798
type: XML_FORMAT
4899
root_tag: str
49100
row_tag: str
101+
compression: XMLCompression | None = None
50102

51103

52104
class ORC(BaseModel):
53105
type: ORC_FORMAT
106+
compression: ORCCompression = CSVCompression.NONE
54107

55108

56109
class Parquet(BaseModel):
57110
type: PARQUET_FORMAT
111+
compression: ParquetCompression = CSVCompression.NONE

tests/test_integration/test_run_transfer/conftest.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from onetl.connection import MSSQL, Clickhouse, Hive, MySQL, Oracle, Postgres, SparkS3
1212
from onetl.connection.file_connection.s3 import S3
1313
from onetl.db import DBWriter
14-
from onetl.file.format import CSV, JSON, Excel, JSONLine
14+
from onetl.file.format import CSV, JSON, ORC, Excel, JSONLine, Parquet
1515
from pyspark.sql import DataFrame, SparkSession
1616
from pyspark.sql.types import (
1717
DateType,
@@ -804,7 +804,7 @@ def fill_with_data(df: DataFrame):
804804
pass
805805

806806

807-
@pytest.fixture(params=[("csv", {}), ("jsonline", {}), ("json", {}), ("excel", {})])
807+
@pytest.fixture()
808808
def source_file_format(request: FixtureRequest):
809809
name, params = request.param
810810
if name == "csv":
@@ -835,10 +835,20 @@ def source_file_format(request: FixtureRequest):
835835
**params,
836836
)
837837

838+
if name == "orc":
839+
return "orc", ORC(
840+
**params,
841+
)
842+
843+
if name == "parquet":
844+
return "parquet", Parquet(
845+
**params,
846+
)
847+
838848
raise ValueError(f"Unsupported file format: {name}")
839849

840850

841-
@pytest.fixture(params=[("csv", {}), ("jsonline", {}), ("excel", {})])
851+
@pytest.fixture()
842852
def target_file_format(request: FixtureRequest):
843853
name, params = request.param
844854
if name == "csv":
@@ -863,6 +873,16 @@ def target_file_format(request: FixtureRequest):
863873
**params,
864874
)
865875

876+
if name == "orc":
877+
return "orc", ORC(
878+
**params,
879+
)
880+
881+
if name == "parquet":
882+
return "parquet", Parquet(
883+
**params,
884+
)
885+
866886
raise ValueError(f"Unsupported file format: {name}")
867887

868888

tests/test_integration/test_run_transfer/test_hdfs.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from sqlalchemy.ext.asyncio import AsyncSession
1414

1515
from syncmaster.db.models import Connection, Group, Queue, Status
16+
from syncmaster.db.models.transfer import Transfer
1617
from tests.mocks import MockUser
1718
from tests.test_unit.utils import create_transfer
1819
from tests.utils import get_run_on_end
@@ -114,8 +115,8 @@ async def postgres_to_hdfs(
114115
id="csv",
115116
),
116117
pytest.param(
117-
("json", {}),
118-
"without_compression",
118+
("json", {"compression": "gzip"}),
119+
"with_compression",
119120
id="json",
120121
),
121122
pytest.param(
@@ -128,6 +129,16 @@ async def postgres_to_hdfs(
128129
"with_header",
129130
id="excel",
130131
),
132+
pytest.param(
133+
("orc", {"compression": "snappy"}),
134+
"with_compression",
135+
id="orc",
136+
),
137+
pytest.param(
138+
("parquet", {"compression": "snappy"}),
139+
"with_compression",
140+
id="parquet",
141+
),
131142
],
132143
indirect=["source_file_format", "file_format_flavor"],
133144
)
@@ -136,7 +147,7 @@ async def test_run_transfer_hdfs_to_postgres(
136147
group_owner: MockUser,
137148
init_df: DataFrame,
138149
client: AsyncClient,
139-
hdfs_to_postgres: Connection,
150+
hdfs_to_postgres: Transfer,
140151
source_file_format,
141152
file_format_flavor,
142153
):
@@ -188,8 +199,8 @@ async def test_run_transfer_hdfs_to_postgres(
188199
"target_file_format, file_format_flavor",
189200
[
190201
pytest.param(
191-
("csv", {}),
192-
"with_header",
202+
("csv", {"compression": "lz4"}),
203+
"with_compression",
193204
id="csv",
194205
),
195206
pytest.param(
@@ -202,6 +213,16 @@ async def test_run_transfer_hdfs_to_postgres(
202213
"with_header",
203214
id="excel",
204215
),
216+
pytest.param(
217+
("orc", {"compression": "lzo"}),
218+
"with_compression",
219+
id="orc",
220+
),
221+
pytest.param(
222+
("parquet", {"compression": "brotli"}),
223+
"with_compression",
224+
id="parquet",
225+
),
205226
],
206227
indirect=["target_file_format", "file_format_flavor"],
207228
)
@@ -211,7 +232,7 @@ async def test_run_transfer_postgres_to_hdfs(
211232
client: AsyncClient,
212233
prepare_postgres,
213234
hdfs_file_df_connection: SparkHDFS,
214-
postgres_to_hdfs: Connection,
235+
postgres_to_hdfs: Transfer,
215236
hdfs_connection: SparkHDFS,
216237
target_file_format,
217238
file_format_flavor: str,

tests/test_integration/test_run_transfer/test_s3.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from sqlalchemy.ext.asyncio import AsyncSession
1414

1515
from syncmaster.db.models import Connection, Group, Queue, Status
16+
from syncmaster.db.models.transfer import Transfer
1617
from tests.mocks import MockUser
1718
from tests.test_unit.utils import create_transfer
1819
from tests.utils import get_run_on_end
@@ -114,8 +115,8 @@ async def postgres_to_s3(
114115
id="csv",
115116
),
116117
pytest.param(
117-
("json", {}),
118-
"without_compression",
118+
("json", {"compression": "gzip"}),
119+
"with_compression",
119120
id="json",
120121
),
121122
pytest.param(
@@ -128,6 +129,16 @@ async def postgres_to_s3(
128129
"with_header",
129130
id="excel",
130131
),
132+
pytest.param(
133+
("orc", {"compression": "snappy"}),
134+
"with_compression",
135+
id="orc",
136+
),
137+
pytest.param(
138+
("parquet", {"compression": "snappy"}),
139+
"with_compression",
140+
id="parquet",
141+
),
131142
],
132143
indirect=["source_file_format", "file_format_flavor"],
133144
)
@@ -136,7 +147,7 @@ async def test_run_transfer_s3_to_postgres(
136147
group_owner: MockUser,
137148
init_df: DataFrame,
138149
client: AsyncClient,
139-
s3_to_postgres: Connection,
150+
s3_to_postgres: Transfer,
140151
source_file_format,
141152
file_format_flavor,
142153
):
@@ -189,8 +200,8 @@ async def test_run_transfer_s3_to_postgres(
189200
"target_file_format, file_format_flavor",
190201
[
191202
pytest.param(
192-
("csv", {}),
193-
"with_header",
203+
("csv", {"compression": "lz4"}),
204+
"with_compression",
194205
id="csv",
195206
),
196207
pytest.param(
@@ -203,6 +214,16 @@ async def test_run_transfer_s3_to_postgres(
203214
"with_header",
204215
id="excel",
205216
),
217+
pytest.param(
218+
("orc", {"compression": "none"}),
219+
"with_compression",
220+
id="orc",
221+
),
222+
pytest.param(
223+
("parquet", {"compression": "brotli"}),
224+
"with_compression",
225+
id="parquet",
226+
),
206227
],
207228
indirect=["target_file_format", "file_format_flavor"],
208229
)
@@ -213,7 +234,7 @@ async def test_run_transfer_postgres_to_s3(
213234
s3_file_df_connection: SparkS3,
214235
prepare_postgres,
215236
prepare_s3,
216-
postgres_to_s3: Connection,
237+
postgres_to_s3: Transfer,
217238
target_file_format,
218239
file_format_flavor: str,
219240
):

0 commit comments

Comments
 (0)