diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c7218f..0726dfd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ All notable changes to this project will be documented in this file. +## [Unreleased] + +### Added + +* Added circuit-tracer CLT loading support for HuggingFace, local safetensors, and circuit-tracer cache sources in attribution workflows. +* Added conversion utilities for saving circuit-tracer attribution graphs and feature metadata in the existing CLT-Forge visual interface format. +* Added a notebook showing how to load open-source circuit-tracer CLTs and visualize them with the CLT-Forge interface. + ## [0.1.0] - 2026-02-16 ### Added diff --git a/README.md b/README.md index aeed251..95075b9 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,28 @@ graph = runner.run( ) ``` +You can also run attribution with an open-source circuit-tracer CLT instead of +a CLT-Forge checkpoint: + +``` python +from clt_forge.attribution.attribution import AttributionRunner + +runner = AttributionRunner.from_circuit_tracer_hub( + hf_ref = "mntss/clt-gemma-2-2b-426k", + model_name = "google/gemma-2-2b", +) + +graph = runner.run( + input_string = "The capital of France is", + folder_name = "where/to/save", + run_interventions = False, +) +``` + +See `notebooks/load_open_source_circuit_tracer_clt.ipynb` for an end-to-end +example that saves a CLT-Forge-compatible graph and opens the existing visual +interface. + ------------------------------------------------------------------------ ### 5. Start the Visual-Interface diff --git a/notebooks/load_open_source_circuit_tracer_clt.ipynb b/notebooks/load_open_source_circuit_tracer_clt.ipynb new file mode 100644 index 0000000..75b0e55 --- /dev/null +++ b/notebooks/load_open_source_circuit_tracer_clt.ipynb @@ -0,0 +1,192 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Load an open-source circuit-tracer CLT in CLT-Forge\n", + "\n", + "This notebook loads a trained circuit-tracer CLT from HuggingFace, runs attribution through CLT-Forge's attribution runner, saves a CLT-Forge-compatible graph artifact, optionally converts circuit-tracer feature metadata into CLT-Forge feature JSON files, and opens the existing CLT-Forge visual interface.\n", + "\n", + "It does not change the visual interface. The bridge happens in the Python library layer.\n", + "\n", + "Run it from an environment with CLT-Forge installed, or install the local checkout from the repository root with `python -m pip install -e .`. The first code cell checks for the required notebook dependencies before importing CLT-Forge." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "import importlib.util\n", + "import sys\n", + "\n", + "def find_repo_root(start: Path) -> Path | None:\n", + " for candidate in (start, *start.parents):\n", + " if (candidate / \"src\" / \"clt_forge\").exists():\n", + " return candidate\n", + " return None\n", + "\n", + "repo_root = find_repo_root(Path.cwd().resolve())\n", + "if repo_root is not None:\n", + " sys.path.insert(0, str(repo_root / \"src\"))\n", + "\n", + "required_modules = {\n", + " \"clt_forge\": \"clt-forge\",\n", + " \"torch\": \"torch\",\n", + " \"transformers\": \"transformers\",\n", + " \"transformer_lens\": \"transformer-lens\",\n", + " \"huggingface_hub\": \"huggingface-hub\",\n", + " \"safetensors\": \"safetensors\",\n", + " \"dash\": \"dash\",\n", + " \"dash_cytoscape\": \"dash-cytoscape\",\n", + "}\n", + "missing_packages = [\n", + " package_name\n", + " for module_name, package_name in required_modules.items()\n", + " if importlib.util.find_spec(module_name) is None\n", + "]\n", + "if missing_packages:\n", + " raise RuntimeError(\n", + " \"Missing notebook dependencies. Install them from the CLT-Forge repository root with:\\n\\n\"\n", + " \"python -m pip install -e .\\n\"\n", + " f\"python -m pip install {' '.join(missing_packages)}\\n\\n\"\n", + " \"Then restart this notebook kernel.\"\n", + " )\n", + "\n", + "import torch\n", + "\n", + "from clt_forge.attribution.attribution import AttributionRunner\n", + "from clt_forge.attribution.circuit_tracer_features import (\n", + " download_clt_forge_feature_dicts_for_graph,\n", + ")\n", + "from clt_forge.frontend.app import main\n", + "from clt_forge.frontend.config.settings import AppConfig" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", + "dtype = torch.bfloat16 if device == \"cuda\" else torch.float32\n", + "\n", + "# circuit-tracer open-source CLT refs listed in the vendored circuit-tracer README:\n", + "# - mntss/clt-gemma-2-2b-426k\n", + "# - mntss/clt-gemma-2-2b-2.5M\n", + "# - mntss/clt-llama-3.2-1b-524k\n", + "model_name = \"google/gemma-2-2b\"\n", + "circuit_tracer_clt = \"mntss/clt-gemma-2-2b-426k\"\n", + "\n", + "output_base = repo_root if repo_root is not None else Path.cwd()\n", + "output_dir = output_base / \"outputs\" / \"circuit_tracer_gemma_demo\"\n", + "graph_path = output_dir / \"attribution_graph.pt\"\n", + "feature_dict_dir = output_dir / \"feature_dicts\"\n", + "output_dir.mkdir(parents=True, exist_ok=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", + "\n", + "print(\n", + " f\"Loading circuit-tracer CLT {circuit_tracer_clt!r} \"\n", + " f\"with model {model_name!r} on {device}...\",\n", + " flush=True,\n", + ")\n", + "start_time = time.time()\n", + "\n", + "runner = AttributionRunner.from_circuit_tracer_hub(\n", + " hf_ref=circuit_tracer_clt,\n", + " model_name=model_name,\n", + " device=device,\n", + " dtype=dtype,\n", + " backend=\"transformerlens\",\n", + " lazy_encoder=False,\n", + " lazy_decoder=True,\n", + " debug=False,\n", + ")\n", + "\n", + "print(f\"Runner ready in {time.time() - start_time:.1f}s\", flush=True)\n", + "\n", + "print(\"Running attribution...\", flush=True)\n", + "start_time = time.time()\n", + "\n", + "result = runner.run(\n", + " input_string=\"The capital of France is\",\n", + " folder_name=str(output_dir),\n", + " graph_name=graph_path.name,\n", + " max_n_logits=5,\n", + " max_feature_nodes=4096,\n", + " batch_size=128,\n", + " offload=\"cpu\",\n", + " run_interventions=False,\n", + ")\n", + "\n", + "print(f\"Attribution ready in {time.time() - start_time:.1f}s\", flush=True)\n", + "\n", + "graph_path" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Optional: pull circuit-tracer feature examples for the graph's active features\n", + "# and convert them to the CLT-Forge frontend feature JSON layout.\n", + "# Start with a small max_features while exploring; downloading every active\n", + "# feature can be slow for large graphs.\n", + "written_feature_files = download_clt_forge_feature_dicts_for_graph(\n", + " graph_result=result,\n", + " scan=result.get(\"circuit_tracer_scan\", circuit_tracer_clt),\n", + " output_dir=feature_dict_dir,\n", + " max_features=50,\n", + " strict=False,\n", + ")\n", + "\n", + "len(written_feature_files)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "cfg = AppConfig(\n", + " attr_graph_path=str(graph_path),\n", + " dict_base_folder=str(feature_dict_dir),\n", + " clt_checkpoint=\"\",\n", + " model_name=model_name,\n", + " model_class_name=\"HookedTransformer\",\n", + " port=8106,\n", + ")\n", + "\n", + "main(cfg)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "pygments_lexer": "ipython3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/clt_forge/attribution/attribution.py b/src/clt_forge/attribution/attribution.py index 8411118..fd2f49b 100644 --- a/src/clt_forge/attribution/attribution.py +++ b/src/clt_forge/attribution/attribution.py @@ -1,7 +1,10 @@ from clt_forge import logger +from clt_forge.attribution.conversion import build_clt_forge_attribution_result from clt_forge.attribution.loading import ( - load_circuit_tracing_clt_from_local, + CircuitTracerCLTSource, + _resolve_torch_dtype, + load_attribution_clt, test_clt_performance_on_prompt, compare_reconstruction_with_local_clt_class, ) @@ -10,20 +13,33 @@ run_intervention_per_feature, ) -from clt_forge.vendor.circuit_tracer.circuit_tracer import ReplacementModel, attribute -from clt_forge.vendor.circuit_tracer.circuit_tracer.graph import prune_graph, compute_graph_scores +import clt_forge.vendor.circuit_tracer # noqa: F401 +from circuit_tracer import ReplacementModel, attribute +from circuit_tracer.graph import prune_graph, compute_graph_scores import os import torch -from typing import List, Dict, Any +from typing import Any, Dict, List, Literal class AttributionRunner: def __init__( self, - clt_checkpoint: str, + clt_checkpoint: str | None = None, model_name: str = "gpt2", device: str = "cuda", + dtype: str | torch.dtype = torch.float32, + backend: Literal["nnsight", "transformerlens"] = "transformerlens", + clt_source: CircuitTracerCLTSource = "clt_forge", + circuit_tracer_clt: str | None = None, + lazy_encoder: bool = False, + lazy_decoder: bool = True, + cache_dir: str | None = None, + use_cache: bool = True, + feature_input_hook: str = "hook_resid_mid", + feature_output_hook: str = "hook_mlp_out", + scan: str | list[str] | None = None, debug: bool = False, + model_kwargs: Dict[str, Any] | None = None, ): self.debug = debug @@ -32,49 +48,105 @@ def log(msg): logger.info(msg) self.log = log + torch_dtype = _resolve_torch_dtype(dtype) + + clt_ref = circuit_tracer_clt or clt_checkpoint + if clt_ref is None: + raise ValueError( + "Pass clt_checkpoint for CLT-Forge checkpoints or " + "circuit_tracer_clt for circuit-tracer CLTs." + ) self.log("Loading CLT...") - self.clt = load_circuit_tracing_clt_from_local( - clt_checkpoint, device=device, debug=debug + self.clt = load_attribution_clt( + clt_ref=clt_ref, + source=clt_source, + device=device, + dtype=torch_dtype, + lazy_encoder=lazy_encoder, + lazy_decoder=lazy_decoder, + cache_dir=cache_dir, + use_cache=use_cache, + feature_input_hook=feature_input_hook, + feature_output_hook=feature_output_hook, + scan=scan, + debug=debug, ) self.log("Loading model...") self.model = ReplacementModel.from_pretrained_and_transcoders( model_name=model_name, transcoders=self.clt, + backend=backend, + device=torch.device(device), + dtype=torch_dtype, + **(model_kwargs or {}), ) self.clt_checkpoint = clt_checkpoint + self.clt_ref = clt_ref + self.clt_source = clt_source self.model_name = model_name + self.backend = backend + + @classmethod + def from_circuit_tracer_hub( + cls, + hf_ref: str, + model_name: str, + **kwargs: Any, + ) -> "AttributionRunner": + """Create a runner from an open-source circuit-tracer CLT on HuggingFace.""" + return cls( + model_name=model_name, + clt_source="circuit_tracer_hub", + circuit_tracer_clt=hf_ref, + **kwargs, + ) - def _build_result(self, graph, prune_result, input_string) -> Dict[str, Any]: - sparse_adjacency = prune_result.edge_mask.float() - - active_feature = torch.stack( - [ - graph.active_features[:, 1], - graph.active_features[:, 0], - graph.active_features[:, 2], - ], - dim=1, + @classmethod + def from_circuit_tracer_local( + cls, + clt_path: str, + model_name: str, + **kwargs: Any, + ) -> "AttributionRunner": + """Create a runner from a local circuit-tracer safetensors CLT directory.""" + return cls( + model_name=model_name, + clt_source="circuit_tracer_local", + circuit_tracer_clt=clt_path, + **kwargs, ) - token_string = [self.model.tokenizer.decode(t) for t in graph.input_tokens] - logit_token_strings = [self.model.tokenizer.decode(t) for t in graph.logit_tokens] - - return { - "adjacency_matrix": graph.adjacency_matrix.cpu(), - "feature_indices": active_feature.cpu(), - "sparse_pruned_adj": sparse_adjacency.cpu(), - "feature_mask": prune_result.node_mask.cpu(), - "edge_mask": prune_result.edge_mask.cpu(), - "logit_tokens": graph.logit_tokens.cpu(), - "logit_probabilities": graph.logit_probabilities.cpu(), - "input_tokens": graph.input_tokens.cpu(), - "input_string": input_string, - "token_string": token_string, - "logit_token_strings": logit_token_strings, - } + @classmethod + def from_circuit_tracer_cache( + cls, + hf_ref: str, + model_name: str, + **kwargs: Any, + ) -> "AttributionRunner": + """Create a runner from a circuit-tracer CLT already saved in local cache.""" + return cls( + model_name=model_name, + clt_source="circuit_tracer_cache", + circuit_tracer_clt=hf_ref, + **kwargs, + ) + + def _build_result(self, graph, prune_result, input_string) -> Dict[str, Any]: + return build_clt_forge_attribution_result( + graph=graph, + prune_result=prune_result, + tokenizer=self.model.tokenizer, + input_string=input_string, + metadata={ + "clt_source": self.clt_source, + "clt_ref": self.clt_ref, + "model_name": self.model_name, + "backend": self.backend, + }, + ) def run( self, @@ -101,14 +173,15 @@ def run( input_string, self.clt, self.model, debug=self.debug ) - compare_reconstruction_with_local_clt_class( - self.clt_checkpoint, - input_string, - self.clt, - self.model, - self.model_name, - debug=self.debug, - ) + if self.clt_source == "clt_forge" and self.clt_checkpoint is not None: + compare_reconstruction_with_local_clt_class( + self.clt_checkpoint, + input_string, + self.clt, + self.model, + self.model_name, + debug=self.debug, + ) graph = attribute( prompt=input_string, @@ -135,7 +208,7 @@ def run( if self.debug: self.log(f"Sparse adjacency shape: {prune_result.edge_mask.shape}") - n_features = graph.active_features.shape[0] + n_features = graph.selected_features.shape[0] self.log(f"Number of features before pruning: {n_features}") self.log(f"Number of feature after pruning (not counting error nodes): {prune_result.node_mask[:n_features].sum().item()}") diff --git a/src/clt_forge/attribution/circuit_tracer_features.py b/src/clt_forge/attribution/circuit_tracer_features.py new file mode 100644 index 0000000..29e2b9e --- /dev/null +++ b/src/clt_forge/attribution/circuit_tracer_features.py @@ -0,0 +1,415 @@ +import gzip +import json +import zlib +from pathlib import Path +from typing import Any, Iterable, Mapping + +from clt_forge import logger + + +def cantor_pair(layer: int, feature_idx: int) -> int: + """Return the feature id convention used by circuit-tracer's frontend.""" + return (layer + feature_idx) * (layer + feature_idx + 1) // 2 + feature_idx + + +def cantor_unpair(feature_id: int) -> tuple[int, int]: + """Inverse of ``cantor_pair`` as ``(layer, feature_idx)``.""" + w = int(((8 * feature_id + 1) ** 0.5 - 1) // 2) + t = (w * w + w) // 2 + feature_idx = feature_id - t + layer = w - feature_idx + return layer, feature_idx + + +def _decode_binary_feature_record(record: bytes) -> dict[str, Any]: + if len(record) < 4: + raise ValueError("Circuit-tracer binary feature record is too short") + + data_length = int.from_bytes(record[:4], byteorder="little", signed=False) + compressed = record[4 : 4 + data_length] + + try: + payload = zlib.decompress(compressed) + except zlib.error: + payload = gzip.decompress(compressed) + + return json.loads(payload.decode("utf-8")) + + +def load_circuit_tracer_feature_data(path: str | Path) -> dict[str, Any]: + """Load a circuit-tracer feature record from JSON, gzipped JSON, or binary form.""" + feature_path = Path(path) + + if feature_path.suffix == ".json": + with open(feature_path, "r") as f: + return json.load(f) + + if feature_path.suffix == ".gz": + with gzip.open(feature_path, "rt") as f: + return json.load(f) + + with open(feature_path, "rb") as f: + return _decode_binary_feature_record(f.read()) + + +def _maybe_feature_coordinates( + feature_data: Mapping[str, Any], + layer: int | None, + feature_idx: int | None, +) -> tuple[int, int]: + if layer is not None and feature_idx is not None: + return int(layer), int(feature_idx) + + if "layer" in feature_data: + layer = int(feature_data["layer"]) + if "feature_index" in feature_data: + feature_idx = int(feature_data["feature_index"]) + + if layer is not None and feature_idx is not None: + return int(layer), int(feature_idx) + + feature_id = feature_data.get("index") + if feature_id is None: + raise ValueError( + "Could not infer feature coordinates. Pass layer and feature_idx " + "explicitly or provide a circuit-tracer feature record with an index." + ) + + return cantor_unpair(int(feature_id)) + + +def _highlight_tokens(tokens: list[str], acts: list[float], threshold_ratio: float) -> str: + if not tokens: + return "" + if not acts or len(acts) != len(tokens): + return "".join(tokens) + + max_act = max(acts) + if max_act <= 0: + return "".join(tokens) + + threshold = max_act * threshold_ratio + parts = [] + in_highlight = False + + for token, act in zip(tokens, acts): + should_highlight = act >= threshold and act > 0 + if should_highlight and not in_highlight: + parts.append("<<") + in_highlight = True + elif not should_highlight and in_highlight: + parts.append(">>") + in_highlight = False + + parts.append(token) + + if in_highlight: + parts.append(">>") + + return "".join(parts) + + +def _iter_examples(feature_data: Mapping[str, Any]) -> Iterable[tuple[str, Mapping[str, Any]]]: + for quantile in feature_data.get("examples_quantiles", []) or []: + quantile_name = quantile.get("quantile_name", "") + for example in quantile.get("examples", []) or []: + yield quantile_name, example + + +def _top_activating_tokens( + examples: Iterable[Mapping[str, Any]], + top_k: int, +) -> list[dict[str, Any]]: + stats: dict[str, dict[str, float]] = {} + + for example in examples: + tokens = example.get("tokens", []) or [] + acts = example.get("tokens_acts_list", []) or [] + for token, act in zip(tokens, acts): + act_float = float(act) + if act_float <= 0: + continue + token_stats = stats.setdefault(token, {"count": 0.0, "total": 0.0}) + token_stats["count"] += 1 + token_stats["total"] += act_float + + ranking = [ + { + "token": token, + "frequency": int(values["count"]), + "average_activation": values["total"] / values["count"], + } + for token, values in stats.items() + ] + ranking.sort( + key=lambda item: (item["frequency"], item["average_activation"]), + reverse=True, + ) + return ranking[:top_k] + + +def circuit_tracer_feature_to_clt_forge_feature_dict( + feature_data: Mapping[str, Any], + layer: int | None = None, + feature_idx: int | None = None, + max_examples: int | None = None, + top_k_tokens: int = 10, + highlight_threshold_ratio: float = 0.6, +) -> dict[str, Any]: + """Convert circuit-tracer feature examples to CLT-Forge frontend JSON schema.""" + layer, feature_idx = _maybe_feature_coordinates(feature_data, layer, feature_idx) + + top_examples: list[str] = [] + top_examples_tks: list[dict[str, Any]] = [] + examples_for_tokens: list[Mapping[str, Any]] = [] + positive_acts: list[float] = [] + + for quantile_name, example in _iter_examples(feature_data): + tokens = list(example.get("tokens", []) or []) + acts = [float(act) for act in example.get("tokens_acts_list", []) or []] + examples_for_tokens.append(example) + + if acts: + positive_acts.extend(act for act in acts if act > 0) + + top_examples.append( + _highlight_tokens( + tokens=tokens, + acts=acts, + threshold_ratio=highlight_threshold_ratio, + ) + ) + top_examples_tks.append( + { + "tokens": tokens, + "activations": acts, + "max_val": max(acts) if acts else 0.0, + "quantile": quantile_name, + "train_token_ind": example.get("train_token_ind"), + "is_repeated_datapoint": example.get("is_repeated_datapoint", False), + } + ) + + if max_examples is not None and len(top_examples) >= max_examples: + break + + average_activation = ( + sum(positive_acts) / len(positive_acts) if positive_acts else 0.0 + ) + + return { + "layer": int(layer), + "feature_index": int(feature_idx), + "average_activation": float( + feature_data.get("average_activation", average_activation) + ), + "top_examples": top_examples, + "top_examples_tks": top_examples_tks, + "top_activating_tokens": _top_activating_tokens( + examples=examples_for_tokens, + top_k=top_k_tokens, + ), + "description": feature_data.get("description") + or feature_data.get("label") + or "Unknown", + "explanation": feature_data.get("explanation") + or "No explanation generated", + "raw_explanation": feature_data.get("raw_explanation", ""), + "source": "circuit_tracer", + "transcoder_id": feature_data.get("transcoder_id"), + "circuit_tracer_index": feature_data.get("index", cantor_pair(layer, feature_idx)), + "activation_frequency": feature_data.get("activation_frequency"), + "top_logits": feature_data.get("top_logits", []), + "bottom_logits": feature_data.get("bottom_logits", []), + "act_min": feature_data.get("act_min"), + "act_max": feature_data.get("act_max"), + "quantile_values": feature_data.get("quantile_values", []), + "histogram": feature_data.get("histogram", []), + } + + +def write_clt_forge_feature_dict( + feature_dict: Mapping[str, Any], + output_dir: str | Path, +) -> Path: + """Write one CLT-Forge feature JSON file in the frontend's expected layout.""" + layer = int(feature_dict["layer"]) + feature_idx = int(feature_dict["feature_index"]) + path = ( + Path(output_dir) + / f"layer{layer}" + / f"feature_{feature_idx}_complete.json" + ) + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "w") as f: + json.dump(feature_dict, f, indent=2) + return path + + +def convert_circuit_tracer_feature_file( + input_path: str | Path, + output_dir: str | Path, + layer: int | None = None, + feature_idx: int | None = None, + **kwargs: Any, +) -> Path: + """Convert one circuit-tracer feature file to a CLT-Forge feature JSON file.""" + feature_data = load_circuit_tracer_feature_data(input_path) + feature_dict = circuit_tracer_feature_to_clt_forge_feature_dict( + feature_data=feature_data, + layer=layer, + feature_idx=feature_idx, + **kwargs, + ) + return write_clt_forge_feature_dict(feature_dict, output_dir) + + +def _parse_scan_ref(scan: str) -> tuple[str, str | None, str | None]: + """Parse circuit-tracer scan strings into ``(repo_id, subfolder, revision)``.""" + if scan == "gemma": + scan = "mwhanna/gemma-scope-transcoders" + elif scan == "llama": + scan = "mntss/transcoder-Llama-3.2-1B" + + revision = None + subfolder = None + + if "//" in scan: + repo_id, rest = scan.split("//", 1) + if "@" in rest: + subfolder, revision = rest.rsplit("@", 1) + else: + subfolder = rest + return repo_id, subfolder, revision + + if "@" in scan: + scan, revision = scan.rsplit("@", 1) + + parts = scan.split("/") + if len(parts) > 2: + repo_id = "/".join(parts[:2]) + subfolder = "/".join(parts[2:]) + else: + repo_id = scan + + return repo_id, subfolder, revision + + +def download_circuit_tracer_feature_from_hub( + scan: str, + layer: int, + feature_idx: int, + cache_dir: str | None = None, +) -> dict[str, Any]: + """Download one circuit-tracer feature record from HuggingFace feature storage. + + circuit-tracer feature data may be stored either as ``features/{id}.json`` or + in a binary file indexed by ``features/index.json.gz``. This helper supports + both layouts. + """ + from huggingface_hub import hf_hub_download + + repo_id, subfolder, revision = _parse_scan_ref(scan) + prefix = f"{subfolder}/features" if subfolder else "features" + feature_id = cantor_pair(layer, feature_idx) + + try: + json_path = hf_hub_download( + repo_id=repo_id, + filename=f"{prefix}/{feature_id}.json", + revision=revision, + cache_dir=cache_dir, + ) + return load_circuit_tracer_feature_data(json_path) + except Exception as json_error: + try: + index_path = hf_hub_download( + repo_id=repo_id, + filename=f"{prefix}/index.json.gz", + revision=revision, + cache_dir=cache_dir, + ) + with gzip.open(index_path, "rt") as f: + index_data = json.load(f) + + layer_data = index_data[str(layer)] if isinstance(index_data, dict) else index_data[layer] + offsets = layer_data["offsets"] + filename = layer_data["filename"] + start_byte = offsets[feature_idx] + end_byte = offsets[feature_idx + 1] + + bin_path = hf_hub_download( + repo_id=repo_id, + filename=f"{prefix}/{filename}", + revision=revision, + cache_dir=cache_dir, + ) + with open(bin_path, "rb") as f: + f.seek(start_byte) + record = f.read(end_byte - start_byte) + + return _decode_binary_feature_record(record) + except Exception as index_error: + raise FileNotFoundError( + "Could not load circuit-tracer feature data for " + f"layer {layer}, feature {feature_idx} from {scan!r}" + ) from index_error or json_error + + +def download_clt_forge_feature_dicts_for_graph( + graph_result: Mapping[str, Any], + scan: str, + output_dir: str | Path, + max_features: int | None = None, + cache_dir: str | None = None, + skip_existing: bool = True, + strict: bool = False, + **kwargs: Any, +) -> list[Path]: + """Download CT feature data for a CLT-Forge graph and write frontend JSONs.""" + seen: set[tuple[int, int]] = set() + written: list[Path] = [] + + for _, layer, feature_idx in graph_result["feature_indices"].tolist(): + key = (int(layer), int(feature_idx)) + if key in seen: + continue + seen.add(key) + + if max_features is not None and len(written) >= max_features: + break + + output_path = ( + Path(output_dir) + / f"layer{key[0]}" + / f"feature_{key[1]}_complete.json" + ) + if skip_existing and output_path.exists(): + written.append(output_path) + continue + + try: + feature_data = download_circuit_tracer_feature_from_hub( + scan=scan, + layer=key[0], + feature_idx=key[1], + cache_dir=cache_dir, + ) + feature_dict = circuit_tracer_feature_to_clt_forge_feature_dict( + feature_data=feature_data, + layer=key[0], + feature_idx=key[1], + **kwargs, + ) + written.append(write_clt_forge_feature_dict(feature_dict, output_dir)) + except Exception as exc: + if strict: + raise + logger.warning( + "Could not convert circuit-tracer feature %s from %s: %s", + key, + scan, + exc, + ) + + return written diff --git a/src/clt_forge/attribution/conversion.py b/src/clt_forge/attribution/conversion.py new file mode 100644 index 0000000..93147cf --- /dev/null +++ b/src/clt_forge/attribution/conversion.py @@ -0,0 +1,149 @@ +from pathlib import Path +from typing import Any, Dict + +import torch + +import clt_forge.vendor.circuit_tracer # noqa: F401 +from circuit_tracer.graph import ( + Graph, + PruneResult, + prune_graph, +) + + +def _decode_token(tokenizer: Any, token_id: int | torch.Tensor) -> str: + token_int = int(token_id) + try: + return tokenizer.decode([token_int]) + except TypeError: + return tokenizer.decode(token_int) + + +def _logit_token_strings(graph: Graph, tokenizer: Any) -> list[str]: + strings: list[str] = [] + for target in graph.logit_targets: + if target.token_str: + strings.append(target.token_str) + elif target.vocab_idx < graph.vocab_size: + strings.append(_decode_token(tokenizer, target.vocab_idx)) + else: + strings.append(str(target.vocab_idx)) + return strings + + +def selected_feature_indices(graph: Graph) -> torch.Tensor: + """Return selected graph features as ``(pos, layer, feature_idx)`` rows. + + circuit-tracer stores all non-zero features in ``graph.active_features`` as + ``(layer, pos, feature_idx)``. The graph adjacency only contains + ``graph.selected_features``. CLT-Forge's frontend expects the selected rows + in ``(pos, layer, feature_idx)`` order. + """ + active_features = graph.active_features + selected_features = getattr(graph, "selected_features", None) + + if selected_features is not None: + active_features = active_features[selected_features] + + return torch.stack( + [ + active_features[:, 1], + active_features[:, 0], + active_features[:, 2], + ], + dim=1, + ) + + +def build_clt_forge_attribution_result( + graph: Graph, + prune_result: PruneResult, + tokenizer: Any, + input_string: str | None = None, + metadata: Dict[str, Any] | None = None, +) -> Dict[str, Any]: + """Convert a circuit-tracer graph into the CLT-Forge frontend ``.pt`` schema.""" + input_tokens = graph.input_tokens.reshape(-1) + logit_tokens = graph.logit_token_ids.cpu() + + result: Dict[str, Any] = { + "adjacency_matrix": graph.adjacency_matrix.cpu(), + "feature_indices": selected_feature_indices(graph).cpu(), + "sparse_pruned_adj": prune_result.edge_mask.float().cpu(), + "feature_mask": prune_result.node_mask.cpu(), + "edge_mask": prune_result.edge_mask.cpu(), + "logit_tokens": logit_tokens, + "logit_probabilities": graph.logit_probabilities.cpu(), + "input_tokens": input_tokens.cpu(), + "input_string": input_string if input_string is not None else graph.input_string, + "token_string": [_decode_token(tokenizer, token) for token in input_tokens], + "logit_token_strings": _logit_token_strings(graph, tokenizer), + "n_layers": int(graph.cfg.n_layers), + } + + if metadata: + result["metadata"] = metadata + + if graph.scan is not None: + result["circuit_tracer_scan"] = graph.scan + + return result + + +def convert_circuit_tracer_graph_to_clt_forge_result( + graph: Graph, + tokenizer: Any, + node_threshold: float = 0.8, + edge_threshold: float = 0.95, + input_string: str | None = None, + metadata: Dict[str, Any] | None = None, +) -> Dict[str, Any]: + """Prune and convert a circuit-tracer ``Graph`` for CLT-Forge visualization.""" + prune_result = prune_graph( + graph=graph, + node_threshold=node_threshold, + edge_threshold=edge_threshold, + ) + return build_clt_forge_attribution_result( + graph=graph, + prune_result=prune_result, + tokenizer=tokenizer, + input_string=input_string, + metadata=metadata, + ) + + +def convert_circuit_tracer_graph_file_to_clt_forge_result( + graph_path: str | Path, + tokenizer: Any | None = None, + node_threshold: float = 0.8, + edge_threshold: float = 0.95, + map_location: str = "cpu", + metadata: Dict[str, Any] | None = None, +) -> Dict[str, Any]: + """Load a circuit-tracer ``Graph.to_pt`` file and convert it for CLT-Forge.""" + graph = Graph.from_pt(str(graph_path), map_location=map_location) + + if tokenizer is None: + from transformers import AutoTokenizer + + tokenizer = AutoTokenizer.from_pretrained(graph.cfg.tokenizer_name) + + return convert_circuit_tracer_graph_to_clt_forge_result( + graph=graph, + tokenizer=tokenizer, + node_threshold=node_threshold, + edge_threshold=edge_threshold, + metadata=metadata, + ) + + +def save_clt_forge_attribution_result( + result: Dict[str, Any], + output_path: str | Path, +) -> Path: + """Save a CLT-Forge-compatible attribution result dict.""" + path = Path(output_path) + path.parent.mkdir(parents=True, exist_ok=True) + torch.save(result, path) + return path diff --git a/src/clt_forge/attribution/intervention.py b/src/clt_forge/attribution/intervention.py index 6adbe5c..43109ef 100644 --- a/src/clt_forge/attribution/intervention.py +++ b/src/clt_forge/attribution/intervention.py @@ -3,7 +3,8 @@ import torch from typing import List, Dict, Any, Tuple -from clt_forge.vendor.circuit_tracer.circuit_tracer import ReplacementModel +import clt_forge.vendor.circuit_tracer # noqa: F401 +from circuit_tracer import ReplacementModel def _decode_top_tokens(model, probs, top_k: int): top_tokens = torch.topk(probs, top_k) diff --git a/src/clt_forge/attribution/loading.py b/src/clt_forge/attribution/loading.py index 3642e8c..07754dd 100644 --- a/src/clt_forge/attribution/loading.py +++ b/src/clt_forge/attribution/loading.py @@ -1,9 +1,42 @@ -import torch from pathlib import Path +from importlib import import_module +from typing import Literal + +import torch + from clt_forge import logger from clt_forge.clt import CLT -from clt_forge.vendor.circuit_tracer.circuit_tracer.transcoder.cross_layer_transcoder import CrossLayerTranscoder -from clt_forge.vendor.circuit_tracer.circuit_tracer import ReplacementModel +from clt_forge.utils import DTYPE_MAP +import clt_forge.vendor.circuit_tracer # noqa: F401 +from circuit_tracer import ReplacementModel +from circuit_tracer.transcoder.cross_layer_transcoder import ( + CrossLayerTranscoder, + load_clt, +) + +CircuitTracerCLTSource = Literal[ + "clt_forge", + "circuit_tracer_hub", + "circuit_tracer_local", + "circuit_tracer_cache", +] + + +def _resolve_torch_dtype(dtype: str | torch.dtype) -> torch.dtype: + if isinstance(dtype, torch.dtype): + return dtype + try: + return DTYPE_MAP[dtype] + except KeyError as exc: + allowed = ", ".join(sorted(DTYPE_MAP)) + raise ValueError(f"Unknown dtype {dtype!r}. Expected one of: {allowed}") from exc + + +def _resolve_torch_device(device: str | torch.device | None) -> torch.device | None: + if device is None or isinstance(device, torch.device): + return device + return torch.device(device) + def load_circuit_tracing_clt_from_local( clt_checkpoint: str, @@ -105,6 +138,199 @@ def log(msg): return instance +def load_clt_forge_checkpoint_as_circuit_tracer_clt( + clt_checkpoint: str, + device: str = "cuda", + debug: bool = False, +) -> CrossLayerTranscoder: + """Alias for the historical CLT-Forge-checkpoint-to-circuit-tracer loader.""" + return load_circuit_tracing_clt_from_local( + clt_checkpoint=clt_checkpoint, + device=device, + debug=debug, + ) + + +def load_circuit_tracer_clt_from_hub( + hf_ref: str, + device: str | torch.device | None = "cuda", + dtype: str | torch.dtype = torch.float32, + lazy_encoder: bool = False, + lazy_decoder: bool = True, + cache_dir: str | None = None, + use_cache: bool = True, + debug: bool = False, +) -> CrossLayerTranscoder: + """Load an open-source circuit-tracer CLT from HuggingFace. + + ``hf_ref`` can be any reference accepted by circuit-tracer, including + repository IDs, ``repo/subfolder@revision`` references, and the circuit-tracer + convenience aliases. + """ + torch_device = _resolve_torch_device(device) + torch_dtype = _resolve_torch_dtype(dtype) + + if debug: + logger.info( + "Loading circuit-tracer CLT from HuggingFace: " + f"{hf_ref} ({torch_device=}, {torch_dtype=})" + ) + + load_transcoder_from_hub = getattr( + import_module("circuit_tracer.utils.hf_utils"), + "load_transcoder_from_hub", + ) + transcoders, _ = load_transcoder_from_hub( + hf_ref=hf_ref, + device=torch_device, + dtype=torch_dtype, + lazy_encoder=lazy_encoder, + lazy_decoder=lazy_decoder, + cache_dir=cache_dir, + use_cache=use_cache, + ) + + if not isinstance(transcoders, CrossLayerTranscoder): + raise TypeError( + "Expected a circuit-tracer CrossLayerTranscoder, but " + f"{hf_ref!r} loaded {type(transcoders).__name__}. " + "Use a circuit-tracer CLT repo rather than a per-layer transcoder set." + ) + + return transcoders + + +def load_circuit_tracer_clt_from_local_safetensors( + clt_path: str, + device: str | torch.device | None = "cuda", + dtype: str | torch.dtype = torch.float32, + lazy_encoder: bool = False, + lazy_decoder: bool = True, + feature_input_hook: str = "hook_resid_mid", + feature_output_hook: str = "hook_mlp_out", + scan: str | list[str] | None = None, + debug: bool = False, +) -> CrossLayerTranscoder: + """Load a circuit-tracer CLT already saved in safetensors directory format.""" + torch_device = _resolve_torch_device(device) + torch_dtype = _resolve_torch_dtype(dtype) + + if debug: + logger.info( + "Loading circuit-tracer CLT from local safetensors: " + f"{clt_path} ({torch_device=}, {torch_dtype=})" + ) + + return load_clt( + clt_path=clt_path, + device=torch_device, + dtype=torch_dtype, + lazy_encoder=lazy_encoder, + lazy_decoder=lazy_decoder, + feature_input_hook=feature_input_hook, + feature_output_hook=feature_output_hook, + scan=scan, + ) + + +def load_circuit_tracer_clt_from_cache( + hf_ref: str, + device: str | torch.device | None = "cuda", + dtype: str | torch.dtype = torch.float32, + lazy_encoder: bool = False, + lazy_decoder: bool = True, + cache_dir: str | None = None, + debug: bool = False, +) -> CrossLayerTranscoder: + """Load a circuit-tracer CLT from circuit-tracer's local cache.""" + torch_device = _resolve_torch_device(device) + torch_dtype = _resolve_torch_dtype(dtype) + + if debug: + logger.info( + "Loading circuit-tracer CLT from cache: " + f"{hf_ref} ({torch_device=}, {torch_dtype=})" + ) + + load_transcoders_from_cache = getattr( + import_module("circuit_tracer.utils.caching"), + "load_transcoders_from_cache", + ) + transcoders, _ = load_transcoders_from_cache( + hf_ref=hf_ref, + cache_dir=cache_dir, + device=torch_device, + dtype=torch_dtype, + lazy_encoder=lazy_encoder, + lazy_decoder=lazy_decoder, + ) + + if not isinstance(transcoders, CrossLayerTranscoder): + raise TypeError( + "Expected a cached circuit-tracer CrossLayerTranscoder, but " + f"{hf_ref!r} loaded {type(transcoders).__name__}." + ) + + return transcoders + + +def load_attribution_clt( + clt_ref: str, + source: CircuitTracerCLTSource = "clt_forge", + device: str | torch.device | None = "cuda", + dtype: str | torch.dtype = torch.float32, + lazy_encoder: bool = False, + lazy_decoder: bool = True, + cache_dir: str | None = None, + use_cache: bool = True, + feature_input_hook: str = "hook_resid_mid", + feature_output_hook: str = "hook_mlp_out", + scan: str | list[str] | None = None, + debug: bool = False, +) -> CrossLayerTranscoder: + """Load a CLT object suitable for circuit-tracer attribution.""" + if source == "clt_forge": + return load_circuit_tracing_clt_from_local( + clt_checkpoint=clt_ref, + device=str(device or "cpu"), + debug=debug, + ) + if source == "circuit_tracer_hub": + return load_circuit_tracer_clt_from_hub( + hf_ref=clt_ref, + device=device, + dtype=dtype, + lazy_encoder=lazy_encoder, + lazy_decoder=lazy_decoder, + cache_dir=cache_dir, + use_cache=use_cache, + debug=debug, + ) + if source == "circuit_tracer_local": + return load_circuit_tracer_clt_from_local_safetensors( + clt_path=clt_ref, + device=device, + dtype=dtype, + lazy_encoder=lazy_encoder, + lazy_decoder=lazy_decoder, + feature_input_hook=feature_input_hook, + feature_output_hook=feature_output_hook, + scan=scan, + debug=debug, + ) + if source == "circuit_tracer_cache": + return load_circuit_tracer_clt_from_cache( + hf_ref=clt_ref, + device=device, + dtype=dtype, + lazy_encoder=lazy_encoder, + lazy_decoder=lazy_decoder, + cache_dir=cache_dir, + debug=debug, + ) + raise ValueError(f"Unknown CLT source: {source}") + + def test_clt_performance_on_prompt( inputs: str, clt: CrossLayerTranscoder, diff --git a/tests/attribution/test_circuit_tracer_bridge.py b/tests/attribution/test_circuit_tracer_bridge.py new file mode 100644 index 0000000..4293600 --- /dev/null +++ b/tests/attribution/test_circuit_tracer_bridge.py @@ -0,0 +1,250 @@ +import gzip +import json +import zlib +from pathlib import Path +from types import SimpleNamespace + +import pytest +import torch + +from clt_forge.attribution.circuit_tracer_features import ( + cantor_pair, + circuit_tracer_feature_to_clt_forge_feature_dict, + convert_circuit_tracer_feature_file, + download_circuit_tracer_feature_from_hub, + load_circuit_tracer_feature_data, + write_clt_forge_feature_dict, +) +from clt_forge.attribution.conversion import build_clt_forge_attribution_result +from clt_forge.attribution.loading import load_circuit_tracer_clt_from_hub +from clt_forge.frontend.config.settings import AppConfig +from clt_forge.frontend.data.loaders import DataLoader +import clt_forge.vendor.circuit_tracer # noqa: F401 +from circuit_tracer.attribution.targets import LogitTarget +from circuit_tracer.graph import Graph, PruneResult +from circuit_tracer.transcoder.cross_layer_transcoder import ( + CrossLayerTranscoder, +) +from circuit_tracer.utils.tl_nnsight_mapping import UnifiedConfig + + +class ToyTokenizer: + def decode(self, token): + if isinstance(token, list): + return "".join(f"tok{int(t)}" for t in token) + return f"tok{int(token)}" + + +def _toy_graph() -> Graph: + n_layers = 2 + n_tokens = 3 + n_features = 2 + n_logits = 1 + total_nodes = n_features + n_layers * n_tokens + n_tokens + n_logits + + cfg = UnifiedConfig( + n_layers=n_layers, + d_model=4, + d_head=2, + n_heads=2, + d_mlp=8, + d_vocab=100, + tokenizer_name="toy-tokenizer", + model_name="toy-model", + original_architecture="toy", + ) + + return Graph( + input_string="tok10tok11tok12", + input_tokens=torch.tensor([10, 11, 12]), + active_features=torch.tensor( + [ + [0, 0, 5], + [1, 1, 7], + [1, 2, 9], + ], + dtype=torch.long, + ), + selected_features=torch.tensor([0, 2], dtype=torch.long), + activation_values=torch.tensor([1.0, 2.0, 3.0]), + adjacency_matrix=torch.ones(total_nodes, total_nodes), + cfg=cfg, + logit_targets=[LogitTarget(token_str="target", vocab_idx=42)], + logit_probabilities=torch.tensor([1.0]), + scan="mntss/clt-gemma-2-2b-426k", + ) + + +def _toy_prune_result(graph: Graph) -> PruneResult: + n_nodes = graph.adjacency_matrix.shape[0] + return PruneResult( + node_mask=torch.ones(n_nodes, dtype=torch.bool), + edge_mask=torch.ones(n_nodes, n_nodes, dtype=torch.bool), + cumulative_scores=torch.ones(n_nodes), + ) + + +def _feature_data() -> dict: + return { + "index": cantor_pair(2, 17), + "transcoder_id": "mntss/clt-gemma-2-2b-426k", + "examples_quantiles": [ + { + "quantile_name": "top", + "examples": [ + { + "tokens": ["The", " capital", " is"], + "tokens_acts_list": [0.0, 1.5, 0.2], + "train_token_ind": 1, + "is_repeated_datapoint": False, + } + ], + } + ], + "top_logits": [" Paris"], + "bottom_logits": [" London"], + "act_min": 0.0, + "act_max": 2.0, + "quantile_values": [0.1, 1.0], + "histogram": [0, 2, 1], + "activation_frequency": 0.001, + } + + +def test_graph_conversion_matches_frontend_dataloader_contract(tmp_path: Path): + graph = _toy_graph() + result = build_clt_forge_attribution_result( + graph=graph, + prune_result=_toy_prune_result(graph), + tokenizer=ToyTokenizer(), + ) + + assert result["n_layers"] == 2 + assert result["feature_indices"].tolist() == [[0, 0, 5], [2, 1, 9]] + + graph_path = tmp_path / "attribution_graph.pt" + torch.save(result, graph_path) + + cfg = AppConfig( + attr_graph_path=str(graph_path), + dict_base_folder=str(tmp_path / "feature_dicts"), + clt_checkpoint="", + model_name="toy-model", + model_class_name="toy", + ) + graph_data = DataLoader(cfg).preprocess_data() + + assert graph_data.n_layers == 2 + assert graph_data.prompt_length == 3 + assert graph_data.feature_indices.tolist() == [[0, 0, 5], [2, 1, 9]] + assert graph_data.top5_logit_tokens == ["target"] + + +def test_circuit_tracer_feature_conversion_writes_frontend_json(tmp_path: Path): + feature_dict = circuit_tracer_feature_to_clt_forge_feature_dict(_feature_data()) + + assert feature_dict["layer"] == 2 + assert feature_dict["feature_index"] == 17 + assert feature_dict["source"] == "circuit_tracer" + assert "<< capital>>" in feature_dict["top_examples"][0] + assert feature_dict["top_activating_tokens"][0]["token"] == " capital" + + path = write_clt_forge_feature_dict(feature_dict, tmp_path) + assert path == tmp_path / "layer2" / "feature_17_complete.json" + assert json.loads(path.read_text())["feature_index"] == 17 + + +def test_loads_circuit_tracer_binary_feature_record(tmp_path: Path): + payload = json.dumps(_feature_data()).encode("utf-8") + compressed = zlib.compress(payload) + record = len(compressed).to_bytes(4, byteorder="little") + compressed + + feature_path = tmp_path / "feature.bin" + feature_path.write_bytes(record) + + assert load_circuit_tracer_feature_data(feature_path)["index"] == cantor_pair(2, 17) + + +def test_convert_circuit_tracer_feature_file_accepts_explicit_coordinates(tmp_path: Path): + source_path = tmp_path / "feature.json" + source_path.write_text(json.dumps({**_feature_data(), "index": 123})) + + out_path = convert_circuit_tracer_feature_file( + source_path, + tmp_path / "converted", + layer=4, + feature_idx=5, + ) + + assert out_path == tmp_path / "converted" / "layer4" / "feature_5_complete.json" + assert json.loads(out_path.read_text())["layer"] == 4 + + +def test_download_circuit_tracer_feature_supports_binary_index( + tmp_path: Path, + monkeypatch: pytest.MonkeyPatch, +): + payload = json.dumps(_feature_data()).encode("utf-8") + compressed = zlib.compress(payload) + record = len(compressed).to_bytes(4, byteorder="little") + compressed + + index_path = tmp_path / "index.json.gz" + offsets = [0] * 19 + offsets[18] = len(record) + with gzip.open(index_path, "wt") as f: + json.dump({"2": {"filename": "layer2.bin", "offsets": offsets}}, f) + + bin_path = tmp_path / "layer2.bin" + bin_path.write_bytes(record) + + def fake_hf_hub_download(**kwargs): + filename = kwargs["filename"] + if filename.endswith(".json"): + raise FileNotFoundError(filename) + if filename.endswith("index.json.gz"): + return str(index_path) + if filename.endswith("layer2.bin"): + return str(bin_path) + raise AssertionError(filename) + + monkeypatch.setattr("huggingface_hub.hf_hub_download", fake_hf_hub_download) + + data = download_circuit_tracer_feature_from_hub( + scan="mntss/clt-gemma-2-2b-426k", + layer=2, + feature_idx=17, + ) + + assert data["index"] == cantor_pair(2, 17) + + +def test_hub_loader_returns_cross_layer_transcoder(monkeypatch: pytest.MonkeyPatch): + clt = CrossLayerTranscoder( + n_layers=1, + d_transcoder=2, + d_model=3, + device=torch.device("cpu"), + dtype=torch.float32, + lazy_decoder=True, + lazy_encoder=False, + ) + + def fake_load_transcoder_from_hub(**kwargs): + assert kwargs["hf_ref"] == "mntss/clt-gemma-2-2b-426k" + assert kwargs["device"] == torch.device("cpu") + assert kwargs["dtype"] == torch.float32 + return clt, {"model_kind": "cross_layer_transcoder"} + + def fake_import_module(name): + assert name == "circuit_tracer.utils.hf_utils" + return SimpleNamespace(load_transcoder_from_hub=fake_load_transcoder_from_hub) + + monkeypatch.setattr("clt_forge.attribution.loading.import_module", fake_import_module) + + loaded = load_circuit_tracer_clt_from_hub( + "mntss/clt-gemma-2-2b-426k", + device="cpu", + dtype="float32", + ) + + assert loaded is clt