Skip to content

Commit addbd17

Browse files
author
Ilyas Gasanov
committed
[DOP-29428] Add support for Iceberg with REST catalog and S3 warehouse
1 parent 085e567 commit addbd17

File tree

9 files changed

+188
-42
lines changed

9 files changed

+188
-42
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added support for Iceberg with REST catalog and S3 warehouse

poetry.lock

Lines changed: 38 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ pyjwt = { version = "^2.10.1", optional = true }
5656
jinja2 = { version = "^3.1.6", optional = true }
5757
python-multipart = { version = "^0.0.20", optional = true }
5858
celery = { version = "^5.5.0", optional = true }
59-
onetl = { version = ">=0.13.5,<0.15.0", extras = ["all"], optional = true }
59+
onetl = { git = "https://github.com/MobileTeleSystems/onetl.git", branch = "develop", extras = ["all"], optional = true }
6060
pyspark = { version = "<4.0.0", optional = true }
6161
pyyaml = { version = "*", optional = true }
6262
psycopg2-binary = { version = "^2.9.10", optional = true }

syncmaster/dto/connections.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,23 @@ class HiveConnectionDTO(ConnectionDTO):
7373
type: ClassVar[str] = "hive"
7474

7575

76+
@dataclass
77+
class IcebergRESTCatalogS3ConnectionDTO(ConnectionDTO):
78+
metastore_url: str
79+
s3_warehouse_path: str
80+
s3_host: str
81+
s3_bucket: str
82+
s3_region: str
83+
s3_access_key: str
84+
s3_secret_key: str
85+
metastore_username: str
86+
metastore_password: str
87+
s3_port: int | None = None
88+
s3_protocol: str = "https"
89+
s3_path_style_access: bool = False
90+
type: ClassVar[str] = "iceberg_rest_s3"
91+
92+
7693
@dataclass
7794
class HDFSConnectionDTO(ConnectionDTO):
7895
user: str

syncmaster/dto/transfers.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,16 @@ def __post_init__(self):
107107
self.options.setdefault("if_exists", "replace_overlapping_partitions")
108108

109109

110+
@dataclass
111+
class IcebergRESTCatalogS3TransferDTO(DBTransferDTO):
112+
type: ClassVar[str] = "iceberg_rest_s3"
113+
catalog_name: str | None = None
114+
115+
def __post_init__(self):
116+
super().__post_init__()
117+
self.options.setdefault("if_exists", "replace_overlapping_partitions")
118+
119+
110120
@dataclass
111121
class S3TransferDTO(FileTransferDTO):
112122
type: ClassVar[str] = "s3"

syncmaster/schemas/v1/connections/connection_base.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,17 @@
22
# SPDX-License-Identifier: Apache-2.0
33
from pydantic import BaseModel, ConfigDict, Field
44

5-
from syncmaster.schemas.v1.auth import ReadBasicAuthSchema, ReadS3AuthSchema
6-
from syncmaster.schemas.v1.auth.samba import ReadSambaAuthSchema
5+
from syncmaster.schemas.v1.auth import (
6+
ReadBasicAuthSchema,
7+
ReadIcebergRESTCatalogBasicAuthSchema,
8+
ReadS3AuthSchema,
9+
ReadSambaAuthSchema,
10+
)
711
from syncmaster.schemas.v1.types import NameConstr
812

9-
ReadConnectionAuthDataSchema = ReadBasicAuthSchema | ReadS3AuthSchema | ReadSambaAuthSchema
13+
ReadConnectionAuthDataSchema = (
14+
ReadBasicAuthSchema | ReadS3AuthSchema | ReadSambaAuthSchema | ReadIcebergRESTCatalogBasicAuthSchema
15+
)
1016

1117

1218
class CreateConnectionBaseSchema(BaseModel):

