Skip to content

Commit f7c58f4

Browse files
ahgraberanistark
andauthored
feature: improve async / executor functionality (#2070)
- refactor: move is_event_loop_running and as_completed to async_utils This helps separate async features from executor and engine modules, and will prevent circular imports in these modules. - feat: add ProgressBarManager for enhanced progress tracking Introduces a new ProgressBarManager class to manage progress bars for both batch and non-batch execution. This includes methods for creating single and nested progress bars, as well as updating batch progress bars, improving user experience during long-running tasks. - feat: refactor async utilities for improved functionality - Refactored `run` function from repeated async, executor, and engine logic. - Changed `as_completed` from async to a regular function for better compatibility. - Added `process_futures` to handle futures with optional progress tracking. - feat: enhance async task management with ProgressBarManager - Refactored Executor to utilize new async_utils and ProgressBarManager for improved composition - Refactored run_async_tasks to utilize ProgressBarManager for improved progress tracking. - Modified tests to validate the new behavior functions - feat: enhance Executor with job indexing and exception handling - Added `_jobs_processed` attribute to maintain consistent job indexing across multiple runs. - Implemented `clear_jobs` method to reset job indices. - Updated tests to verify exception handling and job indexing after clearing jobs. - Adjusted Jupyter notebook tests for consistency in results retrieval. - feat: update engine's apply_transformations to recursively process nested Transformation or Parallel procedures - The apply_transforms() function has been updated to handle different types of transformations recursively: If transforms is a list, it recursively applies each transform in the list. If transforms is a Parallel instance, it recursively applies the transformations contained within the Parallel object. If transforms is a BaseGraphTransformation, it generates an execution plan (a list of coroutines), gets a description, and then runs the coroutines asynchronously using run_async_tasks(). If transforms is none of the above, it raises a ValueError indicating an invalid type. - Move apply_nest_asyncio to async_utils for better organization - Updated the Parallel class to support transformations with improved type hints. - Added unit tests - chore: use Sequence over List for type checking improvements; add debug logging --------- Co-authored-by: Ani <[email protected]>
1 parent b22112f commit f7c58f4

File tree

11 files changed

+708
-394
lines changed

11 files changed

+708
-394
lines changed

docs/references/executor.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,3 @@
33
members:
44
- Executor
55
- run_async_batch
6-
- is_event_loop_running

src/ragas/async_utils.py

Lines changed: 141 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,89 +1,171 @@
11
"""Async utils."""
22

33
import asyncio
4-
from typing import Any, Coroutine, List, Optional
4+
import logging
5+
import typing as t
56

6-
from tqdm.auto import tqdm
7+
logger = logging.getLogger(__name__)
78

8-
from ragas.executor import is_event_loop_running
9-
from ragas.utils import batched
9+
10+
def is_event_loop_running() -> bool:
11+
"""
12+
Check if an event loop is currently running.
13+
"""
14+
try:
15+
loop = asyncio.get_running_loop()
16+
except RuntimeError:
17+
return False
18+
else:
19+
return loop.is_running()
20+
21+
22+
def apply_nest_asyncio():
23+
"""Apply nest_asyncio if an event loop is running."""
24+
if is_event_loop_running():
25+
# an event loop is running so call nested_asyncio to fix this
26+
try:
27+
import nest_asyncio
28+
except ImportError:
29+
raise ImportError(
30+
"It seems like your running this in a jupyter-like environment. Please install nest_asyncio with `pip install nest_asyncio` to make it work."
31+
)
32+
33+
nest_asyncio.apply()
34+
35+
36+
def as_completed(
37+
coroutines: t.Sequence[t.Coroutine],
38+
max_workers: int = -1,
39+
*,
40+
cancel_check: t.Optional[t.Callable[[], bool]] = None,
41+
cancel_pending: bool = True,
42+
) -> t.Iterator[asyncio.Future]:
43+
"""
44+
Wrap coroutines with a semaphore if max_workers is specified.
45+
46+
Returns an iterator of futures that completes as tasks finish.
47+
"""
48+
if max_workers == -1:
49+
tasks = [asyncio.create_task(coro) for coro in coroutines]
50+
else:
51+
semaphore = asyncio.Semaphore(max_workers)
52+
53+
async def sema_coro(coro):
54+
async with semaphore:
55+
return await coro
56+
57+
tasks = [asyncio.create_task(sema_coro(coro)) for coro in coroutines]
58+
59+
ac_iter = asyncio.as_completed(tasks)
60+
61+
if cancel_check is None:
62+
return ac_iter
63+
64+
def _iter_with_cancel():
65+
for future in ac_iter:
66+
if cancel_check():
67+
if cancel_pending:
68+
for t in tasks:
69+
if not t.done():
70+
t.cancel()
71+
break
72+
yield future
73+
74+
return _iter_with_cancel()
75+
76+
77+
async def process_futures(
78+
futures: t.Iterator[asyncio.Future],
79+
) -> t.AsyncGenerator[t.Any, None]:
80+
"""
81+
Process futures with optional progress tracking.
82+
83+
Args:
84+
futures: Iterator of asyncio futures to process (e.g., from asyncio.as_completed)
85+
86+
Yields:
87+
Results from completed futures as they finish
88+
"""
89+
# Process completed futures as they finish
90+
for future in futures:
91+
try:
92+
result = await future
93+
except asyncio.CancelledError:
94+
raise # Re-raise CancelledError to ensure proper cancellation
95+
except Exception as e:
96+
result = e
97+
98+
yield result
99+
100+
101+
def run(
102+
async_func: t.Union[
103+
t.Callable[[], t.Coroutine[t.Any, t.Any, t.Any]],
104+
t.Coroutine[t.Any, t.Any, t.Any],
105+
],
106+
) -> t.Any:
107+
"""
108+
Run an async function in the current event loop or a new one if not running.
109+
"""
110+
# Ensure nest_asyncio is applied if we're in a running loop
111+
# This is common in environments like Jupyter notebooks
112+
apply_nest_asyncio()
113+
114+
# Create the coroutine if it's a callable, otherwise use directly
115+
coro = async_func() if callable(async_func) else async_func
116+
return asyncio.run(coro)
10117

11118

12119
def run_async_tasks(
13-
tasks: List[Coroutine],
14-
batch_size: Optional[int] = None,
120+
tasks: t.Sequence[t.Coroutine],
121+
batch_size: t.Optional[int] = None,
15122
show_progress: bool = True,
16123
progress_bar_desc: str = "Running async tasks",
17-
) -> List[Any]:
124+
max_workers: int = -1,
125+
*,
126+
cancel_check: t.Optional[t.Callable[[], bool]] = None,
127+
) -> t.List[t.Any]:
18128
"""
19129
Execute async tasks with optional batching and progress tracking.
20130
21131
NOTE: Order of results is not guaranteed!
22132
23133
Args:
24-
tasks: List of coroutines to execute
134+
tasks: Sequence of coroutines to execute
25135
batch_size: Optional size for batching tasks. If None, runs all concurrently
26136
show_progress: Whether to display progress bars
137+
max_workers: Maximum number of concurrent tasks (-1 for unlimited)
27138
"""
139+
from ragas.utils import ProgressBarManager, batched
28140

