Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ jobs:
cargo test --lib --no-default-features --features openapi-codegen
bash api/check_openapi_codegen_parity.sh

- name: Check generated API clients are up to date
run: bash api/check_client_codegen_parity.sh

- name: Install dprint
run: |
curl -fsSL https://dprint.dev/install.sh | sh
Expand Down
117 changes: 117 additions & 0 deletions api/check_client_codegen_parity.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/bin/bash
set -euo pipefail

# Verify that the checked-in Python and Julia API clients match what
# openapi-generator would produce from the current api/openapi.yaml.
# Exits non-zero and prints a diff when drift is detected.

OPENAPI_CLI_VERSION="${OPENAPI_CLI_VERSION:-v7.16.0}"
OPENAPI_CLI_DIGEST="sha256:e56372add5e038753fb91aa1bbb470724ef58382fdfc35082bf1b3e079ce353c"
CONTAINER_EXEC="${CONTAINER_EXEC:-docker}"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
SPEC_PATH="${SCRIPT_DIR}/openapi.yaml"
API_VERSION="$(
awk -F'"' '/pub const HTTP_API_VERSION:/ { print $2; exit }' "${REPO_ROOT}/src/api_version.rs"
)"

if [[ -z "${API_VERSION}" ]]; then
echo "Failed to read HTTP API version from src/api_version.rs" >&2
exit 1
fi

SPEC_DIR="$(cd "$(dirname "${SPEC_PATH}")" && pwd)"
SPEC_FILE="$(basename "${SPEC_PATH}")"

TMP_PYTHON="$(mktemp -d "${TMPDIR:-/tmp}/torc-py-check.XXXXXX")"
TMP_JULIA="$(mktemp -d "${TMPDIR:-/tmp}/torc-jl-check.XXXXXX")"

