Skip to content

Commit bd1d75e

Browse files
ahgraberjjmachan
andauthored
Feature: batched execution (#1589)
Provides batched execution support for `ragas.executor.Executor` and `ragas.async_utils.run_async_tasks`. This is useful, for example, for folks who are self-hosting their own models and therefore don't have the throughput that a model service has. In this circumstance, sending a long list of task to the Executor would previously result in timeouts as all `n` tasks were activated at the same time, but the LLM responses were queued. The batched approach only "activates" `batch_size` of the coroutines, and then waits until that batch has been completed before moving on. Batching may result in slower/longer execution times with LLM APIs, but is _disabled by default_ with `batch_size=None`. --- Note: `tqdm` support is a bit weird, and I haven't been able to resolve after spending about a day and a half on it, so I'm giving up / forcing `leave=True`. - my current hypothesis is that there's a bug in `tqdm`'s `position` kwarg, so the inner progress bar gets overwritten by the other loop's when `leave=False` - as a result, it _seems_ that inner bar doesn't refresh until it is complete; it shows up due to `leave=True` once the cursor drops to the next line for the next batch --------- Co-authored-by: jjmachan <[email protected]>
1 parent e0867d1 commit bd1d75e

File tree

15 files changed

+542
-202
lines changed

15 files changed

+542
-202
lines changed

docs/howtos/integrations/llamaindex.ipynb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@
335335
"\n",
336336
"# init metrics with evaluator LLM\n",
337337
"from ragas.llms import LlamaIndexLLMWrapper\n",
338+
"\n",
338339
"evaluator_llm = LlamaIndexLLMWrapper(OpenAI(model=\"gpt-4o\"))\n",
339340
"metrics = [\n",
340341
" Faithfulness(llm=evaluator_llm),\n",

src/ragas/async_utils.py

Lines changed: 73 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,89 @@
11
"""Async utils."""
22

33
import asyncio
4-
from typing import Any, Coroutine, List
4+
from typing import Any, Coroutine, List, Optional
5+
6+
from tqdm.auto import tqdm
7+
8+
from ragas.executor import is_event_loop_running
9+
from ragas.utils import batched
510

611

712
def run_async_tasks(
813
tasks: List[Coroutine],
9-
show_progress: bool = False,
14+
batch_size: Optional[int] = None,
15+
show_progress: bool = True,
1016
progress_bar_desc: str = "Running async tasks",
1117
) -> List[Any]:
12-
"""Run a list of async tasks."""
13-
tasks_to_execute: List[Any] = tasks
18+
"""
19+
Execute async tasks with optional batching and progress tracking.
20+
21+
NOTE: Order of results is not guaranteed!
22+
23+
Args:
24+
tasks: List of coroutines to execute
25+
batch_size: Optional size for batching tasks. If None, runs all concurrently
26+
show_progress: Whether to display progress bars
27+
"""
28+
29+
async def _run():
30+
total_tasks = len(tasks)
31+
results = []
1432

15-
# if running in notebook, use nest_asyncio to hijack the event loop
16-
try:
17-
loop = asyncio.get_running_loop()
33+
# If no batching, run all tasks concurrently with single progress bar
34+
if not batch_size:
35+
with tqdm(
36+
total=total_tasks,
37+
desc=progress_bar_desc,
38+
disable=not show_progress,
39+
) as pbar:
40+
for future in asyncio.as_completed(tasks):
41+
result = await future
42+
results.append(result)
43+
pbar.update(1)
44+
return results
45+
46+
# With batching, show nested progress bars
47+
batches = batched(tasks, batch_size) # generator
48+
n_batches = (total_tasks + batch_size - 1) // batch_size
49+
with (
50+
tqdm(
51+
total=total_tasks,
52+
desc=progress_bar_desc,
53+
disable=not show_progress,
54+
position=0,
55+
leave=True,
56+
) as overall_pbar,
57+
tqdm(
58+
total=batch_size,
59+
desc=f"Batch 1/{n_batches}",
60+
disable=not show_progress,
61+
position=1,
62+
leave=False,
63+
) as batch_pbar,
64+
):
65+
for i, batch in enumerate(batches, 1):
66+
batch_pbar.reset(total=len(batch))
67+
batch_pbar.set_description(f"Batch {i}/{n_batches}")
68+
for future in asyncio.as_completed(batch):
69+
result = await future
70+
results.append(result)
71+
overall_pbar.update(1)
72+
batch_pbar.update(1)
73+
74+
return results
75+
76+
if is_event_loop_running():
77+
# an event loop is running so call nested_asyncio to fix this
1878
try:
1979
import nest_asyncio
2080
except ImportError:
21-
raise RuntimeError(
22-
"nest_asyncio is required to run async tasks in jupyter. Please install it via `pip install nest_asyncio`." # noqa
81+
raise ImportError(
82+
"It seems like your running this in a jupyter-like environment. "
83+
"Please install nest_asyncio with `pip install nest_asyncio` to make it work."
2384
)
2485
else:
2586
nest_asyncio.apply()
26-
except RuntimeError:
27-
loop = asyncio.new_event_loop()
28-
29-
# gather tasks to run
30-
if show_progress:
31-
from tqdm.asyncio import tqdm
32-
33-
async def _gather() -> List[Any]:
34-
"gather tasks and show progress bar"
35-
return await tqdm.gather(*tasks_to_execute, desc=progress_bar_desc)
36-
37-
else: # don't show_progress
38-
39-
async def _gather() -> List[Any]:
40-
return await asyncio.gather(*tasks_to_execute)
41-
42-
try:
43-
outputs: List[Any] = loop.run_until_complete(_gather())
44-
except Exception as e:
45-
# run the operation w/o tqdm on hitting a fatal
46-
# may occur in some environments where tqdm.asyncio
47-
# is not supported
48-
raise RuntimeError("Fatal error occurred while running async tasks.", e) from e
49-
return outputs
87+
88+
results = asyncio.run(_run())
89+
return results

src/ragas/dataset_schema.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ def to_pandas(self) -> PandasDataframe:
208208
def from_pandas(cls, dataframe: PandasDataframe):
209209
"""Creates an EvaluationDataset from a pandas DataFrame."""
210210
return cls.from_list(dataframe.to_dict(orient="records"))
211-
211+
212212
def features(self):
213213
"""Returns the features of the samples."""
214214
return self.samples[0].get_features()

src/ragas/evaluation.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
@track_was_completed
5656
def evaluate(
5757
dataset: t.Union[Dataset, EvaluationDataset],
58-
metrics: list[Metric] | None = None,
58+
metrics: t.Optional[t.Sequence[Metric]] = None,
5959
llm: t.Optional[BaseRagasLLM | LangchainLLM] = None,
6060
embeddings: t.Optional[BaseRagasEmbeddings | LangchainEmbeddings] = None,
6161
callbacks: Callbacks = None,
@@ -65,6 +65,7 @@ def evaluate(
6565
raise_exceptions: bool = False,
6666
column_map: t.Optional[t.Dict[str, str]] = None,
6767
show_progress: bool = True,
68+
batch_size: t.Optional[int] = None,
6869
) -> EvaluationResult:
6970
"""
7071
Run the evaluation on the dataset with different metrics
@@ -110,6 +111,8 @@ def evaluate(
110111
column_map can be given as {"contexts":"contexts_v1"}
111112
show_progress: bool, optional
112113
Whether to show the progress bar during evaluation. If set to False, the progress bar will be disabled. Default is True.
114+
batch_size: int, optional
115+
How large should batches be. If set to None (default), no batching is done.
113116
114117
Returns
115118
-------
@@ -223,6 +226,7 @@ def evaluate(
223226
raise_exceptions=raise_exceptions,
224227
run_config=run_config,
225228
show_progress=show_progress,
229+
batch_size=batch_size,
226230
)
227231

228232
# Ragas Callbacks

0 commit comments

Comments
 (0)