diff --git a/slime/README.md b/slime/README.md index 0492875..c5683c2 100644 --- a/slime/README.md +++ b/slime/README.md @@ -12,3 +12,16 @@ Then you can run: ``` modal run slime/modal_train.py::train_multi_node --config qwen-8b-multi ``` + +## Code-golf RL example (SLIME + Harbor + Modal sandboxes) + +A dedicated example lives in: + +`slime/code_golf_harbor_modal` + +It adds: + +- MBPP -> Harbor task conversion +- custom `--custom-rm-path` reward that runs Harbor-style verification in Modal sandboxes +- size-aware code-golf reward shaping +- checkpoint serving + Harbor eval utilities diff --git a/slime/code_golf_harbor_modal/README.md b/slime/code_golf_harbor_modal/README.md new file mode 100644 index 0000000..d754f33 --- /dev/null +++ b/slime/code_golf_harbor_modal/README.md @@ -0,0 +1,96 @@ +# SLIME + Harbor + Modal code-golf example (Qwen 8B) + +This example trains **Qwen3-8B** with SLIME on Modal for Python code golf. + +It does two things in reward: + +1. **Correctness** via Harbor task verification. +2. **Code-size pressure** via a length bonus on top of correctness. + +## What this example adds + +- MBPP (`Muennighoff/mbpp`) conversion into: + - **Harbor task project format** (`tasks//...`) + - **SLIME parquet prompt format** (`slime/train.parquet`, `slime/test.parquet`) +- Regex function-name extraction from MBPP `code` field and injection into prompt. +- Custom SLIME reward model (`--custom-rm-path`) that runs Harbor tests in **Modal sandboxes**. +- Harbor eval path with: + - a custom single-shot code agent (`harbor_litellm_agent.py`) + - a checkpoint server (SGLang) that serves the latest checkpoint from Modal volume + - high-concurrency Harbor runs (`-n` with `--env modal`) + +## Files + +- `modal_train.py` – Modal entrypoints for dataset prep, training, serving, and eval. +- `dataset_to_harbor.py` – MBPP -> Harbor + SLIME conversion function. +- `custom_rm.py` – SLIME custom reward function (Harbor+Modal sandbox scoring + length bonus). +- `harbor_litellm_agent.py` – Harbor custom agent that calls an OpenAI-compatible endpoint. +- `configs/qwen_8b_multi.py` – Qwen3-8B multi-node config based on the Modal SLIME config style. + +## 1) Prepare dataset in Modal volume + +Run once: + +`modal run slime/code_golf_harbor_modal/modal_train.py::prepare_dataset` + +Optional small smoke conversion: + +`modal run slime/code_golf_harbor_modal/modal_train.py::prepare_dataset --limit 32 --train-size 24` + +Data lands in Modal volume `slime-code-golf-harbor-data` under: + +- `/data/mbpp_harbor/tasks` +- `/data/mbpp_harbor/slime/train.parquet` +- `/data/mbpp_harbor/slime/test.parquet` + +Each Harbor task uses `environment/Dockerfile` with: + +`FROM python:3.11-slim` + +## 2) Download base model + +Run once: + +`modal run slime/code_golf_harbor_modal/modal_train.py::download_model --config qwen-8b-multi` + +## 3) Train with SLIME + +`modal run slime/code_golf_harbor_modal/modal_train.py::train_multi_node --config qwen-8b-multi` + +The config uses: + +- `--custom-rm-path custom_rm.batched_custom_rm` +- MBPP parquet generated by `prepare_dataset` +- Multi-node Qwen3-8B settings in the same style as the existing SLIME Modal configs + +## 4) Serve latest checkpoint + +Start a long-lived server endpoint from latest checkpoint: + +`modal serve slime/code_golf_harbor_modal/modal_train.py::serve_latest_checkpoint` + +Or one-shot eval (starts local server inside function, runs Harbor, exits): + +`modal run slime/code_golf_harbor_modal/modal_train.py::eval_latest_checkpoint --n-concurrent 256 --n-tasks 500` + +## 5) Run Harbor eval against server (Modal sandboxes) + +Given a running server URL: + +`modal run slime/code_golf_harbor_modal/modal_train.py::run_harbor_eval --server-base-url https:// --n-concurrent 256 --n-tasks 500` + +This uses Harbor with: + +- `--env modal` (trial environments are Modal sandboxes) +- custom agent import path `harbor_litellm_agent.SingleShotCodeAgent` +- OpenAI-compatible API calls to the served checkpoint endpoint + +## Reward behavior + +The custom RM in `custom_rm.py` computes: + +- `pass_rate` from Harbor task verifier output. +- `length_bonus` from `reference_bytes / candidate_bytes` (capped). +- Final reward: correctness scaled by a positive length bonus. + +So passing tests remains primary; shorter passing programs get higher reward. diff --git a/slime/code_golf_harbor_modal/code_utils.py b/slime/code_golf_harbor_modal/code_utils.py new file mode 100644 index 0000000..1222486 --- /dev/null +++ b/slime/code_golf_harbor_modal/code_utils.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +import re + +_CODE_FENCE_RE = re.compile(r"```(?:python)?\s*(.*?)```", re.IGNORECASE | re.DOTALL) +_FUNCTION_RE = re.compile(r"^\s*def\s+([A-Za-z_]\w*)\s*\(", re.MULTILINE) + + +def normalize_code(text: str) -> str: + return text.replace("\r\n", "\n").replace("\r", "\n") + + +def extract_function_name_from_code(code: str) -> str: + match = _FUNCTION_RE.search(normalize_code(code)) + if match is None: + raise ValueError("No Python function definition found in code field.") + return match.group(1) + + +def extract_python_code(text: str) -> str: + normalized = normalize_code(text).strip() + fenced = _CODE_FENCE_RE.findall(normalized) + if fenced: + candidates = [block.strip() for block in fenced if block.strip()] + if candidates: + for candidate in reversed(candidates): + if _FUNCTION_RE.search(candidate): + return candidate + return candidates[-1] + return normalized + + +def code_size_bytes(code: str) -> int: + return len(normalize_code(code).encode("utf-8")) diff --git a/slime/code_golf_harbor_modal/configs/__init__.py b/slime/code_golf_harbor_modal/configs/__init__.py new file mode 100644 index 0000000..126c3f5 --- /dev/null +++ b/slime/code_golf_harbor_modal/configs/__init__.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import importlib +from pathlib import Path +from typing import Callable + +from .base import RLConfig + +_CONFIGS: dict[str, Callable[[], RLConfig]] = {} +_CONFIGS_DIR = Path(__file__).parent +_EXCLUDE = {"__init__.py", "base.py"} + + +def get_config(name: str) -> RLConfig: + if name not in _CONFIGS: + available = ", ".join(sorted(_CONFIGS.keys())) + raise ValueError(f"Unknown config: {name}. Available configs: {available}") + return _CONFIGS[name]() + + +def list_configs() -> list[str]: + return sorted(_CONFIGS.keys()) + + +for file_path in _CONFIGS_DIR.glob("*.py"): + if file_path.name in _EXCLUDE: + continue + module_name = file_path.stem + config_name = module_name.replace("_", "-") + module = importlib.import_module(f".{module_name}", package="configs") + if hasattr(module, "get_config"): + _CONFIGS[config_name] = module.get_config diff --git a/slime/code_golf_harbor_modal/configs/base.py b/slime/code_golf_harbor_modal/configs/base.py new file mode 100644 index 0000000..09f3225 --- /dev/null +++ b/slime/code_golf_harbor_modal/configs/base.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path + + +@dataclass +class RLConfig: + model_name: str + model_id: str + + app_name: str = "slime-code-golf" + n_nodes: int = 4 + gpu: str = "H100:8" + sync: bool = True + + wandb_project: str = "slime-code-golf" + wandb_run_name_prefix: str = "qwen8b-mbpp" + + slime_args: str = "" + extra_args: list[str] = field(default_factory=list) + + # Custom RM runtime settings + harbor_task_root: str = "/data/mbpp_harbor/tasks" + harbor_data_volume_name: str = "slime-code-golf-harbor-data" + harbor_rm_modal_app: str = "slime-harbor-rm" + harbor_rm_max_concurrency: int = 64 + harbor_rm_timeout_sec: int = 120 + harbor_length_bonus_weight: float = 0.2 + + @property + def train_script(self) -> str: + return "slime/train.py" if self.sync else "slime/train_async.py" + + def _clean_args(self, args: str) -> str: + lines: list[str] = [] + for raw_line in args.splitlines(): + line = raw_line.split("#", 1)[0].strip() + if line: + lines.append(line) + return " ".join(lines) + + def generate_train_args( + self, + hf_model_path: str, + checkpoints_path: Path, + data_path: Path, + ) -> str: + base_args = f"--hf-checkpoint {hf_model_path} --ref-load {hf_model_path}" + + cleaned = self._clean_args(self.slime_args) + cleaned = cleaned.replace("{data_path}", str(data_path)) + cleaned = cleaned.replace("{checkpoints_path}", str(checkpoints_path)) + + extra = " ".join(self.extra_args) if self.extra_args else "" + return f"{base_args} {cleaned} {extra}".strip() + + +QWEN3_8B_MODEL_ARGS = """ + --num-layers 36 --hidden-size 4096 --ffn-hidden-size 12288 + --num-attention-heads 32 --group-query-attention --num-query-groups 8 + --kv-channels 128 --vocab-size 151936 + --normalization RMSNorm --norm-epsilon 1e-6 --swiglu + --disable-bias-linear --qk-layernorm + --use-rotary-position-embeddings --rotary-base 1000000 +""" + +DEFAULT_TRAINING_ARGS = """ + --tensor-model-parallel-size 2 --sequence-parallel + --recompute-granularity full --recompute-method uniform --recompute-num-layers 1 + --use-dynamic-batch-size --max-tokens-per-gpu 9216 + --megatron-to-hf-mode bridge + --attention-dropout 0.0 --hidden-dropout 0.0 + --accumulate-allreduce-grads-in-fp32 --attention-softmax-in-fp32 +""" + +DEFAULT_OPTIMIZER_ARGS = """ + --optimizer adam + --lr 1e-6 --lr-decay-style constant + --weight-decay 0.1 --adam-beta1 0.9 --adam-beta2 0.98 +""" + +DEFAULT_GRPO_ARGS = """ + --advantage-estimator grpo + --use-kl-loss --kl-loss-coef 0.00 --kl-loss-type low_var_kl + --entropy-coef 0.00 + --eps-clip 0.2 --eps-clip-high 0.28 +""" diff --git a/slime/code_golf_harbor_modal/configs/qwen_8b_multi.py b/slime/code_golf_harbor_modal/configs/qwen_8b_multi.py new file mode 100644 index 0000000..e68ec5e --- /dev/null +++ b/slime/code_golf_harbor_modal/configs/qwen_8b_multi.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +from .base import ( + DEFAULT_GRPO_ARGS, + DEFAULT_OPTIMIZER_ARGS, + DEFAULT_TRAINING_ARGS, + QWEN3_8B_MODEL_ARGS, + RLConfig, +) + + +def get_config() -> RLConfig: + return RLConfig( + model_name="Qwen3-8B", + model_id="Qwen/Qwen3-8B", + app_name="slime-qwen8b-code-golf", + n_nodes=4, + gpu="H100:8", + sync=True, + wandb_project="slime-code-golf", + wandb_run_name_prefix="qwen8b-mbpp-harbor", + slime_args=f""" + # Model + {QWEN3_8B_MODEL_ARGS} + + # Training + optimizer + GRPO + {DEFAULT_TRAINING_ARGS} + {DEFAULT_OPTIMIZER_ARGS} + {DEFAULT_GRPO_ARGS} + + # Dataset format + --input-key messages + --label-key label + --apply-chat-template + --prompt-data {{data_path}}/mbpp_harbor/slime/train.parquet + --eval-prompt-data mbpp {{data_path}}/mbpp_harbor/slime/test.parquet + + # Rollout / batching + --num-rollout 2000 + --rollout-batch-size 128 + --n-samples-per-prompt 8 + --global-batch-size 1024 + --rollout-max-response-len 1024 + --rollout-temperature 0.9 + --eval-max-response-len 1024 + --n-samples-per-eval-prompt 8 + + # Custom reward model (Harbor + Modal sandbox scoring) + --rm-type math + --custom-rm-path custom_rm.batched_custom_rm + + # SGLang rollout engines + --rollout-num-gpus-per-engine 2 + --sglang-mem-fraction-static 0.7 + + # Distributed orchestration + --actor-num-nodes 4 + --actor-num-gpus-per-node 8 + --colocate + + # Eval cadence + --eval-interval 20 + --eval-top-p 1 + + # Save checkpoints to volume + --save {{checkpoints_path}}/qwen8b_code_golf + """, + ) diff --git a/slime/code_golf_harbor_modal/custom_rm.py b/slime/code_golf_harbor_modal/custom_rm.py new file mode 100644 index 0000000..0437eca --- /dev/null +++ b/slime/code_golf_harbor_modal/custom_rm.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +import asyncio +import json +import os +from pathlib import Path +from typing import Any + +import modal + +from code_utils import code_size_bytes, extract_python_code + +_DEFAULT_MODAL_APP_NAME = "slime-harbor-rm" +_DEFAULT_SANDBOX_IMAGE = "python:3.11-slim" +_DEFAULT_TASK_ROOT = "/data/mbpp_harbor/tasks" + +_app_cache: modal.App | None = None +_sandbox_image = modal.Image.from_registry(_DEFAULT_SANDBOX_IMAGE).entrypoint([]) + + +def _get_env_int(name: str, default: int) -> int: + value = os.environ.get(name) + if value is None: + return default + return int(value) + + +def _get_env_float(name: str, default: float) -> float: + value = os.environ.get(name) + if value is None: + return default + return float(value) + + +async def _get_modal_app() -> modal.App: + global _app_cache + if _app_cache is None: + app_name = os.environ.get("HARBOR_RM_MODAL_APP", _DEFAULT_MODAL_APP_NAME) + _app_cache = await modal.App.lookup.aio(app_name, create_if_missing=True) + return _app_cache + + +async def _read_remote_file(sandbox: modal.Sandbox, remote_path: str) -> str: + async with await sandbox.open.aio(remote_path, "rb") as handle: + chunks: list[bytes] = [] + while True: + chunk = await handle.read.aio(8192) + if not chunk: + break + chunks.append(chunk) + return b"".join(chunks).decode("utf-8") + + +async def _write_remote_file(sandbox: modal.Sandbox, remote_path: str, content: str) -> None: + parent = str(Path(remote_path).parent) + await sandbox.exec.aio("bash", "-lc", f"mkdir -p {parent}") + async with await sandbox.open.aio(remote_path, "wb") as handle: + await handle.write.aio(content.encode("utf-8")) + + +def _extract_label(sample: Any) -> dict[str, Any]: + if not getattr(sample, "label", None): + return {} + try: + return json.loads(sample.label) + except json.JSONDecodeError: + return {} + + +def _harbor_task_abs_path(label_payload: dict[str, Any]) -> str: + root = Path(os.environ.get("HARBOR_TASKS_ROOT", _DEFAULT_TASK_ROOT)) + rel = label_payload.get("harbor_task_rel") + if rel: + return str((root.parent / rel).resolve()) + task_id = label_payload.get("task_id") + if task_id is None: + raise ValueError("Missing task_id and harbor_task_rel in sample label.") + return str((root / f"mbpp_{int(task_id):04d}").resolve()) + + +def _compose_reward( + pass_rate: float, + candidate_size: int, + reference_size: int, + length_weight: float, +) -> float: + if pass_rate <= 0: + return 0.0 + ratio = float(reference_size) / float(max(candidate_size, 1)) + capped_ratio = min(2.0, ratio) + size_bonus = max(0.0, capped_ratio - 1.0) + return pass_rate * (1.0 + length_weight * size_bonus) + + +async def _score_sample(sample: Any, semaphore: asyncio.Semaphore) -> float: + timeout_sec = _get_env_int("HARBOR_RM_TIMEOUT_SEC", 120) + length_weight = _get_env_float("HARBOR_LENGTH_BONUS_WEIGHT", 0.2) + + label_payload = _extract_label(sample) + task_path = _harbor_task_abs_path(label_payload) + reference_size = int(label_payload.get("reference_bytes", 1)) + candidate_code = extract_python_code(getattr(sample, "response", "")) + candidate_size = code_size_bytes(candidate_code) + + app = await _get_modal_app() + volume_name = os.environ.get("HARBOR_DATA_VOLUME_NAME", "").strip() + volumes = {"/data": modal.Volume.from_name(volume_name)} if volume_name else None + + async with semaphore: + sandbox = await modal.Sandbox.create.aio( + app=app, + image=_sandbox_image, + timeout=timeout_sec, + cpu=1, + memory=2048, + volumes=volumes, + ) + try: + await _write_remote_file(sandbox, "/workspace/solution.py", candidate_code) + + test_script = f"{task_path}/tests/test.sh" + process = await sandbox.exec.aio( + "bash", + "-lc", + f"mkdir -p /logs/verifier && bash {test_script}", + timeout=timeout_sec, + ) + await process.stdout.read.aio() + await process.stderr.read.aio() + await process.wait.aio() + + reward_json = await _read_remote_file(sandbox, "/logs/verifier/reward.json") + result = json.loads(reward_json) + pass_rate = float(result.get("pass_rate", result.get("reward", 0.0))) + reward = _compose_reward(pass_rate, candidate_size, reference_size, length_weight) + + if not isinstance(sample.metadata, dict): + sample.metadata = {} + sample.metadata["harbor_rm"] = { + "pass_rate": pass_rate, + "reference_bytes": reference_size, + "candidate_bytes": candidate_size, + "reward": reward, + } + return reward + except Exception as exc: + if not isinstance(sample.metadata, dict): + sample.metadata = {} + sample.metadata["harbor_rm_error"] = repr(exc) + return 0.0 + finally: + try: + await sandbox.terminate.aio() + except Exception: + pass + + +async def batched_custom_rm(args: Any, samples: list[Any], **kwargs: Any) -> list[float]: + max_concurrency = _get_env_int("HARBOR_RM_MAX_CONCURRENCY", 64) + semaphore = asyncio.Semaphore(max_concurrency) + tasks = [_score_sample(sample, semaphore) for sample in samples] + return list(await asyncio.gather(*tasks)) + + +async def custom_rm(args: Any, sample: Any, **kwargs: Any) -> float: + rewards = await batched_custom_rm(args=args, samples=[sample], **kwargs) + return rewards[0] diff --git a/slime/code_golf_harbor_modal/dataset_to_harbor.py b/slime/code_golf_harbor_modal/dataset_to_harbor.py new file mode 100644 index 0000000..cf9d19f --- /dev/null +++ b/slime/code_golf_harbor_modal/dataset_to_harbor.py @@ -0,0 +1,341 @@ +from __future__ import annotations + +import json +import textwrap +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import pandas as pd +from huggingface_hub import hf_hub_download + +from code_utils import code_size_bytes, extract_function_name_from_code, normalize_code + +MBPP_REPO_ID = "Muennighoff/mbpp" +MBPP_JSONL_PATH = "data/mbpp.jsonl" + + +@dataclass +class MbppRecord: + task_id: int + text: str + code: str + test_setup_code: str + test_list: list[str] + challenge_test_list: list[str] + function_name: str + + +def _load_mbpp_records( + repo_id: str = MBPP_REPO_ID, dataset_file: str = MBPP_JSONL_PATH +) -> list[MbppRecord]: + mbpp_path = hf_hub_download(repo_id=repo_id, filename=dataset_file, repo_type="dataset") + rows = [json.loads(line) for line in Path(mbpp_path).read_text(encoding="utf-8").splitlines() if line.strip()] + + records: list[MbppRecord] = [] + for row in rows: + code = normalize_code(row["code"]) + records.append( + MbppRecord( + task_id=int(row["task_id"]), + text=row["text"].strip(), + code=code, + test_setup_code=normalize_code(row.get("test_setup_code", "")), + test_list=list(row.get("test_list", [])), + challenge_test_list=list(row.get("challenge_test_list", [])), + function_name=extract_function_name_from_code(code), + ) + ) + records.sort(key=lambda item: item.task_id) + return records + + +def _build_instruction(record: MbppRecord) -> str: + return textwrap.dedent( + f"""\ + You are solving a Python code-golf programming task. + + Task: + {record.text} + + You must define a function named `{record.function_name}`. + Write only valid Python code to `/workspace/solution.py`. + """ + ).strip() + "\n" + + +def _build_task_toml(record: MbppRecord) -> str: + tags = json.dumps(["mbpp", "python", "code-golf"]) + return textwrap.dedent( + f"""\ + version = "1.0" + + [metadata] + author_name = "MBPP" + author_email = "unknown" + difficulty = "medium" + category = "coding" + tags = {tags} + source = "mbpp" + task_id = "{record.task_id}" + function_name = "{record.function_name}" + + [verifier] + timeout_sec = 180.0 + + [agent] + timeout_sec = 180.0 + + [environment] + build_timeout_sec = 300.0 + cpus = 1 + memory_mb = 2048 + storage_mb = 2048 + gpus = 0 + allow_internet = false + mcp_servers = [] + + [verifier.env] + + [solution.env] + """ + ) + + +def _build_test_sh() -> str: + return textwrap.dedent( + """\ + #!/bin/bash + set -euo pipefail + + mkdir -p /logs/verifier + if python3 "$(dirname "$0")/verify.py"; then + exit 0 + fi + + if [ ! -f /logs/verifier/reward.json ]; then + echo '{"reward": 0.0, "pass_rate": 0.0, "passed": 0, "total": 0}' > /logs/verifier/reward.json + fi + """ + ) + + +def _build_verify_py(record: MbppRecord) -> str: + all_tests = record.test_list + record.challenge_test_list + return textwrap.dedent( + f"""\ + import json + import traceback + from pathlib import Path + + TASK_ID = {record.task_id} + FUNCTION_NAME = {record.function_name!r} + TEST_SETUP_CODE = {record.test_setup_code!r} + TEST_LIST = {json.dumps(all_tests)} + + SOLUTION_PATH = Path("/workspace/solution.py") + REWARD_JSON = Path("/logs/verifier/reward.json") + DETAILS_JSON = Path("/logs/verifier/details.json") + + + def _write_outputs(reward: dict, details: dict) -> None: + REWARD_JSON.parent.mkdir(parents=True, exist_ok=True) + REWARD_JSON.write_text(json.dumps(reward), encoding="utf-8") + DETAILS_JSON.write_text(json.dumps(details, indent=2), encoding="utf-8") + + + def _load_solution() -> tuple[dict, str]: + source = SOLUTION_PATH.read_text(encoding="utf-8") + namespace: dict = {{}} + exec(compile(source, str(SOLUTION_PATH), "exec"), namespace, namespace) + return namespace, source + + + def _run_tests(namespace: dict) -> tuple[int, int, list[dict]]: + runtime = dict(namespace) + if TEST_SETUP_CODE.strip(): + exec(TEST_SETUP_CODE, runtime, runtime) + + passed = 0 + failures: list[dict] = [] + for test_expr in TEST_LIST: + try: + exec(test_expr, runtime, runtime) + passed += 1 + except Exception as exc: + failures.append({{"test": test_expr, "error": repr(exc)}}) + return passed, len(TEST_LIST), failures + + + def main() -> int: + details = {{"task_id": TASK_ID, "function_name": FUNCTION_NAME}} + try: + namespace, source = _load_solution() + passed, total, failures = _run_tests(namespace) + pass_rate = float(passed) / float(total) if total else 0.0 + reward = 1.0 if total > 0 and passed == total else 0.0 + details.update( + {{ + "source_bytes": len(source.encode("utf-8")), + "passed": passed, + "total": total, + "failures": failures, + }} + ) + _write_outputs( + {{ + "reward": reward, + "pass_rate": pass_rate, + "passed": passed, + "total": total, + }}, + details, + ) + return 0 + except Exception as exc: + details["exception"] = repr(exc) + details["traceback"] = traceback.format_exc() + _write_outputs( + {{ + "reward": 0.0, + "pass_rate": 0.0, + "passed": 0, + "total": len(TEST_LIST), + }}, + details, + ) + return 0 + + + if __name__ == "__main__": + raise SystemExit(main()) + """ + ) + + +def _build_solution_sh(record: MbppRecord) -> str: + return textwrap.dedent( + f"""\ + #!/bin/bash + set -euo pipefail + + cat > /workspace/solution.py <<'PY' + {record.code} + PY + """ + ) + + +def _write_harbor_task(task_dir: Path, record: MbppRecord) -> None: + (task_dir / "environment").mkdir(parents=True, exist_ok=True) + (task_dir / "solution").mkdir(parents=True, exist_ok=True) + (task_dir / "tests").mkdir(parents=True, exist_ok=True) + + (task_dir / "instruction.md").write_text(_build_instruction(record), encoding="utf-8") + (task_dir / "task.toml").write_text(_build_task_toml(record), encoding="utf-8") + (task_dir / "environment" / "Dockerfile").write_text( + "FROM python:3.11-slim\n\nWORKDIR /workspace\n", encoding="utf-8" + ) + + solve_path = task_dir / "solution" / "solve.sh" + solve_path.write_text(_build_solution_sh(record), encoding="utf-8") + solve_path.chmod(0o755) + + test_path = task_dir / "tests" / "test.sh" + test_path.write_text(_build_test_sh(), encoding="utf-8") + test_path.chmod(0o755) + + verify_path = task_dir / "tests" / "verify.py" + verify_path.write_text(_build_verify_py(record), encoding="utf-8") + + +def _build_slime_row(record: MbppRecord, harbor_task_rel: str) -> dict[str, Any]: + prompt = textwrap.dedent( + f"""\ + Solve the following Python task. + + Task: + {record.text} + + Required function name: `{record.function_name}`. + + Output only Python code. Do not include Markdown fences. + """ + ).strip() + label_payload = { + "task_id": record.task_id, + "function_name": record.function_name, + "harbor_task_rel": harbor_task_rel, + "reference_bytes": code_size_bytes(record.code), + } + return { + "task_id": record.task_id, + "messages": [ + {"role": "system", "content": "You write short Python code that passes tests."}, + {"role": "user", "content": prompt}, + ], + "label": json.dumps(label_payload, separators=(",", ":")), + } + + +def convert_mbpp_to_harbor_and_slime( + output_root: Path, + train_size: int = 900, + limit: int | None = None, +) -> dict[str, Any]: + records = _load_mbpp_records() + if limit is not None: + records = records[:limit] + + if not records: + raise ValueError("No MBPP records available for conversion.") + + tasks_root = output_root / "tasks" + slime_root = output_root / "slime" + tasks_root.mkdir(parents=True, exist_ok=True) + slime_root.mkdir(parents=True, exist_ok=True) + + rows: list[dict[str, Any]] = [] + for record in records: + task_name = f"mbpp_{record.task_id:04d}" + task_dir = tasks_root / task_name + _write_harbor_task(task_dir, record) + rows.append(_build_slime_row(record, f"tasks/{task_name}")) + + train_size = max(1, min(train_size, len(rows) - 1)) + train_rows = rows[:train_size] + eval_rows = rows[train_size:] + + pd.DataFrame(train_rows).to_parquet(slime_root / "train.parquet", index=False) + pd.DataFrame(eval_rows).to_parquet(slime_root / "test.parquet", index=False) + + manifest = { + "dataset": MBPP_REPO_ID, + "tasks": len(rows), + "train": len(train_rows), + "eval": len(eval_rows), + "output_root": str(output_root), + } + (output_root / "manifest.json").write_text(json.dumps(manifest, indent=2), encoding="utf-8") + return manifest + + +def main() -> None: + import argparse + + parser = argparse.ArgumentParser(description="Convert MBPP to Harbor + Slime dataset format.") + parser.add_argument("--output-root", type=Path, required=True) + parser.add_argument("--train-size", type=int, default=900) + parser.add_argument("--limit", type=int, default=None) + args = parser.parse_args() + + summary = convert_mbpp_to_harbor_and_slime( + output_root=args.output_root, + train_size=args.train_size, + limit=args.limit, + ) + print(json.dumps(summary, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/slime/code_golf_harbor_modal/harbor_litellm_agent.py b/slime/code_golf_harbor_modal/harbor_litellm_agent.py new file mode 100644 index 0000000..63c33f9 --- /dev/null +++ b/slime/code_golf_harbor_modal/harbor_litellm_agent.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +import asyncio +from pathlib import Path +from typing import Any + +import requests +from harbor.agents.base import BaseAgent +from harbor.environments.base import BaseEnvironment +from harbor.models.agent.context import AgentContext + +from code_utils import code_size_bytes, extract_python_code + + +class SingleShotCodeAgent(BaseAgent): + @staticmethod + def name() -> str: + return "single-shot-code-agent" + + def __init__( + self, + logs_dir: Path, + model_name: str | None = None, + api_base: str | None = None, + api_key: str = "EMPTY", + temperature: float = 0.0, + max_tokens: int = 1024, + timeout_sec: float = 90.0, + **kwargs: Any, + ): + super().__init__(logs_dir=logs_dir, model_name=model_name, **kwargs) + self.api_base = (api_base or "http://127.0.0.1:8000/v1").rstrip("/") + self.api_key = api_key + self.temperature = temperature + self.max_tokens = max_tokens + self.timeout_sec = timeout_sec + + def version(self) -> str: + return "1.0.0" + + async def setup(self, environment: BaseEnvironment) -> None: + return + + def _chat_completion(self, instruction: str) -> dict[str, Any]: + if not self.model_name: + raise ValueError("model_name is required for SingleShotCodeAgent.") + + payload = { + "model": self.model_name, + "temperature": self.temperature, + "max_tokens": self.max_tokens, + "messages": [ + {"role": "system", "content": "Return only valid Python code."}, + {"role": "user", "content": instruction}, + ], + } + headers = {"Content-Type": "application/json"} + if self.api_key: + headers["Authorization"] = f"Bearer {self.api_key}" + + response = requests.post( + f"{self.api_base}/chat/completions", + json=payload, + headers=headers, + timeout=self.timeout_sec, + ) + response.raise_for_status() + return response.json() + + async def run( + self, + instruction: str, + environment: BaseEnvironment, + context: AgentContext, + ) -> None: + completion = await asyncio.to_thread(self._chat_completion, instruction) + message = completion["choices"][0]["message"]["content"] + solution_code = extract_python_code(message) + + local_solution_path = self.logs_dir / "solution.py" + local_solution_path.write_text(solution_code, encoding="utf-8") + await environment.upload_file( + source_path=local_solution_path, + target_path="/workspace/solution.py", + ) + + usage = completion.get("usage", {}) + context.n_input_tokens = usage.get("prompt_tokens") + context.n_output_tokens = usage.get("completion_tokens") + context.metadata = { + "api_base": self.api_base, + "response_id": completion.get("id"), + "candidate_bytes": code_size_bytes(solution_code), + } diff --git a/slime/code_golf_harbor_modal/modal_train.py b/slime/code_golf_harbor_modal/modal_train.py new file mode 100644 index 0000000..5dfb532 --- /dev/null +++ b/slime/code_golf_harbor_modal/modal_train.py @@ -0,0 +1,419 @@ +from __future__ import annotations + +import os +import subprocess +import sys +import time +from pathlib import Path +from typing import Optional + +import modal +import modal.experimental +import requests + +CURRENT_DIR = Path(__file__).parent.resolve() +if str(CURRENT_DIR) not in sys.path: + sys.path.insert(0, str(CURRENT_DIR)) + +from configs.base import RLConfig + +EXAMPLE_REMOTE_ROOT = "/root/code_golf_harbor_modal" + +image = ( + modal.Image.from_registry("slimerl/slime:nightly-dev-20260126a") + .run_commands( + "uv pip install --system git+https://github.com/huggingface/transformers.git@eebf856", + "uv pip install --system harbor==0.1.44 pandas pyarrow huggingface_hub requests", + r"""sed -i 's/hf_config\.rope_theta/hf_config.rope_parameters["rope_theta"]/g' /usr/local/lib/python3.12/dist-packages/megatron/bridge/models/qwen/qwen3_bridge.py""", + ) + .entrypoint([]) + .add_local_dir( + str(CURRENT_DIR), + remote_path=EXAMPLE_REMOTE_ROOT, + copy=True, + ignore=["**/__pycache__", "**/*.pyc", "**/.git", "**/.venv"], + ) +) + +with image.imports(): + import ray + from ray.job_submission import JobSubmissionClient + +DATA_PATH = Path("/data") +HF_CACHE_PATH = Path("/root/.cache/huggingface") +CHECKPOINTS_PATH = Path("/checkpoints") + +DATA_VOLUME_NAME = "slime-code-golf-harbor-data" +CHECKPOINT_VOLUME_NAME = "slime-code-golf-harbor-checkpoints" + +data_volume = modal.Volume.from_name(DATA_VOLUME_NAME, create_if_missing=True) +hf_cache_volume = modal.Volume.from_name("huggingface-cache", create_if_missing=True) +checkpoints_volume = modal.Volume.from_name(CHECKPOINT_VOLUME_NAME, create_if_missing=True) + +RAY_PORT = 6379 +RAY_DASHBOARD_PORT = 8265 + +APP_NAME = os.environ.get("SLIME_APP_NAME", "slime-code-golf") +app = modal.App(APP_NAME) + + +def _init_ray(rank: int, main_node_addr: str, node_ip_addr: str, n_nodes: int) -> None: + os.environ["SLIME_HOST_IP"] = node_ip_addr + + if rank == 0: + subprocess.Popen( + [ + "ray", + "start", + "--head", + f"--node-ip-address={node_ip_addr}", + "--dashboard-host=0.0.0.0", + ] + ) + for _ in range(30): + try: + ray.init(address="auto") + break + except ConnectionError: + time.sleep(1) + else: + raise RuntimeError("Unable to initialize ray head node.") + + for _ in range(120): + alive_nodes = [node for node in ray.nodes() if node["Alive"]] + if len(alive_nodes) == n_nodes: + return + time.sleep(1) + raise RuntimeError("Not all ray worker nodes joined in time.") + + subprocess.Popen( + [ + "ray", + "start", + f"--node-ip-address={node_ip_addr}", + "--address", + f"{main_node_addr}:{RAY_PORT}", + ] + ) + + +def _build_runtime_env(config: RLConfig, master_addr: str) -> dict: + existing_pythonpath = os.environ.get("PYTHONPATH", "") + extra_paths = ["/root/Megatron-LM/", EXAMPLE_REMOTE_ROOT] + pythonpath = ":".join(extra_paths + ([existing_pythonpath] if existing_pythonpath else [])) + + return { + "env_vars": { + "CUDA_DEVICE_MAX_CONNECTIONS": "1", + "NCCL_NVLS_ENABLE": "1", + "no_proxy": master_addr, + "MASTER_ADDR": master_addr, + "PYTHONPATH": pythonpath, + "HARBOR_TASKS_ROOT": config.harbor_task_root, + "HARBOR_DATA_VOLUME_NAME": config.harbor_data_volume_name, + "HARBOR_RM_MODAL_APP": config.harbor_rm_modal_app, + "HARBOR_RM_MAX_CONCURRENCY": str(config.harbor_rm_max_concurrency), + "HARBOR_RM_TIMEOUT_SEC": str(config.harbor_rm_timeout_sec), + "HARBOR_LENGTH_BONUS_WEIGHT": str(config.harbor_length_bonus_weight), + } + } + + +def _generate_slime_cmd(config: RLConfig, master_addr: str) -> tuple[str, dict]: + from huggingface_hub import snapshot_download + + hf_model_path = snapshot_download(repo_id=config.model_id, local_files_only=True) + train_args = config.generate_train_args( + hf_model_path=hf_model_path, + checkpoints_path=CHECKPOINTS_PATH, + data_path=DATA_PATH, + ) + + wandb_key = os.environ.get("WANDB_API_KEY") + if wandb_key: + train_args += ( + f" --use-wandb --wandb-project {config.wandb_project}" + f" --wandb-group {config.wandb_run_name_prefix}" + f" --wandb-key '{wandb_key}' --disable-wandb-random-suffix" + ) + + train_script = config.train_script + return f"python3 {train_script} {train_args}", _build_runtime_env(config, master_addr) + + +async def _run_training(config: RLConfig, n_nodes: int, master_addr: str) -> None: + client = JobSubmissionClient("http://127.0.0.1:8265") + slime_cmd, runtime_env = _generate_slime_cmd(config, master_addr) + job_id = client.submit_job(entrypoint=slime_cmd, runtime_env=runtime_env) + print(f"Submitted SLIME training job: {job_id}") + async for line in client.tail_job_logs(job_id): + print(line, end="", flush=True) + + +def _find_latest_checkpoint(search_root: Path) -> Path: + candidates: list[Path] = [] + for config_path in search_root.rglob("config.json"): + parent = config_path.parent + if any(parent.glob("*.safetensors")) or any(parent.glob("*.bin")): + candidates.append(parent) + + if not candidates: + for candidate in search_root.rglob("*"): + if candidate.is_dir() and candidate.name.startswith(("global_step", "step", "iter")): + candidates.append(candidate) + + if not candidates: + raise FileNotFoundError(f"No checkpoints found under {search_root}") + return max(candidates, key=lambda path: path.stat().st_mtime) + + +def _wait_for_openai_server(base_url: str, timeout_sec: int = 300) -> None: + models_url = f"{base_url.rstrip('/')}/v1/models" + deadline = time.time() + timeout_sec + while time.time() < deadline: + try: + response = requests.get(models_url, timeout=2) + if response.ok: + return + except Exception: + pass + time.sleep(2) + raise TimeoutError(f"Timed out waiting for server readiness: {models_url}") + + +def _run_harbor_eval_subprocess( + server_base_url: str, + model_name: str, + n_concurrent: int, + n_tasks: Optional[int], + tasks_dir: Path, + jobs_dir: Path, +) -> None: + jobs_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + "harbor", + "run", + "-p", + str(tasks_dir), + "--agent-import-path", + "harbor_litellm_agent.SingleShotCodeAgent", + "-m", + model_name, + "--ak", + f"api_base={server_base_url.rstrip('/')}/v1", + "--ak", + "temperature=0.0", + "--ak", + "max_tokens=1024", + "-e", + "modal", + "-n", + str(n_concurrent), + "--jobs-dir", + str(jobs_dir), + ] + if n_tasks is not None: + cmd.extend(["-l", str(n_tasks)]) + + env = dict(os.environ) + existing = env.get("PYTHONPATH", "") + env["PYTHONPATH"] = f"{EXAMPLE_REMOTE_ROOT}:{existing}" if existing else EXAMPLE_REMOTE_ROOT + + print("Running Harbor eval command:") + print(" ".join(cmd)) + subprocess.run(cmd, check=True, env=env) + + +@app.function( + image=image, + volumes={HF_CACHE_PATH.as_posix(): hf_cache_volume}, + timeout=24 * 60 * 60, +) +def download_model(config: str = "qwen-8b-multi", revision: Optional[str] = None) -> None: + from huggingface_hub import snapshot_download + from configs import get_config + + cfg = get_config(config) + path = snapshot_download(repo_id=cfg.model_id, revision=revision) + print(f"Model downloaded to: {path}") + hf_cache_volume.commit() + + +@app.function( + image=image, + volumes={DATA_PATH.as_posix(): data_volume}, + timeout=24 * 60 * 60, +) +def prepare_dataset(train_size: int = 900, limit: Optional[int] = None) -> None: + from dataset_to_harbor import convert_mbpp_to_harbor_and_slime + + data_volume.reload() + output_root = DATA_PATH / "mbpp_harbor" + summary = convert_mbpp_to_harbor_and_slime( + output_root=output_root, + train_size=train_size, + limit=limit, + ) + data_volume.commit() + print(summary) + + +@app.local_entrypoint() +def list_available_configs() -> None: + from configs import list_configs + + print("Available configs:") + for config_name in list_configs(): + print(f" - {config_name}") + + +@app.function( + image=image, + gpu="H100:8", + volumes={ + HF_CACHE_PATH.as_posix(): hf_cache_volume, + CHECKPOINTS_PATH.as_posix(): checkpoints_volume, + DATA_PATH.as_posix(): data_volume, + }, + secrets=[modal.Secret.from_name("wandb-secret")], + timeout=24 * 60 * 60, + experimental_options={"efa_enabled": True}, +) +@modal.experimental.clustered(4, rdma=True) +async def train_multi_node(config: str = "qwen-8b-multi") -> None: + from configs import get_config + + cfg = get_config(config) + + hf_cache_volume.reload() + data_volume.reload() + + cluster_info = modal.experimental.get_cluster_info() + ray_main_node_addr = cluster_info.container_ipv4_ips[0] + my_ip_addr = cluster_info.container_ipv4_ips[cluster_info.rank] + n_nodes = len(cluster_info.container_ipv4_ips) + + _init_ray(cluster_info.rank, ray_main_node_addr, my_ip_addr, n_nodes) + + if cluster_info.rank == 0: + with modal.forward(RAY_DASHBOARD_PORT) as tunnel: + print(f"Ray dashboard: {tunnel.url}") + await _run_training(cfg, n_nodes, ray_main_node_addr) + else: + while True: + time.sleep(30) + + +@app.function( + image=image, + gpu="H100:1", + volumes={CHECKPOINTS_PATH.as_posix(): checkpoints_volume}, + timeout=24 * 60 * 60, +) +@modal.web_server(8000, startup_timeout=30 * 60) +def serve_latest_checkpoint( + checkpoint_subdir: str = "qwen8b_code_golf", + tensor_parallel_size: int = 1, +) -> None: + checkpoints_volume.reload() + search_root = CHECKPOINTS_PATH / checkpoint_subdir + checkpoint_path = _find_latest_checkpoint(search_root) + print(f"Serving checkpoint: {checkpoint_path}") + subprocess.Popen( + [ + "python3", + "-m", + "sglang.launch_server", + "--model-path", + str(checkpoint_path), + "--host", + "0.0.0.0", + "--port", + "8000", + "--tp", + str(tensor_parallel_size), + ] + ) + + +@app.function( + image=image, + volumes={DATA_PATH.as_posix(): data_volume}, + timeout=24 * 60 * 60, +) +def run_harbor_eval( + server_base_url: str, + model_name: str = "qwen-code-golf", + n_concurrent: int = 128, + n_tasks: Optional[int] = 200, +) -> None: + data_volume.reload() + tasks_dir = DATA_PATH / "mbpp_harbor" / "tasks" + jobs_dir = DATA_PATH / "mbpp_harbor" / "harbor_jobs" + _run_harbor_eval_subprocess( + server_base_url=server_base_url, + model_name=model_name, + n_concurrent=n_concurrent, + n_tasks=n_tasks, + tasks_dir=tasks_dir, + jobs_dir=jobs_dir, + ) + data_volume.commit() + + +@app.function( + image=image, + gpu="H100:1", + volumes={ + CHECKPOINTS_PATH.as_posix(): checkpoints_volume, + DATA_PATH.as_posix(): data_volume, + }, + timeout=24 * 60 * 60, +) +def eval_latest_checkpoint( + checkpoint_subdir: str = "qwen8b_code_golf", + model_name: str = "qwen-code-golf", + n_concurrent: int = 128, + n_tasks: Optional[int] = 200, +) -> None: + checkpoints_volume.reload() + data_volume.reload() + + checkpoint_path = _find_latest_checkpoint(CHECKPOINTS_PATH / checkpoint_subdir) + print(f"Using checkpoint for eval: {checkpoint_path}") + + server_proc = subprocess.Popen( + [ + "python3", + "-m", + "sglang.launch_server", + "--model-path", + str(checkpoint_path), + "--host", + "0.0.0.0", + "--port", + "8000", + "--tp", + "1", + ] + ) + try: + _wait_for_openai_server("http://127.0.0.1:8000", timeout_sec=300) + _run_harbor_eval_subprocess( + server_base_url="http://127.0.0.1:8000", + model_name=model_name, + n_concurrent=n_concurrent, + n_tasks=n_tasks, + tasks_dir=DATA_PATH / "mbpp_harbor" / "tasks", + jobs_dir=DATA_PATH / "mbpp_harbor" / "harbor_jobs", + ) + finally: + if server_proc.poll() is None: + server_proc.terminate() + try: + server_proc.wait(timeout=15) + except subprocess.TimeoutExpired: + server_proc.kill() + server_proc.wait(timeout=15) + + data_volume.commit() diff --git a/slime/code_golf_harbor_modal/run.sh b/slime/code_golf_harbor_modal/run.sh new file mode 100755 index 0000000..f16e30d --- /dev/null +++ b/slime/code_golf_harbor_modal/run.sh @@ -0,0 +1,27 @@ +#!/bin/bash +set -euo pipefail + +SCRIPT="slime/code_golf_harbor_modal/modal_train.py" +MODE="${1:-}" + +case "$MODE" in + prepare) + modal run "${SCRIPT}"::prepare_dataset + ;; + download) + modal run "${SCRIPT}"::download_model --config qwen-8b-multi + ;; + train) + modal run "${SCRIPT}"::train_multi_node --config qwen-8b-multi + ;; + serve) + modal serve "${SCRIPT}"::serve_latest_checkpoint + ;; + eval) + modal run "${SCRIPT}"::eval_latest_checkpoint --n-concurrent "${2:-256}" --n-tasks "${3:-500}" + ;; + *) + echo "Usage: $0 {prepare|download|train|serve|eval [n_concurrent] [n_tasks]}" + exit 1 + ;; +esac