Skip to content

Commit b0d25d2

Browse files
authored
feat: cancellable task (#2242)
## Issue Link / Problem Description <!-- Link to related issue or describe the problem this PR solves --> - Closes #2103 ## Changes Made <!-- Describe what you changed and why --> - Tasks can now be cancelled - Executor import updated - added tests - added docs for cancellable task ## Testing <!-- Describe how this should be tested --> ### How to Test - [x] Automated tests added/updated - [x] Manual testing steps: 1. `make test` for testing all tests. 2. `uv run pytest tests/unit/test_cancellation.py` to test cancelled task.
1 parent 866c662 commit b0d25d2

File tree

9 files changed

+817
-22
lines changed

9 files changed

+817
-22
lines changed
Lines changed: 370 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,370 @@
1+
# Cancelling Long-Running Tasks
2+
3+
When working with large datasets or complex evaluations, some Ragas operations can take significant time to complete. The cancellation feature allows you to gracefully terminate these long-running tasks when needed, which is especially important in production environments.
4+
5+
## Overview
6+
7+
Ragas provides cancellation support for:
8+
- **`evaluate()`** - Evaluation of datasets with metrics
9+
- **`generate_with_langchain_docs()`** - Test set generation from documents
10+
11+
The cancellation mechanism is thread-safe and allows for graceful termination with partial results when possible.
12+
13+
## Basic Usage
14+
15+
### Cancellable Evaluation
16+
17+
Instead of running evaluation directly, you can get an executor that allows cancellation:
18+
19+
```py
20+
from ragas import evaluate
21+
from ragas.dataset_schema import EvaluationDataset
22+
23+
# Your dataset and metrics
24+
dataset = EvaluationDataset(...)
25+
metrics = [...]
26+
27+
# Get executor instead of running evaluation immediately
28+
executor = evaluate(
29+
dataset=dataset,
30+
metrics=metrics,
31+
return_executor=True # Key parameter
32+
)
33+
34+
# Now you can:
35+
# - Cancel: executor.cancel()
36+
# - Check status: executor.is_cancelled()
37+
# - Get results: executor.results() # This blocks until completion
38+
```
39+
40+
### Cancellable Test Set Generation
41+
42+
Similar approach for test set generation:
43+
44+
```py
45+
from ragas.testset.synthesizers.generate import TestsetGenerator
46+
47+
generator = TestsetGenerator(...)
48+
49+
# Get executor for cancellable generation
50+
executor = generator.generate_with_langchain_docs(
51+
documents=documents,
52+
testset_size=100,
53+
return_executor=True # Allow access to Executor to cancel
54+
)
55+
56+
# Use the same cancellation interface
57+
executor.cancel()
58+
```
59+
60+
## Production Patterns
61+
62+
### 1. Timeout Pattern
63+
64+
Automatically cancel operations that exceed a time limit:
65+
66+
```py
67+
import threading
68+
import time
69+
70+
def evaluate_with_timeout(dataset, metrics, timeout_seconds=300):
71+
"""Run evaluation with automatic timeout."""
72+
# Get cancellable executor
73+
executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
74+
75+
results = None
76+
exception = None
77+
78+
def run_evaluation():
79+
nonlocal results, exception
80+
try:
81+
results = executor.results()
82+
except Exception as e:
83+
exception = e
84+
85+
# Start evaluation in background thread
86+
thread = threading.Thread(target=run_evaluation)
87+
thread.start()
88+
89+
# Wait for completion or timeout
90+
thread.join(timeout=timeout_seconds)
91+
92+
if thread.is_alive():
93+
print(f"Evaluation exceeded {timeout_seconds}s timeout, cancelling...")
94+
executor.cancel()
95+
thread.join(timeout=10) # Custom timeout as per need
96+
return None, "timeout"
97+
98+
return results, exception
99+
100+
# Usage
101+
results, error = evaluate_with_timeout(dataset, metrics, timeout_seconds=600)
102+
if error == "timeout":
103+
print("Evaluation was cancelled due to timeout")
104+
else:
105+
print(f"Evaluation completed: {results}")
106+
```
107+
108+
### 2. Signal Handler Pattern (Ctrl+C)
109+
110+
Allow users to cancel with keyboard interrupt:
111+
112+
```py
113+
import signal
114+
import sys
115+
116+
def setup_cancellation_handler():
117+
"""Set up graceful cancellation on Ctrl+C."""
118+
executor = None
119+
120+
def signal_handler(signum, frame):
121+
if executor and not executor.is_cancelled():
122+
print("\nReceived interrupt signal, cancelling evaluation...")
123+
executor.cancel()
124+
print("Cancellation requested. Waiting for graceful shutdown...")
125+
sys.exit(0)
126+
127+
# Register signal handler
128+
signal.signal(signal.SIGINT, signal_handler)
129+
130+
return lambda exec: setattr(signal_handler, 'executor', exec)
131+
132+
# Usage
133+
set_executor = setup_cancellation_handler()
134+
135+
executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
136+
set_executor(executor)
137+
138+
print("Running evaluation... Press Ctrl+C to cancel gracefully")
139+
try:
140+
results = executor.results()
141+
print("Evaluation completed successfully")
142+
except KeyboardInterrupt:
143+
print("Evaluation was cancelled")
144+
```
145+
146+
### 3. Web Application Pattern
147+
148+
For web applications, cancel operations when requests are aborted:
149+
150+
```py
151+
from flask import Flask, request
152+
import threading
153+
import uuid
154+
155+
app = Flask(__name__)
156+
active_evaluations = {}
157+
158+
@app.route('/evaluate', methods=['POST'])
159+
def start_evaluation():
160+
# Create unique evaluation ID
161+
eval_id = str(uuid.uuid4())
162+
163+
# Get dataset and metrics from request
164+
dataset = get_dataset_from_request(request)
165+
metrics = get_metrics_from_request(request)
166+
167+
# Start cancellable evaluation
168+
executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
169+
active_evaluations[eval_id] = executor
170+
171+
# Start evaluation in background
172+
def run_eval():
173+
try:
174+
results = executor.results()
175+
# Store results somewhere
176+
store_results(eval_id, results)
177+
except Exception as e:
178+
store_error(eval_id, str(e))
179+
finally:
180+
active_evaluations.pop(eval_id, None)
181+
182+
threading.Thread(target=run_eval).start()
183+
184+
return {"evaluation_id": eval_id, "status": "started"}
185+
186+
@app.route('/evaluate/<eval_id>/cancel', methods=['POST'])
187+
def cancel_evaluation(eval_id):
188+
executor = active_evaluations.get(eval_id)
189+
if executor:
190+
executor.cancel()
191+
return {"status": "cancelled"}
192+
return {"error": "Evaluation not found"}, 404
193+
```
194+
195+
## Advanced Usage
196+
197+
### Checking Cancellation Status
198+
199+
```py
200+
executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
201+
202+
# Start in background
203+
def monitor_evaluation():
204+
while not executor.is_cancelled():
205+
print("Evaluation still running...")
206+
time.sleep(5)
207+
print("Evaluation was cancelled")
208+
209+
threading.Thread(target=monitor_evaluation).start()
210+
211+
# Cancel after some condition
212+
if some_condition():
213+
executor.cancel()
214+
```
215+
216+
### Partial Results
217+
218+
When cancellation occurs during execution, you may get partial results:
219+
220+
```py
221+
executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
222+
223+
try:
224+
results = executor.results()
225+
print(f"Completed {len(results)} evaluations")
226+
except Exception as e:
227+
if executor.is_cancelled():
228+
print("Evaluation was cancelled - may have partial results")
229+
else:
230+
print(f"Evaluation failed: {e}")
231+
```
232+
233+
### Custom Cancellation Logic
234+
235+
```py
236+
class EvaluationManager:
237+
def __init__(self):
238+
self.executors = []
239+
240+
def start_evaluation(self, dataset, metrics):
241+
executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
242+
self.executors.append(executor)
243+
return executor
244+
245+
def cancel_all(self):
246+
"""Cancel all running evaluations."""
247+
for executor in self.executors:
248+
if not executor.is_cancelled():
249+
executor.cancel()
250+
print(f"Cancelled {len(self.executors)} evaluations")
251+
252+
def cleanup_completed(self):
253+
"""Remove completed executors."""
254+
self.executors = [ex for ex in self.executors if not ex.is_cancelled()]
255+
256+
# Usage
257+
manager = EvaluationManager()
258+
259+
# Start multiple evaluations
260+
exec1 = manager.start_evaluation(dataset1, metrics)
261+
exec2 = manager.start_evaluation(dataset2, metrics)
262+
263+
# Cancel all if needed
264+
manager.cancel_all()
265+
```
266+
267+
## Best Practices
268+
269+
### 1. Always Use Timeouts in Production
270+
```py
271+
# Good: Always set reasonable timeouts
272+
results, error = evaluate_with_timeout(dataset, metrics, timeout_seconds=1800) # 30 minutes
273+
274+
# Avoid: Indefinite blocking
275+
results = executor.results() # Could block forever
276+
```
277+
278+
### 2. Handle Cancellation Gracefully
279+
```py
280+
try:
281+
results = executor.results()
282+
process_results(results)
283+
except Exception as e:
284+
if executor.is_cancelled():
285+
log_cancellation()
286+
cleanup_partial_work()
287+
else:
288+
log_error(e)
289+
handle_failure()
290+
```
291+
292+
### 3. Provide User Feedback
293+
```py
294+
def run_with_progress_and_cancellation(executor):
295+
print("Starting evaluation... Press Ctrl+C to cancel")
296+
297+
# Monitor progress in background
298+
def show_progress():
299+
while not executor.is_cancelled():
300+
# Show some progress indication
301+
print(".", end="", flush=True)
302+
time.sleep(1)
303+
304+
progress_thread = threading.Thread(target=show_progress)
305+
progress_thread.daemon = True
306+
progress_thread.start()
307+
308+
try:
309+
return executor.results()
310+
except KeyboardInterrupt:
311+
print("\nCancelling...")
312+
executor.cancel()
313+
return None
314+
```
315+
316+
### 4. Clean Up Resources
317+
```py
318+
def managed_evaluation(dataset, metrics):
319+
executor = None
320+
try:
321+
executor = evaluate(dataset=dataset, metrics=metrics, return_executor=True)
322+
return executor.results()
323+
except Exception as e:
324+
if executor:
325+
executor.cancel()
326+
raise
327+
finally:
328+
# Clean up any temporary resources
329+
cleanup_temp_files()
330+
```
331+
332+
## Limitations
333+
334+
- **Async Operations**: Cancellation works at the task level, not within individual LLM calls
335+
- **Partial State**: Cancelled operations may leave partial results or temporary files
336+
- **Timing**: Cancellation is cooperative - tasks need to check for cancellation periodically
337+
- **Dependencies**: Some external services may not respect cancellation immediately
338+
339+
## Troubleshooting
340+
341+
### Cancellation Not Working
342+
```py
343+
# Check if cancellation is set
344+
if executor.is_cancelled():
345+
print("Cancellation was requested")
346+
else:
347+
print("Cancellation not requested yet")
348+
349+
# Ensure you're calling cancel()
350+
executor.cancel()
351+
assert executor.is_cancelled()
352+
```
353+
354+
### Tasks Still Running After Cancellation
355+
```py
356+
# Give time for graceful shutdown
357+
executor.cancel()
358+
time.sleep(2) # Allow tasks to detect cancellation
359+
360+
# Force cleanup if needed
361+
import asyncio
362+
try:
363+
loop = asyncio.get_running_loop()
364+
for task in asyncio.all_tasks(loop):
365+
task.cancel()
366+
except RuntimeError:
367+
pass # No event loop running
368+
```
369+
370+
The cancellation feature provides robust control over long-running Ragas operations, enabling production-ready deployments with proper resource management and user experience.

docs/howtos/customizations/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ How to customize various aspects of Ragas to suit your needs.
66

77
- [Customize models](customize_models.md)
88
- [Customize timeouts, retries and others](./_run_config.md)
9+
- [Cancelling long-running tasks](cancellation.md)
910

1011
## Metrics
1112
- [Modify prompts in metrics](./metrics/_modifying-prompts-metrics.md)

mkdocs.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ nav:
9797
- Customise models: howtos/customizations/customize_models.md
9898
- Run Config: howtos/customizations/_run_config.md
9999
- Caching: howtos/customizations/_caching.md
100-
- Iterate and Improve Prompts: howtos/customizations/iterate_prompt.md
100+
- Cancelling Tasks: howtos/customizations/cancellation.md
101+
- Iterate Prompts: howtos/customizations/iterate_prompt.md
101102
- Metrics:
102103
- Modify Prompts: howtos/customizations/metrics/_modifying-prompts-metrics.md
103104
- Adapt Metrics to Languages: howtos/customizations/metrics/_metrics_language_adaptation.md

0 commit comments

Comments
 (0)