Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,10 @@ class MetaCognition:

**PROGRESS MARKER**:
```
[ ] Step 4.1 COMPLETED - Advanced search strategies implemented
Date: ___________
Test Result: ___% accuracy improvement from better search
Notes: ________________________________
[X] Step 4.1 COMPLETED - Advanced search strategies implemented
Date: 2025-09-12
Test Result: pytest tests/test_beam_search.py passed
Notes: Added beam search with constraint propagation and MCTS search
```

---
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This repository contains an advanced solver for the **ARC Prize 2025** competiti
- **Two-attempt diversity** as required by ARC Prize 2025 rules
- **Fallback resilience** with graceful degradation to baseline methods
- **Performance monitoring** with detailed statistics and benchmarking
- **Beam search with constraint propagation** for deeper program synthesis

## Directory Structure

Expand Down
73 changes: 73 additions & 0 deletions arc_solver/beam_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# [S:ALG v1] strategy=beam_search nodes_metric=on pass
import logging
from typing import List, Tuple, Dict, Any
from .grid import Array
from .dsl import OPS
from .heuristics import score_candidate
from .neural.sketches import generate_parameter_grid

logger = logging.getLogger(__name__)


def beam_search(
train_pairs: List[Tuple[Array, Array]],
beam_width: int = 10,
depth: int = 2,
max_expansions: int = 10000,
) -> Tuple[List[List[Tuple[str, Dict[str, Any]]]], Dict[str, int]]:
"""Beam search over DSL programs.

Args:
train_pairs: Training examples as ``[(input, output), ...]``.
beam_width: Number of candidates kept per level.
depth: Maximum program length.
max_expansions: Safety limit on node expansions.

Returns:
A tuple ``(programs, stats)`` where ``programs`` is a list of candidate
programs matching all training pairs exactly and ``stats`` contains
observability metrics.
"""
if beam_width <= 0 or depth <= 0:
raise ValueError("beam_width and depth must be positive")

beam: List[Tuple[List[Tuple[str, Dict[str, Any]]], float]] = [([], 1.0)]
complete: List[List[Tuple[str, Dict[str, Any]]]] = []
nodes_expanded = 0

for _ in range(depth):
expansions: List[Tuple[List[Tuple[str, Dict[str, Any]]], float]] = []
for program, _ in beam:
for op_name in OPS.keys():
for params in generate_parameter_grid(op_name):
candidate = program + [(op_name, params)]
try:
score = score_candidate(candidate, train_pairs)
except Exception:
continue # constraint violation
nodes_expanded += 1
if score >= 0.999:
complete.append(candidate)
else:
expansions.append((candidate, score))
if nodes_expanded >= max_expansions:
logger.warning(
"beam_search max expansions reached",
extra={"nodes_expanded": nodes_expanded},
)
break
if nodes_expanded >= max_expansions:
break
if nodes_expanded >= max_expansions:
break
expansions.sort(key=lambda x: x[1], reverse=True)
beam = expansions[:beam_width]
if not beam:
break

complete = complete[:beam_width]
logger.info(
"beam_search complete",
extra={"nodes_expanded": nodes_expanded, "solutions": len(complete)},
)
return complete, {"nodes_expanded": nodes_expanded}
34 changes: 27 additions & 7 deletions arc_solver/enhanced_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@
from .neural.episodic import EpisodicRetrieval
from .neural.sketches import SketchMiner, generate_parameter_grid
from .ttt import TestTimeTrainer, DataAugmentation
from .beam_search import beam_search
from .mcts_search import mcts_search


class EnhancedSearch:
"""Enhanced program synthesis search with neural guidance and episodic retrieval."""

def __init__(self, guidance_model_path: Optional[str] = None,
episode_db_path: str = "episodes.json"):
def __init__(self, guidance_model_path: Optional[str] = None,
episode_db_path: str = "episodes.json",
enable_beam_search: bool = True):
self.neural_guidance = NeuralGuidance(guidance_model_path)
self.episodic_retrieval = EpisodicRetrieval(episode_db_path)
self.sketch_miner = SketchMiner()
self.test_time_trainer = TestTimeTrainer()
self.search_stats = {}
self.enable_beam_search = enable_beam_search

