Skip to content

Commit 019e074

Browse files
committed
update
1 parent a31408f commit 019e074

File tree

1 file changed

+80
-15
lines changed

1 file changed

+80
-15
lines changed

src/auditwheel/pool.py

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,99 @@
1+
import time
12
from concurrent.futures import Future, ThreadPoolExecutor
23
from pathlib import Path
34
from typing import Any, Callable, Optional
45

56

67
class FileTaskExecutor:
7-
def __init__(self, concurrent: bool = False):
8-
self.executor = ThreadPoolExecutor() if concurrent else None
8+
"""A task executor that manages concurrent file operations with deduplication.
9+
10+
This executor ensures that only one task per file path runs at a time, even if
11+
multiple tasks are submitted for the same file. It executes tasks with `concurrent`
12+
threads when `concurrent` >= 1, specially when `concurrent` is 1, it will execute
13+
tasks sequentially. When `concurrent` < 1, it will use the default setting of
14+
ThreadPoolExecutor.
15+
16+
Args:
17+
concurrent (int): Number of concurrent threads to use. Defaults to 1.
18+
Example:
19+
>>> executor = FileTaskExecutor(concurrent=2)
20+
>>> future = executor.submit(Path("file.txt"), process_file, "file.txt")
21+
>>> executor.wait() # Wait for all tasks to complete
22+
"""
23+
24+
def __init__(self, concurrent: int = 1):
25+
self.executor = (
26+
None
27+
if concurrent == 1
28+
else ThreadPoolExecutor(concurrent if concurrent > 1 else None)
29+
)
930
self.working_map: dict[Path, Future[tuple[str, str]]] = {}
1031

1132
def submit(
1233
self, path: Path, fn: Callable[[Any], Any], /, *args: Any, **kwargs: Any
13-
) -> Future[Any]:
34+
) -> None:
35+
if not path.is_absolute():
36+
path = path.absolute()
37+
1438
future: Future[Any]
1539
if self.executor is None:
1640
future = Future()
1741
future.set_result(fn(*args, **kwargs))
18-
return future
19-
assert path not in self.working_map
20-
future = self.executor.submit(fn, *args, **kwargs)
21-
future.add_done_callback(lambda f: self.working_map.pop(path))
22-
self.working_map[path] = future
23-
return future
42+
return
43+
44+
if path not in self.working_map:
45+
future = self.executor.submit(fn, *args, **kwargs)
46+
self.working_map[path] = future
47+
else:
48+
future = self.working_map[path]
49+
future.add_done_callback(lambda _: self.working_map.pop(path, None))
50+
future.add_done_callback(lambda _: self.submit(path, fn, *args, **kwargs))
2451

2552
def wait(self, path: Optional[Path] = None) -> None:
53+
"""Wait for tasks to complete.
54+
55+
If a path is specified, waits only for that specific file's task to complete.
56+
Otherwise, waits for all tasks to complete.
57+
58+
Args:
59+
path (Optional[Path]): The specific file path to wait for. If None,
60+
waits for all tasks to complete.
61+
"""
2662
if self.executor is None:
2763
return
28-
if path is not None:
29-
if path in self.working_map:
30-
self.working_map.pop(path).result()
31-
return
32-
33-
for path in self.working_map:
64+
if path is not None and path in self.working_map:
65+
self.working_map.pop(path, None).result()
66+
# may have chained callback, so we need to wait again
3467
self.wait(path)
68+
69+
while self.working_map:
70+
# Process one task for each for-loop
71+
# for map might be changed during the loop
72+
for path in self.working_map:
73+
self.wait(path)
74+
break
75+
76+
77+
def fake_job(i: int) -> int:
78+
print(f"start {i}")
79+
time.sleep(i)
80+
print(f"end {i}")
81+
82+
83+
if __name__ == "__main__":
84+
executor = FileTaskExecutor(concurrent=0)
85+
for i in range(10):
86+
executor.submit(Path(f"test{i}.txt"), fake_job, i)
87+
for i in range(10):
88+
executor.submit(Path(f"test{i}.txt"), fake_job, i)
89+
for i in range(10):
90+
executor.submit(Path(f"test{i}.txt"), fake_job, i)
91+
for i in range(10):
92+
executor.submit(Path(f"test{i}.txt"), fake_job, i)
93+
for i in range(10):
94+
executor.submit(Path(f"test{i}.txt"), fake_job, i)
95+
for i in range(10):
96+
executor.submit(Path(f"test{i}.txt"), fake_job, i)
97+
for i in range(10):
98+
executor.submit(Path(f"test{i}.txt"), fake_job, i)
99+
executor.wait()

0 commit comments

Comments
 (0)