Skip to content

Commit c163640

Browse files
author
Ilyas Gasanov
committed
[DOP-29428] Add support for Iceberg with REST catalog and S3 warehouse
1 parent 676bb9f commit c163640

File tree

13 files changed

+200
-49
lines changed

13 files changed

+200
-49
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added support for Iceberg with REST catalog and S3 warehouse

poetry.lock

Lines changed: 38 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ pyjwt = { version = "^2.10.1", optional = true }
5656
jinja2 = { version = "^3.1.6", optional = true }
5757
python-multipart = { version = "^0.0.20", optional = true }
5858
celery = { version = "^5.5.0", optional = true }
59-
onetl = { version = ">=0.13.5,<0.15.0", extras = ["all"], optional = true }
59+
onetl = { git = "https://github.com/MobileTeleSystems/onetl.git", branch = "develop", extras = ["all"], optional = true }
60+
# TODO: revert before next syncmaster release
6061
pyspark = { version = "<4.0.0", optional = true }
6162
pyyaml = { version = "*", optional = true }
6263
psycopg2-binary = { version = "^2.9.10", optional = true }

syncmaster/db/models/connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
class ConnectionType(StrEnum):
1919
POSTGRES = "postgres"
2020
HIVE = "hive"
21-
ICEBERG = "iceberg_rest_s3"
21+
ICEBERG_REST_S3 = "iceberg_rest_s3"
2222
ORACLE = "oracle"
2323
CLICKHOUSE = "clickhouse"
2424
MSSQL = "mssql"

syncmaster/dto/connections.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,23 @@ class HiveConnectionDTO(ConnectionDTO):
7373
type: ClassVar[str] = "hive"
7474

7575

76+
@dataclass
77+
class IcebergRESTCatalogS3ConnectionDTO(ConnectionDTO):
78+
metastore_url: str
79+
s3_warehouse_path: str
80+
s3_host: str
81+
s3_bucket: str
82+
s3_region: str
83+
s3_access_key: str
84+
s3_secret_key: str
85+
metastore_username: str
86+
metastore_password: str
87+
s3_port: int | None = None
88+
s3_protocol: str = "https"
89+
s3_path_style_access: bool = False
90+
type: ClassVar[str] = "iceberg_rest_s3"
91+
92+
7693
@dataclass
7794
class HDFSConnectionDTO(ConnectionDTO):
7895
user: str

syncmaster/dto/transfers.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,16 @@ def __post_init__(self):
107107
self.options.setdefault("if_exists", "replace_overlapping_partitions")
108108

109109

110+
@dataclass
111+
class IcebergRESTCatalogS3TransferDTO(DBTransferDTO):
112+
type: ClassVar[str] = "iceberg_rest_s3"
113+
catalog_name: str | None = None
114+
115+
def __post_init__(self):
116+
super().__post_init__()
117+
self.options.setdefault("if_exists", "replace_overlapping_partitions")
118+
119+
110120
@dataclass
111121
class S3TransferDTO(FileTransferDTO):
112122
type: ClassVar[str] = "s3"

syncmaster/schemas/v1/connection_types.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from typing import Literal
44

55
HIVE_TYPE = Literal["hive"]
6-
ICEBERG_TYPE = Literal["iceberg_rest_s3"]
6+
ICEBERG_REST_S3_TYPE = Literal["iceberg_rest_s3"]
77
ORACLE_TYPE = Literal["oracle"]
88
POSTGRES_TYPE = Literal["postgres"]
99
CLICKHOUSE_TYPE = Literal["clickhouse"]
@@ -17,6 +17,7 @@
1717
WEBDAV_TYPE = Literal["webdav"]
1818
SAMBA_TYPE = Literal["samba"]
1919

