diff --git a/AGENTS.md b/AGENTS.md index c63e855..d583abd 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -445,10 +445,10 @@ class MetaCognition: **PROGRESS MARKER**: ``` -[ ] Step 4.1 COMPLETED - Advanced search strategies implemented - Date: ___________ - Test Result: ___% accuracy improvement from better search - Notes: ________________________________ +[X] Step 4.1 COMPLETED - Advanced search strategies implemented + Date: 2025-09-12 + Test Result: pytest tests/test_beam_search.py passed + Notes: Added beam search with constraint propagation and MCTS search ``` --- diff --git a/README.md b/README.md index eabd01b..d0c3e5b 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ This repository contains an advanced solver for the **ARC Prize 2025** competiti - **Two-attempt diversity** as required by ARC Prize 2025 rules - **Fallback resilience** with graceful degradation to baseline methods - **Performance monitoring** with detailed statistics and benchmarking +- **Beam search with constraint propagation** for deeper program synthesis ## Directory Structure diff --git a/arc_solver/beam_search.py b/arc_solver/beam_search.py new file mode 100644 index 0000000..bd6d0fa --- /dev/null +++ b/arc_solver/beam_search.py @@ -0,0 +1,73 @@ +# [S:ALG v1] strategy=beam_search nodes_metric=on pass +import logging +from typing import List, Tuple, Dict, Any +from .grid import Array +from .dsl import OPS +from .heuristics import score_candidate +from .neural.sketches import generate_parameter_grid + +logger = logging.getLogger(__name__) + + +def beam_search( + train_pairs: List[Tuple[Array, Array]], + beam_width: int = 10, + depth: int = 2, + max_expansions: int = 10000, +) -> Tuple[List[List[Tuple[str, Dict[str, Any]]]], Dict[str, int]]: + """Beam search over DSL programs. + + Args: + train_pairs: Training examples as ``[(input, output), ...]``. + beam_width: Number of candidates kept per level. + depth: Maximum program length. + max_expansions: Safety limit on node expansions. + + Returns: + A tuple ``(programs, stats)`` where ``programs`` is a list of candidate + programs matching all training pairs exactly and ``stats`` contains + observability metrics. + """ + if beam_width <= 0 or depth <= 0: + raise ValueError("beam_width and depth must be positive") + + beam: List[Tuple[List[Tuple[str, Dict[str, Any]]], float]] = [([], 1.0)] + complete: List[List[Tuple[str, Dict[str, Any]]]] = [] + nodes_expanded = 0 + + for _ in range(depth): + expansions: List[Tuple[List[Tuple[str, Dict[str, Any]]], float]] = [] + for program, _ in beam: + for op_name in OPS.keys(): + for params in generate_parameter_grid(op_name): + candidate = program + [(op_name, params)] + try: + score = score_candidate(candidate, train_pairs) + except Exception: + continue # constraint violation + nodes_expanded += 1 + if score >= 0.999: + complete.append(candidate) + else: + expansions.append((candidate, score)) + if nodes_expanded >= max_expansions: + logger.warning( + "beam_search max expansions reached", + extra={"nodes_expanded": nodes_expanded}, + ) + break + if nodes_expanded >= max_expansions: + break + if nodes_expanded >= max_expansions: + break + expansions.sort(key=lambda x: x[1], reverse=True) + beam = expansions[:beam_width] + if not beam: + break + + complete = complete[:beam_width] + logger.info( + "beam_search complete", + extra={"nodes_expanded": nodes_expanded, "solutions": len(complete)}, + ) + return complete, {"nodes_expanded": nodes_expanded} \ No newline at end of file diff --git a/arc_solver/enhanced_search.py b/arc_solver/enhanced_search.py index 4f7a56c..6ee273f 100644 --- a/arc_solver/enhanced_search.py +++ b/arc_solver/enhanced_search.py @@ -19,18 +19,22 @@ from .neural.episodic import EpisodicRetrieval from .neural.sketches import SketchMiner, generate_parameter_grid from .ttt import TestTimeTrainer, DataAugmentation +from .beam_search import beam_search +from .mcts_search import mcts_search class EnhancedSearch: """Enhanced program synthesis search with neural guidance and episodic retrieval.""" - def __init__(self, guidance_model_path: Optional[str] = None, - episode_db_path: str = "episodes.json"): + def __init__(self, guidance_model_path: Optional[str] = None, + episode_db_path: str = "episodes.json", + enable_beam_search: bool = True): self.neural_guidance = NeuralGuidance(guidance_model_path) self.episodic_retrieval = EpisodicRetrieval(episode_db_path) self.sketch_miner = SketchMiner() self.test_time_trainer = TestTimeTrainer() self.search_stats = {} + self.enable_beam_search = enable_beam_search # Load any existing sketches try: @@ -44,6 +48,9 @@ def synthesize_enhanced(self, train_pairs: List[Tuple[Array, Array]], self.search_stats = { 'episodic_candidates': 0, 'heuristic_candidates': 0, + 'beam_candidates': 0, + 'beam_nodes_expanded': 0, + 'mcts_candidates': 0, 'sketch_candidates': 0, 'neural_guided_candidates': 0, 'ttt_adapted': False, @@ -61,19 +68,32 @@ def synthesize_enhanced(self, train_pairs: List[Tuple[Array, Array]], all_candidates.extend(heuristic_candidates) self.search_stats['heuristic_candidates'] = len(heuristic_candidates) - # Step 3: Neural-guided search if we need more candidates + # Step 3: Beam search for deeper exploration + if self.enable_beam_search and len(all_candidates) < max_programs: + beam_programs, stats = beam_search(train_pairs, beam_width=16, depth=3) + all_candidates.extend(beam_programs) + self.search_stats['beam_candidates'] = len(beam_programs) + self.search_stats['beam_nodes_expanded'] = stats['nodes_expanded'] + + # Step 4: Monte Carlo Tree Search if still limited + if self.enable_beam_search and len(all_candidates) < max_programs // 2: + mcts_programs = mcts_search(train_pairs, iterations=200, max_depth=2, seed=0) + all_candidates.extend(mcts_programs) + self.search_stats['mcts_candidates'] = len(mcts_programs) + + # Step 5: Neural-guided search if we need more candidates if len(all_candidates) < max_programs // 4: neural_candidates = self._neural_guided_search(train_pairs, max_programs // 2) all_candidates.extend(neural_candidates) self.search_stats['neural_guided_candidates'] = len(neural_candidates) - - # Step 4: Sketch-based search if still need more + + # Step 6: Sketch-based search if still need more if len(all_candidates) < max_programs // 2: sketch_candidates = self._sketch_based_search(train_pairs, max_programs // 3) all_candidates.extend(sketch_candidates) self.search_stats['sketch_candidates'] = len(sketch_candidates) - - # Step 5: Test-time adaptation if we have candidates + + # Step 7: Test-time adaptation if we have candidates if all_candidates: all_candidates = self._apply_test_time_adaptation(train_pairs, all_candidates) self.search_stats['ttt_adapted'] = True diff --git a/arc_solver/mcts_search.py b/arc_solver/mcts_search.py new file mode 100644 index 0000000..1b173e3 --- /dev/null +++ b/arc_solver/mcts_search.py @@ -0,0 +1,72 @@ +# [S:ALG v1] strategy=mcts_search pass +import logging +import math +import random +from typing import List, Tuple, Dict, Any, Optional +from .grid import Array +from .dsl import OPS +from .heuristics import score_candidate +from .neural.sketches import generate_parameter_grid + +logger = logging.getLogger(__name__) + + +class Node: + def __init__(self, program: List[Tuple[str, Dict[str, Any]]], parent: Optional['Node'] = None, depth: int = 0, max_depth: int = 2): + self.program = program + self.parent = parent + self.children: List['Node'] = [] + self.visits = 0 + self.value = 0.0 + self.untried = [] + if depth < max_depth: + for op_name in OPS.keys(): + for params in generate_parameter_grid(op_name): + self.untried.append((op_name, params)) + + def ucb(self, total_visits: int, c: float = 1.4) -> float: + if self.visits == 0: + return float('inf') + return self.value / self.visits + c * math.sqrt(math.log(total_visits) / self.visits) + + +def mcts_search( + train_pairs: List[Tuple[Array, Array]], + iterations: int = 100, + max_depth: int = 2, + seed: Optional[int] = None, +) -> List[List[Tuple[str, Dict[str, Any]]]]: + """Monte Carlo Tree Search for program synthesis.""" + rng = random.Random(seed) + root = Node([], depth=0, max_depth=max_depth) + for _ in range(iterations): + node = root + depth = 0 + # Selection + while not node.untried and node.children and depth < max_depth: + total = sum(child.visits for child in node.children) + node = max(node.children, key=lambda n: n.ucb(total)) + depth += 1 + # Expansion + if node.untried and depth < max_depth: + op_name, params = node.untried.pop() + new_prog = node.program + [(op_name, params)] + child = Node(new_prog, parent=node, depth=depth + 1, max_depth=max_depth) + node.children.append(child) + node = child + # Simulation + try: + reward = score_candidate(node.program, train_pairs) + except Exception: + reward = 0.0 + # Backpropagation + while node: + node.visits += 1 + node.value += reward + node = node.parent + best = max(root.children, key=lambda n: n.value / n.visits if n.visits else 0, default=None) + programs: List[List[Tuple[str, Dict[str, Any]]]] = [] + if best and score_candidate(best.program, train_pairs) >= 0.999: + programs.append(best.program) + logger.info("mcts_search complete", extra={"iterations": iterations, "solutions": len(programs)}) + return programs diff --git a/requirements.txt b/requirements.txt index 2c04913..1d2960f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ numpy==1.26.4 +hypothesis==6.100.2 diff --git a/tests/test_beam_search.py b/tests/test_beam_search.py new file mode 100644 index 0000000..dd2a8a8 --- /dev/null +++ b/tests/test_beam_search.py @@ -0,0 +1,41 @@ +# [S:TEST v1] beam_search unit and property tests pass +import numpy as np +from arc_solver.grid import to_array +from arc_solver.beam_search import beam_search +from arc_solver.mcts_search import mcts_search +from arc_solver.dsl import apply_program +from hypothesis import given, strategies as st +import hypothesis.extra.numpy as hnp + + +def test_beam_search_finds_rotation(): + inp = to_array([[1, 2], [3, 4]]) + out = np.rot90(inp, -1) + progs, stats = beam_search([(inp, out)], beam_width=5, depth=2) + assert any(np.array_equal(apply_program(inp, p), out) for p in progs) + assert stats["nodes_expanded"] > 0 + assert len(progs) <= 5 + + +@given( + grid=hnp.arrays(dtype=np.int16, shape=(3, 3), elements=st.integers(0, 9)), + k=st.integers(1, 3), +) +def test_beam_search_rotation_property(grid, k): + out = np.rot90(grid, -k) + progs, _ = beam_search([(grid, out)], beam_width=5, depth=1) + assert any(p == [("rotate", {"k": k})] for p in progs) + + +def test_beam_search_no_solution(): + a = to_array([[0]]) + b = to_array([[1]]) + progs, _ = beam_search([(a, b)], beam_width=3, depth=1) + assert progs == [] + + +def test_mcts_search_finds_rotation(): + inp = to_array([[1, 2], [3, 4]]) + out = np.rot90(inp, -1) + progs = mcts_search([(inp, out)], iterations=1000, max_depth=1, seed=0) + assert any(np.array_equal(apply_program(inp, p), out) for p in progs)