Skip to content

Commit 8d9a7bb

Browse files
author
Ilyas Gasanov
committed
[DOP-29428] Add support for Iceberg with REST catalog and S3 warehouse
1 parent 496c19d commit 8d9a7bb

File tree

12 files changed

+188
-46
lines changed

12 files changed

+188
-46
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added support for Iceberg with REST catalog and S3 warehouse

poetry.lock

Lines changed: 38 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ pyjwt = { version = "^2.10.1", optional = true }
5656
jinja2 = { version = "^3.1.6", optional = true }
5757
python-multipart = { version = "^0.0.20", optional = true }
5858
celery = { version = "^5.5.0", optional = true }
59-
onetl = { version = ">=0.13.5,<0.15.0", extras = ["all"], optional = true }
59+
onetl = { git = "https://github.com/MobileTeleSystems/onetl.git", branch = "develop", extras = ["all"], optional = true }
60+
# TODO: revert before next syncmaster release
6061
pyspark = { version = "<4.0.0", optional = true }
6162
pyyaml = { version = "*", optional = true }
6263
psycopg2-binary = { version = "^2.9.10", optional = true }

syncmaster/db/models/connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
class ConnectionType(StrEnum):
1919
POSTGRES = "postgres"
2020
HIVE = "hive"
21-
ICEBERG = "iceberg_rest_s3"
21+
ICEBERG_REST_S3 = "iceberg_rest_s3"
2222
ORACLE = "oracle"
2323
CLICKHOUSE = "clickhouse"
2424
MSSQL = "mssql"

syncmaster/dto/connections.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,23 @@ class HiveConnectionDTO(ConnectionDTO):
7373
type: ClassVar[str] = "hive"
7474

7575

76+
@dataclass
77+
class IcebergRESTCatalogS3ConnectionDTO(ConnectionDTO):
78+
metastore_url: str
79+
s3_warehouse_path: str
80+
s3_host: str
81+
s3_bucket: str
82+
s3_region: str
83+
s3_access_key: str
84+
s3_secret_key: str
85+
metastore_username: str
86+
metastore_password: str
87+
s3_port: int | None = None
88+
s3_protocol: str = "https"
89+
s3_path_style_access: bool = False
90+
type: ClassVar[str] = "iceberg_rest_s3"
91+
92+
7693
@dataclass
7794
class HDFSConnectionDTO(ConnectionDTO):
7895
user: str

syncmaster/dto/transfers.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
33
import json
4-
from dataclasses import dataclass
4+
from dataclasses import dataclass, field
55
from typing import ClassVar
6+
from uuid import uuid4
67

78
from onetl.file.format import CSV, JSON, ORC, XML, Excel, JSONLine, Parquet
89

@@ -107,6 +108,16 @@ def __post_init__(self):
107108
self.options.setdefault("if_exists", "replace_overlapping_partitions")
108109

109110

111+
@dataclass
112+
class IcebergRESTCatalogS3TransferDTO(DBTransferDTO):
113+
type: ClassVar[str] = "iceberg_rest_s3"
114+
catalog_name: str = field(default_factory=lambda: f"iceberg_rest_s3_{uuid4().hex[:8]}")
115+
116+
def __post_init__(self):
117+
super().__post_init__()
118+
self.options.setdefault("if_exists", "replace_overlapping_partitions")
119+
120+
110121
@dataclass
111122
class S3TransferDTO(FileTransferDTO):
112123
type: ClassVar[str] = "s3"

syncmaster/schemas/v1/connection_types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import Literal
44

55
HIVE_TYPE = Literal["hive"]
6-
ICEBERG_TYPE = Literal["iceberg_rest_s3"]
6+
ICEBERG_REST_S3_TYPE = Literal["iceberg_rest_s3"]
77
ORACLE_TYPE = Literal["oracle"]
88
POSTGRES_TYPE = Literal["postgres"]
99
CLICKHOUSE_TYPE = Literal["clickhouse"]

syncmaster/schemas/v1/connections/connection_base.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,17 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from pydantic import BaseModel, ConfigDict, Field
44

5-
from syncmaster.schemas.v1.auth import ReadBasicAuthSchema, ReadS3AuthSchema
6-
from syncmaster.schemas.v1.auth.samba import ReadSambaAuthSchema
5+
from syncmaster.schemas.v1.auth import (
6+
ReadBasicAuthSchema,
7+
ReadIcebergRESTCatalogBasicAuthSchema,
8+
ReadS3AuthSchema,
9+
ReadSambaAuthSchema,
10+
)
711
from syncmaster.schemas.v1.types import NameConstr
812

