Skip to content

Commit f14eb80

Browse files
authored
Misc improvements (#174)
* Enhance parallel evaluation with robust processing and progress tracking - Implement parallel processing for evaluation pairs and question-answer pairs - Add ProgressTracker class to monitor and log evaluation progress - Introduce configurable timeouts and worker count via environment variables - Improve error handling and logging for parallel evaluation process - Refactor fast_eval function to support concurrent execution and detailed tracking * Fix 502 error * make deployed app faster * Adjust Gradio chatbot interface height for improved visual layout * Disable autoscroll in Gradio chatbot interface for better user control
1 parent 1057271 commit f14eb80

File tree

3 files changed

+276
-39
lines changed

3 files changed

+276
-39
lines changed

llm-agents/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ ZenML stores the outputs of all the steps in an artifact store that you configur
3737

3838
### Data Sources
3939

40-
We thought about the kinds of data sources we'd want to be queried when users entered their queries. We take a lot of pride and effort to maintain our documentation so that was an obvious one to include. We considered adding [the `README` files of our `examples`](https://github.com/zenml-io/zenml/tree/main/examples) since that's often a starting point for our users, and we thought [our release notes](https://github.com/zenml-io/zenml/blob/main/RELEASE_NOTES.md) would also be useful to be part of the context.
40+
We thought about the kinds of data sources we'd want to be queried when users entered their queries. We take a lot of pride and effort to maintain our documentation so that was an obvious one to include. We considered adding [the `README` files of our `examples`](https://github.com/zenml-io/zenml/tree/main/examples) since that's often a starting point for our users, and we thought [our release notes](https://github.com/zenml-io/zenml/releases) would also be useful to be part of the context.
4141

4242
Getting all of this together was not too hard, and LangChain has functions that help with obtaining Slack messages and documentation from Gitbook. In the end we wanted more flexibility than was available from the pre-built document loaders, so we just used the generic loader for web content along with a custom scraper to get a list of all URLs available for our docs and examples READMEs.
4343

llm-complete-guide/deployment_hf.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,9 @@ def predict(message, history):
124124
try:
125125
return process_input_with_retrieval(
126126
input=message,
127-
n_items_retrieved=20,
128-
use_reranking=True,
127+
n_items_retrieved=7,
128+
use_reranking=False,
129+
model="gpt-4o-mini",
129130
prompt=prompt,
130131
tracing_tags=["gradio", "web-interface", APP_ENVIRONMENT],
131132
)
@@ -137,6 +138,8 @@ def predict(message, history):
137138
with gr.Blocks() as interface:
138139
custom_chatbot = gr.Chatbot(
139140
type="messages",
141+
height=600,
142+
autoscroll=False,
140143
)
141144

142145
gr.ChatInterface(

llm-complete-guide/steps/eval_langfuse.py

Lines changed: 270 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
import concurrent.futures
12
import io
23
import json
34
import logging
5+
import os
46
import sys
7+
import time
58
import traceback
69
from collections import defaultdict
710
from dataclasses import dataclass
@@ -246,59 +249,290 @@ def get_langfuse_scores(
246249
return qa_pairs, good_response, bad_response
247250

248251

252+
def process_question_answer_pair(
253+
question: str,
254+
old_response: str,
255+
eval_criteria: str,
256+
sample_good_response: str,
257+
sample_bad_response: str,
258+
langfuse_score_identifier: str,
259+
prompt: str,
260+
logger: logging.Logger,
261+
) -> Optional[Dict[str, Any]]:
262+
"""Process a single question-answer pair for evaluation.
263+
264+
Args:
265+
question: The question to evaluate
266+
old_response: The old response to compare against
267+
eval_criteria: The criteria for evaluation
268+
sample_good_response: An example of a good response
269+
sample_bad_response: An example of a bad response
270+
langfuse_score_identifier: The identifier for the Langfuse score
271+
prompt: The prompt to use for generating the new response
272+
logger: The logger to use for logging
273+
274+
Returns:
275+
Optional dictionary containing the evaluation result
276+
"""
277+
try:
278+
# Generate new response using current implementation
279+
new_response = process_input_with_retrieval(
280+
input=question,
281+
prompt=prompt,
282+
tracing_tags=["evaluation"],
283+
)
284+
285+
eval_input = EvaluationInput(
286+
question=question,
287+
llm_response=f"OLD RESPONSE:\n{old_response}\n\nNEW RESPONSE:\n{new_response}",
288+
eval_criteria=eval_criteria,
289+
sample_good_response=sample_good_response,
290+
sample_bad_response=sample_bad_response,
291+
)
292+
293+
if result := evaluate_single_response(eval_input, logger):
294+
result.update(
295+
{
296+
"experiment_name": langfuse_score_identifier,
297+
"old_response": old_response,
298+
"new_response": new_response,
299+
}
300+
)
301+
return result
302+
return None
303+
except Exception as e:
304+
logger.error(f"Error processing question-answer pair: {e}")
305+
return None
306+
307+
308+
# Default to using 4 workers for parallel processing, but allow overriding via environment variable
309+
DEFAULT_MAX_WORKERS = 5
310+
MAX_WORKERS = int(os.environ.get("EVAL_MAX_WORKERS", DEFAULT_MAX_WORKERS))
311+
# Default to using 2 workers for question-answer pairs, but allow overriding via environment variable
312+
QA_MAX_WORKERS = int(os.environ.get("EVAL_QA_MAX_WORKERS", 4))
313+
# Default timeouts (in seconds)
314+
EVAL_PAIR_TIMEOUT = int(os.environ.get("EVAL_PAIR_TIMEOUT", 600)) # 10 minutes
315+
QA_PAIR_TIMEOUT = int(os.environ.get("QA_PAIR_TIMEOUT", 300)) # 5 minutes
316+
317+
318+
class ProgressTracker:
319+
"""A simple class to track progress of parallel evaluations."""
320+
321+
def __init__(self, total_pairs: int, total_qa_pairs: Dict[str, int]):
322+
self.total_pairs = total_pairs
323+
self.total_qa_pairs = total_qa_pairs
324+
self.completed_pairs = 0
325+
self.completed_qa_pairs = defaultdict(int)
326+
self.start_time = time.time()
327+
self.pair_start_times = {}
328+
329+
def start_pair(self, pair_id: str):
330+
"""Mark the start of processing an evaluation pair."""
331+
self.pair_start_times[pair_id] = time.time()
332+
333+
def complete_pair(self, pair_id: str, results_count: int):
334+
"""Mark the completion of processing an evaluation pair."""
335+
self.completed_pairs += 1
336+
duration = time.time() - self.pair_start_times.get(
337+
pair_id, self.start_time
338+
)
339+
return (
340+
f"Completed evaluation pair '{pair_id}' with {results_count} results "
341+
f"({self.completed_pairs}/{self.total_pairs}, {duration:.1f}s)"
342+
)
343+
344+
def complete_qa(self, pair_id: str):
345+
"""Mark the completion of processing a question-answer pair."""
346+
self.completed_qa_pairs[pair_id] += 1
347+
total = self.total_qa_pairs.get(pair_id, 0)
348+
completed = self.completed_qa_pairs[pair_id]
349+
return f"Progress for '{pair_id}': {completed}/{total} ({completed / total * 100:.1f}%)"
350+
351+
def get_overall_progress(self) -> str:
352+
"""Get the overall progress of the evaluation."""
353+
elapsed = time.time() - self.start_time
354+
if self.completed_pairs == 0:
355+
eta = "unknown"
356+
else:
357+
avg_time_per_pair = elapsed / self.completed_pairs
358+
remaining_pairs = self.total_pairs - self.completed_pairs
359+
eta_seconds = avg_time_per_pair * remaining_pairs
360+
eta = f"{eta_seconds:.1f}s"
361+
362+
return (
363+
f"Overall progress: {self.completed_pairs}/{self.total_pairs} pairs "
364+
f"({self.completed_pairs / self.total_pairs * 100:.1f}%), "
365+
f"elapsed: {elapsed:.1f}s, ETA: {eta}"
366+
)
367+
368+
369+
def process_evaluation_pair(
370+
pair: Dict[str, str],
371+
prompt: str,
372+
logger: logging.Logger,
373+
progress_tracker: Optional[ProgressTracker] = None,
374+
) -> List[Dict[str, Any]]:
375+
"""Process a single evaluation pair.
376+
377+
Args:
378+
pair: The evaluation pair to process
379+
prompt: The prompt to use for generating the new response
380+
logger: The logger to use for logging
381+
progress_tracker: Optional progress tracker
382+
383+
Returns:
384+
List of evaluation results
385+
"""
386+
results = []
387+
langfuse_score_identifier = pair["langfuse_score_identifier"]
388+
eval_criteria = pair["eval_criteria"]
389+
390+
try:
391+
langfuse_score_data, sample_good_response, sample_bad_response = (
392+
get_langfuse_scores(langfuse_score_identifier)
393+
)
394+
395+
if progress_tracker:
396+
progress_tracker.start_pair(langfuse_score_identifier)
397+
398+
logger.info(
399+
f"Processing {len(langfuse_score_data)} question-answer pairs for '{langfuse_score_identifier}' with {QA_MAX_WORKERS} workers"
400+
)
401+
402+
# Process each question-answer pair in parallel
403+
with concurrent.futures.ThreadPoolExecutor(
404+
max_workers=QA_MAX_WORKERS
405+
) as executor:
406+
futures = {}
407+
for data in langfuse_score_data:
408+
question = data["question"]
409+
old_response = data["old_response"]
410+
411+
future = executor.submit(
412+
process_question_answer_pair,
413+
question,
414+
old_response,
415+
eval_criteria,
416+
sample_good_response,
417+
sample_bad_response,
418+
langfuse_score_identifier,
419+
prompt,
420+
logger,
421+
)
422+
futures[future] = question
423+
424+
# Collect results as they complete
425+
for future in concurrent.futures.as_completed(futures):
426+
question = futures[future]
427+
try:
428+
# Add timeout to prevent hanging on a single evaluation
429+
result = future.result(timeout=QA_PAIR_TIMEOUT)
430+
if result:
431+
logger.info(
432+
f"Completed evaluation for question: '{question[:50]}...'"
433+
)
434+
results.append(result)
435+
else:
436+
logger.warning(
437+
f"No result for question: '{question[:50]}...'"
438+
)
439+
except concurrent.futures.TimeoutError:
440+
logger.error(
441+
f"Timeout processing question '{question[:50]}...' after {QA_PAIR_TIMEOUT} seconds"
442+
)
443+
except Exception as e:
444+
logger.error(
445+
f"Error processing question '{question[:50]}...': {e}"
446+
)
447+
448+
# Update progress
449+
if progress_tracker:
450+
progress_msg = progress_tracker.complete_qa(
451+
langfuse_score_identifier
452+
)
453+
logger.info(progress_msg)
454+
except Exception as e:
455+
logger.error(
456+
f"Error processing evaluation pair '{langfuse_score_identifier}': {e}"
457+
)
458+
459+
logger.info(
460+
f"Completed processing '{langfuse_score_identifier}' with {len(results)} results"
461+
)
462+
return results
463+
464+
249465
@step(enable_cache=False)
250466
def fast_eval(prompt: str) -> List[Dict[str, Any]]:
251-
"""Evaluate LLM responses by comparing old vs new responses.
467+
"""Evaluate LLM responses by comparing old vs new responses in parallel.
468+
469+
Args:
470+
prompt: The prompt to use for generating the new response
252471
253472
Returns:
254473
List of evaluation results comparing old and new responses
255474
"""
256475
logger = logging.getLogger(__name__)
257-
results: List[Dict[str, Any]] = []
476+
all_results = []
258477

478+
# Initialize progress tracking
479+
total_qa_pairs = {}
259480
for pair in EVALUATION_DATA_PAIRS:
260-
langfuse_score_identifier = pair["langfuse_score_identifier"]
261-
eval_criteria = pair["eval_criteria"]
481+
try:
482+
langfuse_score_data, _, _ = get_langfuse_scores(
483+
pair["langfuse_score_identifier"]
484+
)
485+
total_qa_pairs[pair["langfuse_score_identifier"]] = len(
486+
langfuse_score_data
487+
)
488+
except Exception:
489+
total_qa_pairs[pair["langfuse_score_identifier"]] = 0
262490

263-
langfuse_score_data, sample_good_response, sample_bad_response = (
264-
get_langfuse_scores(langfuse_score_identifier)
265-
)
491+
progress_tracker = ProgressTracker(
492+
len(EVALUATION_DATA_PAIRS), total_qa_pairs
493+
)
266494

267-
for data in langfuse_score_data:
268-
question = data["question"]
269-
old_response = data["old_response"]
495+
logger.info(
496+
f"Starting parallel evaluation with {MAX_WORKERS} workers for {len(EVALUATION_DATA_PAIRS)} evaluation pairs"
497+
)
270498

271-
# Generate new response using current implementation
499+
# Process each evaluation pair in parallel
500+
with concurrent.futures.ThreadPoolExecutor(
501+
max_workers=MAX_WORKERS
502+
) as executor:
503+
futures = {}
504+
for pair in EVALUATION_DATA_PAIRS:
505+
future = executor.submit(
506+
process_evaluation_pair, pair, prompt, logger, progress_tracker
507+
)
508+
futures[future] = pair["langfuse_score_identifier"]
509+
510+
# Collect results as they complete
511+
for future in concurrent.futures.as_completed(futures):
512+
identifier = futures[future]
272513
try:
273-
new_response = process_input_with_retrieval(
274-
input=question,
275-
prompt=prompt,
276-
tracing_tags=["evaluation"],
514+
# Add timeout to prevent hanging on a single evaluation pair
515+
results = future.result(timeout=EVAL_PAIR_TIMEOUT)
516+
progress_msg = progress_tracker.complete_pair(
517+
identifier, len(results)
518+
)
519+
logger.info(progress_msg)
520+
logger.info(progress_tracker.get_overall_progress())
521+
all_results.extend(results)
522+
except concurrent.futures.TimeoutError:
523+
logger.error(
524+
f"Timeout processing evaluation pair '{identifier}' after {EVAL_PAIR_TIMEOUT} seconds"
277525
)
278526
except Exception as e:
279-
logger.error(f"Error generating new response: {e}")
280-
continue
281-
282-
eval_input = EvaluationInput(
283-
question=question,
284-
llm_response=f"OLD RESPONSE:\n{old_response}\n\nNEW RESPONSE:\n{new_response}",
285-
eval_criteria=eval_criteria,
286-
sample_good_response=sample_good_response,
287-
sample_bad_response=sample_bad_response,
288-
)
289-
290-
if result := evaluate_single_response(eval_input, logger):
291-
result.update(
292-
{
293-
"experiment_name": langfuse_score_identifier,
294-
"old_response": old_response,
295-
"new_response": new_response,
296-
}
527+
logger.error(
528+
f"Error processing evaluation pair '{identifier}': {e}"
297529
)
298-
results.append(result)
299530

300-
logger.info("All evaluations completed with %d results", len(results))
301-
return results
531+
total_time = time.time() - progress_tracker.start_time
532+
logger.info(
533+
f"All evaluations completed with {len(all_results)} total results in {total_time:.1f} seconds"
534+
)
535+
return all_results
302536

303537

304538
@step

0 commit comments

Comments
 (0)