Skip to content

Commit bd51887

Browse files
a-slice-of-pySilvio Lugarojaidisidomalachi-constantkukushking
authored
feat: add read_items to dynamodb module (#1877)
* feat: add read_items to dynamodb module (#1867) Co-authored-by: Silvio Lugaro <[email protected]> Co-authored-by: jaidisido <[email protected]> Co-authored-by: Lucas Hanson <[email protected]> Co-authored-by: kukushking <[email protected]>
1 parent 5e6bc4e commit bd51887

File tree

5 files changed

+574
-6
lines changed

5 files changed

+574
-6
lines changed

awswrangler/dynamodb/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,17 @@
11
"""Amazon DynamoDB Write Module."""
22

33
from awswrangler.dynamodb._delete import delete_items
4-
from awswrangler.dynamodb._read import read_partiql_query
4+
from awswrangler.dynamodb._read import read_items, read_partiql_query
55
from awswrangler.dynamodb._utils import get_table
66
from awswrangler.dynamodb._write import put_csv, put_df, put_items, put_json
77

8-
__all__ = ["delete_items", "get_table", "put_csv", "put_df", "put_items", "put_json", "read_partiql_query"]
8+
__all__ = [
9+
"delete_items",
10+
"get_table",
11+
"put_csv",
12+
"put_df",
13+
"put_items",
14+
"put_json",
15+
"read_partiql_query",
16+
"read_items",
17+
]

awswrangler/dynamodb/_read.py

Lines changed: 336 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@
22

33
import logging
44
import re
5-
from typing import Any, Dict, List, Optional, Tuple
5+
from functools import wraps
6+
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, TypeVar, Union, cast
67

78
import boto3
89
import pandas as pd
10+
from boto3.dynamodb.conditions import ConditionBase
911
from boto3.dynamodb.types import TypeDeserializer
12+
from botocore.exceptions import ClientError
1013

11-
from awswrangler import _utils
12-
from awswrangler._config import apply_configs
14+
from awswrangler import _utils, exceptions
15+
from awswrangler.dynamodb._utils import get_table
1316

1417
_logger: logging.Logger = logging.getLogger(__name__)
1518

@@ -97,7 +100,6 @@ def _parse_dynamodb_items(
97100
return df.astype(dtype=dtype) if dtype else df
98101

99102

100-
@apply_configs
101103
def read_partiql_query(
102104
query: str,
103105
dtype: Optional[Dict[str, str]] = None,
@@ -146,3 +148,333 @@ def read_partiql_query(
146148
_logger.debug("Reading results for PartiQL query: %s", query)
147149
items = _get_items(query=query, boto3_session=boto3_session)
148150
return _parse_dynamodb_items(items=items, dtype=dtype)
151+
152+
153+
def _get_invalid_kwarg(msg: str) -> Optional[str]:
154+
"""Detect which kwarg contains reserved keywords based on given error message.
155+
156+
Parameters
157+
----------
158+
msg : str
159+
Botocore client error message.
160+
161+
Returns
162+
-------
163+
str, optional
164+
Detected invalid kwarg if any, None otherwise.
165+
"""
166+
for kwarg in ("ProjectionExpression", "KeyConditionExpression", "FilterExpression"):
167+
if msg.startswith(f"Invalid {kwarg}: Attribute name is a reserved keyword; reserved keyword: "):
168+
return kwarg
169+
return None
170+
171+
172+
# SEE: https://stackoverflow.com/a/72295070
173+
CustomCallable = TypeVar("CustomCallable", bound=Callable[[Any], Sequence[Dict[str, Any]]])
174+
175+
176+
def _handle_reserved_keyword_error(func: CustomCallable) -> CustomCallable:
177+
"""Handle automatic replacement of DynamoDB reserved keywords.
178+
179+
For reserved keywords reference:
180+
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ReservedWords.html.
181+
"""
182+
183+
@wraps(func)
184+
def wrapper(*args: Any, **kwargs: Any) -> Sequence[Dict[str, Any]]:
185+
try:
186+
return func(*args, **kwargs)
187+
except ClientError as e:
188+
error_code, error_message = (e.response["Error"]["Code"], e.response["Error"]["Message"])
189+
# Check catched error to verify its message
190+
kwarg = _get_invalid_kwarg(error_message)
191+
if (error_code == "ValidationException") and kwarg:
192+
reserved_keyword = error_message.split("keyword: ")[-1]
193+
sanitized_keyword = f"#{reserved_keyword}"
194+
kwargs[kwarg] = kwargs[kwarg].replace(reserved_keyword, sanitized_keyword)
195+
kwargs["ExpressionAttributeNames"] = {
196+
**kwargs.get("ExpressionAttributeNames", {}),
197+
sanitized_keyword: reserved_keyword,
198+
}
199+
# SEE: recursive approach guarantees that each reserved keyword will be properly replaced,
200+
# even if it will require as many calls as the reserved keywords involved (not so efficient...)
201+
return wrapper(*args, **kwargs)
202+
# Otherwise raise it
203+
raise e
204+
205+
# SEE: https://github.com/python/mypy/issues/3157#issue-221120895
206+
return cast(CustomCallable, wrapper)
207+
208+
209+
@_handle_reserved_keyword_error
210+
def _read_items(
211+
table_name: str, boto3_session: Optional[boto3.Session] = None, **kwargs: Any
212+
) -> Sequence[Dict[str, Any]]:
213+
"""Read items from given DynamoDB table.
214+
215+
This function set the optimal reading strategy based on the received kwargs.
216+
217+
Parameters
218+
----------
219+
table_name : str
220+
DynamoDB table name.
221+
boto3_session : boto3.Session, optional
222+
Boto3 Session. Defaults to None (the default boto3 Session will be used).
223+
224+
Returns
225+
-------
226+
Sequence[Mapping[str, Any]]
227+
Retrieved items.
228+
"""
229+
# Get DynamoDB resource and Table instance
230+
resource = _utils.resource(service_name="dynamodb", session=boto3_session)
231+
table = get_table(table_name=table_name, boto3_session=boto3_session)
232+
233+
# Extract 'Keys' from provided kwargs: if needed, will be reinserted later on
234+
keys = kwargs.pop("Keys", None)
235+
236+
# Conditionally define optimal reading strategy
237+
use_get_item = (keys is not None) and (len(keys) == 1)
238+
use_batch_get_item = (keys is not None) and (len(keys) > 1)
239+
use_query = (keys is None) and ("KeyConditionExpression" in kwargs)
240+
use_scan = (keys is None) and ("KeyConditionExpression" not in kwargs)
241+
242+
# Read items
243+
if use_get_item:
244+
kwargs["Key"] = keys[0]
245+
items = [table.get_item(**kwargs).get("Item", {})]
246+
elif use_batch_get_item:
247+
kwargs["Keys"] = keys
248+
response = resource.batch_get_item(RequestItems={table_name: kwargs})
249+
items = response.get("Responses", {table_name: []}).get(table_name, [])
250+
# SEE: handle possible unprocessed keys. As suggested in Boto3 docs,
251+
# this approach should involve exponential backoff, but this should be
252+
# already managed by AWS SDK itself, as stated
253+
# [here](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html)
254+
while response["UnprocessedKeys"]:
255+
kwargs["Keys"] = response["UnprocessedKeys"][table_name]["Keys"]
256+
response = resource.batch_get_item(RequestItems={table_name: kwargs})
257+
items.extend(response.get("Responses", {table_name: []}).get(table_name, []))
258+
elif use_query or use_scan:
259+
_read_method = table.query if use_query else table.scan
260+
response = _read_method(**kwargs)
261+
items = response.get("Items", [])
262+
263+
# Handle pagination
264+
while "LastEvaluatedKey" in response:
265+
kwargs["ExclusiveStartKey"] = response["LastEvaluatedKey"]
266+
response = _read_method(**kwargs)
267+
items.extend(response.get("Items", []))
268+
269+
return items
270+
271+
272+
def read_items(
273+
table_name: str,
274+
partition_values: Optional[Sequence[Any]] = None,
275+
sort_values: Optional[Sequence[Any]] = None,
276+
filter_expression: Optional[Union[ConditionBase, str]] = None,
277+
key_condition_expression: Optional[Union[ConditionBase, str]] = None,
278+
expression_attribute_names: Optional[Dict[str, str]] = None,
279+
expression_attribute_values: Optional[Dict[str, Any]] = None,
280+
consistent: bool = False,
281+
columns: Optional[Sequence[str]] = None,
282+
allow_full_scan: bool = False,
283+
max_items_evaluated: Optional[int] = None,
284+
as_dataframe: bool = True,
285+
boto3_session: Optional[boto3.Session] = None,
286+
) -> Union[pd.DataFrame, List[Dict[str, Any]]]:
287+
"""Read items from given DynamoDB table.
288+
289+
This function aims to gracefully handle (some of) the complexity of read actions
290+
available in Boto3 towards a DynamoDB table, abstracting it away while providing
291+
a single, unified entrypoint.
292+
293+
Under the hood, it wraps all the four available read actions: get_item, batch_get_item,
294+
query and scan.
295+
296+
Parameters
297+
----------
298+
table_name : str
299+
DynamoDB table name.
300+
partition_values : Sequence[Any], optional
301+
Partition key values to retrieve. Defaults to None.
302+
sort_values : Sequence[Any], optional
303+
Sort key values to retrieve. Defaults to None.
304+
filter_expression : Union[ConditionBase, str], optional
305+
Filter expression as string or combinations of boto3.dynamodb.conditions.Attr conditions. Defaults to None.
306+
key_condition_expression : Union[ConditionBase, str], optional
307+
Key condition expression as string or combinations of boto3.dynamodb.conditions.Key conditions.
308+
Defaults to None.
309+
expression_attribute_names : Mapping[str, str], optional
310+
Mapping of placeholder and target attributes. Defaults to None.
311+
expression_attribute_values : Mapping[str, Any], optional
312+
Mapping of placeholder and target values. Defaults to None.
313+
consistent : bool
314+
If True, ensure that the performed read operation is strongly consistent, otherwise eventually consistent.
315+
Defaults to False.
316+
columns : Sequence[str], optional
317+
Attributes to retain in the returned items. Defaults to None (all attributes).
318+
allow_full_scan : bool
319+
If True, allow full table scan without any filtering. Defaults to False.
320+
max_items_evaluated : int, optional
321+
Limit the number of items evaluated in case of query or scan operations. Defaults to None (all matching items).
322+
as_dataframe : bool
323+
If True, return items as pd.DataFrame, otherwise as list/dict. Defaults to True.
324+
boto3_session : boto3.Session, optional
325+
Boto3 Session. Defaults to None (the default boto3 Session will be used).
326+
327+
Raises
328+
------
329+
exceptions.InvalidArgumentType
330+
When the specified table has also a sort key but only the partition values are specified.
331+
exceptions.InvalidArgumentCombination
332+
When both partition and sort values sequences are specified but they have different lengths,
333+
or when provided parameters are not enough informative to proceed with a read operation.
334+
335+
Returns
336+
-------
337+
Union[pd.DataFrame, List[Mapping[str, Any]]]
338+
A Data frame containing the retrieved items, or a dictionary of returned items.
339+
340+
Examples
341+
--------
342+
Reading 5 random items from a table
343+
344+
>>> import awswrangler as wr
345+
>>> df = wr.dynamodb.read_items(table_name='my-table', max_items_evaluated=5)
346+
347+
Strongly-consistent reading of a given partition value from a table
348+
349+
>>> import awswrangler as wr
350+
>>> df = wr.dynamodb.read_items(table_name='my-table', partition_values=['my-value'], consistent=True)
351+
352+
Reading items pairwise-identified by partition and sort values, from a table with a composite primary key
353+
354+
>>> import awswrangler as wr
355+
>>> df = wr.dynamodb.read_items(
356+
... table_name='my-table',
357+
... partition_values=['pv_1', 'pv_2'],
358+
... sort_values=['sv_1', 'sv_2']
359+
... )
360+
361+
Reading items while retaining only specified attributes, automatically handling possible collision
362+
with DynamoDB reserved keywords
363+
364+
>>> import awswrangler as wr
365+
>>> df = wr.dynamodb.read_items(
366+
... table_name='my-table',
367+
... partition_values=['my-value'],
368+
... columns=['connection', 'other_col'] # connection is a reserved keyword, managed under the hood!
369+
... )
370+
371+
Reading all items from a table explicitly allowing full scan
372+
373+
>>> import awswrangler as wr
374+
>>> df = wr.dynamodb.read_items(table_name='my-table', allow_full_scan=True)
375+
376+
Reading items matching a KeyConditionExpression expressed with boto3.dynamodb.conditions.Key
377+
378+
>>> import awswrangler as wr
379+
>>> from boto3.dynamodb.conditions import Key
380+
>>> df = wr.dynamodb.read_items(
381+
... table_name='my-table',
382+
... key_condition_expression=(Key('key_1').eq('val_1') & Key('key_2').eq('val_2'))
383+
... )
384+
385+
Same as above, but with KeyConditionExpression as string
386+
387+
>>> import awswrangler as wr
388+
>>> df = wr.dynamodb.read_items(
389+
... table_name='my-table',
390+
... key_condition_expression='key_1 = :v1 and key_2 = :v2',
391+
... expression_attribute_values={':v1': 'val_1', ':v2': 'val_2'},
392+
... )
393+
394+
Reading items matching a FilterExpression expressed with boto3.dynamodb.conditions.Attr
395+
396+
>>> import awswrangler as wr
397+
>>> from boto3.dynamodb.conditions import Attr
398+
>>> df = wr.dynamodb.read_items(
399+
... table_name='my-table',
400+
... filter_expression=Attr('my_attr').eq('this-value')
401+
... )
402+
403+
Same as above, but with FilterExpression as string
404+
405+
>>> import awswrangler as wr
406+
>>> df = wr.dynamodb.read_items(
407+
... table_name='my-table',
408+
... filter_expression='my_attr = :v',
409+
... expression_attribute_values={':v': 'this-value'}
410+
... )
411+
412+
Reading items involving an attribute which collides with DynamoDB reserved keywords
413+
414+
>>> import awswrangler as wr
415+
>>> df = wr.dynamodb.read_items(
416+
... table_name='my-table',
417+
... filter_expression='#operator = :v',
418+
... expression_attribute_names={'#operator': 'operator'},
419+
... expression_attribute_values={':v': 'this-value'}
420+
... )
421+
422+
"""
423+
# Extract key schema
424+
table_key_schema = get_table(table_name=table_name, boto3_session=boto3_session).key_schema
425+
426+
# Detect sort key, if any
427+
if len(table_key_schema) == 1:
428+
partition_key, sort_key = table_key_schema[0]["AttributeName"], None
429+
else:
430+
partition_key, sort_key = (
431+
next(filter(lambda x: x["KeyType"] == "HASH", table_key_schema))["AttributeName"],
432+
next(filter(lambda x: x["KeyType"] == "RANGE", table_key_schema))["AttributeName"],
433+
)
434+
435+
# Build kwargs shared by read methods
436+
kwargs: Dict[str, Any] = {"ConsistentRead": consistent}
437+
if partition_values:
438+
if sort_key is None:
439+
keys = [{partition_key: pv} for pv in partition_values]
440+
else:
441+
if not sort_values:
442+
raise exceptions.InvalidArgumentType(
443+
f"Kwarg sort_values must be specified: table {table_name} has {sort_key} as sort key."
444+
)
445+
if len(sort_values) != len(partition_values):
446+
raise exceptions.InvalidArgumentCombination("Partition and sort values must have the same length.")
447+
keys = [{partition_key: pv, sort_key: sv} for pv, sv in zip(partition_values, sort_values)]
448+
kwargs["Keys"] = keys
449+
if key_condition_expression:
450+
kwargs["KeyConditionExpression"] = key_condition_expression
451+
if filter_expression:
452+
kwargs["FilterExpression"] = filter_expression
453+
if columns:
454+
kwargs["ProjectionExpression"] = ", ".join(columns)
455+
if expression_attribute_names:
456+
kwargs["ExpressionAttributeNames"] = expression_attribute_names
457+
if expression_attribute_values:
458+
kwargs["ExpressionAttributeValues"] = expression_attribute_values
459+
if max_items_evaluated:
460+
kwargs["Limit"] = max_items_evaluated
461+
462+
_logger.debug("kwargs: %s", kwargs)
463+
# If kwargs are sufficiently informative, proceed with actual read op
464+
if any((partition_values, key_condition_expression, filter_expression, allow_full_scan, max_items_evaluated)):
465+
items = _read_items(table_name, boto3_session, **kwargs)
466+
# Raise otherwise
467+
else:
468+
_args = (
469+
"partition_values",
470+
"key_condition_expression",
471+
"filter_expression",
472+
"allow_full_scan",
473+
"max_items_evaluated",
474+
)
475+
raise exceptions.InvalidArgumentCombination(
476+
f"Please provide at least one of these arguments: {', '.join(_args)}."
477+
)
478+
479+
# Enforce DataFrame type if requested
480+
return pd.DataFrame(items) if as_dataframe else items

docs/source/api.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ DynamoDB
303303

304304
delete_items
305305
get_table
306+
read_items
306307
put_csv
307308
put_df
308309
put_items

0 commit comments

Comments
 (0)