# shellcheck disable=SC2329,SC2317 # invoked indirectly via trap
cleanup_tmp() {
# Docker may create root-owned files that the CI runner cannot delete directly.
# Use a container to remove them, then clean up the (now-empty) temp dirs.
for d in "${TMP_PYTHON}" "${TMP_JULIA}"; do
"${CONTAINER_EXEC}" run --rm -v "${d}":/tmp_clean alpine rm -rf /tmp_clean/* 2>/dev/null || true
rm -rf "${d}" 2>/dev/null || true
done
}
trap cleanup_tmp EXIT

docker_run() {
case "${OSTYPE:-}" in
msys*|cygwin*)
MSYS_NO_PATHCONV=1 "${CONTAINER_EXEC}" "$@"
;;
*)
"${CONTAINER_EXEC}" "$@"
;;
esac
}

echo "Generating Python client from ${SPEC_FILE}…"
docker_run run \
-v "${SCRIPT_DIR}":/data \
-v "${SPEC_DIR}":/spec \
-v "${TMP_PYTHON}":/python_client \
"docker.io/openapitools/openapi-generator-cli:${OPENAPI_CLI_VERSION}@${OPENAPI_CLI_DIGEST}" \
generate -g python --input-spec="/spec/${SPEC_FILE}" -o /python_client -c /data/config.json \
--additional-properties=packageVersion="${API_VERSION}"

echo "Generating Julia client from ${SPEC_FILE}…"
docker_run run \
-v "${SCRIPT_DIR}":/data \
-v "${SPEC_DIR}":/spec \
-v "${TMP_JULIA}":/julia_client \
"docker.io/openapitools/openapi-generator-cli:${OPENAPI_CLI_VERSION}@${OPENAPI_CLI_DIGEST}" \
generate -g julia-client --input-spec="/spec/${SPEC_FILE}" -o /julia_client \
--additional-properties=packageVersion="${API_VERSION}"

RC=0

echo "Checking Python client parity…"
if ! diff -rq "${TMP_PYTHON}/torc/openapi_client" \
"${REPO_ROOT}/python_client/src/torc/openapi_client" >/dev/null 2>&1; then
echo "Python client is out of date with ${SPEC_FILE}" >&2
diff -ru "${REPO_ROOT}/python_client/src/torc/openapi_client" \
"${TMP_PYTHON}/torc/openapi_client" || true
RC=1
else
echo "Python client is up to date."
fi

echo "Checking Julia client parity…"
JULIA_DRIFT=0
if ! diff -rq "${TMP_JULIA}/src" \
"${REPO_ROOT}/julia_client/Torc/src/api" >/dev/null 2>&1; then
echo "Julia client API sources are out of date with ${SPEC_FILE}" >&2
diff -ru "${REPO_ROOT}/julia_client/Torc/src/api" \
"${TMP_JULIA}/src" || true
JULIA_DRIFT=1
fi
if ! diff -rq "${TMP_JULIA}/docs" \
"${REPO_ROOT}/julia_client/julia_client/docs" >/dev/null 2>&1; then
echo "Julia client docs are out of date with ${SPEC_FILE}" >&2
diff -ru "${REPO_ROOT}/julia_client/julia_client/docs" \
"${TMP_JULIA}/docs" || true
JULIA_DRIFT=1
fi
if ! diff -q "${TMP_JULIA}/README.md" \
"${REPO_ROOT}/julia_client/julia_client/README.md" >/dev/null 2>&1; then
echo "Julia client README is out of date with ${SPEC_FILE}" >&2
diff -u "${REPO_ROOT}/julia_client/julia_client/README.md" \
"${TMP_JULIA}/README.md" || true
JULIA_DRIFT=1
fi

if [[ "${JULIA_DRIFT}" -eq 1 ]]; then
RC=1
else
echo "Julia client is up to date."
fi

if [[ "${RC}" -ne 0 ]]; then
echo "" >&2
echo "Run 'bash api/sync_openapi.sh clients' to regenerate." >&2
fi

exit "${RC}"
4 changes: 2 additions & 2 deletions api/openapi.codegen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4067,8 +4067,8 @@ paths:
type: integer
format: int64
- name: trigger_type
in: path
required: true
in: query
required: false
schema:
type:
- array
Expand Down
4 changes: 2 additions & 2 deletions api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4067,8 +4067,8 @@ paths:
type: integer
format: int64
- name: trigger_type
in: path
required: true
in: query
required: false
schema:
type:
- array
Expand Down
6 changes: 5 additions & 1 deletion api/regenerate_rust_client.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ docker_run run \
-t /templates \
--additional-properties=supportAsync=false

rm -f "${REPO_ROOT}/src/client/apis/"*_api.rs
find "${REPO_ROOT}/src/client/apis" \
-maxdepth 1 \
-name '*_api.rs' \
! -name 'ro_crate_api.rs' \
-delete
cp "${TMP_RUST_CLIENT}/src/apis/"*_api.rs "${REPO_ROOT}/src/client/apis/"

cargo fmt --manifest-path "${REPO_ROOT}/Cargo.toml" -- "${REPO_ROOT}/src/client/apis/"*_api.rs
5 changes: 4 additions & 1 deletion api/sync_openapi.sh
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,19 @@ case "${COMMAND}" in
;;
all)
"${SCRIPT_DIR}/emit_openapi_from_rust.sh"
"${SCRIPT_DIR}/check_openapi_codegen_parity.sh"

if [[ "${PROMOTE}" -eq 1 ]]; then
"${SCRIPT_DIR}/promote_openapi_from_rust.sh"
"${SCRIPT_DIR}/check_openapi_codegen_parity.sh"
"${SCRIPT_DIR}/regenerate_clients.sh" --spec "${SCRIPT_DIR}/openapi.yaml"
elif [[ -n "${SPEC_PATH}" ]]; then
"${SCRIPT_DIR}/check_openapi_codegen_parity.sh"
"${SCRIPT_DIR}/regenerate_clients.sh" --spec "${SPEC_PATH}"
elif [[ "${USE_RUST_SPEC}" -eq 1 ]]; then
"${SCRIPT_DIR}/check_openapi_codegen_parity.sh"
"${SCRIPT_DIR}/regenerate_clients.sh" --spec "${SCRIPT_DIR}/openapi.codegen.yaml"
else
"${SCRIPT_DIR}/check_openapi_codegen_parity.sh"
"${SCRIPT_DIR}/regenerate_clients.sh" --spec "${SCRIPT_DIR}/openapi.yaml"
fi
;;
Expand Down
24 changes: 11 additions & 13 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,18 @@ fn main() {

println!("cargo:rustc-env=GIT_HASH={}", git_hash);

// Check if working directory is dirty
let is_dirty = Command::new("git")
.args(["status", "--porcelain"])
.output()
.ok()
.map(|output| !output.stdout.is_empty())
.unwrap_or(false);

let dirty_suffix = if is_dirty { "-dirty" } else { "" };
println!("cargo:rustc-env=GIT_DIRTY={}", dirty_suffix);

// Rerun if git HEAD changes or if any tracked files change
// Rerun when the checked-out commit changes (new commit, branch switch).
// NOTE: Do NOT watch .git/index — it is modified by nearly every git
// operation (stage, stash, status) and causes constant rebuilds of all
// test targets that depend on the env vars emitted above.
println!("cargo:rerun-if-changed=.git/HEAD");
println!("cargo:rerun-if-changed=.git/index");
if let Ok(head) = fs::read_to_string(".git/HEAD") {
// HEAD usually contains "ref: refs/heads/<branch>"; watch that file
// so we rebuild when the branch tip moves (i.e., a new commit).
if let Some(refpath) = head.trim().strip_prefix("ref: ") {
println!("cargo:rerun-if-changed=.git/{}", refpath);
}
}

// Ensure binaries embedding SQLx migrations rebuild whenever migrations change.
emit_rerun_if_changed_for_dir(Path::new("torc-server/migrations"));
Expand Down
17 changes: 17 additions & 0 deletions examples/scripts/checkpointing_evaluate.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash
set -e
python3 -c "
import numpy as np, json, glob
models = glob.glob('/workspace/models/model_*.pt.npy')
best_loss = float('inf')
best_model = None
for m in models:
data = np.load(m, allow_pickle=True).item()
if data['final_loss'] < best_loss:
best_loss = data['final_loss']
best_model = m
report = {'best_model': best_model, 'best_loss': best_loss, 'num_models': len(models)}
with open('/workspace/results/evaluation.json', 'w') as f:
json.dump(report, f, indent=2)
print(json.dumps(report, indent=2))
"
10 changes: 10 additions & 0 deletions examples/scripts/checkpointing_prepare_data.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash
set -e
mkdir -p /workspace/data /workspace/checkpoints /workspace/models /workspace/results
python3 -c "
import pickle, numpy as np
data = {'X': np.random.rand(50000, 128), 'y': np.random.randint(0, 10, 50000)}
with open('/workspace/data/dataset.pkl', 'wb') as f:
pickle.dump(data, f)
print('Dataset prepared: 50000 samples, 128 features')
"
88 changes: 88 additions & 0 deletions examples/scripts/checkpointing_train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python3
"""Training script with SIGTERM-aware checkpointing.

Saves periodic checkpoints during training. On SIGTERM (e.g., torc approaching
a time limit), saves an emergency checkpoint and exits cleanly. On restart,
resumes from the latest checkpoint automatically.

Expected environment variables (set by torc or the calling shell):
TORC_JOB_NAME - used to create a per-job checkpoint directory
MODEL_INDEX - index suffix for the output model file
"""

import json
import numpy as np
import os
import pickle
import signal
import sys
import time

# ── Configuration ──────────────────────────────────────────────────
ckpt_dir = f"/workspace/checkpoints/{os.environ['TORC_JOB_NAME']}"
model_out = f"/workspace/models/model_{os.environ['MODEL_INDEX']}.pt"
os.makedirs(ckpt_dir, exist_ok=True)

total_epochs = 100

# ── SIGTERM handling ───────────────────────────────────────────────
terminated = False


def handle_sigterm(_signum, _frame):
global terminated
terminated = True
print("SIGTERM received — will save checkpoint and exit after current epoch")


signal.signal(signal.SIGTERM, handle_sigterm)

# ── Resume from checkpoint if available ────────────────────────────
checkpoints = sorted(
[f for f in os.listdir(ckpt_dir) if f.startswith("checkpoint_")],
reverse=True,
)
start_epoch = 0
weights = np.random.rand(128, 10) * 0.01

if checkpoints:
latest = os.path.join(ckpt_dir, checkpoints[0])
data = np.load(latest, allow_pickle=True).item()
weights = data["weights"]
start_epoch = data["epoch"] + 1
print(f"Resuming from checkpoint at epoch {start_epoch}")
else:
print("Starting fresh training")

# ── Load dataset ───────────────────────────────────────────────────
with open("/workspace/data/dataset.pkl", "rb") as f:
dataset = pickle.load(f)

# ── Training loop ──────────────────────────────────────────────────
loss = float("inf")
for epoch in range(start_epoch, total_epochs):
# Simulate training step
grad = np.random.randn(*weights.shape) * 0.001
weights -= grad
loss = float(np.linalg.norm(grad))

# Periodic checkpoint every 10 epochs
if (epoch + 1) % 10 == 0:
ckpt_path = os.path.join(ckpt_dir, f"checkpoint_{epoch:04d}.npy")
np.save(ckpt_path, {"weights": weights, "epoch": epoch, "loss": loss})
print(f"Epoch {epoch+1}/{total_epochs} loss={loss:.6f} [checkpoint saved]")
else:
print(f"Epoch {epoch+1}/{total_epochs} loss={loss:.6f}")

# Check if we received SIGTERM — save and exit gracefully
if terminated:
ckpt_path = os.path.join(ckpt_dir, f"checkpoint_{epoch:04d}.npy")
np.save(ckpt_path, {"weights": weights, "epoch": epoch, "loss": loss})
print(f"Emergency checkpoint saved at epoch {epoch+1}. Exiting.")
sys.exit(0)

time.sleep(1) # Simulate compute time

# Save final model
np.save(model_out, {"weights": weights, "final_loss": loss})
print(f"Training complete. Model saved to {model_out}")
23 changes: 23 additions & 0 deletions examples/scripts/fan_in_global_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env python3
"""Aggregate all regional summaries into a global report.

Usage: fan_in_global_report.py
"""

import glob
import json

summaries = []
for f in sorted(glob.glob("output/summary_*.json")):
with open(f) as fh:
summaries.append(json.load(fh))

report = {
"regions": {s["region"]: s for s in summaries},
"global_mean": sum(s["mean"] for s in summaries) / len(summaries),
"total_simulations": sum(s["num_simulations"] for s in summaries),
}

with open("output/global_report.json", "w") as f:
json.dump(report, f, indent=2)
print(json.dumps(report, indent=2))
37 changes: 37 additions & 0 deletions examples/scripts/fan_in_regional_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env python3
"""Aggregate simulation outputs for a single region.

Usage: fan_in_regional_summary.py <region>
"""

import csv
import glob
import json
import sys

import numpy as np

region = sys.argv[1]

files = sorted(glob.glob(f"output/sim_{region}_*.csv"))
all_values = []
for f in files:
with open(f) as fh:
reader = csv.DictReader(fh)
all_values.extend(float(row["value"]) for row in reader)

arr = np.array(all_values)
summary = {
"region": region,
"num_simulations": len(files),
"total_samples": len(arr),
"mean": float(arr.mean()),
"std": float(arr.std()),
"min": float(arr.min()),
"max": float(arr.max()),
}

outpath = f"output/summary_{region}.json"
with open(outpath, "w") as f:
json.dump(summary, f, indent=2)
print(json.dumps(summary, indent=2))
Loading
Loading