Skip to content

Commit f9efeda

Browse files
authored
Merge pull request #245 from NatLabRockies/fix/start-one-worker-per-node
Add start-one-worker-per-node to interactive recovery
2 parents 01097f1 + 662166b commit f9efeda

39 files changed

+1019
-78
lines changed

.github/workflows/lint.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ jobs:
6464
cargo test --lib --no-default-features --features openapi-codegen
6565
bash api/check_openapi_codegen_parity.sh
6666
67+
- name: Check generated API clients are up to date
68+
run: bash api/check_client_codegen_parity.sh
69+
6770
- name: Install dprint
6871
run: |
6972
curl -fsSL https://dprint.dev/install.sh | sh

api/check_client_codegen_parity.sh

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#!/bin/bash
2+
set -euo pipefail
3+
4+
# Verify that the checked-in Python and Julia API clients match what
5+
# openapi-generator would produce from the current api/openapi.yaml.
6+
# Exits non-zero and prints a diff when drift is detected.
7+
8+
OPENAPI_CLI_VERSION="${OPENAPI_CLI_VERSION:-v7.16.0}"
9+
OPENAPI_CLI_DIGEST="sha256:e56372add5e038753fb91aa1bbb470724ef58382fdfc35082bf1b3e079ce353c"
10+
CONTAINER_EXEC="${CONTAINER_EXEC:-docker}"
11+
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
12+
REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
13+
SPEC_PATH="${SCRIPT_DIR}/openapi.yaml"
14+
API_VERSION="$(
15+
awk -F'"' '/pub const HTTP_API_VERSION:/ { print $2; exit }' "${REPO_ROOT}/src/api_version.rs"
16+
)"
17+
18+
if [[ -z "${API_VERSION}" ]]; then
19+
echo "Failed to read HTTP API version from src/api_version.rs" >&2
20+
exit 1
21+
fi
22+
23+
SPEC_DIR="$(cd "$(dirname "${SPEC_PATH}")" && pwd)"
24+
SPEC_FILE="$(basename "${SPEC_PATH}")"
25+
26+
TMP_PYTHON="$(mktemp -d "${TMPDIR:-/tmp}/torc-py-check.XXXXXX")"
27+
TMP_JULIA="$(mktemp -d "${TMPDIR:-/tmp}/torc-jl-check.XXXXXX")"
28+
29+
# shellcheck disable=SC2329,SC2317 # invoked indirectly via trap
30+
cleanup_tmp() {
31+
# Docker may create root-owned files that the CI runner cannot delete directly.
32+
# Use a container to remove them, then clean up the (now-empty) temp dirs.
33+
for d in "${TMP_PYTHON}" "${TMP_JULIA}"; do
34+
"${CONTAINER_EXEC}" run --rm -v "${d}":/tmp_clean alpine rm -rf /tmp_clean/* 2>/dev/null || true
35+
rm -rf "${d}" 2>/dev/null || true
36+
done
37+
}
38+
trap cleanup_tmp EXIT
39+
40+
docker_run() {
41+
case "${OSTYPE:-}" in
42+
msys*|cygwin*)
43+
MSYS_NO_PATHCONV=1 "${CONTAINER_EXEC}" "$@"
44+
;;
45+
*)
46+
"${CONTAINER_EXEC}" "$@"
47+
;;
48+
esac
49+
}
50+
51+
echo "Generating Python client from ${SPEC_FILE}"
52+
docker_run run \
53+
-v "${SCRIPT_DIR}":/data \
54+
-v "${SPEC_DIR}":/spec \
55+
-v "${TMP_PYTHON}":/python_client \
56+
"docker.io/openapitools/openapi-generator-cli:${OPENAPI_CLI_VERSION}@${OPENAPI_CLI_DIGEST}" \
57+
generate -g python --input-spec="/spec/${SPEC_FILE}" -o /python_client -c /data/config.json \
58+
--additional-properties=packageVersion="${API_VERSION}"
59+
60+
echo "Generating Julia client from ${SPEC_FILE}"
61+
docker_run run \
62+
-v "${SCRIPT_DIR}":/data \
63+
-v "${SPEC_DIR}":/spec \
64+
-v "${TMP_JULIA}":/julia_client \
65+
"docker.io/openapitools/openapi-generator-cli:${OPENAPI_CLI_VERSION}@${OPENAPI_CLI_DIGEST}" \
66+
generate -g julia-client --input-spec="/spec/${SPEC_FILE}" -o /julia_client \
67+
--additional-properties=packageVersion="${API_VERSION}"
68+
69+
RC=0
70+
71+
echo "Checking Python client parity…"
72+
if ! diff -rq "${TMP_PYTHON}/torc/openapi_client" \
73+
"${REPO_ROOT}/python_client/src/torc/openapi_client" >/dev/null 2>&1; then
74+
echo "Python client is out of date with ${SPEC_FILE}" >&2
75+
diff -ru "${REPO_ROOT}/python_client/src/torc/openapi_client" \
76+
"${TMP_PYTHON}/torc/openapi_client" || true
77+
RC=1
78+
else
79+
echo "Python client is up to date."
80+
fi
81+
82+
echo "Checking Julia client parity…"
83+
JULIA_DRIFT=0
84+
if ! diff -rq "${TMP_JULIA}/src" \
85+
"${REPO_ROOT}/julia_client/Torc/src/api" >/dev/null 2>&1; then
86+
echo "Julia client API sources are out of date with ${SPEC_FILE}" >&2
87+
diff -ru "${REPO_ROOT}/julia_client/Torc/src/api" \
88+
"${TMP_JULIA}/src" || true
89+
JULIA_DRIFT=1
90+
fi
91+
if ! diff -rq "${TMP_JULIA}/docs" \
92+
"${REPO_ROOT}/julia_client/julia_client/docs" >/dev/null 2>&1; then
93+
echo "Julia client docs are out of date with ${SPEC_FILE}" >&2
94+
diff -ru "${REPO_ROOT}/julia_client/julia_client/docs" \
95+
"${TMP_JULIA}/docs" || true
96+
JULIA_DRIFT=1
97+
fi
98+
if ! diff -q "${TMP_JULIA}/README.md" \
99+
"${REPO_ROOT}/julia_client/julia_client/README.md" >/dev/null 2>&1; then
100+
echo "Julia client README is out of date with ${SPEC_FILE}" >&2
101+
diff -u "${REPO_ROOT}/julia_client/julia_client/README.md" \
102+
"${TMP_JULIA}/README.md" || true
103+
JULIA_DRIFT=1
104+
fi
105+
106+
if [[ "${JULIA_DRIFT}" -eq 1 ]]; then
107+
RC=1
108+
else
109+
echo "Julia client is up to date."
110+
fi
111+
112+
if [[ "${RC}" -ne 0 ]]; then
113+
echo "" >&2
114+
echo "Run 'bash api/sync_openapi.sh clients' to regenerate." >&2
115+
fi
116+
117+
exit "${RC}"

api/openapi.codegen.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4067,8 +4067,8 @@ paths:
40674067
type: integer
40684068
format: int64
40694069
- name: trigger_type
4070-
in: path
4071-
required: true
4070+
in: query
4071+
required: false
40724072
schema:
40734073
type:
40744074
- array

api/openapi.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4067,8 +4067,8 @@ paths:
40674067
type: integer
40684068
format: int64
40694069
- name: trigger_type
4070-
in: path
4071-
required: true
4070+
in: query
4071+
required: false
40724072
schema:
40734073
type:
40744074
- array

api/regenerate_rust_client.sh

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@ docker_run run \
6969
-t /templates \
7070
--additional-properties=supportAsync=false
7171

72-
rm -f "${REPO_ROOT}/src/client/apis/"*_api.rs
72+
find "${REPO_ROOT}/src/client/apis" \
73+
-maxdepth 1 \
74+
-name '*_api.rs' \
75+
! -name 'ro_crate_api.rs' \
76+
-delete
7377
cp "${TMP_RUST_CLIENT}/src/apis/"*_api.rs "${REPO_ROOT}/src/client/apis/"
7478

7579
cargo fmt --manifest-path "${REPO_ROOT}/Cargo.toml" -- "${REPO_ROOT}/src/client/apis/"*_api.rs

api/sync_openapi.sh

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,16 +85,19 @@ case "${COMMAND}" in
8585
;;
8686
all)
8787
"${SCRIPT_DIR}/emit_openapi_from_rust.sh"
88-
"${SCRIPT_DIR}/check_openapi_codegen_parity.sh"
8988

9089
if [[ "${PROMOTE}" -eq 1 ]]; then
9190
"${SCRIPT_DIR}/promote_openapi_from_rust.sh"
91+
"${SCRIPT_DIR}/check_openapi_codegen_parity.sh"
9292
"${SCRIPT_DIR}/regenerate_clients.sh" --spec "${SCRIPT_DIR}/openapi.yaml"
9393
elif [[ -n "${SPEC_PATH}" ]]; then
94+
"${SCRIPT_DIR}/check_openapi_codegen_parity.sh"
9495
"${SCRIPT_DIR}/regenerate_clients.sh" --spec "${SPEC_PATH}"
9596
elif [[ "${USE_RUST_SPEC}" -eq 1 ]]; then
97+
"${SCRIPT_DIR}/check_openapi_codegen_parity.sh"
9698
"${SCRIPT_DIR}/regenerate_clients.sh" --spec "${SCRIPT_DIR}/openapi.codegen.yaml"
9799
else
100+
"${SCRIPT_DIR}/check_openapi_codegen_parity.sh"
98101
"${SCRIPT_DIR}/regenerate_clients.sh" --spec "${SCRIPT_DIR}/openapi.yaml"
99102
fi
100103
;;

build.rs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,18 @@ fn main() {
2222

2323
println!("cargo:rustc-env=GIT_HASH={}", git_hash);
2424

25-
// Check if working directory is dirty
26-
let is_dirty = Command::new("git")
27-
.args(["status", "--porcelain"])
28-
.output()
29-
.ok()
30-
.map(|output| !output.stdout.is_empty())
31-
.unwrap_or(false);
32-
33-
let dirty_suffix = if is_dirty { "-dirty" } else { "" };
34-
println!("cargo:rustc-env=GIT_DIRTY={}", dirty_suffix);
35-
36-
// Rerun if git HEAD changes or if any tracked files change
25+
// Rerun when the checked-out commit changes (new commit, branch switch).
26+
// NOTE: Do NOT watch .git/index — it is modified by nearly every git
27+
// operation (stage, stash, status) and causes constant rebuilds of all
28+
// test targets that depend on the env vars emitted above.
3729
println!("cargo:rerun-if-changed=.git/HEAD");
38-
println!("cargo:rerun-if-changed=.git/index");
30+
if let Ok(head) = fs::read_to_string(".git/HEAD") {
31+
// HEAD usually contains "ref: refs/heads/<branch>"; watch that file
32+
// so we rebuild when the branch tip moves (i.e., a new commit).
33+
if let Some(refpath) = head.trim().strip_prefix("ref: ") {
34+
println!("cargo:rerun-if-changed=.git/{}", refpath);
35+
}
36+
}
3937

4038
// Ensure binaries embedding SQLx migrations rebuild whenever migrations change.
4139
emit_rerun_if_changed_for_dir(Path::new("torc-server/migrations"));
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/bin/bash
2+
set -e
3+
python3 -c "
4+
import numpy as np, json, glob
5+
models = glob.glob('/workspace/models/model_*.pt.npy')
6+
best_loss = float('inf')
7+
best_model = None
8+
for m in models:
9+
data = np.load(m, allow_pickle=True).item()
10+
if data['final_loss'] < best_loss:
11+
best_loss = data['final_loss']
12+
best_model = m
13+
report = {'best_model': best_model, 'best_loss': best_loss, 'num_models': len(models)}
14+
with open('/workspace/results/evaluation.json', 'w') as f:
15+
json.dump(report, f, indent=2)
16+
print(json.dumps(report, indent=2))
17+
"
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#!/bin/bash
2+
set -e
3+
mkdir -p /workspace/data /workspace/checkpoints /workspace/models /workspace/results
4+
python3 -c "
5+
import pickle, numpy as np
6+
data = {'X': np.random.rand(50000, 128), 'y': np.random.randint(0, 10, 50000)}
7+
with open('/workspace/data/dataset.pkl', 'wb') as f:
8+
pickle.dump(data, f)
9+
print('Dataset prepared: 50000 samples, 128 features')
10+
"
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#!/usr/bin/env python3
2+
"""Training script with SIGTERM-aware checkpointing.
3+
4+
Saves periodic checkpoints during training. On SIGTERM (e.g., torc approaching
5+
a time limit), saves an emergency checkpoint and exits cleanly. On restart,
6+
resumes from the latest checkpoint automatically.
7+
8+
Expected environment variables (set by torc or the calling shell):
9+
TORC_JOB_NAME - used to create a per-job checkpoint directory
10+
MODEL_INDEX - index suffix for the output model file
11+
"""
12+
13+
import json
14+
import numpy as np
15+
import os
16+
import pickle
17+
import signal
18+
import sys
19+
import time
20+
21+
# ── Configuration ──────────────────────────────────────────────────
22+
ckpt_dir = f"/workspace/checkpoints/{os.environ['TORC_JOB_NAME']}"
23+
model_out = f"/workspace/models/model_{os.environ['MODEL_INDEX']}.pt"
24+
os.makedirs(ckpt_dir, exist_ok=True)
25+
26+
total_epochs = 100
27+
28+
# ── SIGTERM handling ───────────────────────────────────────────────
29+
terminated = False
30+
31+
32+
def handle_sigterm(_signum, _frame):
33+
global terminated
34+
terminated = True
35+
print("SIGTERM received — will save checkpoint and exit after current epoch")
36+
37+
38+
signal.signal(signal.SIGTERM, handle_sigterm)
39+
40+
# ── Resume from checkpoint if available ────────────────────────────
41+
checkpoints = sorted(
42+
[f for f in os.listdir(ckpt_dir) if f.startswith("checkpoint_")],
43+
reverse=True,
44+
)
45+
start_epoch = 0
46+
weights = np.random.rand(128, 10) * 0.01
47+
48+
if checkpoints:
49+
latest = os.path.join(ckpt_dir, checkpoints[0])
50+
data = np.load(latest, allow_pickle=True).item()
51+
weights = data["weights"]
52+
start_epoch = data["epoch"] + 1
53+
print(f"Resuming from checkpoint at epoch {start_epoch}")
54+
else:
55+
print("Starting fresh training")
56+
57+
# ── Load dataset ───────────────────────────────────────────────────
58+
with open("/workspace/data/dataset.pkl", "rb") as f:
59+
dataset = pickle.load(f)
60+
61+
# ── Training loop ──────────────────────────────────────────────────
62+
loss = float("inf")
63+
for epoch in range(start_epoch, total_epochs):
64+
# Simulate training step
65+
grad = np.random.randn(*weights.shape) * 0.001
66+
weights -= grad
67+
loss = float(np.linalg.norm(grad))
68+
69+
# Periodic checkpoint every 10 epochs
70+
if (epoch + 1) % 10 == 0:
71+
ckpt_path = os.path.join(ckpt_dir, f"checkpoint_{epoch:04d}.npy")
72+
np.save(ckpt_path, {"weights": weights, "epoch": epoch, "loss": loss})
73+
print(f"Epoch {epoch+1}/{total_epochs} loss={loss:.6f} [checkpoint saved]")
74+
else:
75+
print(f"Epoch {epoch+1}/{total_epochs} loss={loss:.6f}")
76+
77+
# Check if we received SIGTERM — save and exit gracefully
78+
if terminated:
79+
ckpt_path = os.path.join(ckpt_dir, f"checkpoint_{epoch:04d}.npy")
80+
np.save(ckpt_path, {"weights": weights, "epoch": epoch, "loss": loss})
81+
print(f"Emergency checkpoint saved at epoch {epoch+1}. Exiting.")
82+
sys.exit(0)
83+
84+
time.sleep(1) # Simulate compute time
85+
86+
# Save final model
87+
np.save(model_out, {"weights": weights, "final_loss": loss})
88+
print(f"Training complete. Model saved to {model_out}")

0 commit comments

Comments
 (0)