Skip to content

Commit 5343970

Browse files
committed
[DOP-30631] Add support for Iceberg REST Catalog + S3 delegated access
1 parent 7e2c59c commit 5343970

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+921
-329
lines changed

poetry.lock

Lines changed: 138 additions & 145 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ max-annotation-complexity = 4
263263
max-returns = 5
264264
max-awaits = 5
265265
max-local-variables = 20
266-
max-name-length = 60
266+
max-name-length = 65
267267
# Max of expressions in a function
268268
max-expressions = 15
269269
# Max args in a function
@@ -281,7 +281,7 @@ max-cognitive-score = 20
281281
# Max amount of cognitive complexity per module
282282
max-cognitive-average = 25
283283
max-imports = 25
284-
max-imported-names = 50
284+
max-imported-names = 55
285285
# Max of expression usages in a module
286286
max-module-expressions = 15
287287
# Max of expression usages in a function

syncmaster/db/models/connection.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
class ConnectionType(StrEnum):
1919
POSTGRES = "postgres"
2020
HIVE = "hive"
21-
ICEBERG_REST_S3 = "iceberg_rest_s3"
21+
ICEBERG_REST_S3_DIRECT = "iceberg_rest_s3_direct"
22+
ICEBERG_REST_S3_DELEGATED = "iceberg_rest_s3_delegated"
2223
ORACLE = "oracle"
2324
CLICKHOUSE = "clickhouse"
2425
MSSQL = "mssql"
@@ -62,7 +63,7 @@ class Connection(Base, ResourceMixin, TimestampMixin):
6263
'simple',
6364
translate(coalesce(data->>'host', ''), './-_:\\', ' ')
6465
)
65-
""",
66+
""", # noqa: WPS342
6667
persisted=True,
6768
),
6869
nullable=False,

syncmaster/dto/connections.py

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class HiveConnectionDTO(ConnectionDTO):
7474

7575

7676
@dataclass
77-
class IcebergRESTCatalogS3ConnectionBaseDTO(ConnectionDTO):
77+
class IcebergRESTCatalogS3DirectConnectionBaseDTO(ConnectionDTO):
7878
metastore_url: str
7979
s3_warehouse_path: str
8080
s3_host: str
@@ -86,18 +86,18 @@ class IcebergRESTCatalogS3ConnectionBaseDTO(ConnectionDTO):
8686
s3_port: int | None
8787
s3_protocol: str
8888
s3_additional_params: dict
89-
type: ClassVar[str] = "iceberg_rest_s3"
89+
type: ClassVar[str] = "iceberg_rest_s3_direct"
9090

9191

9292
@dataclass(kw_only=True)
93-
class IcebergRESTCatalogBasicAuthS3DTO(IcebergRESTCatalogS3ConnectionBaseDTO):
93+
class IcebergRESTCatalogBasicAuthS3BasicDTO(IcebergRESTCatalogS3DirectConnectionBaseDTO):
9494
metastore_username: str
9595
metastore_password: str
9696
metastore_auth_type: Literal["basic"] = "basic"
9797

9898

