diff --git a/README.md b/README.md index ed03110bb7..453f30c366 100644 --- a/README.md +++ b/README.md @@ -262,7 +262,73 @@ You will notice that the data is ingested into the `data/cache` directory and st These datasets are also stored as wandb artifacts in the project defined in the environment variable `WANDB_PROJECT` and can be accessed from the [wandb dashboard](https://wandb.ai/wandb/wandbot-dev). -#### Ingestion pipeline debugging +### Evaluating a file with Precomputed Answers + +Instead of hitting the wandbot endpoint, you can also pass a `.json` file of precomputed answers for evaluation by the `WandbotCorrectnessScorer`. To do so, pass a filepath to the `precomputed_answers_json_path` parameter in the `EvalConfig`), it should be a JSON file containing a list of objects. Each object should have a question and a precomputed answer. + +The evaluation system will try to match questions from your evaluation dataset (defined by `eval_dataset` in `EvalConfig`) to the `question` field in these objects using exact string matching (after stripping leading/trailing whitespace). + +Each object in the JSON list should have the following structure: + +**Required Fields:** + +* `question` (string): The question text. This is used to match against questions in the evaluation dataset. +* `generated_answer` (string): The precomputed answer text. This will be used as `EvalChatResponse.answer`. + +**Fields for Contextual Scoring:** +To enable context-based scoring (e.g., by `WandbotCorrectnessScorer`), you can provide the context information through one of the following fields in each JSON object, although its not essential: + +* `retrieved_contexts` (List of Dicts): A list of context documents. Each dictionary in the list should represent a document and ideally have `"source"` (string, URL) and `"content"` (string, text of the document) keys. Minimally, a `"content"` key is needed for the scorer. + *Example*: `[{"source": "http://example.com/doc1", "content": "Context snippet 1."}, {"content": "Context snippet 2."}]` +* **OR** `source_documents` (string): A raw string representation of source documents that can be parsed by the system (specifically, by the `parse_text_to_json` function in `eval.py`). This string usually contains multiple documents, each prefixed by something like "source: http://...". + +If only `source_documents` (string) is provided and `retrieved_contexts` (list) is not, the system will attempt to parse `source_documents` to populate the `retrieved_contexts` field for the `EvalChatResponse`. If neither is provided, context-based scoring for that precomputed answer will operate with empty context. + +**Optional Fields (to fully populate `EvalChatResponse` and mimic live API calls):** + +* `system_prompt` (string): The system prompt used. +* `sources` (string): A string listing sources (can be similar to `source_documents` or a different format). +* `model` (string): The name of the model that generated the answer (e.g., "precomputed_gpt-4"). +* `total_tokens` (int): Total tokens used. +* `prompt_tokens` (int): Prompt tokens used. +* `completion_tokens` (int): Completion tokens used. +* `time_taken` (float): Time taken for the call. +* `api_call_statuses` (dict): Dictionary of API call statuses. +* `start_time` (string): ISO 8601 formatted start time of the call (e.g., `"2023-10-27T10:00:00Z"`). +* `end_time` (string): ISO 8601 formatted end time of the call. +* `has_error` (boolean): Set to `true` if this precomputed item represents an error response. +* `error_message` (string): The error message if `has_error` is `true`. + +**Example Object:** +```json +{ + "question": "What is Weights & Biases?", + "generated_answer": "Weights & Biases is an MLOps platform.", + "retrieved_contexts": [ + {"source": "http://example.com/docA", "content": "Content of document A talking about W&B."}, + {"content": "Another piece of context."} + ], + "model": "precomputed_from_file", + "system_prompt": "You are a helpful assistant.", + "total_tokens": 50, + "has_error": false +} +``` +Or using `source_documents`: +```json +{ + "question": "What is Weights & Biases?", + "generated_answer": "Weights & Biases is an MLOps platform.", + "source_documents": "source: http://example.com/docA\\nContent of document A talking about W&B.\\nsource: http://example.com/docB\\nContent of document B.", + "model": "precomputed_from_file", + "system_prompt": "You are a helpful assistant.", + "total_tokens": 50, + "has_error": false +} +``` + + +### Ingestion pipeline debugging To help with debugging, you can use the `steps` and `include_sources` flags to specify only sub-components of the pipeline and only certain documents sources to run. For example if you wanted to stop the pipeline before it creates the vector db and creates the artifacts and W&B report AND you only wanted to process the Weave documentation, you would do the following: @@ -291,4 +357,4 @@ B. If you don't compute a diff or want a simple way to do this [] **Deployment:** Clone the repo to a prod environment. Deploy updated version. Test via cli and slackbot that the endpoint is working and the correctg response is received. [] [] **GitHub:** Update evaluation table at top of README with latest eval score, weave Eval link and data ingestion Report link [] **GitHub:** Update git tag -[] **GitHub:** Create gthub release +[] **GitHub:** Create gthub release \ No newline at end of file diff --git a/src/wandbot/configs/chat_config.py b/src/wandbot/configs/chat_config.py index 3059100e12..b8763d0cea 100644 --- a/src/wandbot/configs/chat_config.py +++ b/src/wandbot/configs/chat_config.py @@ -44,10 +44,10 @@ class ChatConfig(BaseSettings): # Response synthesis model settings response_synthesizer_provider: str = "anthropic" - response_synthesizer_model: str = "claude-3-7-sonnet-20250219" + response_synthesizer_model: str = "claude-4-opus-20250514" # "claude-4-sonnet-20250514" #"claude-3-7-sonnet-20250219" response_synthesizer_temperature: float = 0.1 response_synthesizer_fallback_provider: str = "anthropic" - response_synthesizer_fallback_model: str = "claude-3-7-sonnet-20250219" + response_synthesizer_fallback_model: str = "claude-4-opus-20250514" # "claude-4-sonnet-20250514" # "claude-3-7-sonnet-20250219" response_synthesizer_fallback_temperature: float = 0.1 # Translation models settings diff --git a/src/wandbot/evaluation/data_utils.py b/src/wandbot/evaluation/data_utils.py new file mode 100644 index 0000000000..bfdba9a37e --- /dev/null +++ b/src/wandbot/evaluation/data_utils.py @@ -0,0 +1,139 @@ +import json +import logging +from typing import Dict, List, Optional, Any + +import weave + +# From eval.py +def sanitize_precomputed_item_recursive(item: Any) -> Any: + """Recursively sanitize an item by converting None values to empty strings.""" + if isinstance(item, dict): + return {k: sanitize_precomputed_item_recursive(v) for k, v in item.items()} + elif isinstance(item, list): + return [sanitize_precomputed_item_recursive(elem) for elem in item] + elif item is None: + return "" + return item + + +def load_and_prepare_precomputed_data( + file_path: Optional[str], logger: logging.Logger +) -> Optional[Dict[str, Dict]]: + """Loads, sanitizes, and prepares precomputed answers from a JSON file into a map.""" + if not file_path: + return None + + logger.info(f"Loading precomputed answers from: {file_path}") + try: + with open(file_path, "r") as f: + loaded_answers_raw = json.load(f) + + if not isinstance(loaded_answers_raw, list): + raise ValueError("Precomputed answers JSON must be a list of items.") + + loaded_answers_sanitized = [] + for raw_item in loaded_answers_raw: + if not isinstance(raw_item, dict): + raise ValueError(f"Skipping non-dictionary item in precomputed answers: {raw_item}") + sanitized_item = sanitize_precomputed_item_recursive(raw_item) + loaded_answers_sanitized.append(sanitized_item) + logger.debug(f"Sanitized precomputed item: {sanitized_item}") + + precomputed_answers_map = {} + for i, item in enumerate(loaded_answers_sanitized): + if not isinstance(item, dict): + raise ValueError( + f"Item at original index {i} in precomputed answers (post-sanitization) is not a dictionary." + ) + + item_index_str: str + raw_item_index = item.get("index") + + if raw_item_index is None or str(raw_item_index).strip() == "": + logger.warning( + f"Item (original index {i}) is missing 'index' or index is empty after sanitization. " + f"Content: {str(item.get('question', 'N/A'))[:50]+'...'}. Using list index {i} as fallback string key." + ) + item_index_str = str(i) + else: + item_index_str = str(raw_item_index).strip() + if not item_index_str: + logger.warning( + f"Item (original index {i}) had whitespace-only 'index' after sanitization. " + f"Content: {str(item.get('question', 'N/A'))[:50]+'...'}. Using list index {i} as fallback string key." + ) + item_index_str = str(i) + + if item_index_str in precomputed_answers_map: + logger.warning( + f"Duplicate string index '{item_index_str}' found in precomputed answers. " + f"Overwriting with item from original list at index {i}." + ) + precomputed_answers_map[item_index_str] = item + + logger.info( + f"Loaded {len(precomputed_answers_map)} precomputed answers into map from {len(loaded_answers_sanitized)} sanitized items." + ) + return precomputed_answers_map + + except FileNotFoundError: + logger.error(f"Precomputed answers JSON file not found: {file_path}") + raise + except ValueError as e: + logger.error(f"Invalid format in precomputed answers JSON: {e}") + raise + except Exception as e: + logger.error(f"Failed to load or parse precomputed answers JSON: {e}") + raise + + +def load_and_prepare_dataset_rows( + dataset_ref_uri: str, is_debug: bool, n_debug_samples: int, logger: logging.Logger +) -> List[Dict]: + """Loads dataset rows from a Weave reference, applies debug sampling, and prepares them for evaluation.""" + dataset_ref = weave.ref(dataset_ref_uri).get() + question_rows = dataset_ref.rows + + if is_debug: + question_rows = question_rows[:n_debug_samples] + + question_rows_for_eval = [] + for i, row in enumerate(question_rows): + if not isinstance(row, dict): + logger.warning(f"Dataset item at original index {i} is not a dictionary, skipping: {row}") + continue + + dataset_row_index_str: str + raw_dataset_index = row.get("index") + if raw_dataset_index is None or str(raw_dataset_index).strip() == "": + logger.warning( + f"Dataset item (original list index {i}, question: {str(row.get('question', 'N/A'))[:50] + '...'}) " + f"is missing 'index' or index is empty. Using list index {i} as fallback string key." + ) + dataset_row_index_str = str(i) + else: + dataset_row_index_str = str(raw_dataset_index).strip() + + question = row.get("question") + ground_truth = row.get("answer") + notes = row.get("notes") + + if question is None: + logger.warning(f"Dataset item at index {dataset_row_index_str} is missing 'question'. Using empty string.") + question = "" + if ground_truth is None: + logger.warning(f"Dataset item at index {dataset_row_index_str} is missing 'answer'. Using empty string.") + ground_truth = "" + if notes is None: + logger.warning(f"Dataset item at index {dataset_row_index_str} is missing 'notes'. Using empty string.") + notes = "" + + question_rows_for_eval.append( + { + "index": dataset_row_index_str, + "question": str(question), + "ground_truth": str(ground_truth), + "notes": str(notes), + } + ) + return question_rows_for_eval \ No newline at end of file diff --git a/src/wandbot/evaluation/eval.py b/src/wandbot/evaluation/eval.py index a095cac851..4dd93e4d77 100644 --- a/src/wandbot/evaluation/eval.py +++ b/src/wandbot/evaluation/eval.py @@ -1,252 +1,39 @@ import asyncio import json -import logging import os -import re +from datetime import datetime from pathlib import Path -import httpx -import requests +import requests # Keep for get_wandbot_configs import weave from dotenv import load_dotenv -from tenacity import after_log, retry, retry_if_exception_type, stop_after_attempt, wait_exponential from weave import Evaluation +# Import new modules +from wandbot.evaluation import data_utils, weave_model, weave_scorer from wandbot.evaluation.eval_config import EvalConfig, get_eval_config -from wandbot.evaluation.eval_metrics.correctness import CorrectnessEvaluationResult, WandBotCorrectnessEvaluator + +# EvalChatResponse is now primarily used by weave_model.py, ensure it's correctly located/imported there +# from wandbot.evaluation.eval_schemas import EvalChatResponse from wandbot.utils import get_logger # Load environment variables from .env in project root -ENV_PATH = Path(__file__).parent.parent.parent.parent / '.env' +ENV_PATH = Path(__file__).parent.parent.parent.parent / ".env" load_dotenv(ENV_PATH, override=True) logger = get_logger(__name__) -def get_wandbot_configs(config: EvalConfig = None): + +def get_wandbot_configs(config: EvalConfig) -> dict: # config is now required """Get wandbot's configs and repo git info""" - url = config.wandbot_url if config else "http://0.0.0.0:8000" + url = config.wandbot_url # No default, use from config try: response = requests.get(f"{url}/configs") response.raise_for_status() return response.json() except requests.RequestException as e: logger.error(f"Error making request to wandbot configs: {e}") - return { - "chat_config": {}, - "vector_store_config": {}, - "git_info": {}, - "app_config": {} - } - -@retry( - stop=stop_after_attempt(5), - wait=wait_exponential(multiplier=1, min=10, max=300), - retry=retry_if_exception_type(httpx.HTTPError), - before_sleep=lambda retry_state: logger.warning( - f"Attempt {retry_state.attempt_number} failed. Retrying in {retry_state.next_action.sleep} seconds..." - ), - after=after_log(logger, logging.ERROR) -) -async def make_request(url: str, question: str, application: str = "api-eval", language: str = "en") -> dict: - """Make HTTP request to wandbot API with retry logic.""" - request_timeout = 120.0 - request_connect_timeout = 30.0 - async with httpx.AsyncClient(timeout=httpx.Timeout(timeout=request_timeout, connect=request_connect_timeout)) as client: - try: - response = await client.post( - f"{url}/chat/query", - json={"question": question, "application": application, "language": language} - ) - response.raise_for_status() - return response.json() - except httpx.ReadTimeout: - logger.error(f"Request timed out after {request_timeout} seconds") - raise - except httpx.ConnectTimeout: - logger.error(f"Connection timed out after {request_connect_timeout} seconds") - raise - -async def get_answer(question: str, wandbot_url: str, application: str = "api-eval", language: str = "en") -> str: - """Get answer from wandbot API.""" - try: - result = await make_request(wandbot_url, question, application, language) - return json.dumps(result) - except Exception as e: - logger.error(f"Failed to get answer: {str(e)}") - return json.dumps({ - "error": str(e), - "answer": "", - "system_prompt": "", - "source_documents": "", - "model": "", - "total_tokens": 0, - "prompt_tokens": 0, - "completion_tokens": 0, - "time_taken": 0 - }) - -def parse_text_to_json(text): - # Split the text into documents - documents = re.split(r"source: https?://", text)[1:] - result = [] - for doc in documents: - source_url = "https://" + doc.split("\n")[0].strip() - content = "\n".join(doc.split("\n")[1:]).strip() - document = {"source": source_url, "content": content} - result.append(document) - return result - - -@weave.op -async def get_record(question: str, wandbot_url: str, application: str = "api-eval", language: str = "en") -> dict: - try: - response = await get_answer(question, wandbot_url=wandbot_url, application=application, language=language) - response_dict = json.loads(response) - - if not response_dict: - try: - error_data = json.loads(response) - error_msg = error_data.get("error", "Unknown API error") - except json.JSONDecodeError: - error_msg = response if response else "Empty response from API" - - logger.error(error_msg) - return { - "system_prompt": "", - "generated_answer": "", - "response_synthesis_llm_messages": [], - "retrieved_contexts": [], - "model": "", - "total_tokens": 0, - "prompt_tokens": 0, - "completion_tokens": 0, - "time_taken": 0, - "has_error": True, - "api_call_statuses": {}, - "error_message": error_msg - } - - return { - "system_prompt": response_dict.get("system_prompt", ""), - "generated_answer": response_dict.get("answer", ""), - "response_synthesis_llm_messages": response_dict.get("response_synthesis_llm_messages", []), - "retrieved_contexts": parse_text_to_json( - response_dict.get("source_documents", "") - ), - "model": response_dict.get("model", ""), - "total_tokens": response_dict.get("total_tokens", 0), - "prompt_tokens": response_dict.get("prompt_tokens", 0), - "completion_tokens": response_dict.get("completion_tokens", 0), - "time_taken": response_dict.get("time_taken", 0), - "api_call_statuses": response_dict.get("api_call_statuses", {}), - "has_error": False, - "error_message": None - } - except Exception as e: - error_msg = f"Error getting response from wandbotAPI: {str(e)}" - logger.error(error_msg) - return { - "system_prompt": "", - "generated_answer": "", - "response_synthesis_llm_messages": [], - "retrieved_contexts": [], - "model": "", - "total_tokens": 0, - "prompt_tokens": 0, - "completion_tokens": 0, - "time_taken": 0, - "has_error": True, - "api_call_statuses": {}, - "error_message": error_msg - } - - -class WandbotModel(weave.Model): - language: str = "en" - application: str = "api-eval" - wandbot_url: str = "http://0.0.0.0:8000" - wandbot_config: dict = {} - - @weave.op - async def predict(self, question: str) -> dict: - prediction = await get_record(question, - wandbot_url=self.wandbot_url, - application=self.application, - language=self.language) - return prediction - - -class WandbotCorrectnessScorer(weave.Scorer): - config: EvalConfig - correctness_evaluator: WandBotCorrectnessEvaluator = None - debug: bool = False - - def __init__(self, config: EvalConfig): - super().__init__(config=config) - self.debug = config.debug - self.correctness_evaluator = WandBotCorrectnessEvaluator( - provider=config.eval_judge_provider, - model_name=config.eval_judge_model, - temperature=config.eval_judge_temperature, - max_retries=config.max_evaluator_retries, - timeout=config.evaluator_timeout, - ) - - @weave.op - async def score(self, question: str, ground_truth: str, notes: str, model_output: dict) -> dict: - - if self.debug: - if model_output is not None: - logger.debug(f"In WandbotCorrectnessScorer, model_output keys:\n{model_output.keys()}") - else: - logger.error("model_output is None") - - try: - contexts = [c["content"] for c in model_output.get("retrieved_contexts", [])] if model_output.get("retrieved_contexts") else [] - - if model_output.get("generated_answer", "") == "": - error_msg = "Generated answer is empty" - logger.error(error_msg) - return { - "answer_correct": False, - "reasoning": error_msg, - "score": 1.0, - "has_error": True, - "error_message": error_msg - } - if model_output.get("has_error", False): - error_msg = model_output.get("error_message", "Unknown error") - logger.error(error_msg) - return { - "answer_correct": False, - "reasoning": error_msg, - "score": 1.0, - "has_error": True, - "error_message": error_msg - } - - # If not error from wandbot generation, run the correctness evaluator - return await self.correctness_evaluator.aevaluate( - query=question, - response=model_output.get("generated_answer", ""), - reference=ground_truth, - contexts=contexts, - reference_notes=notes, - ) - - except Exception as e: - error_msg = f"Error evaluating answer: {str(e)}" - logger.error(error_msg) - return CorrectnessEvaluationResult( - query=question, - response=model_output.get("generated_answer", ""), - contexts=contexts, - passing=False, - score=1.0, - reasoning=error_msg, - has_error=True, - error_message=error_msg - ) + return {"chat_config": {}, "vector_store_config": {}, "git_info": {}, "app_config": {}} def main(): @@ -257,73 +44,97 @@ def main(): # Initialize weave with config weave.init(f"{config.wandb_entity}/{config.wandb_project}") - wandbot_info = get_wandbot_configs(config) - if wandbot_info: + # Use data_utils for loading precomputed data + precomputed_answers_map = data_utils.load_and_prepare_precomputed_data(config.precomputed_answers_json_path, logger) + + wandbot_info = get_wandbot_configs(config) # Pass config directly + if wandbot_info: logger.info(f"WandBot configs and git info:\n{wandbot_info}\n") else: logger.warning("Failed to get WandBot configs") os.environ["WEAVE_PARALLELISM"] = str(config.n_weave_parallelism) - dataset_ref = weave.ref(config.eval_dataset).get() - question_rows = dataset_ref.rows + # Use data_utils for loading and preparing dataset rows + question_rows_for_eval = data_utils.load_and_prepare_dataset_rows( + config.eval_dataset, config.debug, config.n_debug_samples, logger + ) - if config.debug: - question_rows = question_rows[:config.n_debug_samples] + if config.debug: # This logic is now inside load_and_prepare_dataset_rows, but experiment/eval name changes remain config.evaluation_name = f"{config.evaluation_name}_debug" config.experiment_name = f"{config.experiment_name}_debug" - question_rows = [ - { - "question": row["question"], - "ground_truth": row["answer"], - "notes": row["notes"], - } - for row in question_rows - ] + # Initialize Wandbot Model from weave_model.py + wandbot_model_instance = weave_model.WandbotModel( # Renamed variable to avoid conflict with module name + language=config.lang, + application=config.experiment_name, + wandbot_url=config.wandbot_url, + wandbot_config=wandbot_info, + precomputed_data_map=precomputed_answers_map, + ) - # Initialize Wandbot Model - wandbot = WandbotModel(language=config.lang, - application=config.experiment_name, - wandbot_url=config.wandbot_url, - wandbot_config=wandbot_info - ) - - # Initialize Correctness scorer - correctness_scorer = WandbotCorrectnessScorer(config=config) + # Initialize Correctness scorer from weave_scorer.py + correctness_scorer_instance = weave_scorer.WandbotCorrectnessScorer(config=config) # Renamed variable wandbot_evaluator = Evaluation( name=config.evaluation_name, - dataset=question_rows, - scorers=[correctness_scorer], - trials=config.n_trials + dataset=question_rows_for_eval, + scorers=[correctness_scorer_instance], # Use instance + trials=config.n_trials, ) - eval_config = { - "evaluation_strategy_name": config.experiment_name, - "n_samples": len(question_rows), - "n_trials": config.n_trials, - "language": config.lang, - "is_debug": config.debug, - "eval_judge_model": config.eval_judge_model, - "eval_judge_temperature": config.eval_judge_temperature, + eval_config_summary = { + "evaluation_strategy_name": config.experiment_name, + "n_samples": len(question_rows_for_eval), + "n_trials": config.n_trials, + "language": config.lang, + "is_debug": config.debug, + "eval_judge_model": config.eval_judge_model, + "eval_judge_temperature": config.eval_judge_temperature, } eval_attributes = { - "eval_config": eval_config, - "wandbot_chat_config": wandbot_info.get("chat_config", {}), - "wandbot_vectore_store_config": wandbot_info.get("vector_store_config", {}), - "wandbot_git_info": wandbot_info.get("git_info", {}), - "wandbot_app_config": wandbot_info.get("app_config", {}) - } + "eval_config": eval_config_summary, + "wandbot_chat_config": wandbot_info.get("chat_config", {}) if not config.precomputed_answers_json_path else {}, + "wandbot_vectore_store_config": wandbot_info.get("vector_store_config", {}) + if not config.precomputed_answers_json_path + else {}, + "wandbot_git_info": wandbot_info.get("git_info", {}) if not config.precomputed_answers_json_path else {}, + "wandbot_app_config": wandbot_info.get("app_config", {}), + } + # Ensure application name is set correctly even if precomputed answers are used + if "wandbot_app_config" not in eval_attributes: # Should always exist based on get_wandbot_configs + eval_attributes["wandbot_app_config"] = {} eval_attributes["wandbot_app_config"]["application"] = config.experiment_name - logger.info(f"Starting evaluation of {len(question_rows)} samples with {config.n_trials} trials, \ -{len(question_rows) * config.n_trials} calls in total.") + logger.info( + f"Starting evaluation of {len(question_rows_for_eval)} samples with {config.n_trials} trials, \ +{len(question_rows_for_eval) * config.n_trials} calls in total." + ) with weave.attributes(eval_attributes): - asyncio.run(wandbot_evaluator.evaluate( - model=wandbot, __weave={"display_name": config.experiment_name} - )) + asyncio.run( + wandbot_evaluator.evaluate(model=wandbot_model_instance, __weave={"display_name": config.experiment_name}) + ) # Use instance + + all_scored_data = correctness_scorer_instance.get_all_scored_results() # Use instance + if all_scored_data: + timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M") + + output_dir = Path(getattr(config, "eval_output_dir", ".")) + output_dir.mkdir(parents=True, exist_ok=True) + + output_filename = f"evaluation_results_{config.evaluation_name}_{timestamp}.json" + output_path = output_dir / output_filename + + try: + with output_path.open("w", encoding="utf-8") as f: + json.dump(all_scored_data, f, indent=2, ensure_ascii=False) + logger.info(f"Saved detailed evaluation results to: {output_path.resolve()}") + except Exception as e: + logger.error(f"Failed to save evaluation results to {output_path.resolve()}: {e}") + else: + logger.warning("No scored results were collected by the scorer to save.") + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/wandbot/evaluation/eval_config.py b/src/wandbot/evaluation/eval_config.py index a4b02917a7..73d6112d81 100644 --- a/src/wandbot/evaluation/eval_config.py +++ b/src/wandbot/evaluation/eval_config.py @@ -22,12 +22,14 @@ class EvalConfig: n_debug_samples: int = 3 max_evaluator_retries: int = 3 evaluator_timeout: int = 60 + precomputed_answers_json_path: str | None = sp.field(default=None, help="Path to a JSON file containing precomputed answers. If provided, network calls to wandbot will be skipped.") + # Links to evaluation datasets stored in Weave @property def eval_dataset(self) -> str: if self.lang == "ja": - return "weave:///wandbot/wandbot-eval-jp/object/wandbot_eval_data_jp:oCWifIAtEVCkSjushP0bOEc5GnhsMUYXURwQznBeKLA" - return "weave:///wandbot/wandbot-eval/object/wandbot_eval_data:eCQQ0GjM077wi4ykTWYhLPRpuGIaXbMwUGEB7IyHlFU" + return "weave:///wandbot/wandbot-eval/object/wandbot_eval_data_jp:I2BlFnw1VnPn8lFG72obBWN1sCokB3EYk4G4vSKg23g" + return "weave:///wandbot/wandbot-eval/object/wandbot_eval_data:ZZUQa2CCAqPDFWiB90VANCm4EdT8qtc125NazaWUrdI" def get_eval_config() -> EvalConfig: return sp.parse(EvalConfig) diff --git a/src/wandbot/evaluation/eval_schemas.py b/src/wandbot/evaluation/eval_schemas.py new file mode 100644 index 0000000000..393c2cc65e --- /dev/null +++ b/src/wandbot/evaluation/eval_schemas.py @@ -0,0 +1,41 @@ +from datetime import datetime +from typing import Dict, List, Optional + +from pydantic import Field + +from wandbot.chat.schemas import ChatResponse + + +class EvalChatResponse(ChatResponse): + """ + An evaluation-specific ChatResponse that includes error tracking + and makes certain fields optional for error scenarios. + It also includes a field for pre-parsed retrieved_contexts. + """ + + # Fields inherited from ChatResponse, overridden for error handling: + # question: str (remains mandatory from ChatResponse) + system_prompt: Optional[str] = "" + answer: Optional[str] = "" + model: Optional[str] = "" + sources: Optional[str] = "" + source_documents: Optional[str] = "" # Raw string from API / precomputed + total_tokens: Optional[int] = 0 + prompt_tokens: Optional[int] = 0 + completion_tokens: Optional[int] = 0 + time_taken: Optional[float] = 0.0 + start_time: Optional[datetime] = None + end_time: Optional[datetime] = None + api_call_statuses: Optional[Dict] = Field(default_factory=dict) + response_synthesis_llm_messages: Optional[List[Dict[str, str]]] = Field(default_factory=list) + + # New fields for evaluation context and error tracking + has_error: bool = False + error_message: Optional[str] = None + retrieved_contexts: Optional[List[Dict[str, str]]] = Field( + default_factory=list, + description="Parsed source documents into list of dicts for scorer", + ) + + class Config: + from_attributes = True diff --git a/src/wandbot/evaluation/weave_model.py b/src/wandbot/evaluation/weave_model.py new file mode 100644 index 0000000000..2913906a67 --- /dev/null +++ b/src/wandbot/evaluation/weave_model.py @@ -0,0 +1,257 @@ +import json +import logging +import re +from datetime import datetime, timezone +from typing import Dict, List, Optional + +import httpx +import weave +from tenacity import after_log, retry, retry_if_exception_type, stop_after_attempt, wait_exponential + +from wandbot.evaluation.eval_schemas import EvalChatResponse # Assuming EvalChatResponse is defined here or moved +from wandbot.utils import get_logger + +logger = get_logger(__name__) + + +# Moved from eval.py +@retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=10, max=300), + retry=retry_if_exception_type(httpx.HTTPError), + before_sleep=lambda retry_state: logger.warning( + f"Attempt {retry_state.attempt_number} failed. Retrying in {retry_state.next_action.sleep} seconds..." + ), + after=after_log(logger, logging.ERROR), +) +async def make_request(url: str, question: str, application: str = "api-eval", language: str = "en") -> dict: + """Make HTTP request to wandbot API with retry logic.""" + request_timeout = 120.0 + request_connect_timeout = 30.0 + async with httpx.AsyncClient( + timeout=httpx.Timeout(timeout=request_timeout, connect=request_connect_timeout) + ) as client: + try: + response = await client.post( + f"{url}/chat/query", json={"question": question, "application": application, "language": language} + ) + response.raise_for_status() + return response.json() + except httpx.ReadTimeout: + logger.error(f"Request timed out after {request_timeout} seconds") + raise + except httpx.ConnectTimeout: + logger.error(f"Connection timed out after {request_connect_timeout} seconds") + raise + + +async def get_answer(question: str, wandbot_url: str, application: str = "api-eval", language: str = "en") -> str: + """Get answer from wandbot API.""" + try: + result = await make_request(wandbot_url, question, application, language) + return json.dumps(result) + except Exception as e: + logger.error(f"Failed to get answer: {str(e)}") + return json.dumps( + { + "error": str(e), + "answer": "", + "system_prompt": "", + "source_documents": "", + "model": "", + "total_tokens": 0, + "prompt_tokens": 0, + "completion_tokens": 0, + "time_taken": 0, + } + ) + + +# Moved from eval.py +def parse_text_to_json(text: str) -> List[Dict[str, str]]: + # Split the text into documents + documents = re.split(r"source: https?://", text)[1:] + result = [] + for doc in documents: + source_url = "https://" + doc.split("\n")[0].strip() + content = "\n".join(doc.split("\n")[1:]).strip() + document = {"source": source_url, "content": content} + result.append(document) + return result + + +# Moved from eval.py +@weave.op +async def get_record( + question: str, wandbot_url: str, application: str = "api-eval", language: str = "en" +) -> EvalChatResponse: + _start_time = datetime.now(timezone.utc) + try: + response_str = await get_answer(question, wandbot_url=wandbot_url, application=application, language=language) + response_dict = json.loads(response_str) + _end_time = datetime.now(timezone.utc) + + if not response_dict or not response_dict.get("answer"): + _error_msg = "Empty answer from API" + try: + error_data = json.loads(response_str) + _error_msg = error_data.get("error", "Unknown API error or empty answer") + if not response_dict.get("answer"): + _error_msg = "Empty answer from API" + except json.JSONDecodeError: + _error_msg = response_str if response_str else "Empty response from API (JSONDecodeError)" + + logger.error(_error_msg) + return EvalChatResponse( + question=question, + has_error=True, + error_message=_error_msg, + model="api_error_or_empty_answer", + start_time=_start_time, # Provide start_time even for error + end_time=_end_time, # Provide end_time for error + ) + + raw_source_documents = response_dict.get("source_documents", "") + parsed_retrieved_contexts = parse_text_to_json(raw_source_documents) + + return EvalChatResponse( + question=question, + system_prompt=response_dict.get("system_prompt", ""), + answer=response_dict.get("answer", ""), + sources=response_dict.get("sources", ""), + source_documents=raw_source_documents, + retrieved_contexts=parsed_retrieved_contexts, + response_synthesis_llm_messages=response_dict.get("response_synthesis_llm_messages", []), + model=response_dict.get("model", ""), + total_tokens=response_dict.get("total_tokens", 0), + prompt_tokens=response_dict.get("prompt_tokens", 0), + completion_tokens=response_dict.get("completion_tokens", 0), + time_taken=response_dict.get("time_taken", 0.0), + api_call_statuses=response_dict.get("api_call_statuses", {}), + start_time=_start_time, + end_time=_end_time, + has_error=False, + ) + except Exception as e: + _error_msg = f"Error getting response from wandbotAPI: {str(e)}" + logger.error(_error_msg) + return EvalChatResponse( + question=question, + has_error=True, + error_message=_error_msg, + model="exception_in_get_record", + start_time=_start_time, + end_time=datetime.now(timezone.utc), + ) + + +# Moved from eval.py +class WandbotModel(weave.Model): + language: str = "en" + application: str = "api-eval" + wandbot_url: str = "http://0.0.0.0:8000" + wandbot_config: dict = {} + precomputed_data_map: Optional[Dict[str, Dict]] = None + + # Utility for parsing datetime strings within predict, can be a static method or defined outside + @staticmethod + def _parse_iso_datetime(dt_str: Optional[str], placeholder: str, item_index: str, field_name: str) -> str: + if dt_str: + try: + # Attempt to parse then reformat to ensure consistent ISO with Z + dt_obj = datetime.fromisoformat(str(dt_str).replace("Z", "+00:00")) + return dt_obj.isoformat() + except ValueError: + logger.warning( + f"Predict: Could not parse {field_name} '{dt_str}' for item index {item_index}. Using placeholder." + ) + return placeholder + + def _create_response_dict_from_precomputed(self, item: Dict, item_index: str) -> Dict: + _placeholder_datetime_str = datetime(1970, 1, 1, tzinfo=timezone.utc).isoformat() + + _precomp_start_time_iso = self._parse_iso_datetime( + item.get("start_time"), _placeholder_datetime_str, item_index, "start_time" + ) + _precomp_end_time_iso = self._parse_iso_datetime( + item.get("end_time"), _placeholder_datetime_str, item_index, "end_time" + ) + + raw_retrieved_contexts = item.get("retrieved_contexts", []) + final_retrieved_contexts = [] + if isinstance(raw_retrieved_contexts, list): + for ctx in raw_retrieved_contexts: + if isinstance(ctx, dict): + final_retrieved_contexts.append( + {"source": str(ctx.get("source", "")), "content": str(ctx.get("content", ""))} + ) + + has_error_val = item.get("has_error", False) + processed_has_error = bool( + str(has_error_val).lower() == "true" if isinstance(has_error_val, str) else has_error_val + ) + + return { + "question": str(item.get("question", "")), + "system_prompt": str(item.get("system_prompt", "")), + "answer": str(item.get("generated_answer", "")), + "model": str(item.get("model", "precomputed")), + "sources": str(item.get("sources", "")), + "source_documents": str(item.get("source_documents", "")), + "total_tokens": int(str(item.get("total_tokens", "0") or "0")), + "prompt_tokens": int(str(item.get("prompt_tokens", "0") or "0")), + "completion_tokens": int(str(item.get("completion_tokens", "0") or "0")), + "time_taken": float(str(item.get("time_taken", "0.0") or "0.0")), + "start_time": _precomp_start_time_iso, + "end_time": _precomp_end_time_iso, + "api_call_statuses": item.get("api_call_statuses", {}), + "response_synthesis_llm_messages": item.get("response_synthesis_llm_messages", []), + "has_error": str(processed_has_error), # Ensure boolean is stringified as per original logic + "error_message": str(item.get("error_message", "")), + "retrieved_contexts": final_retrieved_contexts, + } + + @weave.op + async def predict(self, index: str, question: str) -> dict: + _current_time_for_error_str = datetime.now(timezone.utc).isoformat() + + if self.precomputed_data_map: + if index in self.precomputed_data_map: + precomputed_item = self.precomputed_data_map[index] + logger.debug(f"Precomputed item at index {index} for question '{question}': {precomputed_item}") + + response_data_dict = self._create_response_dict_from_precomputed(precomputed_item, index) + + if response_data_dict.get("has_error") == "True": # Check stringified boolean + logger.debug(f"Processing precomputed item (error case) for index {index}") + else: + logger.debug(f"Processing precomputed item (success case) for index {index}") + + return response_data_dict + else: + _error_msg = f"Index {index} (Question: '{question}') not found in precomputed_answers_json_path" + logger.warning(_error_msg) + return { + "question": str(question), + "system_prompt": "", + "answer": "", + "model": "precomputed_not_found", + "sources": "", + "source_documents": "", + "total_tokens": 0, + "prompt_tokens": 0, + "completion_tokens": 0, + "time_taken": 0.0, + "start_time": _current_time_for_error_str, + "end_time": _current_time_for_error_str, + "api_call_statuses": {}, + "response_synthesis_llm_messages": [], + "has_error": True, + "error_message": _error_msg, + "retrieved_contexts": [], + } + else: + eval_chat_response_obj = await get_record( + question, wandbot_url=self.wandbot_url, application=self.application, language=self.language + ) + return eval_chat_response_obj.model_dump(warnings=False, mode="json") diff --git a/src/wandbot/evaluation/weave_scorer.py b/src/wandbot/evaluation/weave_scorer.py new file mode 100644 index 0000000000..6c3b757654 --- /dev/null +++ b/src/wandbot/evaluation/weave_scorer.py @@ -0,0 +1,121 @@ +import json +from typing import Dict, List + +import weave + +from wandbot.evaluation.eval_config import EvalConfig +from wandbot.evaluation.eval_metrics.correctness import ( + CorrectnessEvaluationResult, + WandBotCorrectnessEvaluator, +) +from wandbot.utils import get_logger + +logger = get_logger(__name__) + +# Moved from eval.py +class WandbotCorrectnessScorer(weave.Scorer): + config: EvalConfig + correctness_evaluator: WandBotCorrectnessEvaluator = None + debug: bool = False + _all_scored_results: List[Dict] + + def __init__(self, config: EvalConfig): + super().__init__(config=config) + self.debug = config.debug + self.correctness_evaluator = WandBotCorrectnessEvaluator( + provider=config.eval_judge_provider, + model_name=config.eval_judge_model, + temperature=config.eval_judge_temperature, + max_retries=config.max_evaluator_retries, + timeout=config.evaluator_timeout, + ) + self._all_scored_results = [] + + @weave.op + async def score( + self, index: int, question: str, ground_truth: str, notes: str, model_output: dict # Expect dict + ) -> dict: + if self.debug: + if model_output is not None: + if str(model_output.get('has_error')).lower() == 'true': + logger.debug( + f"In WandbotCorrectnessScorer, model_output is dict (error) with message: {model_output.get('error_message')}" + ) + else: + logger.debug( + f"In WandbotCorrectnessScorer, model_output is dict (success) with answer: {model_output.get('answer', '')[:50] if model_output.get('answer') else 'None'}..." + ) + else: + logger.error("model_output is None (should be a dict)") + + score_result_dict: Dict + try: + if str(model_output.get('has_error')).lower() == 'true': + _error_msg = model_output.get('error_message') or "Unknown error from model_output dict" + logger.error(f"Model output dict indicates an error: {_error_msg}") + score_result_dict = CorrectnessEvaluationResult( + reason=_error_msg, + score=1.0, + decision="incorrect", + answer_correct=False, + has_error=True, + error_message=_error_msg + ).model_dump(warnings=False) + elif not model_output.get('answer'): + _error_msg = "Generated answer is empty (from model_output dict)" + logger.error(_error_msg) + score_result_dict = CorrectnessEvaluationResult( + reason=_error_msg, + score=1.0, + decision="incorrect", + answer_correct=False, + has_error=True, + error_message=_error_msg + ).model_dump(warnings=False) + else: + actual_answer = model_output.get('answer', '') + retrieved_contexts_for_eval = model_output.get('retrieved_contexts') or [] + contexts_for_eval = [str(c.get("content", "")) for c in retrieved_contexts_for_eval if isinstance(c, dict)] + + evaluator_output = await self.correctness_evaluator.aevaluate( + query=question, response=actual_answer, reference=ground_truth, + contexts=contexts_for_eval, reference_notes=notes, + ) + score_result_dict = evaluator_output.model_dump(warnings=False) + + except Exception as e: + _error_msg = f"Error evaluating answer: {str(e)}" + logger.error(_error_msg, exc_info=True) + + _current_answer_for_error_payload = model_output.get('answer', '') if isinstance(model_output, dict) else "" + _current_retrieved_contexts = model_output.get('retrieved_contexts') if isinstance(model_output, dict) else [] + _current_contexts_for_error_payload = [str(c.get("content", "")) for c in (_current_retrieved_contexts or []) if isinstance(c, dict)] + + _error_message_for_payload = _error_msg + if isinstance(model_output, dict) and model_output.get('has_error') and model_output.get('error_message'): + _error_message_for_payload = model_output.get('error_message') + + eval_result_obj = CorrectnessEvaluationResult( + reason=str(_error_message_for_payload or _error_msg), + score=1.0, + decision="incorrect", + answer_correct=False, + has_error=True, error_message=str(_error_message_for_payload or _error_msg) + ) + score_result_dict = eval_result_obj.model_dump(warnings=False) + + score_result_dict["reason"] = str(score_result_dict.get("reason", "")) + score_result_dict["error_message"] = str(score_result_dict.get("error_message", "")) + + if self.debug: + logger.debug(f"Final score_result_dict for index {index} before returning to Weave: {score_result_dict}") + + self._all_scored_results.append({ + "index": index, "question": question, "ground_truth": ground_truth, + "notes": notes, "model_output": json.dumps(model_output), + "score_output": score_result_dict + }) + return score_result_dict + + def get_all_scored_results(self) -> List[Dict]: + return self._all_scored_results \ No newline at end of file