Skip to content

Commit 38ebb19

Browse files
jayceslesargruuya
andauthored
Robustify boto3 session handling (DynamoDB, RestCatalog) (apache#2071)
3 changes here... 1. Basically a mirror of apache#1920 but for dynamo 2. Allow users to pass in an existing client/pass initialize client with properties in `RestCatalog` with sigv4 enabled. (closes apache#2070, closes apache#2008) 3. Re-use of the client in `add_headers` in `RestCatalog` with sigv4. (closes apache#2069 ) --------- Co-authored-by: Marko Grujic <[email protected]>
1 parent dea5f77 commit 38ebb19

File tree

6 files changed

+1639
-1262
lines changed

6 files changed

+1639
-1262
lines changed

poetry.lock

Lines changed: 1556 additions & 1212 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyiceberg/catalog/dynamodb.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666

6767
if TYPE_CHECKING:
6868
import pyarrow as pa
69+
from mypy_boto3_dynamodb.client import DynamoDBClient
70+
6971

7072
DYNAMODB_CLIENT = "dynamodb"
7173

@@ -94,18 +96,28 @@
9496

9597

9698
class DynamoDbCatalog(MetastoreCatalog):
97-
def __init__(self, name: str, **properties: str):
99+
def __init__(self, name: str, client: Optional["DynamoDBClient"] = None, **properties: str):
100+
"""Dynamodb catalog.
101+
102+
Args:
103+
name: Name to identify the catalog.
104+
client: An optional boto3 dynamodb client.
105+
properties: Properties for dynamodb client construction and configuration.
106+
"""
98107
super().__init__(name, **properties)
108+
if client is not None:
109+
self.dynamodb = client
110+
else:
111+
session = boto3.Session(
112+
profile_name=properties.get(DYNAMODB_PROFILE_NAME),
113+
region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION),
114+
botocore_session=properties.get(BOTOCORE_SESSION),
115+
aws_access_key_id=get_first_property_value(properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
116+
aws_secret_access_key=get_first_property_value(properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
117+
aws_session_token=get_first_property_value(properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN),
118+
)
119+
self.dynamodb = session.client(DYNAMODB_CLIENT)
99120

100-
session = boto3.Session(
101-
profile_name=properties.get(DYNAMODB_PROFILE_NAME),
102-
region_name=get_first_property_value(properties, DYNAMODB_REGION, AWS_REGION),
103-
botocore_session=properties.get(BOTOCORE_SESSION),
104-
aws_access_key_id=get_first_property_value(properties, DYNAMODB_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
105-
aws_secret_access_key=get_first_property_value(properties, DYNAMODB_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
106-
aws_session_token=get_first_property_value(properties, DYNAMODB_SESSION_TOKEN, AWS_SESSION_TOKEN),
107-
)
108-
self.dynamodb = session.client(DYNAMODB_CLIENT)
109121
self.dynamodb_table_name = self.properties.get(DYNAMODB_TABLE_NAME, DYNAMODB_TABLE_NAME_DEFAULT)
110122
self._ensure_catalog_table_exists_or_create()
111123

pyiceberg/catalog/glue.py

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,6 @@
3030

3131
import boto3
3232
from botocore.config import Config
33-
from mypy_boto3_glue.client import GlueClient
34-
from mypy_boto3_glue.type_defs import (
35-
ColumnTypeDef,
36-
DatabaseInputTypeDef,
37-
DatabaseTypeDef,
38-
StorageDescriptorTypeDef,
39-
TableInputTypeDef,
40-
TableTypeDef,
41-
)
4233

4334
from pyiceberg.catalog import (
4435
BOTOCORE_SESSION,
@@ -101,6 +92,15 @@
10192

10293
if TYPE_CHECKING:
10394
import pyarrow as pa
95+
from mypy_boto3_glue.client import GlueClient
96+
from mypy_boto3_glue.type_defs import (
97+
ColumnTypeDef,
98+
DatabaseInputTypeDef,
99+
DatabaseTypeDef,
100+
StorageDescriptorTypeDef,
101+
TableInputTypeDef,
102+
TableTypeDef,
103+
)
104104

105105

106106
# There is a unique Glue metastore in each AWS account and each AWS region. By default, GlueCatalog chooses the Glue
@@ -140,7 +140,7 @@
140140

141141

142142
def _construct_parameters(
143-
metadata_location: str, glue_table: Optional[TableTypeDef] = None, prev_metadata_location: Optional[str] = None
143+
metadata_location: str, glue_table: Optional["TableTypeDef"] = None, prev_metadata_location: Optional[str] = None
144144
) -> Properties:
145145
new_parameters = glue_table.get("Parameters", {}) if glue_table else {}
146146
new_parameters.update({TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location})
@@ -190,15 +190,15 @@ def primitive(self, primitive: PrimitiveType) -> str:
190190
return GLUE_PRIMITIVE_TYPES[primitive_type]
191191

192192

193-
def _to_columns(metadata: TableMetadata) -> List[ColumnTypeDef]:
194-
results: Dict[str, ColumnTypeDef] = {}
193+
def _to_columns(metadata: TableMetadata) -> List["ColumnTypeDef"]:
194+
results: Dict[str, "ColumnTypeDef"] = {}
195195

196196
def _append_to_results(field: NestedField, is_current: bool) -> None:
197197
if field.name in results:
198198
return
199199

200200
results[field.name] = cast(
201-
ColumnTypeDef,
201+
"ColumnTypeDef",
202202
{
203203
"Name": field.name,
204204
"Type": visit(field.field_type, _IcebergSchemaToGlueType()),
@@ -230,10 +230,10 @@ def _construct_table_input(
230230
metadata_location: str,
231231
properties: Properties,
232232
metadata: TableMetadata,
233-
glue_table: Optional[TableTypeDef] = None,
233+
glue_table: Optional["TableTypeDef"] = None,
234234
prev_metadata_location: Optional[str] = None,
235-
) -> TableInputTypeDef:
236-
table_input: TableInputTypeDef = {
235+
) -> "TableInputTypeDef":
236+
table_input: "TableInputTypeDef" = {
237237
"Name": table_name,
238238
"TableType": EXTERNAL_TABLE,
239239
"Parameters": _construct_parameters(metadata_location, glue_table, prev_metadata_location),
@@ -249,8 +249,8 @@ def _construct_table_input(
249249
return table_input
250250

251251

252-
def _construct_rename_table_input(to_table_name: str, glue_table: TableTypeDef) -> TableInputTypeDef:
253-
rename_table_input: TableInputTypeDef = {"Name": to_table_name}
252+
def _construct_rename_table_input(to_table_name: str, glue_table: "TableTypeDef") -> "TableInputTypeDef":
253+
rename_table_input: "TableInputTypeDef" = {"Name": to_table_name}
254254
# use the same Glue info to create the new table, pointing to the old metadata
255255
assert glue_table["TableType"]
256256
rename_table_input["TableType"] = glue_table["TableType"]
@@ -264,16 +264,16 @@ def _construct_rename_table_input(to_table_name: str, glue_table: TableTypeDef)
264264
# It turns out the output of StorageDescriptor is not the same as the input type
265265
# because the Column can have a different type, but for now it seems to work, so
266266
# silence the type error.
267-
rename_table_input["StorageDescriptor"] = cast(StorageDescriptorTypeDef, glue_table["StorageDescriptor"])
267+
rename_table_input["StorageDescriptor"] = cast("StorageDescriptorTypeDef", glue_table["StorageDescriptor"])
268268

269269
if "Description" in glue_table:
270270
rename_table_input["Description"] = glue_table["Description"]
271271

272272
return rename_table_input
273273

274274

275-
def _construct_database_input(database_name: str, properties: Properties) -> DatabaseInputTypeDef:
276-
database_input: DatabaseInputTypeDef = {"Name": database_name}
275+
def _construct_database_input(database_name: str, properties: Properties) -> "DatabaseInputTypeDef":
276+
database_input: "DatabaseInputTypeDef" = {"Name": database_name}
277277
parameters = {}
278278
for k, v in properties.items():
279279
if k == "Description":
@@ -286,7 +286,7 @@ def _construct_database_input(database_name: str, properties: Properties) -> Dat
286286
return database_input
287287

288288

289-
def _register_glue_catalog_id_with_glue_client(glue: GlueClient, glue_catalog_id: str) -> None:
289+
def _register_glue_catalog_id_with_glue_client(glue: "GlueClient", glue_catalog_id: str) -> None:
290290
"""
291291
Register the Glue Catalog ID (AWS Account ID) as a parameter on all Glue client methods.
292292
@@ -303,9 +303,9 @@ def add_glue_catalog_id(params: Dict[str, str], **kwargs: Any) -> None:
303303

304304

305305
class GlueCatalog(MetastoreCatalog):
306-
glue: GlueClient
306+
glue: "GlueClient"
307307

308-
def __init__(self, name: str, client: Optional[GlueClient] = None, **properties: Any):
308+
def __init__(self, name: str, client: Optional["GlueClient"] = None, **properties: Any):
309309
"""Glue Catalog.
310310
311311
You either need to provide a boto3 glue client, or one will be constructed from the properties.
@@ -317,7 +317,7 @@ def __init__(self, name: str, client: Optional[GlueClient] = None, **properties:
317317
"""
318318
super().__init__(name, **properties)
319319

320-
if client:
320+
if client is not None:
321321
self.glue = client
322322
else:
323323
retry_mode_prop_value = get_first_property_value(properties, GLUE_RETRY_MODE)
@@ -344,7 +344,7 @@ def __init__(self, name: str, client: Optional[GlueClient] = None, **properties:
344344
if glue_catalog_id := properties.get(GLUE_ID):
345345
_register_glue_catalog_id_with_glue_client(self.glue, glue_catalog_id)
346346

347-
def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table:
347+
def _convert_glue_to_iceberg(self, glue_table: "TableTypeDef") -> Table:
348348
properties: Properties = glue_table["Parameters"]
349349

350350
assert glue_table["DatabaseName"]
@@ -380,15 +380,15 @@ def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table:
380380
catalog=self,
381381
)
382382

383-
def _create_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef) -> None:
383+
def _create_glue_table(self, database_name: str, table_name: str, table_input: "TableInputTypeDef") -> None:
384384
try:
385385
self.glue.create_table(DatabaseName=database_name, TableInput=table_input)
386386
except self.glue.exceptions.AlreadyExistsException as e:
387387
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
388388
except self.glue.exceptions.EntityNotFoundException as e:
389389
raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e
390390

391-
def _update_glue_table(self, database_name: str, table_name: str, table_input: TableInputTypeDef, version_id: str) -> None:
391+
def _update_glue_table(self, database_name: str, table_name: str, table_input: "TableInputTypeDef", version_id: str) -> None:
392392
try:
393393
self.glue.update_table(
394394
DatabaseName=database_name,
@@ -403,7 +403,7 @@ def _update_glue_table(self, database_name: str, table_name: str, table_input: T
403403
f"Cannot commit {database_name}.{table_name} because Glue detected concurrent update to table version {version_id}"
404404
) from e
405405

406-
def _get_glue_table(self, database_name: str, table_name: str) -> TableTypeDef:
406+
def _get_glue_table(self, database_name: str, table_name: str) -> "TableTypeDef":
407407
try:
408408
load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name)
409409
return load_table_response["Table"]
@@ -496,7 +496,7 @@ def commit_table(
496496
table_identifier = table.name()
497497
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
498498

499-
current_glue_table: Optional[TableTypeDef]
499+
current_glue_table: Optional["TableTypeDef"]
500500
glue_table_version_id: Optional[str]
501501
current_table: Optional[Table]
502502
try:
@@ -702,7 +702,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
702702
NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
703703
"""
704704
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
705-
table_list: List[TableTypeDef] = []
705+
table_list: List["TableTypeDef"] = []
706706
next_token: Optional[str] = None
707707
try:
708708
while True:
@@ -730,7 +730,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi
730730
if namespace:
731731
return []
732732

733-
database_list: List[DatabaseTypeDef] = []
733+
database_list: List["DatabaseTypeDef"] = []
734734
next_token: Optional[str] = None
735735

736736
while True:
@@ -806,5 +806,5 @@ def view_exists(self, identifier: Union[str, Identifier]) -> bool:
806806
raise NotImplementedError
807807

808808
@staticmethod
809-
def __is_iceberg_table(table: TableTypeDef) -> bool:
809+
def __is_iceberg_table(table: "TableTypeDef") -> bool:
810810
return table.get("Parameters", {}).get(TABLE_TYPE, "").lower() == ICEBERG

pyiceberg/catalog/rest/__init__.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
from pyiceberg import __version__
3434
from pyiceberg.catalog import (
35+
BOTOCORE_SESSION,
3536
TOKEN,
3637
URI,
3738
WAREHOUSE_LOCATION,
@@ -53,6 +54,7 @@
5354
TableAlreadyExistsError,
5455
UnauthorizedError,
5556
)
57+
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
5658
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids
5759
from pyiceberg.schema import Schema, assign_fresh_schema_ids
5860
from pyiceberg.table import (
@@ -72,7 +74,7 @@
7274
from pyiceberg.typedef import EMPTY_DICT, UTF8, IcebergBaseModel, Identifier, Properties
7375
from pyiceberg.types import transform_dict_value_to_str
7476
from pyiceberg.utils.deprecated import deprecation_message
75-
from pyiceberg.utils.properties import get_header_properties, property_as_bool
77+
from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool
7678

7779
if TYPE_CHECKING:
7880
import pyarrow as pa
@@ -390,11 +392,17 @@ class SigV4Adapter(HTTPAdapter):
390392
def __init__(self, **properties: str):
391393
super().__init__()
392394
self._properties = properties
395+
self._boto_session = boto3.Session(
396+
region_name=get_first_property_value(self._properties, AWS_REGION),
397+
botocore_session=self._properties.get(BOTOCORE_SESSION),
398+
aws_access_key_id=get_first_property_value(self._properties, AWS_ACCESS_KEY_ID),
399+
aws_secret_access_key=get_first_property_value(self._properties, AWS_SECRET_ACCESS_KEY),
400+
aws_session_token=get_first_property_value(self._properties, AWS_SESSION_TOKEN),
401+
)
393402

394403
def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylint: disable=W0613
395-
boto_session = boto3.Session()
396-
credentials = boto_session.get_credentials().get_frozen_credentials()
397-
region = self._properties.get(SIGV4_REGION, boto_session.region_name)
404+
credentials = self._boto_session.get_credentials().get_frozen_credentials()
405+
region = self._properties.get(SIGV4_REGION, self._boto_session.region_name)
398406
service = self._properties.get(SIGV4_SERVICE, "execute-api")
399407

400408
url = str(request.url).split("?")[0]

pyproject.toml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,10 @@ pandas = { version = ">=1.0.0,<3.0.0", optional = true }
6767
duckdb = { version = ">=0.5.0,<2.0.0", optional = true }
6868
ray = [
6969
{ version = "==2.10.0", python = "<3.9", optional = true },
70-
{ version = ">=2.10.0,<3.0.0", python = ">=3.9", optional = true },
70+
{ version = ">=2.10.0,<=2.44.0", python = ">=3.9", optional = true },
7171
]
7272
python-snappy = { version = ">=0.6.0,<1.0.0", optional = true }
7373
thrift = { version = ">=0.13.0,<1.0.0", optional = true }
74-
mypy-boto3-glue = { version = ">=1.28.18", optional = true }
7574
boto3 = { version = ">=1.24.59", optional = true }
7675
s3fs = { version = ">=2023.1.0", optional = true }
7776
adlfs = { version = ">=2023.1.0", optional = true }
@@ -102,6 +101,8 @@ cython = "3.1.2"
102101
deptry = ">=0.14,<0.24"
103102
datafusion = ">=44,<48"
104103
docutils = "!=0.21.post1" # https://github.com/python-poetry/poetry/issues/9248#issuecomment-2026240520
104+
mypy-boto3-glue = ">=1.28.18"
105+
mypy-boto3-dynamodb = ">=1.28.18"
105106

106107
[tool.poetry.group.docs.dependencies]
107108
# for mkdocs
@@ -217,6 +218,10 @@ ignore_missing_imports = true
217218
module = "mypy_boto3_glue.*"
218219
ignore_missing_imports = true
219220

221+
[[tool.mypy.overrides]]
222+
module = "mypy_boto3_dynamodb.*"
223+
ignore_missing_imports = true
224+
220225
[[tool.mypy.overrides]]
221226
module = "moto"
222227
ignore_missing_imports = true
@@ -299,7 +304,7 @@ snappy = ["python-snappy"]
299304
hive = ["thrift"]
300305
hive-kerberos = ["thrift", "thrift_sasl", "kerberos"]
301306
s3fs = ["s3fs"]
302-
glue = ["boto3", "mypy-boto3-glue"]
307+
glue = ["boto3"]
303308
adlfs = ["adlfs"]
304309
dynamodb = ["boto3"]
305310
zstandard = ["zstandard"]

tests/catalog/test_dynamodb.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,3 +626,11 @@ def test_table_exists(
626626
assert test_catalog.table_exists(identifier) is True
627627
# Act and Assert for an non-existing table
628628
assert test_catalog.table_exists(("non", "exist")) is False
629+
630+
631+
@mock_aws
632+
def test_dynamodb_client_override() -> None:
633+
catalog_name = "glue"
634+
test_client = boto3.client("dynamodb", region_name="us-west-2")
635+
test_catalog = DynamoDbCatalog(catalog_name, test_client)
636+
assert test_catalog.dynamodb is test_client

0 commit comments

Comments
 (0)