9999
@dataclass(kw_only=True)
100-
class IcebergRESTCatalogOAuth2ClientCredentialsS3DTO(IcebergRESTCatalogS3ConnectionBaseDTO):
100+
class IcebergRESTCatalogOAuth2ClientCredentialsS3BasicDTO(IcebergRESTCatalogS3DirectConnectionBaseDTO):
101101
metastore_oauth2_client_id: str
102102
metastore_oauth2_client_secret: str
103103
metastore_oauth2_scopes: list[str]
@@ -107,13 +107,54 @@ class IcebergRESTCatalogOAuth2ClientCredentialsS3DTO(IcebergRESTCatalogS3Connect
107107
metastore_auth_type: Literal["oauth2"] = "oauth2"
108108

109109

110+
IcebergRESTCatalogS3DirectConnectionDTO = (
111+
IcebergRESTCatalogBasicAuthS3BasicDTO | IcebergRESTCatalogOAuth2ClientCredentialsS3BasicDTO
112+
)
113+
114+
110115
# TODO: should be refactored
111-
class IcebergRESTCatalogS3ConnectionDTO:
112-
def __new__(cls, **data):
113-
if "metastore_oauth2_client_id" in data:
114-
return IcebergRESTCatalogOAuth2ClientCredentialsS3DTO(**data)
116+
def get_iceberg_rest_catalog_s3_direct_connection_dto(**data) -> IcebergRESTCatalogS3DirectConnectionDTO:
117+
if "metastore_oauth2_client_id" in data:
118+
return IcebergRESTCatalogOAuth2ClientCredentialsS3BasicDTO(**data)
119+
return IcebergRESTCatalogBasicAuthS3BasicDTO(**data)
120+
121+
122+
@dataclass
123+
class IcebergRESTCatalogS3DelegatedConnectionBaseDTO(ConnectionDTO):
124+
metastore_url: str
125+
s3_warehouse_name: str | None = None
126+
s3_access_delegation: Literal["vended-credentials", "remote-signing"] = "vended-credentials"
127+
type: ClassVar[str] = "iceberg_rest_s3_delegated"
128+
129+
130+
@dataclass(kw_only=True)
131+
class IcebergRESTCatalogBasicAuthS3DelegatedDTO(IcebergRESTCatalogS3DelegatedConnectionBaseDTO):
132+
metastore_username: str
133+
metastore_password: str
134+
metastore_auth_type: Literal["basic"] = "basic"
135+
115136

116-
return IcebergRESTCatalogBasicAuthS3DTO(**data)
137+
@dataclass(kw_only=True)
138+
class IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO(IcebergRESTCatalogS3DelegatedConnectionBaseDTO):
139+
metastore_oauth2_client_id: str
140+
metastore_oauth2_client_secret: str
141+
metastore_oauth2_scopes: list[str]
142+
metastore_oauth2_resource: str | None = None
143+
metastore_oauth2_audience: str | None = None
144+
metastore_oauth2_server_uri: str | None = None
145+
metastore_auth_type: Literal["oauth2"] = "oauth2"
146+
147+
148+
IcebergRESTCatalogS3DelegatedConnectionDTO = (
149+
IcebergRESTCatalogBasicAuthS3DelegatedDTO | IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO
150+
)
151+
152+
153+
# TODO: should be refactored
154+
def get_iceberg_rest_catalog_s3_delegated_connection_dto(**data) -> IcebergRESTCatalogS3DelegatedConnectionDTO:
155+
if "metastore_oauth2_client_id" in data:
156+
return IcebergRESTCatalogOAuth2ClientCredentialsS3DelegatedDTO(**data)
157+
return IcebergRESTCatalogBasicAuthS3DelegatedDTO(**data)
117158

118159

119160
@dataclass

syncmaster/dto/transfers.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,19 @@ def __post_init__(self):
128128

129129

130130
@dataclass
131-
class IcebergRESTCatalogS3TransferDTO(DBTransferDTO):
132-
type: ClassVar[str] = "iceberg_rest_s3"
133-
catalog_name: str = field(default_factory=lambda: f"iceberg_rest_s3_{uuid4().hex[:8]}") # noqa: WPS237
131+
class IcebergRESTCatalogS3DelegatedTransferDTO(DBTransferDTO):
132+
type: ClassVar[str] = "iceberg_rest_s3_delegated"
133+
catalog_name: str = field(default_factory=lambda: f"iceberg_rest_s3_delegated_{uuid4().hex[:8]}") # noqa: WPS237
134+
135+
def __post_init__(self):
136+
super().__post_init__()
137+
self.options.setdefault("if_exists", "replace_overlapping_partitions")
138+
139+
140+
@dataclass
141+
class IcebergRESTCatalogS3DirectTransferDTO(DBTransferDTO):
142+
type: ClassVar[str] = "iceberg_rest_s3_direct"
143+
catalog_name: str = field(default_factory=lambda: f"iceberg_rest_s3_direct_{uuid4().hex[:8]}") # noqa: WPS237
134144

135145
def __post_init__(self):
136146
super().__post_init__()

syncmaster/schemas/v1/auth/iceberg/__init__.py

Lines changed: 0 additions & 2 deletions
This file was deleted.

syncmaster/schemas/v1/auth/iceberg/auth.py renamed to syncmaster/schemas/v1/auth/iceberg_rest_s3_delegated/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,28 @@
44

55
from pydantic import Field
66

7-
from syncmaster.schemas.v1.auth.iceberg.basic import (
7+
from syncmaster.schemas.v1.auth.iceberg_rest_s3_delegated.basic import (
88
CreateIcebergRESTCatalogBasicAuthSchema,
99
ReadIcebergRESTCatalogBasicAuthSchema,
1010
UpdateIcebergRESTCatalogBasicAuthSchema,
1111
)
12-
from syncmaster.schemas.v1.auth.iceberg.oauth2_client_credentials import (
12+
from syncmaster.schemas.v1.auth.iceberg_rest_s3_delegated.oauth2_client_credentials import (
1313
CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
1414
ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
1515
UpdateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
1616
)
1717

18-
CreateIcebergRESTCatalogS3ConnectionAuthDataSchema = Annotated[
18+
CreateIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema = Annotated[
1919
CreateIcebergRESTCatalogBasicAuthSchema | CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
2020
Field(discriminator="type"),
2121
]
2222

23-
ReadIcebergRESTCatalogS3ConnectionAuthDataSchema = Annotated[
23+
ReadIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema = Annotated[
2424
ReadIcebergRESTCatalogBasicAuthSchema | ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
2525
Field(discriminator="type"),
2626
]
2727

28-
UpdateIcebergRESTCatalogS3ConnectionAuthDataSchema = Annotated[
28+
UpdateIcebergRESTCatalogS3DelegatedConnectionAuthDataSchema = Annotated[
2929
UpdateIcebergRESTCatalogBasicAuthSchema | UpdateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
3030
Field(discriminator="type"),
3131
]
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Literal
4+
5+
from pydantic import BaseModel, SecretStr
6+
7+
8+
class ReadIcebergRESTCatalogBasicAuthSchema(BaseModel):
9+
type: Literal["iceberg_rest_basic"]
10+
metastore_username: str
11+
12+
13+
class CreateIcebergRESTCatalogBasicAuthSchema(ReadIcebergRESTCatalogBasicAuthSchema):
14+
metastore_password: SecretStr
15+
16+
17+
class UpdateIcebergRESTCatalogBasicAuthSchema(ReadIcebergRESTCatalogBasicAuthSchema):
18+
metastore_password: SecretStr | None = None
19+
20+
def get_secret_fields(self) -> tuple[str, ...]:
21+
return ("metastore_password",)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Literal
4+
5+
from pydantic import BaseModel, Field, SecretStr
6+
7+
8+
class ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema(BaseModel):
9+
type: Literal["iceberg_rest_oauth2_client_credentials"]
10+
metastore_oauth2_client_id: str
11+
metastore_oauth2_scopes: list[str] = Field(default_factory=list)
12+
metastore_oauth2_resource: str | None = None
13+
metastore_oauth2_audience: str | None = None
14+
metastore_oauth2_server_uri: str | None = None
15+
16+
17+
class CreateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema(
18+
ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
19+
):
20+
metastore_oauth2_client_secret: SecretStr
21+
22+
23+
class UpdateIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema(
24+
ReadIcebergRESTCatalogOAuth2ClientCredentialsAuthSchema,
25+
):
26+
metastore_oauth2_client_secret: SecretStr | None = None
27+
28+
def get_secret_fields(self) -> tuple[str, ...]:
29+
return ("metastore_oauth2_client_secret",)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Annotated
4+
5+
from pydantic import Field
6+
7+
from syncmaster.schemas.v1.auth.iceberg_rest_s3_direct.basic import (
8+
CreateIcebergRESTCatalogBasicS3BasicAuthSchema,
9+
ReadIcebergRESTCatalogBasicS3BasicAuthSchema,
10+
UpdateIcebergRESTCatalogBasicS3BasicAuthSchema,
11+
)
12+
from syncmaster.schemas.v1.auth.iceberg_rest_s3_direct.oauth2_client_credentials import (
13+
CreateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
14+
ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
15+
UpdateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
16+
)
17+
18+
CreateIcebergRESTCatalogS3DirectConnectionAuthDataSchema = Annotated[
19+
CreateIcebergRESTCatalogBasicS3BasicAuthSchema | CreateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
20+
Field(discriminator="type"),
21+
]
22+
23+
ReadIcebergRESTCatalogS3DirectConnectionAuthDataSchema = Annotated[
24+
ReadIcebergRESTCatalogBasicS3BasicAuthSchema | ReadIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
25+
Field(discriminator="type"),
26+
]
27+
28+
UpdateIcebergRESTCatalogS3DirectConnectionAuthDataSchema = Annotated[
29+
UpdateIcebergRESTCatalogBasicS3BasicAuthSchema | UpdateIcebergRESTCatalogOAuth2ClientCredentialsS3BasicAuthSchema,
30+
Field(discriminator="type"),
31+
]

0 commit comments

Comments
 (0)