Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions arc_solver/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
"""ARC Solver Package
"""ARC Solver Package.

Enhanced ARC solver with neural guidance, episodic retrieval, and test-time training.
This package exposes the high-level :class:`ARCSolver` alongside common
utilities for interacting with ARC datasets. The solver integrates neural
guidance, episodic retrieval and test-time training into a cohesive system.
"""

from .solver import ARCSolver
from .io_utils import load_rerun_json, save_submission
from .grid import Array

__all__ = ["ARCSolver", "load_rerun_json", "save_submission", "Array"]
17 changes: 17 additions & 0 deletions arc_solver/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@
# Type alias for clarity. ARC grids are small 2D arrays of integers.
Array = np.ndarray

__all__ = [
"Array",
"to_array",
"to_list",
"same_shape",
"rotate90",
"flip",
"transpose",
"pad_to",
"crop",
"translate",
"color_map",
"histogram",
"eq",
"bg_color",
]


def to_array(grid: List[List[int]]) -> Array:
"""Convert a nested Python list into a numpy array of dtype int16."""
Expand Down
17 changes: 15 additions & 2 deletions arc_solver/heuristics.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,25 @@

from __future__ import annotations

import logging
import numpy as np
from typing import List, Dict, Tuple, Optional

from .grid import Array, eq, rotate90, flip, histogram, bg_color, to_array
from .dsl import apply_program

logger = logging.getLogger(__name__)

__all__ = [
"infer_color_mapping",
"match_rotation_reflection",
"infer_translation",
"consistent_program_single_step",
"guess_output_shape",
"score_candidate",
"diversify_programs",
]


def infer_color_mapping(inp: Array, out: Array) -> Optional[Dict[int, int]]:
"""Try to infer a one-to-one color mapping between input and output grids.
Expand Down Expand Up @@ -123,8 +136,8 @@ def score_candidate(program: List[Tuple[str, Dict[str, int]]], train_pairs: List
try:
out = apply_program(a, program)
good += int(eq(out, b))
except Exception:
pass
except Exception as exc:
logger.warning("Program execution failed on training pair: %s", exc)
return good / len(train_pairs)


Expand Down
2 changes: 2 additions & 0 deletions arc_solver/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
"/kaggle/input/arc-agi-2/arc-agi_test_challenges.json",
]

__all__ = ["load_rerun_json", "save_submission"]


def load_rerun_json() -> Dict[str, Any]:
"""Load the JSON file containing all test tasks for the competition.
Expand Down
33 changes: 25 additions & 8 deletions arc_solver/ttt.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@

from __future__ import annotations

import logging
import numpy as np
from typing import List, Tuple, Dict, Any, Optional
from copy import deepcopy

from .grid import Array, eq
from .dsl import apply_program

logger = logging.getLogger(__name__)

__all__ = ["AdaptiveScorer", "TestTimeTrainer", "DataAugmentation"]


class AdaptiveScorer:
"""Adaptive scoring function that can be fine-tuned at test time."""
Expand Down Expand Up @@ -51,8 +56,11 @@ def extract_program_features(self, program: List[Tuple[str, Dict[str, Any]]],
# Compute partial match (e.g., correct shape)
if pred_out.shape == target_out.shape:
partial_matches += 1
except Exception:
pass
except Exception as exc:
logger.warning(
"Program execution failed during feature extraction: %s", exc
)
continue

features[2] = exact_matches / len(train_pairs)
features[3] = partial_matches / len(train_pairs)
Expand Down Expand Up @@ -167,8 +175,11 @@ def _evaluate_program(self, program: List[Tuple[str, Dict[str, Any]]],
pred_out = apply_program(inp, program)
if eq(pred_out, target_out):
successes += 1
except Exception:
pass
except Exception as exc:
logger.warning(
"Program evaluation failed during adaptation: %s", exc
)
continue

return successes / len(train_pairs) if train_pairs else 0.0

Expand Down Expand Up @@ -216,8 +227,11 @@ def augment_training_pairs(train_pairs: List[Tuple[Array, Array]],
aug_inp = np.rot90(inp, k)
aug_out = np.rot90(out, k)
augmented.append((aug_inp, aug_out))
except Exception:
pass
except Exception as exc:
logger.warning(
"Rotation augmentation failed (k=%s): %s", k, exc
)
continue

# Try reflections
for axis in [0, 1]:
Expand All @@ -227,8 +241,11 @@ def augment_training_pairs(train_pairs: List[Tuple[Array, Array]],
aug_inp = np.flip(inp, axis=axis)
aug_out = np.flip(out, axis=axis)
augmented.append((aug_inp, aug_out))
except Exception:
pass
except Exception as exc:
logger.warning(
"Reflection augmentation failed (axis=%s): %s", axis, exc
)
continue

return augmented[:max_augmentations]

Expand Down
164 changes: 164 additions & 0 deletions tools/colab_eval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""Train and evaluate the ARC solver in Kaggle/Colab environments.

This script provides a minimal end-to-end pipeline for training the neural
guidance classifier and producing Kaggle-compatible submission files. When
ground-truth solutions are provided, it also reports accuracy and per-task
differences between predictions and targets.
"""

