diff --git a/README.md b/README.md index ac4423b..3839751 100644 --- a/README.md +++ b/README.md @@ -50,12 +50,59 @@ gcloud auth application-default login 2. Modify `src/cfg/run_cfg.yaml`, if required. -### Capability Generation using the scientist LLM +### Base Pipeline -Generates capability names and descriptions in the first step. In the second step, for each capability, it generates tasks, solves them, and verifies the solutions. +The base (non-agentic) pipeline consists of multiple stages that can be run sequentially or individually: + +- **Stage 0**: Experiment and domain setup +- **Stage 1**: Area generation +- **Stage 2**: Capability generation and filtering +- **Stage 3**: Task generation (questions with options) +- **Stage 4**: Solution generation (determine correct answers) +- **Stage 5**: Task validation + +#### Run All Stages + +```bash +python -m src.run_base_pipeline stage=all +``` + +#### Run Individual Stages ```bash -python -m src.run_capability_generation +# Stage 0: Setup +python -m src.run_base_pipeline stage=0 + +# Stage 1: Generate areas +python -m src.run_base_pipeline stage=1 + +# Stage 2: Generate capabilities (requires areas_tag from Stage 1) +python -m src.run_base_pipeline stage=2 areas_tag=_YYYYMMDD_HHMMSS + +# Stage 3: Generate tasks (requires capabilities_tag from Stage 2) +python -m src.run_base_pipeline stage=3 capabilities_tag=_YYYYMMDD_HHMMSS + +# Stage 4: Generate solutions (requires tasks_tag from Stage 3) +python -m src.run_base_pipeline stage=4 tasks_tag=_YYYYMMDD_HHMMSS + +# Stage 5: Validate tasks (requires solution_tag from Stage 4) +python -m src.run_base_pipeline stage=5 solution_tag=_YYYYMMDD_HHMMSS +``` + +#### Resume from Existing Runs + +```bash +# Resume Stage 2 from existing capabilities_tag +python -m src.run_base_pipeline stage=2 areas_tag=_YYYYMMDD_HHMMSS capabilities_tag=_YYYYMMDD_HHMMSS + +# Resume Stage 3 from existing tasks_tag +python -m src.run_base_pipeline stage=3 capabilities_tag=_YYYYMMDD_HHMMSS tasks_tag=_YYYYMMDD_HHMMSS + +# Resume Stage 4 from existing solution_tag +python -m src.run_base_pipeline stage=4 tasks_tag=_YYYYMMDD_HHMMSS solution_tag=_YYYYMMDD_HHMMSS + +# Resume Stage 5 from existing validation_tag +python -m src.run_base_pipeline stage=5 solution_tag=_YYYYMMDD_HHMMSS validation_tag=_YYYYMMDD_HHMMSS ``` ### Evaluation of subject LLM on generated capabilities @@ -222,3 +269,18 @@ Configure `wikipedia/cfg/static_vs_generated.yaml`: cd wikipedia python static_vs_generated.py ``` + + +## Development Guidelines + +When implementing new features or modifying existing pipeline stages: + +1. **Follow Schema Guidelines**: All data objects must use the schema classes defined in `src/schemas/`: + - Use `Domain`, `Area`, `Capability`, `Task`, `TaskSolution`, `ValidationResult` objects + - Load/save using schema IO functions from `src/schemas/io_utils.py` (e.g., `load_solution()`, `save_validation()`) + - See `src/schemas/GENERATION_PIPELINE_SCHEMAS.md` for detailed schema documentation + +2. **Use Model Call Utilities**: All LLM interactions must use the standardized model client utilities: + - Import from `src.utils.model_client_utils` + - Use `get_standard_model_client()` to initialize clients + - Use `async_call_model()` with appropriate `ModelCallMode` (e.g., `JSON_PARSE`, `TEXT`) diff --git a/example_scripts/example_cfg/capability_score_visualization.yaml b/example_scripts/example_cfg/capability_score_visualization.yaml index 0f1be7d..71d989f 100644 --- a/example_scripts/example_cfg/capability_score_visualization.yaml +++ b/example_scripts/example_cfg/capability_score_visualization.yaml @@ -13,11 +13,11 @@ capabilities_cfg: # Set to -1 to use all seed capabilities num_seed_capabilities: 1 # Number of initial capabilities to generate using the scientist LLM - num_gen_capabilities: 100 + num_capabilities: 100 # Buffer for capability generation - num_gen_capabilities_buffer: 0.2 - # Number of capability areas to generate - num_capability_areas: 10 + num_capabilities_buffer: 0.2 + # Number of areas to generate + num_areas: 10 # Number of initial capabilities to generate per run num_gen_capabilities_per_run: 5 # Number of tasks to generate for each capability diff --git a/experimental/diverse_task_config.yaml b/experimental/diverse_task_config.yaml deleted file mode 100644 index c261782..0000000 --- a/experimental/diverse_task_config.yaml +++ /dev/null @@ -1,37 +0,0 @@ -# Configuration for Diverse Task Generator - -# Model settings -model: - name: gpt-4o # OpenAI model to use - temperature: 1.0 # Temperature for all steps - max_tokens: 8192 # Max tokens for all steps - max_retries: 3 # Number of retry attempts for API calls - retry_delay: 2.0 # Initial delay between retries in seconds (exponential backoff) - -# Task generation settings -generation: - tasks_per_blueprint: 3 # Number of tasks to generate per blueprint - min_subtopics: 3 # Suggested minimum number of sub-topics - max_subtopics: 8 # Suggested maximum number of sub-topics - -# Output settings -output: - base_dir: diverse_task_outputs - save_intermediate_steps: true # Save each step's output - pretty_print_json: true # Indent JSON files - -# Input settings -input: - capability_json_path: capability.json # Default capability JSON file path - -# Verification criteria -verification: - pass_threshold: 0.8 # Minimum pass rate to consider successful - strict_mode: false # If true, all alignment criteria must pass - -# Example capability for quick testing -example_capability: - name: "compound_interest_calculations" - description: "The ability to calculate compound interest for various scenarios, including different compounding frequencies (annually, semi-annually, quarterly, monthly), different time periods, and understanding how changes in principal, rate, or time affect the final amount." - domain: "personal_finance" - area: "investing_and_savings" diff --git a/experimental/diverse_task_generator.py b/experimental/diverse_task_generator.py deleted file mode 100644 index 06595d3..0000000 --- a/experimental/diverse_task_generator.py +++ /dev/null @@ -1,347 +0,0 @@ -"""Standalone script for generating diverse tasks for a single capability.""" - -import argparse -import json -import logging -import os -from dataclasses import asdict -from datetime import datetime -from functools import partial -from pathlib import Path -from typing import Any - -import yaml -from diverse_task_dataclasses import ( - Blueprint, - Capability, - Combination, - SubTopic, - Task, - VerificationResult, -) -from extract_subtopics import extract_subtopics -from find_combinations import find_valid_combinations -from generate_blueprints import generate_blueprints -from model_utils import call_model -from openai import OpenAI -from verify_tasks import verify_tasks - -from generate_tasks import generate_tasks - - -# Configure logging -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", -) -logger = logging.getLogger(__name__) - - -class DiverseTaskGenerator: - """Generate diverse tasks for a capability using multi-dimensional approach.""" - - def __init__( - self, - capability_dict: dict, - config: dict, - resume: bool = False, - resume_dir: str = None, - ) -> None: - """Initialize the diverse task generator.""" - # Extract example tasks from capability_data if present - example_tasks = ( - capability_dict.get("capability_data", [])[:3] - if "capability_data" in capability_dict - else [] - ) - - self.capability = Capability( - name=capability_dict["capability_name"], - description=capability_dict["capability_description"], - domain=capability_dict["capability_domain"], - area=capability_dict.get("capability_area"), - example_tasks=example_tasks, - ) - - # Store configuration - self.config = config - self.resume = resume - - # Use config values - self.model_name = self.config["model"]["name"] - self.temperature = self.config["model"]["temperature"] - self.max_tokens = self.config["model"]["max_tokens"] - self.max_retries = self.config["model"].get("max_retries", 3) - self.retry_delay = self.config["model"].get("retry_delay", 2.0) - self.output_dir = Path(self.config["output"]["base_dir"]) - - # Initialize OpenAI client - api_key = os.getenv("OPENAI_API_KEY") - if not api_key: - raise ValueError("OPENAI_API_KEY environment variable not set") - self.client = OpenAI(api_key=api_key) - - # Create or resume output directory - if resume and resume_dir: - self.run_output_dir = Path(resume_dir) - if not self.run_output_dir.exists(): - raise ValueError(f"Resume directory does not exist: {resume_dir}") - logger.info(f"Resuming from: {self.run_output_dir}") - else: - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - self.run_output_dir = ( - self.output_dir / f"{self.capability.name}_{timestamp}" - ) - self.run_output_dir.mkdir(parents=True, exist_ok=True) - - logger.info("=" * 80) - logger.info(f"Initialized DiverseTaskGenerator for: {self.capability.name}") - logger.info(f"Mode: {'RESUME' if resume else 'NEW RUN'}") - logger.info(f"Model: {self.model_name}") - logger.info(f"Temperature: {self.temperature}") - logger.info(f"Max tokens: {self.max_tokens}") - logger.info(f"Output directory: {self.run_output_dir}") - logger.info("=" * 80) - - # Create API caller with pre-configured parameters - self._call_api = partial( - call_model, - self.client, - model_name=self.model_name, - temperature=self.temperature, - max_tokens=self.max_tokens, - max_retries=self.max_retries, - retry_delay=self.retry_delay, - ) - - def _save_json(self, filename: str, data_key: str, data: Any) -> Path: - """Save data to JSON file.""" - output_file = self.run_output_dir / filename - # Convert dataclass objects to dicts if needed - if data and hasattr( - data[0] if isinstance(data, list) else data, "__dataclass_fields__" - ): - data = ( - [asdict(item) for item in data] - if isinstance(data, list) - else asdict(data) - ) - - with open(output_file, "w") as f: - json.dump({data_key: data} if data_key else data, f, indent=2) - logger.info(f"Saved to: {output_file}") - return output_file - - def _load_json(self, filename: str, data_key: str = None): - """Load data from JSON file if it exists.""" - file_path = self.run_output_dir / filename - if not file_path.exists(): - return None - - with open(file_path, "r") as f: - data = json.load(f) - - if data_key: - return data.get(data_key) - return data - - def _load_subtopics(self): - """Load subtopics from checkpoint.""" - data = self._load_json("subtopics.json", "sub_topics") - if data: - return [SubTopic(**item) for item in data] - return None - - def _load_combinations(self): - """Load combinations from checkpoint.""" - data = self._load_json("combinations.json", "valid_combinations") - if data: - return [Combination(**item) for item in data] - return None - - def _load_blueprints(self): - """Load blueprints from checkpoint.""" - data = self._load_json("blueprints.json", "blueprints") - if data: - return [Blueprint(**item) for item in data] - return None - - def _load_tasks(self): - """Load tasks from checkpoint.""" - data = self._load_json("tasks.json", "tasks") - if data: - return [Task(**item) for item in data] - return None - - def extract_and_save_subtopics(self) -> list[SubTopic]: - """Extract sub-topics and save results.""" - subtopics = extract_subtopics(self.capability, self._call_api) - self._save_json("subtopics.json", "sub_topics", subtopics) - return subtopics - - def find_and_save_combinations( - self, subtopics: list[SubTopic] - ) -> list[Combination]: - """Find valid combinations and save results.""" - combinations = find_valid_combinations( - self.capability, subtopics, self._call_api, self.config - ) - self._save_json("combinations.json", "valid_combinations", combinations) - return combinations - - def generate_and_save_blueprints( - self, combinations: list[Combination] - ) -> list[Blueprint]: - """Generate blueprints and save results.""" - blueprints = generate_blueprints( - self.capability, combinations, self._call_api, self.config - ) - self._save_json("blueprints.json", "blueprints", blueprints) - return blueprints - - def generate_and_save_tasks(self, blueprints: list[Blueprint]) -> list[Task]: - """Generate tasks and save results.""" - tasks_per_blueprint = self.config["generation"]["tasks_per_blueprint"] - tasks = generate_tasks( - self.capability, blueprints, self._call_api, tasks_per_blueprint - ) - self._save_json("tasks.json", "tasks", tasks) - return tasks - - def verify_and_save_tasks( - self, tasks: list[Task], blueprints: list[Blueprint] - ) -> VerificationResult: - """Verify tasks and save results.""" - verification = verify_tasks(self.capability, tasks, blueprints, self._call_api) - self._save_json("verification.json", None, verification) - return verification - - def run_full_pipeline(self) -> dict: - """Run the complete diverse task generation pipeline.""" - logger.info("=" * 80) - logger.info("Starting Diverse Task Generation Pipeline") - logger.info(f"Capability: {self.capability.name}") - logger.info(f"Model: {self.model_name}") - logger.info("=" * 80) - - # Extract sub-topics - subtopics = self._load_subtopics() if self.resume else None - if subtopics: - logger.info(f"Loaded {len(subtopics)} subtopics from checkpoint") - else: - subtopics = self.extract_and_save_subtopics() - - # Find valid combinations - combinations = self._load_combinations() if self.resume else None - if combinations: - logger.info(f"Loaded {len(combinations)} combinations from checkpoint") - else: - combinations = self.find_and_save_combinations(subtopics) - - # Generate blueprints - blueprints = self._load_blueprints() if self.resume else None - if blueprints: - logger.info(f"Loaded {len(blueprints)} blueprints from checkpoint") - else: - blueprints = self.generate_and_save_blueprints(combinations) - - # Generate tasks - tasks = self._load_tasks() if self.resume else None - if tasks: - logger.info(f"Loaded {len(tasks)} tasks from checkpoint") - else: - tasks = self.generate_and_save_tasks(blueprints) - - # Verify tasks - verification = self.verify_and_save_tasks(tasks, blueprints) - - # Compile final results - results = { - "capability_name": self.capability.name, - "capability_description": self.capability.description, - "capability_domain": self.capability.domain, - "model_name": self.model_name, - "timestamp": datetime.now().isoformat(), - "subtopics": [asdict(st) for st in subtopics], - "combinations": [asdict(c) for c in combinations], - "blueprints": [asdict(bp) for bp in blueprints], - "tasks": [asdict(t) for t in tasks], - "verification": verification, - } - - # Save final results - self._save_json("final_results.json", None, results) - - logger.info("=" * 80) - logger.info("Pipeline Complete!") - logger.info(f"All results saved to: {self.run_output_dir}") - logger.info("=" * 80) - - return results - - -def load_capability_from_json(capability_json_path: str) -> dict: - """Load capability information from a JSON file.""" - with open(capability_json_path, "r") as f: - return json.load(f) - - -def main() -> None: - """Generate diverse tasks for a single capability.""" - parser = argparse.ArgumentParser( - description="Generate diverse tasks for a capability from JSON file" - ) - parser.add_argument( - "--capability-json-path", - type=str, - help="Path to capability JSON file (default: from config file)", - ) - parser.add_argument( - "--model-name", - type=str, - help="OpenAI model name (default: from config file)", - ) - parser.add_argument( - "--output-dir", - type=str, - help="Output directory (default: from config file)", - ) - parser.add_argument( - "--resume-dir", - type=str, - help="Resume from an existing run directory", - ) - - args = parser.parse_args() - - # Load config - config_file = Path(__file__).parent / "diverse_task_config.yaml" - with open(config_file, "r") as f: - config = yaml.safe_load(f) - - # Override config with command-line arguments - if args.model_name: - config["model"]["name"] = args.model_name - if args.output_dir: - config["output"]["base_dir"] = args.output_dir - if args.capability_json_path: - config["input"]["capability_json_path"] = args.capability_json_path - - logger.info(f"Loading capability from: {config['input']['capability_json_path']}") - capability_dict = load_capability_from_json(config["input"]["capability_json_path"]) - - # Initialize and run generator - # If resume_dir is provided, automatically enable resume mode - generator = DiverseTaskGenerator( - capability_dict=capability_dict, - resume=bool(args.resume_dir), - resume_dir=args.resume_dir, - config=config, - ) - generator.run_full_pipeline() - - logger.info("Done!") - - -if __name__ == "__main__": - main() diff --git a/experimental/extract_subtopics.py b/experimental/extract_subtopics.py deleted file mode 100644 index 5cbbe43..0000000 --- a/experimental/extract_subtopics.py +++ /dev/null @@ -1,41 +0,0 @@ -"""Extract sub-topics for a capability.""" - -import json -import logging -from typing import Callable - -from diverse_task_dataclasses import Capability, SubTopic -from diverse_task_prompts import format_subtopic_prompt - - -logger = logging.getLogger(__name__) - - -def extract_subtopics(capability: Capability, call_llm: Callable) -> list[SubTopic]: - """Extract sub-topics for the given capability.""" - logger.info("Extracting sub-topics...") - - system_prompt, user_prompt = format_subtopic_prompt( - capability_name=capability.name, - capability_description=capability.description, - capability_domain=capability.domain, - capability_area=capability.area, - ) - - response = call_llm( - system_prompt=system_prompt, - user_prompt=user_prompt, - response_format={"type": "json_object"}, - ) - - result = json.loads(response) - subtopic_names = result.get("sub_topics", []) - - # Create SubTopic objects - subtopics = [SubTopic(name=name) for name in subtopic_names] - - logger.info(f"Extracted {len(subtopics)} sub-topics:") - for st in subtopics: - logger.info(f" - {st.name}") - - return subtopics diff --git a/experimental/generate_blueprints.py b/experimental/generate_blueprints.py deleted file mode 100644 index 5ea4169..0000000 --- a/experimental/generate_blueprints.py +++ /dev/null @@ -1,65 +0,0 @@ -"""Generate task blueprints for each valid combination.""" - -import json -import logging -from typing import Callable - -from diverse_task_constants import BLOOMS_TAXONOMY, DIFFICULTY_LEVELS -from diverse_task_dataclasses import Blueprint, Capability, Combination -from diverse_task_prompts import format_blueprint_prompt - - -logger = logging.getLogger(__name__) - - -def generate_blueprints( - capability: Capability, - combinations: list[Combination], - call_llm: Callable, - config: dict, -) -> list[Blueprint]: - """Generate task blueprints for each valid combination.""" - logger.info("Generating task blueprints...") - - blueprints = [] - - for i, combo in enumerate(combinations): - logger.info( - f"Generating blueprint {i + 1}/{len(combinations)}: " - f"{combo.content} | {combo.difficulty} | {combo.reasoning}" - ) - - system_prompt, user_prompt = format_blueprint_prompt( - capability_name=capability.name, - capability_description=capability.description, - capability_domain=capability.domain, - capability_area=capability.area, - subtopic=combo.content, - difficulty=combo.difficulty, - difficulty_description=DIFFICULTY_LEVELS[combo.difficulty.lower()][ - "description" - ], - reasoning=combo.reasoning, - reasoning_description=BLOOMS_TAXONOMY[combo.reasoning]["description"], - ) - - response = call_llm( - system_prompt=system_prompt, - user_prompt=user_prompt, - response_format={"type": "json_object"}, - ) - - blueprint_data = json.loads(response) - blueprint = Blueprint( - combination_id=i, - subtopic=combo.content, - difficulty=combo.difficulty, - reasoning=combo.reasoning, - blueprint=blueprint_data["blueprint"], - rationale=combo.rationale, - ) - blueprints.append(blueprint) - - logger.info(f"Generated {len(blueprints)} blueprints") - - return blueprints diff --git a/experimental/generate_tasks.py b/experimental/generate_tasks.py deleted file mode 100644 index 4c89de8..0000000 --- a/experimental/generate_tasks.py +++ /dev/null @@ -1,85 +0,0 @@ -"""Generate multiple-choice questions for each blueprint.""" - -import json -import logging -from typing import Callable - -from diverse_task_dataclasses import Blueprint, Capability, Task -from diverse_task_prompts import format_task_prompt - - -logger = logging.getLogger(__name__) - - -def generate_tasks( - capability: Capability, - blueprints: list[Blueprint], - call_llm: Callable, - tasks_per_blueprint: int = 3, -) -> list[Task]: - """Generate multiple-choice questions for each blueprint.""" - logger.info("Generating tasks from blueprints...") - - all_tasks = [] - - for blueprint in blueprints: - logger.info( - f"Generating {tasks_per_blueprint} tasks for blueprint " - f"{blueprint.combination_id}: {blueprint.subtopic} | " - f"{blueprint.difficulty} | {blueprint.reasoning}" - ) - - # Generate multiple tasks for this blueprint - for j in range(tasks_per_blueprint): - task_id = f"task_{blueprint.combination_id}_{j}" - - try: - system_prompt, user_prompt = format_task_prompt( - capability_name=capability.name, - capability_description=capability.description, - capability_domain=capability.domain, - capability_area=capability.area, - blueprint_description=blueprint.blueprint, - ) - - response = call_llm( - system_prompt=system_prompt, - user_prompt=user_prompt, - response_format={"type": "json_object"}, - ) - - task_data = json.loads(response) - - # Create Task object - task = Task( - task_id=task_id, - blueprint_id=blueprint.combination_id, - subtopic=blueprint.subtopic, - difficulty=blueprint.difficulty, - reasoning=blueprint.reasoning, - question=task_data["question"], - choices=task_data["options"], - correct_answer=task_data["correct_answer"], - ) - all_tasks.append(task) - - except Exception as e: - logger.error(f" Failed to generate {task_id}: {e}") - # Create a task with error information - task = Task( - task_id=task_id, - blueprint_id=blueprint.combination_id, - subtopic=blueprint.subtopic, - difficulty=blueprint.difficulty, - reasoning=blueprint.reasoning, - question=f"ERROR: Failed to generate task - {str(e)}", - choices={"A": "N/A", "B": "N/A", "C": "N/A", "D": "N/A"}, - correct_answer="A", - ) - all_tasks.append(task) - - logger.info(f" Generated {tasks_per_blueprint} tasks") - - logger.info(f"Generated {len(all_tasks)} total tasks") - - return all_tasks diff --git a/experimental/model_utils.py b/experimental/model_utils.py deleted file mode 100644 index fdf0091..0000000 --- a/experimental/model_utils.py +++ /dev/null @@ -1,61 +0,0 @@ -"""Utilities for LLM API calls.""" - -import logging -import time - -from openai import OpenAI - - -logger = logging.getLogger(__name__) - - -def call_model( - client: OpenAI, - system_prompt: str, - user_prompt: str, - model_name: str, - temperature: float, - max_tokens: int, - response_format: dict = None, - max_retries: int = 3, - retry_delay: float = 2.0, -) -> str: - """Call LLM API with given prompts and automatic retries.""" - messages = [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ] - - kwargs = { - "model": model_name, - "messages": messages, - "temperature": temperature, - "max_tokens": max_tokens, - } - if response_format: - kwargs["response_format"] = response_format - - last_error = None - for attempt in range(max_retries): - try: - response = client.chat.completions.create(**kwargs) - content = response.choices[0].message.content - - if content is None: - raise ValueError("Model returned empty response") - - return content - - except Exception as e: - last_error = e - if attempt < max_retries - 1: - wait_time = retry_delay * (2**attempt) - logger.warning( - f"API call failed (attempt {attempt + 1}/{max_retries}): {e}" - ) - logger.info(f"Retrying in {wait_time:.1f} seconds...") - time.sleep(wait_time) - else: - logger.error(f"API call failed after {max_retries} attempts: {e}") - - raise last_error diff --git a/experimental/verify_tasks.py b/experimental/verify_tasks.py deleted file mode 100644 index 504b823..0000000 --- a/experimental/verify_tasks.py +++ /dev/null @@ -1,146 +0,0 @@ -"""Verify that generated tasks align with intended dimensions.""" - -import json -import logging -from typing import Callable - -from diverse_task_dataclasses import Blueprint, Capability, Task, VerificationResult -from diverse_task_prompts import format_verification_prompt - - -logger = logging.getLogger(__name__) - - -def verify_tasks( - capability: Capability, - tasks: list[Task], - blueprints: list[Blueprint], - call_llm: Callable, -) -> VerificationResult: - """Verify that generated tasks align with intended dimensions.""" - logger.info("Verifying task alignment...") - - # Create blueprint lookup - blueprint_dict = {bp.combination_id: bp for bp in blueprints} - - verification_results = [] - - for i, task in enumerate(tasks): - logger.info(f"Verifying task {i + 1}/{len(tasks)}: {task.task_id}") - - try: - # Skip verification for tasks that failed generation - if task.question.startswith("ERROR:"): - logger.warning(" Skipping verification (task generation failed)") - verification = VerificationResult( - task_id=task.task_id, - subtopic_aligned=False, - difficulty_aligned=False, - reasoning_aligned=False, - choices_appropriate=False, - overall_aligned=False, - feedback="Task generation failed - verification skipped", - ) - verification_results.append(verification) - logger.info(" ✗ SKIPPED") - continue - - # Get blueprint for this task - blueprint = blueprint_dict.get(task.blueprint_id) - blueprint_text = blueprint.blueprint if blueprint else "N/A" - - system_prompt, user_prompt = format_verification_prompt( - capability_domain=capability.domain, - capability_area=capability.area, - capability_name=capability.name, - capability_description=capability.description, - task_blueprint=blueprint_text, - question=task.question, - option_a=task.choices.get("A", ""), - option_b=task.choices.get("B", ""), - option_c=task.choices.get("C", ""), - option_d=task.choices.get("D", ""), - correct_answer=task.correct_answer, - ) - - response = call_llm( - system_prompt=system_prompt, - user_prompt=user_prompt, - response_format={"type": "json_object"}, - ) - - verification_data = json.loads(response) - - # Map new verification format to old format - overall_aligned = verification_data.get("overall_verdict", "Fail") == "Pass" - - verification = VerificationResult( - task_id=task.task_id, - subtopic_aligned=verification_data.get("blueprint_alignment", "No") - == "Yes", - difficulty_aligned=verification_data.get( - "difficulty_reasoning_match", "No" - ) - == "Yes", - reasoning_aligned=verification_data.get("capability_alignment", "No") - == "Yes", - choices_appropriate=verification_data.get("single_correct_answer", "No") - == "Yes", - overall_aligned=overall_aligned, - feedback=verification_data.get("explanation", ""), - ) - verification_results.append(verification) - - status = "✓ PASS" if verification.overall_aligned else "✗ FAIL" - logger.info(f" {status}") - - except Exception as e: - logger.error(f" Failed to verify {task.task_id}: {e}") - # Create a verification result with error information - verification = VerificationResult( - task_id=task.task_id, - subtopic_aligned=False, - difficulty_aligned=False, - reasoning_aligned=False, - choices_appropriate=False, - overall_aligned=False, - feedback=f"Verification failed: {str(e)}", - ) - verification_results.append(verification) - logger.info(" ✗ ERROR") - - # Calculate statistics - total = len(verification_results) - passed = sum(1 for v in verification_results if v.overall_aligned) - failed = total - passed - - # Convert to dict for JSON serialization - verification_details_dict = [ - { - "task_id": v.task_id, - "subtopic_aligned": v.subtopic_aligned, - "difficulty_aligned": v.difficulty_aligned, - "reasoning_aligned": v.reasoning_aligned, - "choices_appropriate": v.choices_appropriate, - "overall_aligned": v.overall_aligned, - "feedback": v.feedback, - "suggested_improvements": v.suggested_improvements, - } - for v in verification_results - ] - - summary = { - "total_tasks": total, - "passed": passed, - "failed": failed, - "pass_rate": passed / total if total > 0 else 0, - "verification_details": verification_details_dict, - } - - logger.info("\nVerification Summary:") - logger.info(f" Total tasks: {total}") - logger.info(f" Passed: {passed}") - logger.info(f" Failed: {failed}") - logger.info(f" Pass rate: {summary['pass_rate']:.1%}") - - return summary diff --git a/src/base_stages/__init__.py b/src/base_stages/__init__.py new file mode 100644 index 0000000..ca1373c --- /dev/null +++ b/src/base_stages/__init__.py @@ -0,0 +1,48 @@ +"""Base (non-agentic) pipeline stages and utilities. + +This module contains all base pipeline stages and the utilities they use: + +Stages: +- stage0_setup: Experiment and domain setup +- stage1_areas: Area generation +- stage2_capabilities: Capability generation and filtering +- stage3_tasks: Task generation +- stage4_solutions: Solution generation +- stage5_validation: Task validation + +Utilities: +- generate_areas: Area generation using LLM +- generate_capabilities: Capability generation using LLM +- generate_diverse_tasks_pipeline: Orchestrates subtopic→combination→blueprint→task + pipeline +- generate_tasks_from_blueprints: Task (question + options) generation from blueprints +- solve_tasks: Task solving to determine correct answers +- validate_tasks: Task validation + +Supporting modules: +- task_constants: Bloom's taxonomy, difficulty levels +- task_dataclasses: SubTopic, Combination, Blueprint, etc. +- extract_subtopics: Sub-topic extraction +- find_combinations: Valid combination finding +- generate_blueprints: Blueprint generation + +Prompts: +- Prompts are in src/utils/base_generation_prompts.py +""" + +from src.base_stages.stage0_setup import run_stage0 +from src.base_stages.stage1_areas import run_stage1 +from src.base_stages.stage2_capabilities import run_stage2 +from src.base_stages.stage3_tasks import run_stage3 +from src.base_stages.stage4_solutions import run_stage4 +from src.base_stages.stage5_validation import run_stage5 + + +__all__ = [ + "run_stage0", + "run_stage1", + "run_stage2", + "run_stage3", + "run_stage4", + "run_stage5", +] diff --git a/src/base_stages/extract_subtopics.py b/src/base_stages/extract_subtopics.py new file mode 100644 index 0000000..eb88deb --- /dev/null +++ b/src/base_stages/extract_subtopics.py @@ -0,0 +1,62 @@ +"""Extract sub-topics for a capability.""" + +import asyncio +import logging + +from autogen_core.models import ChatCompletionClient + +from src.base_stages.task_dataclasses import SubTopic +from src.utils.base_generation_prompts import format_subtopic_prompt +from src.utils.model_client_utils import ModelCallMode, async_call_model + + +logger = logging.getLogger(__name__) + + +def extract_subtopics( + capability, + client: ChatCompletionClient, + min_subtopics: int = 3, + max_subtopics: int = 8, +) -> list[SubTopic]: + """Extract sub-topics for the given capability. + + Args: + capability: Capability object + client: ChatCompletionClient for API calls + min_subtopics: Minimum number of subtopics to generate + max_subtopics: Maximum number of subtopics to generate + + Returns + ------- + List of SubTopic objects + """ + logger.info(f"Extracting sub-topics (range: {min_subtopics}-{max_subtopics}) ...") + + system_prompt, user_prompt = format_subtopic_prompt( + capability_name=capability.capability_name, + capability_description=capability.capability_description, + capability_domain=capability.area.domain.domain_name, + capability_area=capability.area.area_name, + min_subtopics=min_subtopics, + max_subtopics=max_subtopics, + ) + + response = asyncio.run( + async_call_model( + client, + system_prompt=system_prompt, + user_prompt=user_prompt, + mode=ModelCallMode.JSON_PARSE, + ) + ) + + subtopic_names = response.get("sub_topics", []) + + subtopics = [SubTopic(name=name) for name in subtopic_names] + + logger.info(f"Extracted {len(subtopics)} sub-topics:") + for st in subtopics: + logger.info(f" - {st.name}") + + return subtopics diff --git a/experimental/find_combinations.py b/src/base_stages/find_combinations.py similarity index 58% rename from experimental/find_combinations.py rename to src/base_stages/find_combinations.py index adf9d4f..b90c817 100644 --- a/experimental/find_combinations.py +++ b/src/base_stages/find_combinations.py @@ -1,20 +1,36 @@ -"""Find valid combinations of (Content, Difficulty, Reasoning).""" +"""Find valid (Content, Difficulty, Reasoning) combinations.""" -import json +import asyncio import logging -from diverse_task_constants import BLOOMS_TAXONOMY, DIFFICULTY_LEVELS -from diverse_task_dataclasses import Capability, Combination, SubTopic -from diverse_task_prompts import format_combination_prompt +from autogen_core.models import ChatCompletionClient + +from src.base_stages.task_constants import ( + BLOOMS_TAXONOMY, + DIFFICULTY_LEVELS, +) +from src.base_stages.task_dataclasses import Combination, SubTopic +from src.utils.base_generation_prompts import format_combination_prompt +from src.utils.model_client_utils import ModelCallMode, async_call_model logger = logging.getLogger(__name__) def find_valid_combinations( - capability: Capability, subtopics: list[SubTopic], call_llm, config: dict + capability, subtopics: list[SubTopic], client: ChatCompletionClient ) -> list[Combination]: - """Find valid combinations of Content, Difficulty, and Reasoning.""" + """Find valid combinations of Content, Difficulty, and Reasoning. + + Args: + capability: Capability object + subtopics: List of SubTopic objects + client: ChatCompletionClient for API calls + + Returns + ------- + List of Combination objects + """ logger.info("Finding valid combinations...") # Get difficulty levels and reasoning types from constants @@ -36,7 +52,6 @@ def find_valid_combinations( logger.info(f"Generated {len(all_combinations)} total combinations to validate") - # Format combinations as a numbered list for the LLM content_list = "\n".join( [ f"{i + 1}. Content: {c['content']}, Difficulty: {c['difficulty']}, Reasoning: {c['reasoning']}" @@ -45,23 +60,24 @@ def find_valid_combinations( ) system_prompt, user_prompt = format_combination_prompt( - capability_name=capability.name, - capability_description=capability.description, - capability_domain=capability.domain, - capability_area=capability.area, + capability_name=capability.capability_name, + capability_description=capability.capability_description, + capability_domain=capability.area.domain.domain_name, + capability_area=capability.area.area_name, content_list=content_list, ) - response = call_llm( - system_prompt=system_prompt, - user_prompt=user_prompt, - response_format={"type": "json_object"}, + response = asyncio.run( + async_call_model( + client, + system_prompt=system_prompt, + user_prompt=user_prompt, + mode=ModelCallMode.JSON_PARSE, + ) ) - result = json.loads(response) - combinations_data = result.get("valid_combinations", []) + combinations_data = response.get("valid_combinations", []) - # Create Combination objects combinations = [ Combination( content=combo["content"], @@ -74,7 +90,7 @@ def find_valid_combinations( logger.info( f"Found {len(combinations)} valid combinations out of {len(all_combinations)} total:" ) - for i, combo in enumerate(combinations[:5]): # Show first 5 + for i, combo in enumerate(combinations[:5]): logger.info( f" {i + 1}. {combo.content} | {combo.difficulty} | {combo.reasoning}" ) diff --git a/src/base_stages/generate_areas.py b/src/base_stages/generate_areas.py new file mode 100644 index 0000000..29cac36 --- /dev/null +++ b/src/base_stages/generate_areas.py @@ -0,0 +1,68 @@ +"""Generate areas using the scientist LLM.""" + +import asyncio +import logging +from typing import List + +from autogen_core.models import ChatCompletionClient + +from src.schemas.area_schemas import Area +from src.schemas.domain_schemas import Domain +from src.utils.base_generation_prompts import ( + AREAS_GENERATION_RESPONSE_JSON_FORMAT, + AREAS_GENERATION_USER_PROMPT, +) +from src.utils.model_client_utils import ModelCallMode, async_call_model + + +logger = logging.getLogger(__name__) + + +def generate_areas( + domain: Domain, + num_areas: int, + num_capabilities_per_area: int, + client: ChatCompletionClient, +) -> List[Area]: + """Generate areas for the specified domain. + + Args: + domain: Domain object + num_areas: Number of areas to generate + num_capabilities_per_area: Number of capabilities per area + client: ChatCompletionClient for API calls + + Returns + ------- + List of generated Area objects + """ + logger.info(f"Generating {num_areas} areas ...") + user_prompt = AREAS_GENERATION_USER_PROMPT.format( + num_areas=num_areas, + num_capabilities_per_area=num_capabilities_per_area, + domain=domain.domain_name, + response_json_format=AREAS_GENERATION_RESPONSE_JSON_FORMAT, + ) + + response = asyncio.run( + async_call_model( + client, + system_prompt="", + user_prompt=user_prompt, + mode=ModelCallMode.JSON_PARSE, + ) + ) + + areas = [] + for idx, area_name in enumerate(response.get("areas", [])): + area = Area( + area_name=area_name, + area_id=f"area_{idx:03d}", + domain=domain, + area_description="", + ) + areas.append(area) + + logger.info(f"Generated {len(areas)} areas") + + return areas diff --git a/src/base_stages/generate_blueprints.py b/src/base_stages/generate_blueprints.py new file mode 100644 index 0000000..0acf98b --- /dev/null +++ b/src/base_stages/generate_blueprints.py @@ -0,0 +1,90 @@ +"""Generate task blueprints for each combination.""" + +import asyncio +import logging + +from autogen_core.models import ChatCompletionClient + +from src.base_stages.task_constants import ( + BLOOMS_TAXONOMY, + DIFFICULTY_LEVELS, +) +from src.base_stages.task_dataclasses import Blueprint, Combination +from src.utils.base_generation_prompts import format_blueprint_prompt +from src.utils.model_client_utils import ModelCallMode, async_call_model + + +logger = logging.getLogger(__name__) + + +def generate_blueprints( + capability, + combinations: list[Combination], + client: ChatCompletionClient, +) -> list[Blueprint]: + """Generate task blueprints for each valid combination. + + Args: + capability: Capability object + combinations: List of Combination objects + client: ChatCompletionClient for API calls + + Returns + ------- + List of Blueprint objects + """ + logger.info("Generating task blueprints...") + + blueprints = [] + + for i, combo in enumerate(combinations): + logger.info( + f"Generating blueprint {i + 1}/{len(combinations)}: " + f"{combo.content} | {combo.difficulty} | {combo.reasoning}" + ) + + system_prompt, user_prompt = format_blueprint_prompt( + capability_name=capability.capability_name, + capability_description=capability.capability_description, + capability_domain=capability.area.domain.domain_name, + capability_area=capability.area.area_name, + subtopic=combo.content, + difficulty=combo.difficulty, + difficulty_description=DIFFICULTY_LEVELS[combo.difficulty.lower()][ + "description" + ], + reasoning=combo.reasoning, + reasoning_description=BLOOMS_TAXONOMY[combo.reasoning]["description"], + ) + + response = asyncio.run( + async_call_model( + client, + system_prompt=system_prompt, + user_prompt=user_prompt, + mode=ModelCallMode.JSON_PARSE, + ) + ) + + if "blueprint" not in response: + logger.error( + f"Response missing 'blueprint' key. Response keys: {response.keys()}" + ) + logger.error(f"Full response: {response}") + raise ValueError( + "Invalid blueprint response format: missing 'blueprint' key" + ) + + blueprint = Blueprint( + combination_id=i, + subtopic=combo.content, + difficulty=combo.difficulty, + reasoning=combo.reasoning, + blueprint=response["blueprint"], + rationale=combo.rationale, + ) + blueprints.append(blueprint) + + logger.info(f"Generated {len(blueprints)} blueprints") + + return blueprints diff --git a/src/base_stages/generate_capabilities.py b/src/base_stages/generate_capabilities.py new file mode 100644 index 0000000..6521cc3 --- /dev/null +++ b/src/base_stages/generate_capabilities.py @@ -0,0 +1,136 @@ +"""Generate capabilities using the scientist LLM.""" + +import asyncio +import logging +from typing import List + +import numpy as np +from autogen_core.models import ChatCompletionClient + +from src.schemas.area_schemas import Area +from src.schemas.capability_schemas import Capability +from src.utils.base_generation_prompts import ( + CAPABILITY_GENERATION_SYSTEM_PROMPT, + CAPABILITY_GENERATION_USER_PROMPT, +) +from src.utils.model_client_utils import ModelCallMode, async_call_model + + +logger = logging.getLogger(__name__) + + +def generate_capabilities( + area: Area, + num_capabilities: int, + num_capabilities_per_run: int, + client: ChatCompletionClient, +) -> List[Capability]: + """Generate capabilities for a given area. + + Args: + area: Area object + num_capabilities: Total number of capabilities to generate + num_capabilities_per_run: Number of capabilities per LLM call + client: ChatCompletionClient for API calls + + Returns + ------- + List of generated Capability objects + """ + capabilities = [] + + # Calculate number of runs needed + num_runs = int(np.ceil(num_capabilities / num_capabilities_per_run)) + + # Generate capabilities in batches + num_capabilities_left = num_capabilities + for run in range(num_runs): + logger.info(f"Capability generation for area: {area.area_name} at run {run}") + + run_capabilities = generate_capabilities_using_llm( + area=area, + num_capabilities=min(num_capabilities_per_run, num_capabilities_left), + client=client, + prev_capabilities=capabilities, + id_offset=len(capabilities), # Pass offset for unique IDs + ) + capabilities.extend(run_capabilities) + num_capabilities_left -= len(run_capabilities) + + return capabilities + + +def generate_capabilities_using_llm( + area: Area, + num_capabilities: int, + client: ChatCompletionClient, + prev_capabilities: List[Capability], + id_offset: int = 0, +) -> List[Capability]: + """Generate capabilities using LLM. + + Args: + area: Area object + num_capabilities: Number of capabilities to generate + client: ChatCompletionClient for API calls + prev_capabilities: Previously generated capabilities + id_offset: Offset for capability IDs to ensure uniqueness across batches + + Returns + ------- + List of generated Capability objects + """ + sys_prompt = CAPABILITY_GENERATION_SYSTEM_PROMPT + user_prompt = CAPABILITY_GENERATION_USER_PROMPT.format( + area=area.area_name, + domain=area.domain.domain_name, + num_capabilities=num_capabilities, + prev_capabilities="\n".join([elm.capability_name for elm in prev_capabilities]), + ) + + response = asyncio.run( + async_call_model( + client, + system_prompt=sys_prompt, + user_prompt=user_prompt, + mode=ModelCallMode.JSON_PARSE, + ) + ) + + gen_capabilities_dict = response.get("capabilities", []) + capabilities = [] + + for idx, capability_dict in enumerate(gen_capabilities_dict): + try: + capability_id = f"cap_{(idx + id_offset):03d}" + capability = Capability( + capability_name=capability_dict["name"], + capability_id=capability_id, + area=area, + capability_description=capability_dict["description"], + ) + except Exception as e: + logger.warning( + f"Error creating capability {capability_dict['name']}, skipping: {e}" + ) + continue + else: + capabilities.append(capability) + + if len(capabilities) != len(gen_capabilities_dict): + logger.warning( + f"Only {len(capabilities)} capabilities were created out of " + f"{len(gen_capabilities_dict)} generated capabilities." + ) + + # Truncate to requested number if LLM returned more + if len(capabilities) > num_capabilities: + logger.info( + f"LLM returned {len(capabilities)} capabilities, " + f"truncating to requested {num_capabilities}" + ) + capabilities = capabilities[:num_capabilities] + + logger.info(f"Generated {len(capabilities)} capabilities.") + + return capabilities diff --git a/src/base_stages/generate_diverse_tasks_pipeline.py b/src/base_stages/generate_diverse_tasks_pipeline.py new file mode 100644 index 0000000..b0de030 --- /dev/null +++ b/src/base_stages/generate_diverse_tasks_pipeline.py @@ -0,0 +1,67 @@ +"""Generate diverse tasks using multi-dimensional approach.""" + +import logging +from typing import List + +from autogen_core.models import ChatCompletionClient + +from src.base_stages.extract_subtopics import extract_subtopics +from src.base_stages.find_combinations import find_valid_combinations +from src.base_stages.generate_blueprints import generate_blueprints +from src.base_stages.generate_tasks_from_blueprints import ( + generate_tasks_from_blueprints, +) +from src.schemas.capability_schemas import Capability +from src.schemas.task_schemas import Task + + +logger = logging.getLogger(__name__) + + +def generate_diverse_tasks_for_capability( + capability: Capability, + tasks_per_blueprint: int, + client: ChatCompletionClient, + min_subtopics: int = 3, + max_subtopics: int = 8, +) -> List[Task]: + """Generate diverse tasks for a single capability. + + This function generates Task objects (questions with 4 options). The + correct answer is NOT determined here — that happens in Stage 4 + (Solution Generation) where an LLM solves each task. + + Args: + capability: Capability object + tasks_per_blueprint: Number of tasks to generate per blueprint + client: ChatCompletionClient for API calls + min_subtopics: Minimum number of subtopics to generate + max_subtopics: Maximum number of subtopics to generate + + Returns + ------- + List of Task objects (questions + options, no answers) + """ + logger.info( + f"Generating diverse tasks for capability: {capability.capability_name}" + ) + + logger.info("Step 1: Extracting sub-topics") + subtopics = extract_subtopics(capability, client, min_subtopics, max_subtopics) + logger.info(f"Extracted {len(subtopics)} sub-topics") + + logger.info("Step 2: Finding valid combinations") + combinations = find_valid_combinations(capability, subtopics, client) + logger.info(f"Found {len(combinations)} valid combinations") + + logger.info("Step 3: Generating blueprints") + blueprints = generate_blueprints(capability, combinations, client) + logger.info(f"Generated {len(blueprints)} blueprints") + + logger.info("Step 4: Generating tasks") + tasks = generate_tasks_from_blueprints( + capability, blueprints, client, tasks_per_blueprint + ) + logger.info(f"Generated {len(tasks)} tasks") + + return tasks diff --git a/src/base_stages/generate_tasks_from_blueprints.py b/src/base_stages/generate_tasks_from_blueprints.py new file mode 100644 index 0000000..f3a0f13 --- /dev/null +++ b/src/base_stages/generate_tasks_from_blueprints.py @@ -0,0 +1,146 @@ +"""Generate multiple-choice questions for each blueprint.""" + +import asyncio +import logging +from typing import List + +from autogen_core.models import ChatCompletionClient + +from src.base_stages.task_dataclasses import Blueprint +from src.schemas.task_schemas import Task +from src.utils.base_generation_prompts import ( + format_options_prompt, + format_question_prompt, +) +from src.utils.model_client_utils import ModelCallMode, async_call_model + + +logger = logging.getLogger(__name__) + + +def generate_tasks_from_blueprints( + capability, + blueprints: list[Blueprint], + client: ChatCompletionClient, + tasks_per_blueprint: int = 3, +) -> List[Task]: + """Generate multiple-choice questions for each blueprint. + + This function generates Task objects using a two-step process: + 1. Generate the question text + 2. Generate 4 options for the question + + The correct answer is NOT determined here — that happens in Stage 4 + (Solution Generation) where an LLM solves each task. + + Args: + capability: Capability object + blueprints: List of Blueprint objects + client: ChatCompletionClient for API calls + tasks_per_blueprint: Number of tasks to generate per blueprint + + Returns + ------- + List of Task objects (questions + options, no answers) + """ + logger.info("Generating tasks from blueprints...") + + all_tasks = [] + + for blueprint in blueprints: + logger.info( + f"Generating {tasks_per_blueprint} tasks for blueprint " + f"{blueprint.combination_id}: {blueprint.subtopic} | " + f"{blueprint.difficulty} | {blueprint.reasoning}" + ) + + for _j in range(tasks_per_blueprint): + task_id = f"task_{len(all_tasks):03d}" + + try: + # Step 1: Generate the question + logger.debug(f" {task_id}: Generating question...") + question_system, question_user = format_question_prompt( + capability_name=capability.capability_name, + capability_description=capability.capability_description, + capability_domain=capability.area.domain.domain_name, + capability_area=capability.area.area_name, + blueprint_description=blueprint.blueprint, + ) + + question_response = asyncio.run( + async_call_model( + client, + system_prompt=question_system, + user_prompt=question_user, + mode=ModelCallMode.JSON_PARSE, + ) + ) + + question_text = question_response["question"] + logger.debug(f" {task_id}: Question generated") + + # Step 2: Generate the options + logger.debug(f" {task_id}: Generating options...") + options_system, options_user = format_options_prompt( + capability_name=capability.capability_name, + capability_description=capability.capability_description, + capability_domain=capability.area.domain.domain_name, + capability_area=capability.area.area_name, + question=question_text, + ) + + options_response = asyncio.run( + async_call_model( + client, + system_prompt=options_system, + user_prompt=options_user, + mode=ModelCallMode.JSON_PARSE, + ) + ) + + options = options_response["options"] + logger.debug(f" {task_id}: Options generated") + + # Combine question and options into task text + task_text = f"{question_text}\n\n" + for choice_key, choice_text in options.items(): + task_text += f"{choice_key}. {choice_text}\n" + + choices_structured = [ + {"label": label, "solution": text} + for label, text in options.items() + ] + + task = Task( + task_id=task_id, + task=task_text, + task_type="multiple_choice", + solution_type="multiple_choice", + difficulty=blueprint.difficulty, + bloom_level=blueprint.reasoning, + choices=choices_structured, + capability=capability, + generation_metadata={ + "method": "diverse_task_generation", + "blueprint_id": blueprint.combination_id, + "blueprint": blueprint.blueprint, + "subtopic": blueprint.subtopic, + }, + ) + all_tasks.append(task) + + except Exception as e: + logger.error(f" Failed to generate {task_id}: {e}") + continue + + tasks_for_blueprint = [ + t + for t in all_tasks + if t.generation_metadata.get("blueprint_id") == blueprint.combination_id + ] + logger.info(f" Generated {len(tasks_for_blueprint)} tasks for this blueprint") + + logger.info(f"Generated {len(all_tasks)} total tasks") + + return all_tasks diff --git a/src/base_stages/solve_tasks.py b/src/base_stages/solve_tasks.py new file mode 100644 index 0000000..70efc40 --- /dev/null +++ b/src/base_stages/solve_tasks.py @@ -0,0 +1,116 @@ +"""Solve tasks and generate TaskSolution objects (Stage 4). + +This module takes Task objects (questions with options) and determines +the correct answer by having an LLM solve each task, creating TaskSolution +objects with the solution and reasoning. +""" + +import asyncio +import logging +from typing import List, Optional + +from autogen_core.models import ChatCompletionClient + +from src.schemas.solution_schemas import TaskSolution +from src.schemas.task_schemas import Task +from src.utils.base_generation_prompts import format_solution_prompt +from src.utils.model_client_utils import ModelCallMode, async_call_model + + +logger = logging.getLogger(__name__) + + +def solve_tasks( + tasks: List[Task], + client: ChatCompletionClient, +) -> List[TaskSolution]: + """Solve tasks and generate TaskSolution objects. + + For each task, this function has an LLM solve the multiple-choice + question and creates a TaskSolution with the solution and reasoning. + + The solver determines the correct answer by actually solving the problem, + not by looking up any pre-stored answer. + + Args: + tasks: List of Task objects to solve + client: ChatCompletionClient for API calls + + Returns + ------- + List of TaskSolution objects + """ + logger.info(f"Solving {len(tasks)} tasks...") + + task_solutions = [] + + for i, task in enumerate(tasks): + logger.info(f"Solving task {i + 1}/{len(tasks)}: {task.task_id}") + + try: + capability = task.capability + task_gen_metadata = task.generation_metadata or {} + + system_prompt, user_prompt = format_solution_prompt( + capability_domain=capability.area.domain.domain_name, + capability_area=capability.area.area_name, + capability_name=capability.capability_name, + capability_description=capability.capability_description, + task_text=task.task, + ) + + response = asyncio.run( + async_call_model( + client, + system_prompt=system_prompt, + user_prompt=user_prompt, + mode=ModelCallMode.JSON_PARSE, + ) + ) + + solution = response.get("answer", "") + reasoning = response.get("reasoning", "") + + generation_metadata = { + "method": "solve_tasks", + # Include original task generation metadata + "task_generation_metadata": task_gen_metadata, + } + + task_solution = TaskSolution( + task_id=task.task_id, + task=task.task, + solution=solution, + reasoning=reasoning, + task_obj=task, + generation_metadata=generation_metadata, + ) + task_solutions.append(task_solution) + + logger.info(f" Solved: answer={solution}") + + except Exception as e: + logger.error(f" Failed to solve {task.task_id}: {e}") + continue + + logger.info(f"Solved {len(task_solutions)}/{len(tasks)} tasks") + + return task_solutions + + +def solve_single_task( + task: Task, + client: ChatCompletionClient, +) -> Optional[TaskSolution]: + """Solve a single task and return a TaskSolution. + + Args: + task: Task object to solve + client: ChatCompletionClient for API calls + + Returns + ------- + TaskSolution object if successful, None otherwise + """ + result = solve_tasks(tasks=[task], client=client) + return result[0] if result else None diff --git a/src/base_stages/stage0_setup.py b/src/base_stages/stage0_setup.py new file mode 100644 index 0000000..56ba00d --- /dev/null +++ b/src/base_stages/stage0_setup.py @@ -0,0 +1,84 @@ +"""Stage 0: Experiment and domain setup. + +This stage initializes the experiment and creates domain metadata. +""" + +import logging +from pathlib import Path + +from omegaconf import DictConfig, OmegaConf + +from src.schemas.domain_schemas import Domain +from src.schemas.experiment_schemas import Experiment +from src.schemas.io_utils import save_domain, save_experiment +from src.schemas.metadata_schemas import PipelineMetadata +from src.utils.data_utils import check_cfg +from src.utils.timestamp_utils import iso_timestamp + + +logger = logging.getLogger(__name__) + + +def run_stage0(cfg: DictConfig) -> None: + """Stage 0: Experiment and domain setup. + + Creates experiment.json and domain/domain.json files. + + Args: + cfg: Configuration object + """ + check_cfg(cfg, logger) + exp_id = cfg.exp_cfg.exp_id + output_base_dir = Path(cfg.global_cfg.output_dir) + domain_name = cfg.global_cfg.domain + pipeline_type = cfg.global_cfg.pipeline_type + + logger.info( + "Stage 0: exp_id=%s | domain=%s | output_base_dir=%s | pipeline_type=%s", + exp_id, + domain_name, + output_base_dir, + pipeline_type, + ) + + domain_id = "domain_000" + domain_obj = Domain( + domain_name=domain_name, + domain_id=domain_id, + domain_description=None, + ) + + # Convert entire config to dictionary for experiment configuration + config_dict = OmegaConf.to_container(cfg, resolve=True) + + experiment_obj = Experiment( + experiment_id=exp_id, + domain=domain_name, + domain_id=domain_id, + pipeline_type=pipeline_type, + configuration=config_dict, + ) + + metadata = PipelineMetadata( + experiment_id=exp_id, + output_base_dir=str(output_base_dir), + timestamp=iso_timestamp(), + input_stage_tag=None, + output_stage_tag=None, + resume=False, + ) + + save_experiment( + experiment=experiment_obj, + metadata=metadata, + output_path=output_base_dir / exp_id / "experiment.json", + ) + save_domain( + domain=domain_obj, + metadata=metadata, + output_path=output_base_dir / exp_id / "domain" / "domain.json", + ) + + logger.info( + "Stage 0: saved experiment and domain artifacts under %s", output_base_dir + ) diff --git a/src/base_stages/stage1_areas.py b/src/base_stages/stage1_areas.py new file mode 100644 index 0000000..7976eda --- /dev/null +++ b/src/base_stages/stage1_areas.py @@ -0,0 +1,86 @@ +"""Stage 1: Area generation. + +This stage generates capability areas for the domain. +""" + +import logging +from pathlib import Path + +from omegaconf import DictConfig + +from src.base_stages.generate_areas import generate_areas +from src.schemas.io_utils import load_domain, save_areas +from src.schemas.metadata_schemas import PipelineMetadata +from src.utils import constants +from src.utils.model_client_utils import get_standard_model_client +from src.utils.timestamp_utils import iso_timestamp, timestamp_tag + + +logger = logging.getLogger(__name__) + + +def run_stage1(cfg: DictConfig) -> str: + """Stage 1: Generate capability areas. + + Args: + cfg: Configuration object + + Returns + ------- + The areas_tag for this generation + """ + experiment_id = cfg.exp_cfg.exp_id + output_base_dir = Path(cfg.global_cfg.output_dir) + + # Load domain from Stage 0 output + domain_path = output_base_dir / experiment_id / "domain" / "domain.json" + domain, _ = load_domain(domain_path) + + # Initialize scientist LLM client directly with generation parameters + scientist_llm_gen_cfg = dict(cfg.scientist_llm.generation_cfg.capability_generation) + scientist_llm_client = get_standard_model_client( + cfg.scientist_llm.name, + seed=scientist_llm_gen_cfg.get("seed", cfg.exp_cfg.seed), + temperature=scientist_llm_gen_cfg.get( + "temperature", constants.DEFAULT_TEMPERATURE + ), + max_tokens=scientist_llm_gen_cfg.get( + "max_tokens", constants.DEFAULT_MAX_TOKENS + ), + ) + + num_areas = cfg.areas_cfg.num_areas + logger.info( + f"Generating {num_areas} capability areas for domain: {domain.domain_name}" + ) + + areas = generate_areas( + domain=domain, + num_areas=num_areas, + num_capabilities_per_area=cfg.capabilities_cfg.num_capabilities // num_areas, + client=scientist_llm_client, + ) + + # Convert area names to Area objects + if len(areas) > num_areas: + logger.warning( + f"Generated {len(areas)} areas, but only {num_areas} are needed. " + f"Keeping the first {num_areas} areas." + ) + areas = areas[:num_areas] + + # Save areas + areas_tag = timestamp_tag() + areas_path = output_base_dir / experiment_id / "areas" / areas_tag / "areas.json" + metadata = PipelineMetadata( + experiment_id=experiment_id, + output_base_dir=str(output_base_dir), + timestamp=iso_timestamp(), + input_stage_tag=None, + output_stage_tag=areas_tag, + resume=False, + ) + save_areas(areas, metadata, areas_path) + + logger.info(f"Stage 1: saved {len(areas)} areas to {areas_path}") + return areas_tag diff --git a/src/base_stages/stage2_capabilities.py b/src/base_stages/stage2_capabilities.py new file mode 100644 index 0000000..f7dd1d0 --- /dev/null +++ b/src/base_stages/stage2_capabilities.py @@ -0,0 +1,174 @@ +"""Stage 2: Capability generation and filtering. + +This stage generates capabilities for each area, embeds them, and filters +by similarity to remove duplicates. +""" + +import logging +import math +from pathlib import Path + +from omegaconf import DictConfig + +from src.base_stages.generate_capabilities import generate_capabilities +from src.schemas.io_utils import load_areas, save_capabilities +from src.schemas.metadata_schemas import PipelineMetadata +from src.utils import constants +from src.utils.capability_management_utils import ( + filter_schema_capabilities_by_embeddings, +) +from src.utils.embedding_utils import generate_capability_embeddings +from src.utils.model_client_utils import get_standard_model_client +from src.utils.timestamp_utils import iso_timestamp, timestamp_tag + + +logger = logging.getLogger(__name__) + + +def run_stage2( + cfg: DictConfig, + areas_tag: str, + capabilities_tag: str = None, +) -> str: + """Stage 2: Generate capabilities, embed, and filter. + + Args: + cfg: Configuration object + areas_tag: Tag from Stage 1 to load areas from + capabilities_tag: Optional resume tag + + Returns + ------- + The capabilities_tag for this generation + """ + experiment_id = cfg.exp_cfg.exp_id + output_base_dir = Path(cfg.global_cfg.output_dir) + + # Load areas from Stage 1 output + areas_path = output_base_dir / experiment_id / "areas" / areas_tag / "areas.json" + areas, _ = load_areas(areas_path) + logger.info(f"Loaded {len(areas)} area(s) from Stage 1") + + # Initialize scientist LLM client directly with generation parameters + scientist_llm_gen_cfg = dict(cfg.scientist_llm.generation_cfg.capability_generation) + scientist_llm_client = get_standard_model_client( + cfg.scientist_llm.name, + seed=scientist_llm_gen_cfg.get("seed", cfg.exp_cfg.seed), + temperature=scientist_llm_gen_cfg.get( + "temperature", constants.DEFAULT_TEMPERATURE + ), + max_tokens=scientist_llm_gen_cfg.get( + "max_tokens", constants.DEFAULT_MAX_TOKENS + ), + ) + + # Determine capabilities tag (resume or new) + is_resume = capabilities_tag is not None + if is_resume: + logger.info(f"Resuming Stage 2 with capabilities_tag: {capabilities_tag}") + else: + capabilities_tag = timestamp_tag() + logger.info(f"Starting new Stage 2 with capabilities_tag: {capabilities_tag}") + + # Calculate target capabilities per area + target_num_capabilities_per_area = math.ceil( + cfg.capabilities_cfg.num_capabilities / len(areas) + ) + num_capabilities_per_area = int( + target_num_capabilities_per_area + * (1 + cfg.capabilities_cfg.num_capabilities_buffer) + ) + + # Process each area + for area in areas: + # Check if capabilities already exist for this area (resume logic) + capabilities_path = ( + output_base_dir + / experiment_id + / "capabilities" + / capabilities_tag + / area.area_id + / "capabilities.json" + ) + + if is_resume and capabilities_path.exists(): + logger.info( + f"Skipping area {area.area_name} ({area.area_id}) - " + f"capabilities already exist at {capabilities_path}" + ) + continue + + logger.info( + f"Generating {num_capabilities_per_area} capabilities for area: " + f"{area.area_name} ({area.area_id}) [target: {target_num_capabilities_per_area}]" + ) + + # Generate capabilities + capabilities = generate_capabilities( + area=area, + num_capabilities=num_capabilities_per_area, + num_capabilities_per_run=cfg.capabilities_cfg.num_gen_capabilities_per_run, + client=scientist_llm_client, + ) + + # Sort capabilities + capabilities = sorted(capabilities, key=lambda x: x.capability_name) + if len(capabilities) < target_num_capabilities_per_area: + logger.warning( + f"Only {len(capabilities)} capabilities were created. " + f"Target number not reached: {target_num_capabilities_per_area}. " + "It is recommended to increase the buffer." + ) + + # Skip embedding/filtering if no capabilities were generated + if not capabilities: + logger.warning( + f"No capabilities generated for area {area.area_name}. Skipping." + ) + continue + + # Generate embeddings for capabilities + embeddings = generate_capability_embeddings( + capabilities=capabilities, + embedding_model_name=cfg.embedding_cfg.embedding_model, + embed_dimensions=cfg.embedding_cfg.embedding_size, + ) + + # Filter capabilities based on embedding similarity + filtered_capabilities, retained_indices = ( + filter_schema_capabilities_by_embeddings( + capabilities=capabilities, + embeddings=embeddings, + similarity_threshold=cfg.embedding_cfg.filtering_similarity_threshold, + ) + ) + + logger.info( + f"Capabilities retained after filtering: " + f"{len(filtered_capabilities)}/{len(capabilities)}" + ) + + for idx, cap in enumerate(filtered_capabilities): + cap.generation_metadata = { + "embedding_model": cfg.embedding_cfg.embedding_model, + "similarity_threshold": cfg.embedding_cfg.filtering_similarity_threshold, + "original_index": idx, + } + + # Save capabilities for this area + metadata = PipelineMetadata( + experiment_id=experiment_id, + output_base_dir=str(output_base_dir), + timestamp=iso_timestamp(), + input_stage_tag=areas_tag, + output_stage_tag=capabilities_tag, + resume=is_resume, + ) + + save_capabilities(filtered_capabilities, metadata, capabilities_path) + logger.info( + f"Stage 2: saved {len(filtered_capabilities)} capabilities to " + f"{capabilities_path}" + ) + + return capabilities_tag diff --git a/src/base_stages/stage3_tasks.py b/src/base_stages/stage3_tasks.py new file mode 100644 index 0000000..4e042c5 --- /dev/null +++ b/src/base_stages/stage3_tasks.py @@ -0,0 +1,162 @@ +"""Stage 3: Task generation. + +This stage generates tasks (questions with options) for each capability. +The correct answer is NOT determined here — that happens in Stage 4. +""" + +import logging +from pathlib import Path + +from omegaconf import DictConfig + +from src.base_stages.generate_diverse_tasks_pipeline import ( + generate_diverse_tasks_for_capability, +) +from src.schemas.io_utils import load_capabilities, save_tasks +from src.schemas.metadata_schemas import PipelineMetadata +from src.utils import constants +from src.utils.model_client_utils import get_standard_model_client +from src.utils.timestamp_utils import iso_timestamp, timestamp_tag + + +logger = logging.getLogger(__name__) + + +def run_stage3( + cfg: DictConfig, + capabilities_tag: str, + tasks_tag: str = None, +) -> str: + """Stage 3: Generate tasks for each capability. + + This stage generates Task objects (questions with options) using a + two-step process: + 1. Generate the question text + 2. Generate 4 options for the question + + The correct answer is NOT determined here — that happens in Stage 4 + (Solution Generation) where an LLM solves each task. + + Args: + cfg: Configuration object + capabilities_tag: Tag from Stage 2 to load capabilities from + tasks_tag: Optional resume tag + + Returns + ------- + The tasks_tag for this generation + """ + experiment_id = cfg.exp_cfg.exp_id + output_base_dir = Path(cfg.global_cfg.output_dir) + + # Determine tasks tag (resume or new) + is_resume = tasks_tag is not None + if is_resume: + logger.info(f"Resuming Stage 3 with tasks_tag: {tasks_tag}") + else: + tasks_tag = timestamp_tag() + logger.info(f"Starting new Stage 3 with tasks_tag: {tasks_tag}") + + # Initialize scientist LLM client using task_generation config + scientist_llm_gen_cfg = dict(cfg.scientist_llm.generation_cfg.task_generation) + scientist_llm_client = get_standard_model_client( + cfg.scientist_llm.name, + seed=scientist_llm_gen_cfg.get("seed", cfg.exp_cfg.seed), + temperature=scientist_llm_gen_cfg.get( + "temperature", constants.DEFAULT_TEMPERATURE + ), + max_tokens=scientist_llm_gen_cfg.get( + "max_tokens", constants.DEFAULT_MAX_TOKENS + ), + ) + + # Get task generation parameters from config + tasks_per_blueprint = cfg.task_generation_cfg.get("tasks_per_blueprint", 3) + min_subtopics = cfg.task_generation_cfg.get("min_subtopics", 3) + max_subtopics = cfg.task_generation_cfg.get("max_subtopics", 8) + + # Find all area directories under capabilities// + capabilities_base_dir = ( + output_base_dir / experiment_id / "capabilities" / capabilities_tag + ) + area_dirs = [d for d in capabilities_base_dir.iterdir() if d.is_dir()] + + logger.info(f"Found {len(area_dirs)} area directories") + + # Process each area + for area_dir in area_dirs: + area_id = area_dir.name + logger.info(f"Processing area: {area_id}") + + # Load capabilities for this area + capabilities_path = area_dir / "capabilities.json" + capabilities, _ = load_capabilities(capabilities_path) + logger.info(f"Loaded {len(capabilities)} capabilities from {area_id}") + + # Process each capability + for capability in capabilities: + capability_id = capability.capability_id + + # Check if tasks already exist for this capability (resume logic) + tasks_path = ( + output_base_dir + / experiment_id + / "tasks" + / tasks_tag + / area_id + / capability_id + / "tasks.json" + ) + + if is_resume and tasks_path.exists(): + logger.info( + f"Skipping {area_id}/{capability_id} - " + f"tasks already exist at {tasks_path}" + ) + continue + + logger.info( + f"Generating tasks for capability: {capability.capability_name} " + f"({area_id}/{capability_id})" + ) + + try: + # Generate diverse tasks + tasks = generate_diverse_tasks_for_capability( + capability=capability, + tasks_per_blueprint=tasks_per_blueprint, + client=scientist_llm_client, + min_subtopics=min_subtopics, + max_subtopics=max_subtopics, + ) + + logger.info( + f"Generated {len(tasks)} tasks for {capability.capability_name}" + ) + + # Save tasks + metadata = PipelineMetadata( + experiment_id=experiment_id, + output_base_dir=str(output_base_dir), + timestamp=iso_timestamp(), + input_stage_tag=capabilities_tag, + output_stage_tag=tasks_tag, + resume=is_resume, + ) + + save_tasks(tasks, metadata, tasks_path) + + logger.info( + f"Stage 3: saved {len(tasks)} tasks to " + f"tasks/{tasks_tag}/{area_id}/{capability_id}/tasks.json" + ) + + except Exception as e: + logger.error( + f"Error generating tasks for {area_id}/{capability_id}: {e}", + exc_info=True, + ) + # Continue with next capability instead of failing completely + continue + + return tasks_tag diff --git a/src/base_stages/stage4_solutions.py b/src/base_stages/stage4_solutions.py new file mode 100644 index 0000000..1cc80ba --- /dev/null +++ b/src/base_stages/stage4_solutions.py @@ -0,0 +1,159 @@ +"""Stage 4: Solution generation. + +This stage solves tasks by having an LLM determine the correct answer +for each multiple-choice question. +""" + +import logging +from pathlib import Path + +from omegaconf import DictConfig + +from src.base_stages.solve_tasks import solve_tasks +from src.schemas.io_utils import load_tasks, save_solution +from src.schemas.metadata_schemas import PipelineMetadata +from src.utils import constants +from src.utils.model_client_utils import get_standard_model_client +from src.utils.timestamp_utils import iso_timestamp, timestamp_tag + + +logger = logging.getLogger(__name__) + + +def run_stage4( + cfg: DictConfig, + tasks_tag: str, + solution_tag: str = None, +) -> str: + """Stage 4: Generate solutions for tasks. + + This stage takes Task objects and determines the correct answer by + having an LLM solve each task. + + Args: + cfg: Configuration object + tasks_tag: Tag from Stage 3 to load tasks from + solution_tag: Optional resume tag + + Returns + ------- + The solution_tag for this generation + """ + experiment_id = cfg.exp_cfg.exp_id + output_base_dir = Path(cfg.global_cfg.output_dir) + + # Determine solution tag (resume or new) + is_resume = solution_tag is not None + if is_resume: + logger.info(f"Resuming Stage 4 with solution_tag: {solution_tag}") + else: + solution_tag = timestamp_tag() + logger.info(f"Starting new Stage 4 with solution_tag: {solution_tag}") + + # Initialize solver LLM client using task_solve config + solver_llm_gen_cfg = dict(cfg.scientist_llm.generation_cfg.task_solve) + solver_llm_client = get_standard_model_client( + cfg.scientist_llm.name, + seed=solver_llm_gen_cfg.get("seed", cfg.exp_cfg.seed), + temperature=solver_llm_gen_cfg.get( + "temperature", constants.DEFAULT_TEMPERATURE + ), + max_tokens=solver_llm_gen_cfg.get("max_tokens", constants.DEFAULT_MAX_TOKENS), + ) + + # Find all task directories under tasks// + tasks_base_dir = output_base_dir / experiment_id / "tasks" / tasks_tag + + if not tasks_base_dir.exists(): + logger.error(f"Tasks directory not found: {tasks_base_dir}") + return solution_tag + + area_dirs = [d for d in tasks_base_dir.iterdir() if d.is_dir()] + logger.info(f"Found {len(area_dirs)} area directories") + + # Process each area + for area_dir in area_dirs: + area_id = area_dir.name + logger.info(f"Processing area: {area_id}") + + # Find all capability directories + capability_dirs = [d for d in area_dir.iterdir() if d.is_dir()] + + for capability_dir in capability_dirs: + capability_id = capability_dir.name + + # Check if solutions already exist for this capability (resume logic) + solutions_dir = ( + output_base_dir + / experiment_id + / "solutions" + / solution_tag + / area_id + / capability_id + ) + + if ( + is_resume + and solutions_dir.exists() + and any(solutions_dir.glob("*/solution.json")) + ): + logger.info( + f"Skipping {area_id}/{capability_id} - " + f"solutions already exist at {solutions_dir}" + ) + continue + + # Load tasks for this capability + tasks_path = capability_dir / "tasks.json" + if not tasks_path.exists(): + logger.warning(f"No tasks found at {tasks_path}") + continue + + tasks, _ = load_tasks(tasks_path) + logger.info(f"Loaded {len(tasks)} tasks for {area_id}/{capability_id}") + + try: + # Solve tasks + task_solutions = solve_tasks(tasks=tasks, client=solver_llm_client) + + logger.info( + f"Generated {len(task_solutions)} solutions for " + f"{area_id}/{capability_id}" + ) + + # Save each solution + metadata = PipelineMetadata( + experiment_id=experiment_id, + output_base_dir=str(output_base_dir), + timestamp=iso_timestamp(), + input_stage_tag=tasks_tag, + output_stage_tag=solution_tag, + resume=is_resume, + ) + + for task_solution in task_solutions: + solution_path = ( + output_base_dir + / experiment_id + / "solutions" + / solution_tag + / area_id + / capability_id + / task_solution.task_id + / "solution.json" + ) + save_solution(task_solution, metadata, solution_path) + + logger.info( + f"Stage 4: saved {len(task_solutions)} solutions to " + f"solutions/{solution_tag}/{area_id}/{capability_id}/" + ) + + except Exception as e: + logger.error( + f"Error solving tasks for {area_id}/{capability_id}: {e}", + exc_info=True, + ) + continue + + return solution_tag diff --git a/src/base_stages/stage5_validation.py b/src/base_stages/stage5_validation.py new file mode 100644 index 0000000..7b33715 --- /dev/null +++ b/src/base_stages/stage5_validation.py @@ -0,0 +1,168 @@ +"""Stage 5: Task validation. + +This stage validates generated task solutions to ensure they are correct +and align with the task requirements. +""" + +import logging +from pathlib import Path + +from omegaconf import DictConfig + +from src.base_stages.validate_tasks import validate_tasks +from src.schemas.io_utils import load_solution, save_validation +from src.schemas.metadata_schemas import PipelineMetadata +from src.utils import constants +from src.utils.model_client_utils import get_standard_model_client +from src.utils.timestamp_utils import iso_timestamp, timestamp_tag + + +logger = logging.getLogger(__name__) + + +def run_stage5( + cfg: DictConfig, + solution_tag: str, + validation_tag: str = None, +) -> str: + """Stage 5: Validate generated task solutions. + + Args: + cfg: Configuration object + solution_tag: Tag from Stage 4 to load solutions from + validation_tag: Optional resume tag + + Returns + ------- + The validation_tag for this validation + """ + experiment_id = cfg.exp_cfg.exp_id + output_base_dir = Path(cfg.global_cfg.output_dir) + + # Determine validation tag (resume or new) + is_resume = validation_tag is not None + if is_resume: + logger.info(f"Resuming Stage 5 with validation_tag: {validation_tag}") + else: + validation_tag = timestamp_tag() + logger.info(f"Starting new Stage 5 with validation_tag: {validation_tag}") + + # Initialize validator LLM client using task_verify config + validator_llm_gen_cfg = dict(cfg.scientist_llm.generation_cfg.task_verify) + validator_llm_client = get_standard_model_client( + cfg.scientist_llm.name, + seed=validator_llm_gen_cfg.get("seed", cfg.exp_cfg.seed), + temperature=validator_llm_gen_cfg.get( + "temperature", constants.DEFAULT_TEMPERATURE + ), + max_tokens=validator_llm_gen_cfg.get( + "max_tokens", constants.DEFAULT_MAX_TOKENS + ), + ) + + # Find all solutions directories + solutions_base_dir = output_base_dir / experiment_id / "solutions" / solution_tag + + if not solutions_base_dir.exists(): + logger.error(f"Solutions directory not found: {solutions_base_dir}") + return validation_tag + + # Find all area directories + area_dirs = [d for d in solutions_base_dir.iterdir() if d.is_dir()] + logger.info(f"Found {len(area_dirs)} area directories") + + # Process each area + for area_dir in area_dirs: + area_id = area_dir.name + logger.info(f"Processing area: {area_id}") + + # Find all capability directories + capability_dirs = [d for d in area_dir.iterdir() if d.is_dir()] + + for capability_dir in capability_dirs: + capability_id = capability_dir.name + + # Check if validation already exists for this capability (resume logic) + validation_cap_dir = ( + output_base_dir + / experiment_id + / "validation" + / validation_tag + / area_id + / capability_id + ) + + if is_resume and validation_cap_dir.exists(): + existing_validations = list( + validation_cap_dir.glob("*/validation.json") + ) + if existing_validations: + logger.info( + f"Skipping {area_id}/{capability_id} - " + f"{len(existing_validations)} validations already exist" + ) + continue + + # Find all task solution directories (each task has its own directory) + task_dirs = [d for d in capability_dir.iterdir() if d.is_dir()] + + if not task_dirs: + logger.warning(f"No solutions found in {area_id}/{capability_id}") + continue + + logger.info( + f"Validating {len(task_dirs)} solutions for {area_id}/{capability_id}" + ) + + # Load all task solutions for this capability + task_solutions = [] + + for task_dir in task_dirs: + solution_file = task_dir / "solution.json" + if solution_file.exists(): + task_solution, _ = load_solution(solution_file) + task_solutions.append(task_solution) + + if not task_solutions: + logger.warning( + f"No valid solutions loaded for {area_id}/{capability_id}" + ) + continue + + # Validate all tasks for this capability + try: + validation_results = validate_tasks( + task_solutions=task_solutions, + client=validator_llm_client, + ) + + # Save individual validation results + for validation_result in validation_results: + task_id = validation_result.task_id + validation_path = validation_cap_dir / task_id / "validation.json" + + metadata = PipelineMetadata( + experiment_id=experiment_id, + output_base_dir=str(output_base_dir), + timestamp=iso_timestamp(), + input_stage_tag=solution_tag, + output_stage_tag=validation_tag, + resume=is_resume, + ) + + save_validation(validation_result, metadata, validation_path) + + logger.info( + f"Validated {area_id}/{capability_id}: " + f"{len(validation_results)} task(s) validated" + ) + + except Exception as e: + logger.error( + f"Error validating tasks for {area_id}/{capability_id}: {e}", + exc_info=True, + ) + continue + + logger.info(f"Stage 5 completed. Validation tag: {validation_tag}") + return validation_tag diff --git a/experimental/diverse_task_constants.py b/src/base_stages/task_constants.py similarity index 100% rename from experimental/diverse_task_constants.py rename to src/base_stages/task_constants.py diff --git a/experimental/diverse_task_dataclasses.py b/src/base_stages/task_dataclasses.py similarity index 82% rename from experimental/diverse_task_dataclasses.py rename to src/base_stages/task_dataclasses.py index b03aa15..57483f7 100644 --- a/experimental/diverse_task_dataclasses.py +++ b/src/base_stages/task_dataclasses.py @@ -4,17 +4,6 @@ from typing import Dict, List, Optional -@dataclass -class Capability: - """Represents a capability to be tested.""" - - name: str - description: str - domain: str - area: Optional[str] = None - example_tasks: List[Dict] = field(default_factory=list) - - @dataclass class SubTopic: """Represents a sub-topic within a capability.""" @@ -48,8 +37,8 @@ class Blueprint: @dataclass -class Task: - """Represents a generated multiple-choice task.""" +class GeneratedTask: + """Represents a generated multiple-choice task (internal use).""" task_id: str blueprint_id: int diff --git a/src/base_stages/validate_tasks.py b/src/base_stages/validate_tasks.py new file mode 100644 index 0000000..420dcb0 --- /dev/null +++ b/src/base_stages/validate_tasks.py @@ -0,0 +1,133 @@ +"""Validate that generated tasks align with intended dimensions.""" + +import asyncio +import logging + +from autogen_core.models import ChatCompletionClient + +from src.schemas.solution_schemas import TaskSolution +from src.schemas.validation_schemas import ValidationResult +from src.utils.base_generation_prompts import format_verification_prompt +from src.utils.model_client_utils import ModelCallMode, async_call_model + + +logger = logging.getLogger(__name__) + + +def validate_tasks( + task_solutions: list[TaskSolution], + client: ChatCompletionClient, +) -> list[ValidationResult]: + """Validate that generated tasks align with intended dimensions. + + Args: + task_solutions: List of TaskSolution objects + client: ChatCompletionClient for API calls + + Returns + ------- + List of ValidationResult objects + """ + logger.info("Validating task alignment...") + + validation_results = [] + + for i, task_solution in enumerate(task_solutions): + logger.info( + f"Validating task {i + 1}/{len(task_solutions)}: {task_solution.task_id}" + ) + capability = task_solution.task_obj.capability + + try: + task_obj = task_solution.task_obj + + # Use structured fields from Task if available, fallback to metadata/parsing + blueprint_text = "N/A" + if task_obj.difficulty and task_obj.bloom_level: + blueprint_text = ( + f"Difficulty: {task_obj.difficulty}, " + f"Bloom's Level: {task_obj.bloom_level}" + ) + elif task_solution.generation_metadata: + blueprint_text = task_solution.generation_metadata.get( + "blueprint", "N/A" + ) + + # Extract question (first part before choices) + task_lines = task_solution.task.strip().split("\n") + question = task_lines[0] if task_lines else task_solution.task + + # Use structured choices if available, otherwise parse from text + choices = {} + if task_obj.choices: + for choice in task_obj.choices: + choices[choice["label"]] = choice["solution"] + else: + # Fallback: parse from task text + for task_line in task_lines[1:]: + line = task_line.strip() + if line and len(line) > 2 and line[1] == ".": + choice_letter = line[0] + choice_text = line[3:].strip() + choices[choice_letter] = choice_text + + system_prompt, user_prompt = format_verification_prompt( + capability_domain=capability.area.domain.domain_name, + capability_area=capability.area.area_name, + capability_name=capability.capability_name, + capability_description=capability.capability_description, + task_blueprint=blueprint_text, + question=question, + option_a=choices.get("A", ""), + option_b=choices.get("B", ""), + option_c=choices.get("C", ""), + option_d=choices.get("D", ""), + correct_answer=task_solution.solution, + ) + + response = asyncio.run( + async_call_model( + client, + system_prompt=system_prompt, + user_prompt=user_prompt, + mode=ModelCallMode.JSON_PARSE, + ) + ) + + overall_aligned = response.get("overall_verdict", "Fail") == "Pass" + + validation_result = ValidationResult( + task_id=task_solution.task_id, + task=task_solution.task, + task_solution=task_solution, + verification=overall_aligned, + feedback=response.get("explanation", ""), + generation_metadata={ + "method": "validate_tasks", + "subtopic_aligned": response.get("blueprint_alignment", "No") + == "Yes", + "difficulty_aligned": response.get( + "difficulty_reasoning_match", "No" + ) + == "Yes", + "reasoning_aligned": response.get("capability_alignment", "No") + == "Yes", + "choices_appropriate": response.get("single_correct_answer", "No") + == "Yes", + "suggested_improvements": response.get( + "suggested_improvements", "" + ), + **task_solution.generation_metadata, + }, + ) + validation_results.append(validation_result) + + status = "PASS" if overall_aligned else "FAIL" + logger.info(f" {status}") + + except Exception as e: + logger.error(f"Error validating {task_solution.task_id}: {e}") + logger.info("ERROR - Skipping this task") + continue + + return validation_results diff --git a/src/capability.py b/src/capability.py index 8443109..b6a7dc3 100644 --- a/src/capability.py +++ b/src/capability.py @@ -20,7 +20,6 @@ from src.utils.capability_utils import ( parse_python_class_str, read_score_inspect_json, - run_inspect_evals, ) from src.utils.data_utils import ( list_dir, @@ -1041,11 +1040,10 @@ def _evaluate_using_inspect(self, subject_llm: Model, **kwargs: Any) -> None: return os.makedirs(log_dir, exist_ok=True) - run_inspect_evals( - path=self.name, - model=subject_llm, - log_dir=log_dir, - **kwargs, + # TODO: run_inspect_evals was removed - this legacy method needs updating + raise NotImplementedError( + "run_inspect_evals function was removed. " + "Use the new evaluation pipeline instead." ) # Transfer the logs to the GCP bucket diff --git a/src/cfg/run_cfg.yaml b/src/cfg/run_cfg.yaml index ac1fcb1..7924a76 100644 --- a/src/cfg/run_cfg.yaml +++ b/src/cfg/run_cfg.yaml @@ -58,17 +58,30 @@ subject_llm: prompt_cfg: sys_msg: Complete the given task to the best of your ability. +# Diverse task generation configuration (Stage 3) +task_generation_cfg: + tasks_per_blueprint: 1 # Number of tasks to generate per blueprint + min_subtopics: 1 # Suggested minimum number of sub-topics + max_subtopics: 1 # Suggested maximum number of sub-topics + +# Task verification configuration (Stage 5) +task_verification_cfg: + pass_threshold: 0.8 # Minimum pass rate to consider successful + strict_mode: false # If true, all alignment criteria must pass + +# Area generation configuration (Stage 1) +areas_cfg: + num_areas: 2 # Number of areas to generate + +# Capability generation configuration (Stage 2) capabilities_cfg: - capabilities_dir: /fs01/projects/aieng/public/ace/artifacts + capabilities_dir: ./ace-output/ results_dir: gs://ace-artifacts inspect_evals_dir: /fs01/projects/aieng/public/ace/inspect_evals/src/ace_evals - domain: math - method: "hierarchical" num_seed_capabilities: 1 - num_gen_capabilities: 100 - num_gen_capabilities_buffer: 0.0 - num_capability_areas: 10 - num_gen_capabilities_per_run: 5 + num_capabilities: 4 + num_capabilities_buffer: 0.5 # Raised from 0.1 to compensate for filtering + num_gen_capabilities_per_run: 1 # Raised from 1 for more diversity per batch num_gen_tasks_per_capability: 100 num_gen_tasks_buffer: 0.0 task_gen_few_shot: false @@ -91,7 +104,7 @@ lbo_cfg: embedding_cfg: embedding_model: "text-embedding-3-small" embedding_size: 256 - filtering_similarity_threshold: 0.7 + filtering_similarity_threshold: 0.85 # Raised from 0.7 to keep more diverse capabilities dimensionality_reduction_cfg: reduce_dimensionality_method: "pca" @@ -102,7 +115,25 @@ dimensionality_reduction_cfg: exp_cfg: seed: 37 trial_run: false - exp_id: "" + exp_id: "test_exp" + +# Stage control +stage: "all" # Which stage to run: 0, 1, 2, 3, 4, 5, or "all" +areas_tag: null # Areas tag from Stage 1 (required for stage 2 standalone) +capabilities_tag: null # Capabilities tag from Stage 2 (required for stage 3 standalone) +tasks_tag: null # Tasks tag from Stage 3 (required for stage 4 standalone) +solution_tag: null # Solution tag from Stage 4 (required for stage 5 standalone) +validation_tag: null # Validation tag from Stage 5 (optional for resume) + +# Debug settings +use_langchain: false # Set to false for easier debugging (disables LangChain features) + +# Global configuration + +global_cfg: + domain: personal finance + output_dir: base_output/ #Base output directory for all agentic outputs + pipeline_type: base defaults: - _self_ diff --git a/src/generate_capabilities.py b/src/generate_capabilities.py deleted file mode 100644 index 9dc954c..0000000 --- a/src/generate_capabilities.py +++ /dev/null @@ -1,468 +0,0 @@ -"""Generate capabilities using the scientist LLM.""" - -import json -import logging -import os -import shutil -from typing import Any, Dict, List, Optional, Union - -import numpy as np -from langsmith import tracing_context -from tenacity import Retrying, stop_after_attempt - -from src.capability import Capability -from src.model import Model -from src.utils import constants, prompts -from src.utils.capability_management_utils import ( - _sample_seed_capabilities, - get_previous_capabilities, -) -from src.utils.capability_utils import extract_and_parse_response - - -logger = logging.getLogger(__name__) - - -def generate_capability_areas( - domain: str, - num_areas: int, - num_capabilities_per_area: int, - scientist_llm: Model, - user_prompt: str, - scientist_llm_gen_cfg: Dict[str, Any], - sys_prompt: Union[str, None] = None, - **kwargs: Any, -) -> Dict[str, Any]: - """ - Generate capability areas for the specified domain. - - Args - ---- - domain (str): The domain name. - num_areas (int): The number of capability areas to generate. - num_capabilities_per_area (int): The number of capabilities per area. - scientist_llm (Model): The scientist LLM model. - user_prompt (str): The user prompt for generating capability areas. - scientist_llm_gen_cfg (Dict[str, Any]): The generation configuration - for the scientist LLM. - sys_prompt (str | None): The system prompt for the scientist LLM. - **kwargs (Any): Additional keyword arguments. - - Returns - ------- - Dict[str, Any]: A dictionary containing the generated capability areas - and metadata about the generation process. - """ - logger.info(f"Generating {num_areas} capability areas ...") - # Generate output using the model with specified generation arguments - user_prompt = user_prompt.format( - num_areas=num_areas, - num_capabilities_per_area=num_capabilities_per_area, - domain=domain, - response_json_format=prompts.CAPABILITY_AREAS_GENERATION_RESPONSE_JSON_FORMAT, - ) - with tracing_context( - enabled=True, - tags=["generate_capability_areas"], - metadata={ - "ls_provider": scientist_llm.model_provider, - "ls_model_name": scientist_llm.get_model_name(with_provider=False), - "ls_model_type": "chat", - "exp_id": kwargs.get("run_id"), - "domain": domain, - "num_areas": num_areas, - **{f"ls_{k}": v for k, v in scientist_llm_gen_cfg.items()}, - }, - ): - response, metadata = scientist_llm.generate( - sys_prompt=sys_prompt if sys_prompt else "", - user_prompt=user_prompt, - generation_config=scientist_llm_gen_cfg, - ) - - parsed_response = extract_and_parse_response(response, has_thought=False) - capability_areas = parsed_response["parsed_response"] - - logger.info( - f"Capability areas generation tokens summary:\n{json.dumps(metadata, indent=4)}" - ) - - if len(capability_areas) > num_areas: - logger.warning( - f"Generated {len(capability_areas)} capability areas, but only {num_areas} are needed. " - + f"Keeping the first {num_areas} areas." - ) - capability_areas = capability_areas[:num_areas] - - logger.info(f"Generated capability areas:\n{capability_areas}") - - return { - "capability_areas": capability_areas, - "metadata": { - "model": scientist_llm.get_model_name(), - "api_metadata": metadata, - }, - } - - -def generate_capabilities( - domain: str, - num_capabilities: int, - num_capabilities_per_run: int, - base_capability_dir: str, - scientist_llm: Model, - num_seed_capabilities: int, - scientist_llm_gen_cfg: Dict[str, Any], - method: str = "flat", - include_seed_capability_names: Optional[List[str]] = None, - exclude_seed_capability_names: Optional[List[str]] = None, - **kwargs: Any, -) -> List[Capability]: - """ - Generate initial capabilities for the specified domain. - - Args - ---- - domain (str): The domain name. - num_capabilities (int): The number of capabilities to generate. - num_capabilities_per_run (int): The number of capabilities to generate per run. - base_capability_dir (str): The base directory to store - the generated capabilities for the specified domain. - scientist_llm (Model): The scientist LLM model. - num_seed_capabilities (int): The number of seed capabilities to use. - scientist_llm_gen_cfg (Dict[str, Any]): The generation configuration - for the scientist LLM. - method (str): The method to use for generating capabilities. - Choose from "flat" or "hierarchical". - include_seed_capability_names (List[str] | None): A list of seed capability - names to include in the generation process. - exclude_seed_capability_names (List[str] | None): A list of seed capability - names to exclude from the generation process. - **kwargs (Any): Additional keyword arguments. - - Returns - ------- - List[Capability]: The generated capabilities. - """ - gen_capabilities = [] - run_metadata = [] - - if method == "hierarchical": - assert "num_capability_areas" in kwargs, ( - "`num_capability_areas` should be specified for hierarchical generation." - ) - num_capability_areas = kwargs["num_capability_areas"] - assert num_capabilities >= num_capability_areas, ( - "Number of capabilities should be greater than or equal to the number of capability areas, " - + "so that each area can have at least one capability." - ) - # Uniformly distribute num_capabilities across num_capability_areas - num_capabilities_per_area = [ - num_capabilities // num_capability_areas - ] * num_capability_areas - for i in range(num_capabilities % num_capability_areas): - num_capabilities_per_area[i] += 1 - num_runs = [ - int(np.ceil(num / num_capabilities_per_run)) - for num in num_capabilities_per_area - ] - - # Generate capability areas for the specified domain - response = generate_capability_areas( - domain=domain, - num_areas=kwargs["num_capability_areas"], - num_capabilities_per_area=num_capabilities_per_area[0], - scientist_llm=scientist_llm, - user_prompt=prompts.HIERARCHICAL_CAPABILITY_AREAS_GENERATION_USER_PROMPT, - scientist_llm_gen_cfg=scientist_llm_gen_cfg, - **kwargs, - ) - capability_areas = response["capability_areas"] - # Select only the specified number of capability areas - # even if more are generated - capability_areas = capability_areas[:num_capability_areas] - else: - num_capabilities_per_area = [num_capabilities] - num_runs = [int(np.ceil(num_capabilities / num_capabilities_per_run))] - # No capability areas for flat generation, use the domain as the area - capability_areas = [domain] - - for idx, capability_area in enumerate(capability_areas): - if method == "hierarchical": - logger.info(f"Generating capabilities for area: {capability_area}") - # Fetch previously generated capabilities, if any - prev_capabilities = get_previous_capabilities( - capability_dir=base_capability_dir, capability_area=capability_area - ) - user_prompt = prompts.HIERARCHICAL_CAPABILITY_GENERATION_USER_PROMPT.format( - capability_area=capability_area, - ) - else: - prev_capabilities = get_previous_capabilities( - capability_dir=base_capability_dir - ) - user_prompt = prompts.CAPABILITY_GENERATION_USER_PROMPT - - # Add all seed capabilities to the list of prev_capabilities - seed_capability_dir = os.path.join( - constants.BASE_ARTIFACTS_DIR, "seed_capabilities", domain - ) - prev_capabilities.extend( - _sample_seed_capabilities( - seed_capability_dir=seed_capability_dir, - num_seed_capabilities=-1, - random_seed=int(kwargs.get("seed", constants.DEFAULT_RANDOM_SEED)), - ) - ) - - num_capabilities_left = num_capabilities_per_area[idx] - for run_id in range(num_runs[idx]): - logger.info(f"Run ID: {run_id}") - # Generate capabilities using the scientist LLM - - response = generate_capabilities_using_llm( - domain=domain, - num_capabilities=min( - num_capabilities_per_run, - num_capabilities_left, - ), - scientist_llm=scientist_llm, - sys_prompt=prompts.CAPABILITY_GENERATION_SYSTEM_PROMPT, - user_prompt=user_prompt, - num_seed_capabilities=num_seed_capabilities, - seed_capability_dir=seed_capability_dir, - prev_capabilities=prev_capabilities, - scientist_llm_gen_cfg=scientist_llm_gen_cfg, - base_capability_dir=base_capability_dir, - include_seed_capability_names=include_seed_capability_names, - exclude_seed_capability_names=exclude_seed_capability_names, - capability_area=capability_area if method == "hierarchical" else None, - local_run_id=run_id, - **kwargs, - ) - gen_capabilities.extend(response["capabilities"]) - num_capabilities_left -= len(response["capabilities"]) - run_metadata.append(response["metadata"]) - - # Update the list of previously generated capabilities - prev_capabilities.extend(response["capabilities"]) - - # Analyze tokens metadata for capability generation - total_input_tokens = sum([m["api_metadata"]["input_tokens"] for m in run_metadata]) - total_output_tokens = sum( - [m["api_metadata"]["output_tokens"] for m in run_metadata] - ) - tokens_summary = { - "total_input_tokens": total_input_tokens, - "total_output_tokens": total_output_tokens, - "total_tokens": total_input_tokens + total_output_tokens, - "input_tokens_per_run": int(total_input_tokens / sum(num_runs)), - "output_tokens_per_run": int(total_output_tokens / sum(num_runs)), - "total_tokens_per_run": int( - (total_input_tokens + total_output_tokens) / sum(num_runs) - ), - "input_tokens_per_capability": int(total_input_tokens / len(gen_capabilities)), - "output_tokens_per_capability": int( - total_output_tokens / len(gen_capabilities) - ), - "total_tokens_per_capability": int( - (total_input_tokens + total_output_tokens) / len(gen_capabilities) - ), - } - logger.info( - f"Capability generation tokens summary:\n{json.dumps(tokens_summary, indent=4)}" - ) - - return gen_capabilities - - -def generate_capabilities_using_llm( - domain: str, - num_capabilities: int, - scientist_llm: Model, - sys_prompt: str, - user_prompt: str, - num_seed_capabilities: int, - seed_capability_dir: str, - prev_capabilities: List[Capability], - scientist_llm_gen_cfg: Dict[str, Any], - base_capability_dir: str, - include_seed_capability_names: Optional[List[str]] = None, - exclude_seed_capability_names: Optional[List[str]] = None, - capability_area: Union[str, None] = None, - **kwargs: Any, -) -> Dict[str, Any]: - """ - Generate capabilities using the scientist LLM. - - Prompt the scientist LLM with instructions and - seed capabilities for the specified domain - to generate initial capabilities. - - Args - ---- - domain (str): The domain name. - num_capabilities (int): The number of capabilities to generate. - scientist_llm (Model): The scientist LLM model name. - sys_prompt (str): The system prompt. - user_prompt (str): The user prompt. - num_seed_capabilities (int): The number of seed capabilities to use. - seed_capability_dir (str): The directory containing the seed capabilities. - prev_capabilities (List[Capability]): The list of previously - generated capabilities. - scientist_llm_gen_cfg (Dict[str, Any]): The generation configuration - for the scientist LLM. - base_capability_dir (str): The base directory to store - the generated capabilities for the specified domain. - include_seed_capability_names (List[str] | None): A list of seed capability - names to include in the generation process. - exclude_seed_capability_names (List[str] | None): A list of seed capability - names to exclude from the generation process. - capability_area (str | None): The capability area for the generation - **kwargs (Any): Additional keyword arguments. - - Returns - ------- - Dict[str, Any]: A dictionary containing the generated capabilities - and metadata about the generation process. - """ - # Sample seed capabilities for the generation process - seed_capabilities = _sample_seed_capabilities( - seed_capability_dir=seed_capability_dir, - num_seed_capabilities=num_seed_capabilities, - include_capability_names=include_seed_capability_names, - exclude_capability_names=exclude_seed_capability_names, - random_seed=int(kwargs.get("seed", constants.DEFAULT_RANDOM_SEED)), - ) - # Get capability JSON strings (without scores) - seed_capabilities_repr = [ - capability.to_json_str() for capability in seed_capabilities - ] - - # LLM input - user_prompt = user_prompt.format( - sample_capability_json="\n".join(seed_capabilities_repr), - prev_capabilities="\n".join([elm.name for elm in prev_capabilities]), - domain=domain, - num_gen_capabilities=num_capabilities, - ) - - # Generate output using the model with specified generation arguments - num_attempts = kwargs.get( - "retry_attempts", constants.DEFAULT_CAPABILITY_GENERATION_RETRY_ATTEMPTS - ) - try: - # Retry the generation process if an error occurs - # Common errors: - # - [ill-formatted python class] - # - SyntaxError: unterminated triple-quoted string literal - for attempt in Retrying( - stop=stop_after_attempt(num_attempts), - reraise=True, - ): - with attempt: - # Update the seed for each attempt - scientist_llm_gen_cfg["seed"] += 1 - with tracing_context( - enabled=True, - tags=["generate_capabilities_using_llm"], - metadata={ - "ls_provider": scientist_llm.model_provider, - "ls_model_name": scientist_llm.get_model_name( - with_provider=False - ), - "ls_model_type": "chat", - "exp_id": kwargs.get("run_id"), - "run_id": kwargs.get("local_run_id"), - "domain": domain, - "capability_area": capability_area, - "num_capabilities": num_capabilities, - "seed_capabilities": [elm.name for elm in seed_capabilities], - "prev_capabilities": [elm.name for elm in prev_capabilities], - **{f"ls_{k}": v for k, v in scientist_llm_gen_cfg.items()}, - }, - ): - response, metadata = scientist_llm.generate( - sys_prompt=sys_prompt, - user_prompt=user_prompt, - generation_config=scientist_llm_gen_cfg, - ) - - parsed_response = extract_and_parse_response(response) - gen_capabilities = parsed_response["parsed_response"] - # Convert JSON string to dict if needed - gen_capabilities_dict = [] - for capability in gen_capabilities: - if isinstance(capability, dict): - capability_dict = capability - elif isinstance(capability, str): - try: - capability_dict = json.loads(capability) - except json.JSONDecodeError as e: - logger.warning( - f"Error decoding JSON string: {capability}: {repr(e)}" - ) - continue - else: - logger.warning( - f"Invalid capability format: {capability}. Expected str or dict." - ) - continue - gen_capabilities_dict.append(capability_dict) - gen_capabilities_clean = [] - for capability in gen_capabilities_dict: - try: - if capability_area is not None: - # Add the capability area to the generated capabilities - capability["area"] = capability_area - capability_obj = Capability.from_dict( - capability_dict=capability, - base_dir=base_capability_dir, - score_dir_suffix=(kwargs.get("run_id")), - ) - except FileExistsError: - # 1. Same name as existing capability - # Do not delete the capability directory if it already exists - logger.warning( - f"Capability {capability['name']} already exists. Skipping it." - ) - # Skip this capability - continue - except Exception as e: - # 2. "problem" replaced with "riddle" or some other keyword - # leads to KeyError - # 3. Ill-formatted `capability.py` file due to missing quotes - logger.warning( - f"Error creating capability object {capability['name']}, hence skipping it: {e}" - ) - # Delete the capability directory if it exists - capability_dir = os.path.join( - base_capability_dir, capability["name"] - ) - if os.path.exists(capability_dir): - shutil.rmtree(capability_dir) - # Skip this capability - continue - else: - gen_capabilities_clean.append(capability_obj) - if len(gen_capabilities_clean) != len(gen_capabilities): - logger.warning( - f"Only {len(gen_capabilities_clean)} capabilities were created out of {len(gen_capabilities)} generated capabilities." - ) - except Exception as e: - logger.error(f"Error generating capabilities: {e}") - logger.error(f"Response:\n{response}") - raise e - - logger.info( - f"Generated {len(gen_capabilities_clean)} capabilities:\n{gen_capabilities_clean}" - ) - - return { - "capabilities": gen_capabilities_clean, - "metadata": { - "model": scientist_llm.get_model_name(), - "thought": parsed_response["thought"], - "api_metadata": metadata, - }, - } diff --git a/src/run_base_pipeline.py b/src/run_base_pipeline.py new file mode 100644 index 0000000..a359b22 --- /dev/null +++ b/src/run_base_pipeline.py @@ -0,0 +1,246 @@ +"""Base pipeline for capability and task generation. + +This module orchestrates the complete base (non-agentic) generation pipeline: +- Stage 0: Experiment and domain setup +- Stage 1: Area generation +- Stage 2: Capability generation and filtering +- Stage 3: Task generation +- Stage 4: Solution generation +- Stage 5: Task validation + +Usage: + # Run all stages + python -m src.run_base_pipeline stage=all + + # Run specific stage + python -m src.run_base_pipeline stage=0 + python -m src.run_base_pipeline stage=1 + python -m src.run_base_pipeline stage=2 areas_tag=_YYYYMMDD_HHMMSS + python -m src.run_base_pipeline stage=3 capabilities_tag=_YYYYMMDD_HHMMSS + python -m src.run_base_pipeline stage=4 tasks_tag=_YYYYMMDD_HHMMSS + python -m src.run_base_pipeline stage=5 solution_tag=_YYYYMMDD_HHMMSS +""" + +import logging + +import hydra +from omegaconf import DictConfig + +from src.base_stages import ( + run_stage0, + run_stage1, + run_stage2, + run_stage3, + run_stage4, + run_stage5, +) + + +logger = logging.getLogger(__name__) + + +def _validate_stage_inputs( + stage: int | str, + areas_tag: str | None, + capabilities_tag: str | None, + tasks_tag: str | None, + solution_tag: str | None, +) -> bool: + """Validate required inputs for standalone stage execution. + + Returns True if validation passes, False otherwise. + """ + if stage == 2 and not areas_tag: + logger.error("areas_tag is required when running stage 2 standalone") + logger.error( + "Usage: python -m src.run_base_pipeline stage=2 areas_tag=_YYYYMMDD_HHMMSS" + ) + logger.error( + "Optional: capabilities_tag=_YYYYMMDD_HHMMSS to resume from existing run" + ) + return False + + if stage == 3 and not capabilities_tag: + logger.error("capabilities_tag is required when running stage 3 standalone") + logger.error( + "Usage: python -m src.run_base_pipeline stage=3 " + "capabilities_tag=_YYYYMMDD_HHMMSS" + ) + logger.error("Optional: tasks_tag=_YYYYMMDD_HHMMSS to resume from existing run") + return False + + if stage == 4 and not tasks_tag: + logger.error("tasks_tag is required when running stage 4 standalone") + logger.error( + "Usage: python -m src.run_base_pipeline stage=4 tasks_tag=_YYYYMMDD_HHMMSS" + ) + logger.error( + "Optional: solution_tag=_YYYYMMDD_HHMMSS to resume from existing run" + ) + return False + + if stage == 5 and not solution_tag: + logger.error("solution_tag is required when running stage 5 standalone") + logger.error( + "Usage: python -m src.run_base_pipeline stage=5 " + "solution_tag=_YYYYMMDD_HHMMSS" + ) + logger.error( + "Optional: validation_tag=_YYYYMMDD_HHMMSS to resume from existing run" + ) + return False + + return True + + +@hydra.main(version_base=None, config_path="cfg", config_name="run_cfg") +def main(cfg: DictConfig) -> None: + """Run specific pipeline stages based on configuration. + + Stage 0: Experiment and domain setup + Stage 1: Area generation + Stage 2: Capability generation and filtering + Stage 3: Task generation + Stage 4: Solution generation + Stage 5: Task validation + "all": Run all stages sequentially + """ + # Suppress httpx and autogen_core INFO logs + logging.getLogger("httpx").setLevel(logging.WARNING) + logging.getLogger("autogen_core.events").setLevel(logging.WARNING) + + # Get stage from config (can be overridden via command line) + stage = cfg.get("stage", "all") + + # Convert string to int if numeric + if isinstance(stage, str) and stage.isdigit(): + stage = int(stage) + + logger.info(f"Running stage: {stage}") + + # Track tags across stages + areas_tag = cfg.get("areas_tag", None) + capabilities_tag = cfg.get("capabilities_tag", None) + tasks_tag = cfg.get("tasks_tag", None) + solution_tag = cfg.get("solution_tag", None) + + # Validate required inputs for standalone stages + if not _validate_stage_inputs( + stage, areas_tag, capabilities_tag, tasks_tag, solution_tag + ): + return + + # Stage 0: Experiment and Domain Setup + if stage in {0, "all"}: + logger.info("=" * 60) + logger.info("STAGE 0: Experiment and Domain Setup") + logger.info("=" * 60) + run_stage0(cfg) + if stage == 0: + return + + # Stage 1: Area Generation + if stage in {1, "all"}: + logger.info("=" * 60) + logger.info("STAGE 1: Area Generation") + logger.info("=" * 60) + areas_tag = run_stage1(cfg) + logger.info("Stage 1 areas tag: %s", areas_tag) + if stage == 1: + return + + # Stage 2: Capability Generation and Filtering + if stage in {2, "all"}: + logger.info("=" * 60) + logger.info("STAGE 2: Capability Generation and Filtering") + logger.info("=" * 60) + + # Check if resuming + resume_capabilities_tag = ( + cfg.get("capabilities_tag", None) if stage == 2 else None + ) + if resume_capabilities_tag: + logger.info( + f"Resume mode: Will skip areas that already have capabilities " + f"in tag {resume_capabilities_tag}" + ) + + capabilities_tag = run_stage2( + cfg=cfg, + areas_tag=areas_tag, + capabilities_tag=resume_capabilities_tag, + ) + logger.info("Stage 2 capabilities tag: %s", capabilities_tag) + if stage == 2: + return + + # Stage 3: Task Generation + if stage in {3, "all"}: + logger.info("=" * 60) + logger.info("STAGE 3: Task Generation") + logger.info("=" * 60) + + # Check if resuming + resume_tasks_tag = cfg.get("tasks_tag", None) if stage == 3 else None + if resume_tasks_tag: + logger.info( + f"Resume mode: Will skip capabilities that already have tasks " + f"in tag {resume_tasks_tag}" + ) + + tasks_tag = run_stage3( + cfg=cfg, + capabilities_tag=capabilities_tag, + tasks_tag=resume_tasks_tag, + ) + logger.info("Stage 3 tasks tag: %s", tasks_tag) + if stage == 3: + return + + # Stage 4: Solution Generation + if stage in {4, "all"}: + logger.info("=" * 60) + logger.info("STAGE 4: Solution Generation") + logger.info("=" * 60) + + # Check if resuming + resume_solution_tag = cfg.get("solution_tag", None) if stage == 4 else None + if resume_solution_tag: + logger.info( + f"Resume mode: Will skip tasks that already have solutions " + f"in tag {resume_solution_tag}" + ) + + solution_tag = run_stage4( + cfg=cfg, + tasks_tag=tasks_tag, + solution_tag=resume_solution_tag, + ) + logger.info("Stage 4 solution tag: %s", solution_tag) + if stage == 4: + return + + # Stage 5: Task Validation + if stage in {5, "all"}: + logger.info("=" * 60) + logger.info("STAGE 5: Task Validation") + logger.info("=" * 60) + + # Check if resuming + resume_validation_tag = cfg.get("validation_tag", None) if stage == 5 else None + if resume_validation_tag: + logger.info( + f"Resume mode: Will skip tasks that already have validations " + f"in tag {resume_validation_tag}" + ) + + validation_tag = run_stage5( + cfg=cfg, + solution_tag=solution_tag, + validation_tag=resume_validation_tag, + ) + logger.info("Stage 5 validation tag: %s", validation_tag) + + +if __name__ == "__main__": + main() diff --git a/src/run_capability_generation.py b/src/run_capability_generation.py deleted file mode 100644 index 3893432..0000000 --- a/src/run_capability_generation.py +++ /dev/null @@ -1,150 +0,0 @@ -"""Script to generate capabilities and tasks using the scientist LLM.""" - -import logging -import os - -import hydra -from omegaconf import DictConfig - -from src.generate_capabilities import ( - generate_capabilities, -) -from src.generate_tasks import ( - generate_tasks_using_llm, -) -from src.model import Model -from src.utils import constants -from src.utils.capability_management_utils import ( - filter_capabilities, - get_previous_capabilities, -) -from src.utils.data_utils import check_cfg, get_run_id -from src.utils.embedding_utils import ( - generate_and_set_capabilities_embeddings, -) - - -@hydra.main(version_base=None, config_path="cfg", config_name="run_cfg") -def main(cfg: DictConfig) -> None: - """ - Run capability generation with the specified configuration. - - This includes generating capabilities and generating, solving and verifying tasks. - - Args: - cfg (DictConfig): Configuration for the model. - """ - check_cfg(cfg, logger) - run_id = get_run_id(cfg) - logger.info(f"Run ID: {run_id}") - - # Initialize the scientist LLM model - scientist_llm = Model( - model_name=cfg.scientist_llm.name, - model_provider=cfg.scientist_llm.provider, - ) - scientist_llm_gen_cfg = cfg.scientist_llm.generation_cfg - - # Generate initial capabilities - # Set the base capability directory - base_capability_dir = os.path.join( - constants.BASE_ARTIFACTS_DIR, - f"capabilities_{run_id}", - cfg.capabilities_cfg.domain, - ) - target_num_capabilities = cfg.capabilities_cfg.num_gen_capabilities - if os.path.exists(base_capability_dir): - # Fetch previously generated capabilities - logger.info( - f"Base capability directory already exists: {base_capability_dir}. " - "Fetching previously generated capabilities." - ) - capabilities = get_previous_capabilities(capability_dir=base_capability_dir) - else: - os.makedirs(base_capability_dir, exist_ok=False) - logger.info("Starting capability generation ...") - num_capabilities = int( - target_num_capabilities - * (1 + cfg.capabilities_cfg.num_gen_capabilities_buffer) - ) - - capabilities = generate_capabilities( - domain=cfg.capabilities_cfg.domain, - num_capabilities=num_capabilities, - num_capabilities_per_run=cfg.capabilities_cfg.num_gen_capabilities_per_run, - base_capability_dir=base_capability_dir, - scientist_llm=scientist_llm, - num_seed_capabilities=cfg.capabilities_cfg.num_seed_capabilities, - scientist_llm_gen_cfg=dict(scientist_llm_gen_cfg.capability_generation), - method=cfg.capabilities_cfg.method, - num_capability_areas=cfg.capabilities_cfg.num_capability_areas, - exclude_seed_capability_names=["word_problems"], - run_id=run_id, - trial_run=cfg.exp_cfg.trial_run, - seed=cfg.exp_cfg.seed, - retry_attempts=cfg.capabilities_cfg.capabilities_gen_retry_attempts, - ) - capabilities = sorted(capabilities, key=lambda x: x.name) - logger.info(f"Capability names ({len(capabilities)}):\n{capabilities}") - if len(capabilities) < target_num_capabilities: - logger.warning( - f"Only {len(capabilities)} capabilities were created. " - f"Target number of capabilities not reached: {target_num_capabilities}. " - "It is recommended to increase the buffer." - ) - - task_gen_prompt_version = cfg.capabilities_cfg.task_gen_prompt_version - - if "mock_seed_capabilities" in run_id: - # Use all capabilities (skip filtering) for the mock_seed_capabilities runs - filtered_capabilities = capabilities - if "v2" in run_id: - # Use the task generation prompt version 2 for mock seed capabilities v2 - task_gen_prompt_version = "v2" - else: - # Embed capabilities using openai embedding model - generate_and_set_capabilities_embeddings( - capabilities=capabilities, - embedding_model_name=cfg.embedding_cfg.embedding_model, - embed_dimensions=cfg.embedding_cfg.embedding_size, - ) - # Filter capabilities based on their embeddings - filtered_capabilities = filter_capabilities( - capabilities, - embedding_model_name=cfg.embedding_cfg.embedding_model, - similarity_threshold=cfg.embedding_cfg.filtering_similarity_threshold, - ) - logger.info( - f"Capabilities retained after filtering ({len(filtered_capabilities)}/{len(capabilities)}): {filtered_capabilities}" - ) - - for capability in filtered_capabilities: - # Generate tasks for each capability - generate_tasks_using_llm( - capability=capability, - scientist_llm=scientist_llm, - num_tasks=cfg.capabilities_cfg.num_gen_tasks_per_capability, - num_tasks_buffer=cfg.capabilities_cfg.num_gen_tasks_buffer, - scientist_llm_gen_cfg_task_gen=dict(scientist_llm_gen_cfg.task_generation), - scientist_llm_gen_cfg_task_solve=dict(scientist_llm_gen_cfg.task_solve), - scientist_llm_gen_cfg_task_verify=dict(scientist_llm_gen_cfg.task_verify), - solve_sample_tasks=True, - few_shot=cfg.capabilities_cfg.task_gen_few_shot, - run_id=run_id, - tasks_gen_retry_attempts=cfg.capabilities_cfg.tasks_gen_retry_attempts, - concurrency_task_solver=cfg.capabilities_cfg.concurrency_task_solver, - concurrency_task_verifier=cfg.capabilities_cfg.concurrency_task_verifier, - seed=cfg.exp_cfg.seed, - task_gen_prompt_version=task_gen_prompt_version, - ) - if cfg.exp_cfg.trial_run: - logger.info( - f"Trial run enabled. Stopping after generating tasks for {capability.name}." - ) - break - - -if __name__ == "__main__": - logger = logging.getLogger(__name__) - - main() diff --git a/src/schemas/PIPELINE_SCHEMAS.md b/src/schemas/GENERATION_PIPELINE_SCHEMAS.md similarity index 89% rename from src/schemas/PIPELINE_SCHEMAS.md rename to src/schemas/GENERATION_PIPELINE_SCHEMAS.md index d2b8b86..059f2aa 100644 --- a/src/schemas/PIPELINE_SCHEMAS.md +++ b/src/schemas/GENERATION_PIPELINE_SCHEMAS.md @@ -244,39 +244,39 @@ All pipeline outputs include a `metadata` object (represented by the `PipelineMe **File:** [`domain_schemas.py`](domain_schemas.py) **Fields:** -- `name`: String (required, human-readable domain name) +- `domain_name`: String (required, human-readable domain name) - `domain_id`: String (required) -- `description`: String (optional, domain description) +- `domain_description`: String (optional, domain description) ### Area **File:** [`area_schemas.py`](area_schemas.py) **Fields:** -- `name`: String (required, human-readable area name) +- `area_name`: String (required, human-readable area name) - `area_id`: String (required) - `domain`: Domain (required, Domain dataclass object) -- `description`: String (required, area description) +- `area_description`: String (required, area description) - `generation_metadata`: Dict (optional, nested dictionary containing process-specific information) - This field can contain any generation-specific data (e.g., generation method, parameters, intermediate steps) - Structure is flexible and depends on the generation method -**Note:** When serialized to JSON, the `domain` object is flattened to `domain` (string) and `domain_id` (string) fields. +**Note:** When serialized to JSON, the `domain` object is flattened to `domain_name` and `domain_id` fields. ### Capability **File:** [`capability_schemas.py`](capability_schemas.py) **Fields:** -- `name`: String (required, capability name) +- `capability_name`: String (required, capability name) - `capability_id`: String (required) - `area`: Area (required, Area dataclass object) -- `description`: String (required, capability description) +- `capability_description`: String (required, capability description) - `generation_metadata`: Dict (optional, nested dictionary containing process-specific information) - This field can contain any generation-specific data (e.g., generation method, parameters, intermediate steps) - Structure is flexible and depends on the generation method -**Note:** When serialized to JSON, the `area` object is flattened to `area` (string), `area_id` (string), `domain` (string), and `domain_id` (string) fields. +**Note:** When serialized to JSON, the `area` object is flattened to `area_name`, `area_id`, `domain_name`, and `domain_id` fields. ### Task @@ -284,10 +284,16 @@ All pipeline outputs include a `metadata` object (represented by the `PipelineMe **Fields:** - `task_id`: String (required, unique within capability) -- `task`: String (required, the task/problem text) +- `task`: String (required, the task/problem text, includes MCQ stem and options text) +- `task_type`: String (optional, e.g., "multiple_choice", "open_ended") +- `solution_type`: String (optional, e.g., "multiple_choice", "open_ended") +- `difficulty`: String (optional, e.g., "easy", "medium", "hard") +- `bloom_level`: String (optional, e.g., "remember", "understand", "apply", "analyze", "evaluate", "create") +- `choices`: List[Dict] (optional, for MCQ) — each item `{ "label": "A", "solution": "