@@ -194,6 +194,7 @@ def _read_scan_chunked(
194194
195195 deserializer = boto3 .dynamodb .types .TypeDeserializer ()
196196 next_token = "init_token" # Dummy token
197+ total_items = 0
197198
198199 kwargs = dict (kwargs )
199200 if segment is not None :
@@ -208,8 +209,12 @@ def _read_scan_chunked(
208209 {k : v ["B" ] if list (v .keys ())[0 ] == "B" else deserializer .deserialize (v ) for k , v in d .items ()}
209210 for d in response .get ("Items" , [])
210211 ]
212+ total_items += len (items )
211213 yield _utils .list_to_arrow_table (mapping = items ) if as_dataframe else items
212214
215+ if ("Limit" in kwargs ) and (total_items >= kwargs ["Limit" ]):
216+ break
217+
213218 next_token = response .get ("LastEvaluatedKey" , None ) # type: ignore[assignment]
214219 if next_token :
215220 kwargs ["ExclusiveStartKey" ] = next_token
@@ -237,14 +242,22 @@ def _read_query_chunked(
237242 table_name : str , boto3_session : Optional [boto3 .Session ] = None , ** kwargs : Any
238243) -> Iterator [_ItemsListType ]:
239244 table = get_table (table_name = table_name , boto3_session = boto3_session )
240- response = table . query ( ** kwargs )
241- yield response . get ( "Items" , [])
245+ next_token = "init_token" # Dummy token
246+ total_items = 0
242247
243248 # Handle pagination
244- while "LastEvaluatedKey" in response :
245- kwargs ["ExclusiveStartKey" ] = response ["LastEvaluatedKey" ]
249+ while next_token :
246250 response = table .query (** kwargs )
247- yield response .get ("Items" , [])
251+ items = response .get ("Items" , [])
252+ total_items += len (items )
253+ yield items
254+
255+ if ("Limit" in kwargs ) and (total_items >= kwargs ["Limit" ]):
256+ break
257+
258+ next_token = response .get ("LastEvaluatedKey" , None ) # type: ignore[assignment]
259+ if next_token :
260+ kwargs ["ExclusiveStartKey" ] = next_token
248261
249262
250263@_handle_reserved_keyword_error
@@ -352,9 +365,10 @@ def _read_items(
352365 boto3_session : Optional [boto3 .Session ] = None ,
353366 ** kwargs : Any ,
354367) -> Union [pd .DataFrame , Iterator [pd .DataFrame ], _ItemsListType , Iterator [_ItemsListType ]]:
355- # Extract 'Keys' and 'IndexName ' from provided kwargs: if needed, will be reinserted later on
368+ # Extract 'Keys', 'IndexName' and 'Limit ' from provided kwargs: if needed, will be reinserted later on
356369 keys = kwargs .pop ("Keys" , None )
357370 index = kwargs .pop ("IndexName" , None )
371+ limit = kwargs .pop ("Limit" , None )
358372
359373 # Conditionally define optimal reading strategy
360374 use_get_item = (keys is not None ) and (len (keys ) == 1 )
@@ -372,6 +386,11 @@ def _read_items(
372386 items = _read_batch_items (table_name , chunked , boto3_session , ** kwargs )
373387
374388 else :
389+ if limit :
390+ kwargs ["Limit" ] = limit
391+ _logger .debug ("`max_items_evaluated` argument detected, setting use_threads to False" )
392+ use_threads = False
393+
375394 if index :
376395 kwargs ["IndexName" ] = index
377396
@@ -438,6 +457,11 @@ def read_items( # pylint: disable=too-many-branches
438457 of the table or index.
439458 See: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ParallelScan
440459
460+ Note
461+ ----
462+ If `max_items_evaluated` is specified, then `use_threads=False` is enforced. This is because
463+ it's not possible to limit the number of items in a Query/Scan operation across threads.
464+
441465 Parameters
442466 ----------
443467 table_name : str
@@ -466,6 +490,7 @@ def read_items( # pylint: disable=too-many-branches
466490 If True, allow full table scan without any filtering. Defaults to False.
467491 max_items_evaluated : int, optional
468492 Limit the number of items evaluated in case of query or scan operations. Defaults to None (all matching items).
493+ When set, `use_threads` is enforced to False.
469494 dtype_backend: str, optional
470495 Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays,
471496 nullable dtypes are used for all dtypes that have a nullable implementation when
0 commit comments