88import pandas as pd
99from boto3 .dynamodb .types import TypeDeserializer
1010
11+ from awswrangler import _utils
1112from awswrangler ._config import apply_configs
1213
13- from ._utils import get_client
14-
1514_logger : logging .Logger = logging .getLogger (__name__ )
1615
1716
@@ -42,22 +41,25 @@ def _get_terms_groups(terms: List[str]) -> Tuple[List[str], List[str], List[str]
4241 return select_terms , from_terms , where_terms
4342
4443
45- def _get_scan_response (table_name : str , select_terms : List [str ], client : boto3 .resource ) -> List [Dict [str , Any ]]:
44+ def _get_scan_response (
45+ table_name : str , select_terms : List [str ], boto3_session : Optional [boto3 .Session ] = None
46+ ) -> List [Dict [str , Any ]]:
4647 """Perform a scan to the Dynamo DB table and returns the data fetched."""
48+ client_dynamodb = _utils .client (service_name = "dynamodb" , session = boto3_session )
4749 scan_config : Dict [str , Any ] = {"TableName" : table_name }
4850 if len (select_terms ) > 1 or select_terms [0 ] != "*" :
4951 scan_config ["AttributesToGet" ] = select_terms
5052 # get all responses even if pagination is necessary
51- response = client .scan (** scan_config )
53+ response = client_dynamodb .scan (** scan_config )
5254 data : List [Dict [str , Any ]] = response ["Items" ]
5355 while "LastEvaluatedKey" in response :
5456 scan_config ["ExclusiveStartKey" ] = response ["LastEvaluatedKey" ]
55- response = client .scan (** scan_config )
57+ response = client_dynamodb .scan (** scan_config )
5658 data .extend (response ["Items" ])
5759 return data
5860
5961
60- def _get_items (client : boto3 . resource , query : str ) -> List [Dict [str , Any ]]:
62+ def _get_items (query : str , boto3_session : Optional [ boto3 . Session ] = None ) -> List [Dict [str , Any ]]:
6163 # clean input query from possible excessive whitespace
6264 query = re .sub (" +" , " " , query ).strip ()
6365 # generate terms list from query
@@ -70,8 +72,7 @@ def _get_items(client: boto3.resource, query: str) -> List[Dict[str, Any]]:
7072 if len (from_terms ) == 0 :
7173 raise ValueError ("The PartiQL query contains no tables." )
7274 table_name = from_terms [0 ]
73- data = _get_scan_response (table_name , select_terms , client )
74- return data
75+ return _get_scan_response (table_name = table_name , select_terms = select_terms , boto3_session = boto3_session )
7576
7677
7778def _deserialize_value (value : Any ) -> Any :
@@ -87,11 +88,13 @@ def _deserialize_data(df: pd.DataFrame, columns: pd.Index) -> pd.DataFrame:
8788 return df
8889
8990
90- def _parse_dynamodb_results (results : List [Dict [str , Any ]]) -> pd .DataFrame :
91- df = pd .DataFrame (results )
92- columns = df .columns
93- df = _deserialize_data (df , columns )
94- return df
91+ def _parse_dynamodb_items (
92+ items : List [Dict [str , Any ]],
93+ dtype : Optional [Dict [str , str ]] = None ,
94+ ) -> pd .DataFrame :
95+ df = pd .DataFrame (items )
96+ df = _deserialize_data (df , df .columns )
97+ return df .astype (dtype = dtype ) if dtype else df
9598
9699
97100@apply_configs
@@ -140,10 +143,6 @@ def read_partiql_query(
140143 ... dtype={'key': int}
141144 ... )
142145 """
143- client = get_client (boto3_session )
144146 _logger .debug ("Reading results for PartiQL query: %s" , query )
145- items = _get_items (client , query )
146- df = _parse_dynamodb_results (items )
147- if dtype :
148- df = df .astype (dtype = dtype )
149- return df
147+ items = _get_items (query = query , boto3_session = boto3_session )
148+ return _parse_dynamodb_items (items = items , dtype = dtype )
0 commit comments