syncmaster/worker/controller.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
FTPSConnectionDTO,
1818
HDFSConnectionDTO,
1919
HiveConnectionDTO,
20+
IcebergRESTCatalogS3ConnectionDTO,
2021
MSSQLConnectionDTO,
2122
MySQLConnectionDTO,
2223
OracleConnectionDTO,
@@ -33,6 +34,7 @@
3334
FTPTransferDTO,
3435
HDFSTransferDTO,
3536
HiveTransferDTO,
37+
IcebergRESTCatalogS3TransferDTO,
3638
MSSQLTransferDTO,
3739
MySQLTransferDTO,
3840
OracleTransferDTO,
@@ -49,6 +51,7 @@
4951
from syncmaster.worker.handlers.base import Handler
5052
from syncmaster.worker.handlers.db.clickhouse import ClickhouseHandler
5153
from syncmaster.worker.handlers.db.hive import HiveHandler
54+
from syncmaster.worker.handlers.db.iceberg import IcebergHandler
5255
from syncmaster.worker.handlers.db.mssql import MSSQLHandler
5356
from syncmaster.worker.handlers.db.mysql import MySQLHandler
5457
from syncmaster.worker.handlers.db.oracle import OracleHandler
@@ -72,6 +75,12 @@
7275
HiveTransferDTO,
7376
RunDTO,
7477
),
78+
"iceberg_rest_s3": (
79+
IcebergHandler,
80+
IcebergRESTCatalogS3ConnectionDTO,
81+
IcebergRESTCatalogS3TransferDTO,
82+
RunDTO,
83+
),
7584
"oracle": (
7685
OracleHandler,
7786
OracleConnectionDTO,
@@ -269,6 +278,10 @@ def _perform_incremental_transfer(self) -> None:
269278
def _get_transfer_hwm_name(self) -> str:
270279
if self.source_handler.connection_dto.type in FILE_CONNECTION_TYPES:
271280
hwm_name_suffix = self.source_handler.transfer_dto.directory_path
281+
elif self.source_handler.connection_dto.type == "iceberg_rest_s3":
282+
hwm_name_suffix = (
283+
f"{self.source_handler.transfer_dto.catalog_name}.{self.source_handler.transfer_dto.table_name}"
284+
)
272285
else:
273286
hwm_name_suffix = self.source_handler.transfer_dto.table_name
274287
hwm_name = "_".join(

syncmaster/worker/handlers/db/base.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,8 @@ def read(self) -> DataFrame:
3939
reader_params = {}
4040
if self.transfer_dto.strategy.type == "incremental":
4141
self.transfer_dto.strategy.increment_by = self._quote_field(self.transfer_dto.strategy.increment_by)
42-
hwm_name = (
43-
f"{self.transfer_dto.id}_{self.connection_dto.type}_{self.transfer_dto.table_name}" # noqa: WPS237
44-
)
4542
reader_params["hwm"] = DBReader.AutoDetectHWM(
46-
name=hwm_name,
43+
name=self._get_hwm_name(),
4744
expression=self.transfer_dto.strategy.increment_by,
4845
)
4946

@@ -137,5 +134,8 @@ def _get_reading_options(self) -> dict:
137134

138135
return options
139136

137+
def _get_hwm_name(self):
138+
return f"{self.transfer_dto.id}_{self.connection_dto.type}_{self.transfer_dto.table_name}" # noqa: WPS237
139+
140140
def _quote_field(self, field: str) -> str:
141141
return f'"{field}"'
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
from __future__ import annotations
5+
6+
from typing import TYPE_CHECKING
7+
8+
from onetl.connection import Iceberg
9+
from onetl.hooks import slot, support_hooks
10+
11+
from syncmaster.dto.connections import IcebergRESTCatalogS3ConnectionDTO
12+
from syncmaster.dto.transfers import IcebergRESTCatalogS3TransferDTO
13+
from syncmaster.worker.handlers.db.base import DBHandler
14+
15+
if TYPE_CHECKING:
16+
from pyspark.sql import SparkSession
17+
from pyspark.sql.dataframe import DataFrame
18+
19+
20+
@support_hooks
21+
class IcebergHandler(DBHandler):
22+
connection: Iceberg
23+
connection_dto: IcebergRESTCatalogS3ConnectionDTO
24+
transfer_dto: IcebergRESTCatalogS3TransferDTO
25+
_operators = {
26+
"regexp": "RLIKE",
27+
**DBHandler._operators,
28+
}
29+
30+
def connect(self, spark: SparkSession):
31+
self.connection = Iceberg(
32+
spark=spark,
33+
catalog_name=self.transfer_dto.catalog_name,
34+
catalog=Iceberg.RESTCatalog(
35+
uri=self.connection_dto.metastore_url,
36+
auth=Iceberg.RESTCatalog.BasicAuth(
37+
user=self.connection_dto.metastore_username,
38+
password=self.connection_dto.metastore_password,
39+
),
40+
),
41+
warehouse=Iceberg.S3Warehouse(
42+
path=self.connection_dto.s3_warehouse_path,
43+
host=self.connection_dto.s3_host,
44+
port=self.connection_dto.s3_port,
45+
protocol=self.connection_dto.s3_protocol,
46+
bucket=self.connection_dto.s3_bucket,
47+
path_style_access=self.connection_dto.s3_path_style_access,
48+
region=self.connection_dto.s3_region,
49+
access_key=self.connection_dto.s3_access_key,
50+
secret_key=self.connection_dto.s3_secret_key,
51+
),
52+
).check()
53+
54+
@slot
55+
def read(self) -> DataFrame:
56+
return super().read()
57+
58+
@slot
59+
def write(self, df: DataFrame) -> None:
60+
return super().write(df)
61+
62+
def _normalize_column_names(self, df: DataFrame) -> DataFrame:
63+
for column_name in df.columns:
64+
df = df.withColumnRenamed(column_name, column_name.lower())
65+
return df
66+
67+
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
68+
expressions = []
69+
for filter in filters:
70+
op = self._operators[filter["type"]]
71+
field = self._quote_field(filter["field"])
72+
value = filter.get("value")
73+
74+
if value is None:
75+
expressions.append(f"{field} {op}")
76+
continue
77+
78+
if op == "ILIKE":
79+
expressions.append(f"LOWER({field}) LIKE LOWER('{value}')")
80+
elif op == "NOT ILIKE":
81+
expressions.append(f"NOT LOWER({field}) LIKE LOWER('{value}')")
82+
else:
83+
expressions.append(f"{field} {op} '{value}'")
84+
85+
return " AND ".join(expressions) or None
86+
87+
def _get_reading_options(self) -> dict:
88+
return {}
89+
90+
def _get_hwm_name(self):
91+
table = f"{self.transfer_dto.catalog_name}.{self.transfer_dto.table_name}"
92+
return f"{self.transfer_dto.id}_{self.connection_dto.type}_{table}" # noqa: WPS237
93+
94+
def _quote_field(self, field: str) -> str:
95+
return f"`{field}`"

0 commit comments

Comments
 (0)