Skip to content

Commit 3fad112

Browse files
committed
Improve async writing orchestration (multithreading).
1 parent 9710f61 commit 3fad112

File tree

3 files changed

+28
-7
lines changed

3 files changed

+28
-7
lines changed

awswrangler/_utils.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
import os
88
import random
99
import time
10-
from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union, cast
10+
from concurrent.futures import FIRST_COMPLETED, Future, wait
11+
from typing import Any, Callable, Dict, Generator, List, Optional, Sequence, Tuple, Union, cast
1112

1213
import boto3
1314
import botocore.config
@@ -290,3 +291,21 @@ def get_even_chunks_sizes(total_size: int, chunk_size: int, upper_bound: bool) -
290291
i_cycled: int = i % len(sizes)
291292
sizes[i_cycled] += 1
292293
return tuple(sizes)
294+
295+
296+
def get_running_futures(seq: Sequence[Future]) -> Tuple[Future, ...]: # type: ignore
297+
"""Filter only running futures."""
298+
return tuple(f for f in seq if f.running())
299+
300+
301+
def wait_any_future_available(seq: Sequence[Future]) -> None: # type: ignore
302+
"""Wait until any future became available."""
303+
wait(fs=seq, timeout=None, return_when=FIRST_COMPLETED)
304+
305+
306+
def block_waiting_available_thread(seq: Sequence[Future], max_workers: int) -> None: # type: ignore
307+
"""Block until any thread became available."""
308+
running: Tuple[Future, ...] = get_running_futures(seq=seq) # type: ignore
309+
while len(running) >= max_workers:
310+
wait_any_future_available(seq=running)
311+
running = get_running_futures(seq=running)

awswrangler/s3/_fs.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,9 @@ def __init__(self, use_threads: bool):
8686
self.closed = False
8787
self._exec: Optional[concurrent.futures.ThreadPoolExecutor]
8888
self._results: List[Dict[str, Union[str, int]]] = []
89-
cpus: int = _utils.ensure_cpu_count(use_threads=use_threads)
90-
if cpus > 1:
91-
self._exec = concurrent.futures.ThreadPoolExecutor(max_workers=cpus)
89+
self._cpus: int = _utils.ensure_cpu_count(use_threads=use_threads)
90+
if self._cpus > 1:
91+
self._exec = concurrent.futures.ThreadPoolExecutor(max_workers=self._cpus)
9292
self._futures: List[Any] = []
9393
else:
9494
self._exec = None
@@ -137,6 +137,7 @@ def upload(
137137
) -> None:
138138
"""Upload Part."""
139139
if self._exec is not None:
140+
_utils.block_waiting_available_thread(seq=self._futures, max_workers=self._cpus)
140141
future = self._exec.submit(
141142
_UploadProxy._caller,
142143
bucket=bucket,

awswrangler/s3/_write_concurrent.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ class _WriteProxy:
1616
def __init__(self, use_threads: bool):
1717
self._exec: Optional[concurrent.futures.ThreadPoolExecutor]
1818
self._results: List[str] = []
19-
cpus: int = _utils.ensure_cpu_count(use_threads=use_threads)
20-
if cpus > 1:
21-
self._exec = concurrent.futures.ThreadPoolExecutor(max_workers=cpus)
19+
self._cpus: int = _utils.ensure_cpu_count(use_threads=use_threads)
20+
if self._cpus > 1:
21+
self._exec = concurrent.futures.ThreadPoolExecutor(max_workers=self._cpus)
2222
self._futures: List[Any] = []
2323
else:
2424
self._exec = None
@@ -35,6 +35,7 @@ def _caller(
3535
def write(self, func: Callable[..., List[str]], boto3_session: boto3.Session, **func_kwargs: Any) -> None:
3636
"""Write File."""
3737
if self._exec is not None:
38+
_utils.block_waiting_available_thread(seq=self._futures, max_workers=self._cpus)
3839
_logger.debug("Submitting: %s", func)
3940
future = self._exec.submit(
4041
_WriteProxy._caller,

0 commit comments

Comments
 (0)