# Load any existing sketches
try:
Expand All @@ -44,6 +48,9 @@ def synthesize_enhanced(self, train_pairs: List[Tuple[Array, Array]],
self.search_stats = {
'episodic_candidates': 0,
'heuristic_candidates': 0,
'beam_candidates': 0,
'beam_nodes_expanded': 0,
'mcts_candidates': 0,
'sketch_candidates': 0,
'neural_guided_candidates': 0,
'ttt_adapted': False,
Expand All @@ -61,19 +68,32 @@ def synthesize_enhanced(self, train_pairs: List[Tuple[Array, Array]],
all_candidates.extend(heuristic_candidates)
self.search_stats['heuristic_candidates'] = len(heuristic_candidates)

# Step 3: Neural-guided search if we need more candidates
# Step 3: Beam search for deeper exploration
if self.enable_beam_search and len(all_candidates) < max_programs:
beam_programs, stats = beam_search(train_pairs, beam_width=16, depth=3)
all_candidates.extend(beam_programs)
self.search_stats['beam_candidates'] = len(beam_programs)
self.search_stats['beam_nodes_expanded'] = stats['nodes_expanded']

# Step 4: Monte Carlo Tree Search if still limited
if self.enable_beam_search and len(all_candidates) < max_programs // 2:
mcts_programs = mcts_search(train_pairs, iterations=200, max_depth=2, seed=0)
all_candidates.extend(mcts_programs)
self.search_stats['mcts_candidates'] = len(mcts_programs)

# Step 5: Neural-guided search if we need more candidates
if len(all_candidates) < max_programs // 4:
neural_candidates = self._neural_guided_search(train_pairs, max_programs // 2)
all_candidates.extend(neural_candidates)
self.search_stats['neural_guided_candidates'] = len(neural_candidates)
# Step 4: Sketch-based search if still need more

# Step 6: Sketch-based search if still need more
if len(all_candidates) < max_programs // 2:
sketch_candidates = self._sketch_based_search(train_pairs, max_programs // 3)
all_candidates.extend(sketch_candidates)
self.search_stats['sketch_candidates'] = len(sketch_candidates)
# Step 5: Test-time adaptation if we have candidates

# Step 7: Test-time adaptation if we have candidates
if all_candidates:
all_candidates = self._apply_test_time_adaptation(train_pairs, all_candidates)
self.search_stats['ttt_adapted'] = True
Expand Down
72 changes: 72 additions & 0 deletions arc_solver/mcts_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# [S:ALG v1] strategy=mcts_search pass
import logging
import math
import random
from typing import List, Tuple, Dict, Any, Optional
from .grid import Array
from .dsl import OPS
from .heuristics import score_candidate
from .neural.sketches import generate_parameter_grid

logger = logging.getLogger(__name__)


class Node:
def __init__(self, program: List[Tuple[str, Dict[str, Any]]], parent: Optional['Node'] = None, depth: int = 0, max_depth: int = 2):
self.program = program
self.parent = parent
self.children: List['Node'] = []
self.visits = 0
self.value = 0.0
self.untried = []
if depth < max_depth:
for op_name in OPS.keys():
for params in generate_parameter_grid(op_name):
self.untried.append((op_name, params))

def ucb(self, total_visits: int, c: float = 1.4) -> float:
if self.visits == 0:
return float('inf')
return self.value / self.visits + c * math.sqrt(math.log(total_visits) / self.visits)


def mcts_search(
train_pairs: List[Tuple[Array, Array]],
iterations: int = 100,
max_depth: int = 2,
seed: Optional[int] = None,
) -> List[List[Tuple[str, Dict[str, Any]]]]:
"""Monte Carlo Tree Search for program synthesis."""
rng = random.Random(seed)
root = Node([], depth=0, max_depth=max_depth)
for _ in range(iterations):
node = root
depth = 0
# Selection
while not node.untried and node.children and depth < max_depth:
total = sum(child.visits for child in node.children)
node = max(node.children, key=lambda n: n.ucb(total))
depth += 1
# Expansion
if node.untried and depth < max_depth:
op_name, params = node.untried.pop()
new_prog = node.program + [(op_name, params)]
child = Node(new_prog, parent=node, depth=depth + 1, max_depth=max_depth)
node.children.append(child)
node = child
# Simulation
try:
reward = score_candidate(node.program, train_pairs)
except Exception:
reward = 0.0
# Backpropagation
while node:
node.visits += 1
node.value += reward
node = node.parent
best = max(root.children, key=lambda n: n.value / n.visits if n.visits else 0, default=None)
programs: List[List[Tuple[str, Dict[str, Any]]]] = []
if best and score_candidate(best.program, train_pairs) >= 0.999:
programs.append(best.program)
logger.info("mcts_search complete", extra={"iterations": iterations, "solutions": len(programs)})
return programs
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
numpy==1.26.4
hypothesis==6.100.2
41 changes: 41 additions & 0 deletions tests/test_beam_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# [S:TEST v1] beam_search unit and property tests pass
import numpy as np
from arc_solver.grid import to_array
from arc_solver.beam_search import beam_search
from arc_solver.mcts_search import mcts_search
from arc_solver.dsl import apply_program
from hypothesis import given, strategies as st
import hypothesis.extra.numpy as hnp


def test_beam_search_finds_rotation():
inp = to_array([[1, 2], [3, 4]])
out = np.rot90(inp, -1)
progs, stats = beam_search([(inp, out)], beam_width=5, depth=2)
assert any(np.array_equal(apply_program(inp, p), out) for p in progs)
assert stats["nodes_expanded"] > 0
assert len(progs) <= 5


@given(
grid=hnp.arrays(dtype=np.int16, shape=(3, 3), elements=st.integers(0, 9)),
k=st.integers(1, 3),
)
def test_beam_search_rotation_property(grid, k):
out = np.rot90(grid, -k)
progs, _ = beam_search([(grid, out)], beam_width=5, depth=1)
assert any(p == [("rotate", {"k": k})] for p in progs)


def test_beam_search_no_solution():
a = to_array([[0]])
b = to_array([[1]])
progs, _ = beam_search([(a, b)], beam_width=3, depth=1)
assert progs == []


def test_mcts_search_finds_rotation():
inp = to_array([[1, 2], [3, 4]])
out = np.rot90(inp, -1)
progs = mcts_search([(inp, out)], iterations=1000, max_depth=1, seed=0)
assert any(np.array_equal(apply_program(inp, p), out) for p in progs)
Loading