20+
ICEBERG_REST_S3_CONNECTION_TYPE = "iceberg_rest_s3"
2021
CONNECTION_TYPES = [
2122
"oracle",
2223
"postgres",

syncmaster/schemas/v1/connections/connection_base.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,17 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from pydantic import BaseModel, ConfigDict, Field
44

5-
from syncmaster.schemas.v1.auth import ReadBasicAuthSchema, ReadS3AuthSchema
6-
from syncmaster.schemas.v1.auth.samba import ReadSambaAuthSchema
5+
from syncmaster.schemas.v1.auth import (
6+
ReadBasicAuthSchema,
7+
ReadIcebergRESTCatalogBasicAuthSchema,
8+
ReadS3AuthSchema,
9+
ReadSambaAuthSchema,
10+
)
711
from syncmaster.schemas.v1.types import NameConstr
812

9-
ReadConnectionAuthDataSchema = ReadBasicAuthSchema | ReadS3AuthSchema | ReadSambaAuthSchema
13+
ReadConnectionAuthDataSchema = (
14+
ReadBasicAuthSchema | ReadS3AuthSchema | ReadSambaAuthSchema | ReadIcebergRESTCatalogBasicAuthSchema
15+
)
1016

1117

1218
class CreateConnectionBaseSchema(BaseModel):

syncmaster/schemas/v1/connections/iceberg.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
ReadIcebergRESTCatalogBasicAuthSchema,
1111
UpdateIcebergRESTCatalogBasicAuthSchema,
1212
)
13-
from syncmaster.schemas.v1.connection_types import ICEBERG_TYPE
13+
from syncmaster.schemas.v1.connection_types import ICEBERG_REST_S3_TYPE
1414
from syncmaster.schemas.v1.connections.connection_base import (
1515
CreateConnectionBaseSchema,
1616
ReadConnectionBaseSchema,
@@ -40,7 +40,7 @@ class ReadIcebergRESTCatalogS3ConnectionDataSchema(BaseModel):
4040

4141

4242
class CreateIcebergConnectionSchema(CreateConnectionBaseSchema):
43-
type: ICEBERG_TYPE = Field(description="Connection type")
43+
type: ICEBERG_REST_S3_TYPE = Field(description="Connection type")
4444
data: CreateIcebergRESTCatalogS3ConnectionDataSchema = Field(
4545
...,
4646
alias="connection_data",
@@ -54,7 +54,7 @@ class CreateIcebergConnectionSchema(CreateConnectionBaseSchema):
5454

5555

5656
class ReadIcebergConnectionSchema(ReadConnectionBaseSchema):
57-
type: ICEBERG_TYPE
57+
type: ICEBERG_REST_S3_TYPE
5858
data: ReadIcebergRESTCatalogS3ConnectionDataSchema = Field(alias="connection_data")
5959
auth_data: ReadIcebergRESTCatalogBasicAuthSchema | None = None
6060

syncmaster/worker/controller.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
FTPSConnectionDTO,
1818
HDFSConnectionDTO,
1919
HiveConnectionDTO,
20+
IcebergRESTCatalogS3ConnectionDTO,
2021
MSSQLConnectionDTO,
2122
MySQLConnectionDTO,
2223
OracleConnectionDTO,
@@ -33,6 +34,7 @@
3334
FTPTransferDTO,
3435
HDFSTransferDTO,
3536
HiveTransferDTO,
37+
IcebergRESTCatalogS3TransferDTO,
3638
MSSQLTransferDTO,
3739
MySQLTransferDTO,
3840
OracleTransferDTO,
@@ -45,10 +47,14 @@
4547
from syncmaster.dto.transfers_resources import Resources
4648
from syncmaster.dto.transfers_strategy import Strategy
4749
from syncmaster.exceptions.connection import ConnectionTypeNotRecognizedError
48-
from syncmaster.schemas.v1.connection_types import FILE_CONNECTION_TYPES
50+
from syncmaster.schemas.v1.connection_types import (
51+
FILE_CONNECTION_TYPES,
52+
ICEBERG_REST_S3_CONNECTION_TYPE,
53+
)
4954
from syncmaster.worker.handlers.base import Handler
5055
from syncmaster.worker.handlers.db.clickhouse import ClickhouseHandler
5156
from syncmaster.worker.handlers.db.hive import HiveHandler
57+
from syncmaster.worker.handlers.db.iceberg import IcebergRESTCatalogS3Handler
5258
from syncmaster.worker.handlers.db.mssql import MSSQLHandler
5359
from syncmaster.worker.handlers.db.mysql import MySQLHandler
5460
from syncmaster.worker.handlers.db.oracle import OracleHandler
@@ -72,6 +78,12 @@
7278
HiveTransferDTO,
7379
RunDTO,
7480
),
81+
"iceberg_rest_s3": (
82+
IcebergRESTCatalogS3Handler,
83+
IcebergRESTCatalogS3ConnectionDTO,
84+
IcebergRESTCatalogS3TransferDTO,
85+
RunDTO,
86+
),
7587
"oracle": (
7688
OracleHandler,
7789
OracleConnectionDTO,
@@ -269,6 +281,10 @@ def _perform_incremental_transfer(self) -> None:
269281
def _get_transfer_hwm_name(self) -> str:
270282
if self.source_handler.connection_dto.type in FILE_CONNECTION_TYPES:
271283
hwm_name_suffix = self.source_handler.transfer_dto.directory_path
284+
elif self.source_handler.connection_dto.type == ICEBERG_REST_S3_CONNECTION_TYPE:
285+
hwm_name_suffix = (
286+
f"{self.source_handler.transfer_dto.catalog_name}.{self.source_handler.transfer_dto.table_name}"
287+
)
272288
else:
273289
hwm_name_suffix = self.source_handler.transfer_dto.table_name
274290
hwm_name = "_".join(

0 commit comments

Comments
 (0)