9-
ReadConnectionAuthDataSchema = ReadBasicAuthSchema | ReadS3AuthSchema | ReadSambaAuthSchema
13+
ReadConnectionAuthDataSchema = (
14+
ReadBasicAuthSchema | ReadS3AuthSchema | ReadSambaAuthSchema | ReadIcebergRESTCatalogBasicAuthSchema
15+
)
1016

1117

1218
class CreateConnectionBaseSchema(BaseModel):

syncmaster/schemas/v1/connections/iceberg.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
ReadIcebergRESTCatalogBasicAuthSchema,
1111
UpdateIcebergRESTCatalogBasicAuthSchema,
1212
)
13-
from syncmaster.schemas.v1.connection_types import ICEBERG_TYPE
13+
from syncmaster.schemas.v1.connection_types import ICEBERG_REST_S3_TYPE
1414
from syncmaster.schemas.v1.connections.connection_base import (
1515
CreateConnectionBaseSchema,
1616
ReadConnectionBaseSchema,
@@ -40,7 +40,7 @@ class ReadIcebergRESTCatalogS3ConnectionDataSchema(BaseModel):
4040

4141

4242
class CreateIcebergConnectionSchema(CreateConnectionBaseSchema):
43-
type: ICEBERG_TYPE = Field(description="Connection type")
43+
type: ICEBERG_REST_S3_TYPE = Field(description="Connection type")
4444
data: CreateIcebergRESTCatalogS3ConnectionDataSchema = Field(
4545
...,
4646
alias="connection_data",
@@ -54,7 +54,7 @@ class CreateIcebergConnectionSchema(CreateConnectionBaseSchema):
5454

5555

5656
class ReadIcebergConnectionSchema(ReadConnectionBaseSchema):
57-
type: ICEBERG_TYPE
57+
type: ICEBERG_REST_S3_TYPE
5858
data: ReadIcebergRESTCatalogS3ConnectionDataSchema = Field(alias="connection_data")
5959
auth_data: ReadIcebergRESTCatalogBasicAuthSchema | None = None
6060

syncmaster/worker/controller.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
FTPSConnectionDTO,
1818
HDFSConnectionDTO,
1919
HiveConnectionDTO,
20+
IcebergRESTCatalogS3ConnectionDTO,
2021
MSSQLConnectionDTO,
2122
MySQLConnectionDTO,
2223
OracleConnectionDTO,
@@ -33,6 +34,7 @@
3334
FTPTransferDTO,
3435
HDFSTransferDTO,
3536
HiveTransferDTO,
37+
IcebergRESTCatalogS3TransferDTO,
3638
MSSQLTransferDTO,
3739
MySQLTransferDTO,
3840
OracleTransferDTO,
@@ -45,10 +47,13 @@
4547
from syncmaster.dto.transfers_resources import Resources
4648
from syncmaster.dto.transfers_strategy import Strategy
4749
from syncmaster.exceptions.connection import ConnectionTypeNotRecognizedError
48-
from syncmaster.schemas.v1.connection_types import FILE_CONNECTION_TYPES
50+
from syncmaster.schemas.v1.connection_types import (
51+
FILE_CONNECTION_TYPES,
52+
)
4953
from syncmaster.worker.handlers.base import Handler
5054
from syncmaster.worker.handlers.db.clickhouse import ClickhouseHandler
5155
from syncmaster.worker.handlers.db.hive import HiveHandler
56+
from syncmaster.worker.handlers.db.iceberg import IcebergRESTCatalogS3Handler
5257
from syncmaster.worker.handlers.db.mssql import MSSQLHandler
5358
from syncmaster.worker.handlers.db.mysql import MySQLHandler
5459
from syncmaster.worker.handlers.db.oracle import OracleHandler
@@ -72,6 +77,12 @@
7277
HiveTransferDTO,
7378
RunDTO,
7479
),
80+
"iceberg_rest_s3": (
81+
IcebergRESTCatalogS3Handler,
82+
IcebergRESTCatalogS3ConnectionDTO,
83+
IcebergRESTCatalogS3TransferDTO,
84+
RunDTO,
85+
),
7586
"oracle": (
7687
OracleHandler,
7788
OracleConnectionDTO,

0 commit comments

Comments
 (0)