from __future__ import annotations

import argparse
import json
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import numpy as np

# Ensure repository root is on the path so arc_solver can be imported when this
# script runs in Kaggle/Colab notebooks.
sys.path.append(str(Path(__file__).parent.parent))

from arc_solver.solver import ARCSolver
from arc_solver.grid import to_array, eq
from arc_solver.io_utils import save_submission
from train_guidance import (
load_training_data,
extract_training_features_and_labels,
train_classifier,
save_classifier,
)


def train_guidance_model(
train_json: str,
solutions_json: Optional[str],
model_path: str,
epochs: int = 100,
) -> str:
"""Train the neural guidance classifier.

Args:
train_json: Path to the ARC training challenges JSON.
solutions_json: Optional path to training solutions for supervised labels.
model_path: Where to persist the trained classifier.
epochs: Number of training epochs.

Returns:
Path to the saved model.
"""
tasks = load_training_data(train_json, solutions_json)
features, labels = extract_training_features_and_labels(tasks)
classifier = train_classifier(features, labels, epochs)
Path(model_path).parent.mkdir(parents=True, exist_ok=True)
save_classifier(classifier, model_path)
return model_path


def evaluate_solver(
test_json: str,
model_path: str,
solutions_json: Optional[str],
out_path: str,
) -> Tuple[float, Dict[str, List[List[List[int]]]]]:
"""Run the solver on evaluation tasks and optionally score against solutions.

Args:
test_json: Path to evaluation challenges JSON.
model_path: Path to trained guidance model.
solutions_json: Optional path to ground-truth solutions for scoring.
out_path: Where to write the Kaggle submission JSON.

Returns:
Tuple of overall accuracy (0-1) and a mapping of task ids to diff grids.
"""
solver = ARCSolver(use_enhancements=True, guidance_model_path=model_path)

with open(test_json, "r") as f:
test_tasks: Dict[str, Any] = json.load(f)

solutions: Dict[str, Any] = {}
if solutions_json and Path(solutions_json).exists():
with open(solutions_json, "r") as f:
solutions = json.load(f)

predictions: Dict[str, Dict[str, List[List[List[int]]]]] = {}
diffs: Dict[str, List[List[List[int]]]] = {}
correct = 0
total = 0

for task_id, task in test_tasks.items():
result = solver.solve_task(task)
predictions[task_id] = result

if task_id in solutions:
target_grids = [pair["output"] for pair in solutions[task_id]["test"]]
pred_grids = result["attempt_1"]
diff_grids: List[List[List[int]]] = []
all_match = True

for pred, target in zip(pred_grids, target_grids):
pa = to_array(pred)
ta = to_array(target)
all_match &= eq(pa, ta)
diff_grids.append((pa != ta).astype(int).tolist())

if all_match:
correct += 1
diffs[task_id] = diff_grids
total += 1

save_submission(predictions, out_path)
accuracy = correct / total if total else 0.0
return accuracy, diffs


def main() -> None:
parser = argparse.ArgumentParser(description="Train and evaluate ARC solver")
parser.add_argument("--train-json", help="Path to training challenges JSON")
parser.add_argument(
"--train-solutions", help="Path to training solutions JSON", default=None
)
parser.add_argument(
"--model-path",
default="neural_guidance_model.json",
help="Where to save or load the guidance model",
)
parser.add_argument("--test-json", required=True, help="Path to evaluation JSON")
parser.add_argument(
"--test-solutions",
help="Path to evaluation solutions JSON for scoring",
default=None,
)
parser.add_argument(
"--out", default="submission.json", help="Output path for submission JSON"
)
parser.add_argument("--epochs", type=int, default=100, help="Training epochs")

args = parser.parse_args()

if args.train_json:
train_guidance_model(
args.train_json, args.train_solutions, args.model_path, args.epochs
)

accuracy, diffs = evaluate_solver(
args.test_json, args.model_path, args.test_solutions, args.out
)

if args.test_solutions:
print(f"Accuracy: {accuracy * 100:.2f}%")
for task_id, diff in diffs.items():
if any(np.any(np.array(d)) for d in diff):
status = "incorrect"
else:
status = "correct"
print(f"Task {task_id}: {status}")

print(f"Submission file written to {args.out}")


if __name__ == "__main__":
main()

Loading