29141
async def _run():
30142
total_tasks = len(tasks)
31143
results = []
144+
pbm = ProgressBarManager(progress_bar_desc, show_progress)
32145

33-
# If no batching, run all tasks concurrently with single progress bar
34146
if not batch_size:
35-
with tqdm(
36-
total=total_tasks,
37-
desc=progress_bar_desc,
38-
disable=not show_progress,
39-
) as pbar:
40-
for future in asyncio.as_completed(tasks):
41-
result = await future
147+
with pbm.create_single_bar(total_tasks) as pbar:
148+
async for result in process_futures(
149+
as_completed(tasks, max_workers, cancel_check=cancel_check)
150+
):
42151
results.append(result)
43152
pbar.update(1)
44-
return results
45-
46-
# With batching, show nested progress bars
47-
batches = batched(tasks, batch_size) # generator
48-
n_batches = (total_tasks + batch_size - 1) // batch_size
49-
with (
50-
tqdm(
51-
total=total_tasks,
52-
desc=progress_bar_desc,
53-
disable=not show_progress,
54-
position=0,
55-
leave=True,
56-
) as overall_pbar,
57-
tqdm(
58-
total=batch_size,
59-
desc=f"Batch 1/{n_batches}",
60-
disable=not show_progress,
61-
position=1,
62-
leave=False,
63-
) as batch_pbar,
64-
):
65-
for i, batch in enumerate(batches, 1):
66-
batch_pbar.reset(total=len(batch))
67-
batch_pbar.set_description(f"Batch {i}/{n_batches}")
68-
for future in asyncio.as_completed(batch):
69-
result = await future
70-
results.append(result)
71-
overall_pbar.update(1)
72-
batch_pbar.update(1)
153+
else:
154+
total_tasks = len(tasks)
155+
batches = batched(tasks, batch_size)
156+
overall_pbar, batch_pbar, n_batches = pbm.create_nested_bars(
157+
total_tasks, batch_size
158+
)
159+
with overall_pbar, batch_pbar:
160+
for i, batch in enumerate(batches, 1):
161+
pbm.update_batch_bar(batch_pbar, i, n_batches, len(batch))
162+
async for result in process_futures(
163+
as_completed(batch, max_workers, cancel_check=cancel_check)
164+
):
165+
results.append(result)
166+
batch_pbar.update(1)
167+
overall_pbar.update(len(batch))
73168

74169
return results
75170

76-
if is_event_loop_running():
77-
# an event loop is running so call nested_asyncio to fix this
78-
try:
79-
import nest_asyncio
80-
except ImportError:
81-
raise ImportError(
82-
"It seems like your running this in a jupyter-like environment. "
83-
"Please install nest_asyncio with `pip install nest_asyncio` to make it work."
84-
)
85-
else:
86-
nest_asyncio.apply()
87-
88-
results = asyncio.run(_run())
89-
return results
171+
return run(_run)

0 commit comments

Comments
 (0)