Skip to content

Commit 5ae510e

Browse files
committed
Merge branch 'develop' into master
* develop: Update CHANGELOG.md [s3] Support very large buckets in iter_bucket, add client_kwars (#908)
2 parents 7081e6f + 81bc361 commit 5ae510e

File tree

3 files changed

+94
-30
lines changed

3 files changed

+94
-30
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# 7.5.0, 2025-11-08
2+
3+
- [s3] Support very large buckets in iter_bucket, add client_kwars (PR [#908](https://github.com/piskvorky/smart_open/pull/908), [@ddelange](https://github.com/ddelange))
4+
15
# 7.4.4, 2025-11-04
26

37
- [http] Eliminate _read_iter and simplify read (PR [#906](https://github.com/piskvorky/smart_open/pull/906), [@ddelange](https://github.com/ddelange))

smart_open/concurrency.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,64 @@
1111
import concurrent.futures
1212
import contextlib
1313
import logging
14+
from collections import deque
15+
from concurrent.futures import ThreadPoolExecutor as _ThreadPoolExecutor
1416

1517
logger = logging.getLogger(__name__)
1618

1719

20+
class ThreadPoolExecutor(_ThreadPoolExecutor):
21+
"""Subclass with a lazy consuming imap method."""
22+
23+
def imap(self, fn, *iterables, timeout=None, queued_tasks_per_worker=2):
24+
"""Ordered imap that consumes iterables just-in-time.
25+
26+
References:
27+
https://gist.github.com/ddelange/c98b05437f80e4b16bf4fc20fde9c999
28+
29+
Args:
30+
fn: Function to apply.
31+
iterables: One (or more) iterable(s) to pass to fn (using zip) as positional argument(s).
32+
timeout: Per-future result retrieval timeout in seconds.
33+
queued_tasks_per_worker: Amount of additional items per worker to fetch from iterables to
34+
fill the queue: this determines the total queue size.
35+
Setting 0 will result in a true just-in-time behaviour: when a worker finishes a task,
36+
it waits until a result is consumed from the imap generator, at which point next()
37+
is called on the input iterable(s) and a new task is submitted.
38+
Default 2 ensures there is always some work to pick up. Note that at imap startup,
39+
the queue will fill up before the first yield occurs.
40+
41+
Example:
42+
long_generator = itertools.count()
43+
with ThreadPoolExecutor(42) as pool:
44+
result_generator = pool.imap(fn, long_generator)
45+
for result in result_generator:
46+
print(result)
47+
"""
48+
futures, maxlen = deque(), self._max_workers * (queued_tasks_per_worker + 1)
49+
popleft, append, submit = futures.popleft, futures.append, self.submit
50+
51+
def get():
52+
"""Block until the next task is done and return the result."""
53+
return popleft().result(timeout)
54+
55+
for args in zip(*iterables):
56+
append(submit(fn, *args))
57+
if len(futures) == maxlen:
58+
yield get()
59+
60+
while futures:
61+
yield get()
62+
63+
64+
# ConcurrentFuturesPool and create_pool were once used in smart_open.s3.iter_bucket.
65+
# Left here for backwards compatibility.
66+
67+
1868
class ConcurrentFuturesPool(object):
1969
"""A class that mimics multiprocessing.pool.Pool but uses concurrent futures instead of processes."""
2070
def __init__(self, max_workers):
21-
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
71+
self.executor = ThreadPoolExecutor(max_workers=max_workers)
2272

2373
def imap_unordered(self, function, items):
2474
futures = [self.executor.submit(function, item) for item in items]

smart_open/s3.py

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import http
1212
import io
1313
import functools
14+
import itertools
1415
import logging
1516
import time
1617
import warnings
@@ -1333,14 +1334,16 @@ def _accept_all(key):
13331334

13341335

13351336
def iter_bucket(
1336-
bucket_name,
1337-
prefix='',
1338-
accept_key=None,
1339-
key_limit=None,
1340-
workers=16,
1341-
retries=3,
1342-
max_threads_per_fileobj=4,
1343-
**session_kwargs):
1337+
bucket_name,
1338+
prefix='',
1339+
accept_key=None,
1340+
key_limit=None,
1341+
workers=16,
1342+
retries=3,
1343+
max_threads_per_fileobj=4,
1344+
client_kwargs=None,
1345+
**session_kwargs, # double star notation for backwards compatibility
1346+
):
13441347
"""
13451348
Iterate and download all S3 objects under `s3://bucket_name/prefix`.
13461349
@@ -1364,6 +1367,10 @@ def iter_bucket(
13641367
max_threads_per_fileobj: int, optional
13651368
The maximum number of download threads per worker. The maximum size of the
13661369
connection pool will be `workers * max_threads_per_fileobj + 1`. Default: 4
1370+
client_kwargs: dict, optional
1371+
Keyword arguments to pass when creating a new session.
1372+
For a list of available names and values, see:
1373+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.client
13671374
session_kwargs: dict, optional
13681375
Keyword arguments to pass when creating a new session.
13691376
For a list of available names and values, see:
@@ -1411,17 +1418,20 @@ def iter_bucket(
14111418
if bucket_name is None:
14121419
raise ValueError('bucket_name may not be None')
14131420

1414-
total_size, key_no = 0, -1
1421+
total_size, key_no = 0, 0
14151422

14161423
# thread-safe client to share across _list_bucket and _download_key calls
14171424
# https://github.com/boto/boto3/blob/1.38.41/docs/source/guide/clients.rst?plain=1#L111
14181425
session = boto3.session.Session(**session_kwargs)
1419-
config = botocore.client.Config(
1420-
max_pool_connections=workers * max_threads_per_fileobj + 1, # 1 thread for _list_bucket
1421-
tcp_keepalive=True,
1422-
retries={"max_attempts": retries * 2, "mode": "adaptive"},
1423-
)
1424-
client = session.client('s3', config=config)
1426+
if client_kwargs is None:
1427+
client_kwargs = {}
1428+
if 'config' not in client_kwargs:
1429+
client_kwargs['config'] = botocore.client.Config(
1430+
max_pool_connections=workers * max_threads_per_fileobj + 1, # 1 thread for _list_bucket
1431+
tcp_keepalive=True,
1432+
retries={'max_attempts': retries * 2, 'mode': 'adaptive'},
1433+
)
1434+
client = session.client('s3', **client_kwargs)
14251435

14261436
transfer_config = boto3.s3.transfer.TransferConfig(max_concurrency=max_threads_per_fileobj)
14271437

@@ -1439,29 +1449,29 @@ def iter_bucket(
14391449
transfer_config=transfer_config,
14401450
)
14411451

1442-
with smart_open.concurrency.create_pool(workers) as pool:
1443-
result_iterator = pool.imap_unordered(download_key, key_iterator)
1444-
key_no = 0
1445-
while True:
1446-
try:
1447-
(key, content) = result_iterator.__next__()
1448-
except StopIteration:
1449-
break
1452+
# Limit the iterator ('infinite' iterators are supported, key_limit=None is supported)
1453+
key_iterator = itertools.islice(key_iterator, key_limit)
1454+
1455+
with smart_open.concurrency.ThreadPoolExecutor(workers) as executor:
1456+
result_iterator = executor.imap(download_key, key_iterator)
1457+
for key_no, (key, content) in enumerate(result_iterator, start=1):
14501458
# Skip deleted objects (404 responses)
14511459
if key is None:
14521460
continue
1461+
14531462
if key_no % 1000 == 0:
14541463
logger.info(
1455-
"yielding key #%i: %s, size %i (total %.1fMB)",
1464+
"yielding key #%i: %s, size %i (total %.1f MB)",
14561465
key_no, key, len(content), total_size / 1024.0 ** 2
14571466
)
1467+
14581468
yield key, content
14591469
total_size += len(content)
1460-
if key_limit is not None and key_no + 1 >= key_limit:
1461-
# we were asked to output only a limited number of keys => we're done
1462-
break
1463-
key_no += 1
1464-
logger.info("processed %i keys, total size %i" % (key_no + 1, total_size))
1470+
logger.info(
1471+
"processed %i keys, total size %.1f MB",
1472+
key_no,
1473+
total_size / 1024.0 ** 2,
1474+
)
14651475

14661476

14671477
def _list_bucket(

0 commit comments

Comments
 (0)