diff --git a/src/environment.py b/src/environment.py index bd7619d..6fbc2cc 100644 --- a/src/environment.py +++ b/src/environment.py @@ -219,6 +219,8 @@ def reset(self, seed=None, options=None): # Episode k starts at hour k * episode_span (wrapping around the year) start_index = (self.episode_idx * episode_span) % n_prices + if options and "price_start_index" in options: # For testing Purposes. Leave out 'options' to advance episode. + start_index = int(options["price_start_index"]) % n_prices self.prices.reset(start_index=start_index) else: # Synthetic prices or no external prices diff --git a/src/plotter.py b/src/plotter.py new file mode 100644 index 0000000..11674e7 --- /dev/null +++ b/src/plotter.py @@ -0,0 +1,316 @@ +import matplotlib.pyplot as plt +from matplotlib.gridspec import GridSpec +from datetime import datetime +import numpy as np +import os + + +def _as_series(x, n): + if x is None: + return None + a = np.asarray(x, dtype=float).reshape(-1) + if a.size >= n: + return a[:n] + out = np.full(n, np.nan, dtype=float) + out[:a.size] = a + return out + + +def _compute_cumulative_savings(episode_costs): + """ + episode_costs: list of dicts with keys: + agent_cost, baseline_cost, baseline_cost_off + Returns arrays for plotting. + """ + if not episode_costs: + return None + + cum_s = [] + cum_s_off = [] + monthly_pct = [] + monthly_pct_off = [] + + total = 0.0 + total_off = 0.0 + + for i, ep in enumerate(episode_costs): + agent = float(ep["agent_cost"]) + base = float(ep["baseline_cost"]) + base_off = float(ep["baseline_cost_off"]) + + total += (base - agent) + total_off += (base_off - agent) + cum_s.append(total) + cum_s_off.append(total_off) + + # monthly % every 2 episodes (episode = 2 weeks assumption) + if i % 2 == 1: + prev = episode_costs[i - 1] + month_base = float(prev["baseline_cost"]) + base + month_base_off = float(prev["baseline_cost_off"]) + base_off + month_agent = float(prev["agent_cost"]) + agent + + pct = ((month_base - month_agent) / month_base * 100.0) if month_base > 0 else 0.0 + pct_off = ((month_base_off - month_agent) / month_base_off * 100.0) if month_base_off > 0 else 0.0 + + # duplicate for step-like visualization + monthly_pct.extend([pct, pct]) + monthly_pct_off.extend([pct_off, pct_off]) + + # x-axis in "months" (2-week steps) + n_eps = len(episode_costs) + weeks = (np.arange(1, n_eps + 1) * 2.0) + months = weeks / 4.33 + + # monthly arrays are shorter (only defined at month boundaries) -> pad/align + if len(monthly_pct) < n_eps: + last = monthly_pct[-1] if monthly_pct else 0.0 + monthly_pct = monthly_pct + [last] * (n_eps - len(monthly_pct)) + last_off = monthly_pct_off[-1] if monthly_pct_off else 0.0 + monthly_pct_off = monthly_pct_off + [last_off] * (n_eps - len(monthly_pct_off)) + + return { + "months": months, + "cum_s": np.asarray(cum_s, dtype=float), + "cum_s_off": np.asarray(cum_s_off, dtype=float), + "monthly_pct": np.asarray(monthly_pct[:n_eps], dtype=float), + "monthly_pct_off": np.asarray(monthly_pct_off[:n_eps], dtype=float), + } + + +def plot_dashboard(env, num_hours, max_nodes, episode_costs=None, save=True, show=True, suffix=""): + """ + Per-hour dashboard: price, nodes, queue, reward components, etc. + NOTE: episode_costs is accepted for backwards compatibility but NOT used here anymore. + Cumulative savings now lives in plot_cumulative_savings(). + """ + hours = np.arange(num_hours) + + # ----- header text ----- + completion_rate = (env.jobs_completed / env.jobs_submitted * 100) if getattr(env, "jobs_submitted", 0) > 0 else 0.0 + baseline_completion_rate = (env.baseline_jobs_completed / env.baseline_jobs_submitted * 100) if getattr(env, "baseline_jobs_submitted", 0) > 0 else 0.0 + avg_wait = (env.total_job_wait_time / env.jobs_completed) if getattr(env, "jobs_completed", 0) > 0 else 0.0 + baseline_avg_wait = (env.baseline_total_job_wait_time / env.baseline_jobs_completed) if getattr(env, "baseline_jobs_completed", 0) > 0 else 0.0 + + base_cost = float(getattr(env, "baseline_cost", 0.0)) + base_cost_off = float(getattr(env, "baseline_cost_off", 0.0)) + agent_cost = float(getattr(env, "total_cost", 0.0)) + + pct_vs_base = ((base_cost - agent_cost) / base_cost * 100.0) if base_cost > 0 else 0.0 + pct_vs_base_off = ((base_cost_off - agent_cost) / base_cost_off * 100.0) if base_cost_off > 0 else 0.0 + + header = ( + f"{env.session} | ep:{env.current_episode} step:{env.current_step} | {env.weights}\n" + f"Cost: €{agent_cost:.0f}, Base: €{base_cost:.0f} (+{base_cost - agent_cost:.0f}, {pct_vs_base:.1f}%), " + f"Base_Off: €{base_cost_off:.0f} (+{base_cost_off - agent_cost:.0f}, {pct_vs_base_off:.1f}%)\n" + f"Jobs: {env.jobs_completed}/{env.jobs_submitted} ({completion_rate:.0f}%, wait={avg_wait:.1f}h, Q={env.max_queue_size_reached}) | " + f"Base: {env.baseline_jobs_completed}/{env.baseline_jobs_submitted} ({baseline_completion_rate:.0f}%, wait={baseline_avg_wait:.1f}h, Q={env.baseline_max_queue_size_reached})" + ) + + # ----- collect per-hour panels (one / panel, optional overlay) ----- + panels = [] + + def add_panel(title, series, ylabel, ylim=None, overlay=None): + """ + overlay: optional (label, series2) + """ + s = _as_series(series, num_hours) + if s is None: + return + + ov = None + if overlay is not None: + ov_label, ov_series = overlay + s2 = _as_series(ov_series, num_hours) + if s2 is not None: + ov = (ov_label, s2) + + panels.append((title, s, ylabel, ylim, ov)) + + # Price + if not getattr(env, "skip_plot_price", False): + add_panel("Electricity price", getattr(env, "price_stats", None), "€/MWh", None) + + # Nodes + if not getattr(env, "skip_plot_online_nodes", False): + add_panel("Online nodes", getattr(env, "on_nodes", None), "count", (0, max_nodes * 1.1)) + if not getattr(env, "skip_plot_used_nodes", False): + add_panel("Used nodes", getattr(env, "used_nodes", None), "count", (0, max_nodes)) + + # Queue + running jobs (same plot) + if not getattr(env, "skip_plot_job_queue", False): + running_series = getattr(env, "running_jobs_counts", None) + add_panel( + "Job queue & running jobs", + getattr(env, "job_queue_sizes", None), + "jobs", + None, + overlay=("Running jobs", running_series), + ) + + # Reward components + if getattr(env, "plot_eff_reward", False): + add_panel("Efficiency reward (%)", getattr(env, "eff_rewards", None), "score", None) + if getattr(env, "plot_price_reward", False): + add_panel("Price reward (%)", getattr(env, "price_rewards", None), "score", None) + if getattr(env, "plot_idle_penalty", False): + add_panel("Idle penalty (%)", getattr(env, "idle_penalties", None), "score", None) + if getattr(env, "plot_job_age_penalty", False): + add_panel("Job-age penalty (%)", getattr(env, "job_age_penalties", None), "score", None) + if getattr(env, "plot_total_reward", False): + add_panel("Total reward", getattr(env, "rewards", None), "reward", None) + + if not panels: + print("plot_dashboard(): nothing to plot.") + return + + n_pan = len(panels) + ncols = 2 if n_pan <= 6 else 3 + nrows = int(np.ceil(n_pan / ncols)) + + fig = plt.figure(figsize=(14, 3.2 * nrows)) + gs = GridSpec(nrows, ncols, figure=fig) + + # Place panel axes + axs = [] + for i in range(nrows * ncols): + r = i // ncols + c = i % ncols + axs.append(fig.add_subplot(gs[r, c])) + + # Plot per-hour panels + for idx, (title, s, ylabel, ylim, overlay) in enumerate(panels): + ax = axs[idx] + # main series + ax.plot(hours, s, label=title) + # overlay series (e.g. running jobs) + if overlay is not None: + ov_label, s2 = overlay + ax.plot(hours, s2, label=ov_label, linestyle="--") + ax.legend(fontsize=7) + + ax.set_title(title, fontsize=9, pad=2) + ax.set_ylabel(ylabel, fontsize=9) + ax.grid(True, alpha=0.25) + ax.tick_params(labelsize=8) + if ylim is not None: + ax.set_ylim(*ylim) + + # Hide unused axes + for j in range(n_pan, nrows * ncols): + axs[j].axis("off") + + # Shared x-label + for ax in axs[(nrows - 1) * ncols : nrows * ncols]: + if ax.has_data(): + ax.set_xlabel("Hours", fontsize=9) + + # Header text + fig.subplots_adjust(top=0.82, left=0.06, right=0.98, bottom=0.06, hspace=0.45, wspace=0.25) + fig.text(0.01, 0.99, header, ha="left", va="top", fontsize=9, family="monospace") + + # Save/show + prefix = f"e{env.weights.efficiency_weight}_p{env.weights.price_weight}_i{env.weights.idle_weight}_d{env.weights.job_age_weight}" + if save: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + fname = f"{prefix}_{suffix}_{timestamp}.png" + save_path = os.path.join(getattr(env, "plots_dir", "."), fname) + plt.savefig(save_path, dpi=200, bbox_inches="tight") + print(f"Dashboard figure saved as: {save_path}") + if show: + plt.show() + plt.close(fig) + + +def plot_cumulative_savings(env, episode_costs, session_dir=None, save=True, show=True, suffix=""): + """ + Separate canvas for long-term cumulative savings & monthly % savings. + """ + data = _compute_cumulative_savings(episode_costs) + if data is None: + print("plot_cumulative_savings(): no episode_costs, skipping.") + return None + + months = data["months"] + cum_s = data["cum_s"] + cum_s_off = data["cum_s_off"] + monthly_pct = data["monthly_pct"] + monthly_pct_off = data["monthly_pct_off"] + + # Basic stats + final_savings = float(cum_s[-1]) + final_savings_off = float(cum_s_off[-1]) + avg_monthly_savings = float(np.mean(monthly_pct)) if monthly_pct.size > 0 else 0.0 + avg_monthly_savings_off = float(np.mean(monthly_pct_off)) if monthly_pct_off.size > 0 else 0.0 + + fig, ax1 = plt.subplots(figsize=(14, 8)) + + # Primary axis - cumulative savings (€) + ax1.set_xlabel("Time (months)", fontsize=12) + ax1.set_ylabel("Cumulative savings (€)", fontsize=12) + line1 = ax1.plot(months, cum_s, linewidth=3, label="Savings vs baseline (with idle)") + line1b = ax1.plot(months, cum_s_off, linewidth=3, linestyle="--", label="Savings vs baseline_off (no idle)") + ax1.tick_params(axis="y") + ax1.grid(True, alpha=0.3) + + # Secondary axis - monthly savings % + ax2 = ax1.twinx() + ax2.set_ylabel("Monthly savings (%)", fontsize=12) + line2 = ax2.plot(months, monthly_pct, linewidth=2, linestyle=":", alpha=0.7, label="Monthly % (vs baseline)") + line2b = ax2.plot(months, monthly_pct_off, linewidth=2, linestyle=":", alpha=0.7, label="Monthly % (vs baseline_off)") + ax2.tick_params(axis="y") + + max_pct = max( + float(np.max(monthly_pct)) if monthly_pct.size > 0 else 0.0, + float(np.max(monthly_pct_off)) if monthly_pct_off.size > 0 else 0.0, + ) + ax2.set_ylim(0, max_pct * 1.1 if max_pct > 0 else 100) + + # Title and summary box + weights_str = str(getattr(env, "weights", "")) + plt.title( + f"PowerSched Long-Term Cost Savings Analysis\n{weights_str}\n" + f"Savings vs Baseline: €{final_savings:,.0f} ({avg_monthly_savings:.1f}% avg) | " + f"Savings vs Baseline_off: €{final_savings_off:,.0f} ({avg_monthly_savings_off:.1f}% avg)", + fontsize=14, + pad=20, + ) + + textstr = ( + f"Vs Baseline (with idle):\n" + f" €{final_savings:,.0f} | {avg_monthly_savings:.1f}%\n" + f"Vs Baseline_off (no idle):\n" + f" €{final_savings_off:,.0f} | {avg_monthly_savings_off:.1f}%" + ) + props = dict(boxstyle="round", facecolor="wheat", alpha=0.8) + ax1.text(0.02, 0.98, textstr, transform=ax1.transAxes, fontsize=10, verticalalignment="top", bbox=props) + + # Combine legends + lines = line1 + line1b + line2 + line2b + labels = [l.get_label() for l in lines] + ax1.legend(lines, labels, loc="center right", fontsize=9) + + plt.tight_layout() + + # Save/show + prefix = f"e{env.weights.efficiency_weight}_p{env.weights.price_weight}_i{env.weights.idle_weight}_d{env.weights.job_age_weight}" + if session_dir is None: + session_dir = getattr(env, "plots_dir", ".") + if save: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + fname = f"cumulative_savings_{prefix}_{suffix}_{timestamp}.png" + save_path = os.path.join(session_dir, fname) + plt.savefig(save_path, dpi=300, bbox_inches="tight") + print(f"Cumulative savings figure saved: {save_path}") + + if show: + plt.show() + + plt.close(fig) + + return { + "total_savings": final_savings, + "avg_monthly_savings_pct": avg_monthly_savings, + "total_savings_off": final_savings_off, + "avg_monthly_savings_pct_off": avg_monthly_savings_off, + } diff --git a/test/test_sanity_env.py b/test/test_sanity_env.py new file mode 100644 index 0000000..79b7a3a --- /dev/null +++ b/test/test_sanity_env.py @@ -0,0 +1,364 @@ +#!/usr/bin/env python3 + +'''Standalone sanity check for Env, that instantiates the env the same way "train.py" does, but runs a battery of check +like API compliance, invariants, determinism.''' + +'''Use it as: +- Quick invariant run: python sanity_env.py --steps 200 +- Full belt-and-braces: python sanity_env.py --check-gym --check-determinism --steps 300 +- With external data: python sanity_env.py --prices <> --hourly-jobs <> --steps 300 --print-job-every 50 --print-job-kind both +''' + +import argparse +import numpy as np + +from gymnasium.utils.env_checker import check_env + +from src.environment import ComputeClusterEnv, Weights +import pandas as pd +from src.workloadgen import WorkloadGenerator, WorkloadGenConfig + +# Import environment variables: +from src.config import ( + MAX_JOB_DURATION, + MIN_NODES_PER_JOB, MAX_NODES_PER_JOB, + MIN_CORES_PER_JOB, + CORES_PER_NODE, EPISODE_HOURS +) + + +def load_prices(prices_file_path: str | None): + if not prices_file_path: + return None + df = pd.read_csv(prices_file_path, parse_dates=["Date"]) + prices = df["Price"].astype(float).tolist() + print(f"Loaded {len(prices)} prices from CSV: {prices_file_path}") + return prices + +# ----------------------------- +# Invariants / sanity checks +# ----------------------------- +def _extract(obs): + nodes = obs["nodes"] + q = obs["job_queue"].reshape(-1, 4) + prices = obs["predicted_prices"] + return nodes, q, prices + +def check_invariants(env, obs): + nodes, q, prices = _extract(obs) + + # ---- Shapes ---- + assert nodes.ndim == 1, nodes.shape + assert q.ndim == 2 and q.shape[1] == 4, q.shape + assert prices.shape == (24,), prices.shape + + # ---- Node bounds ---- + assert np.all(nodes >= -1), f"nodes < -1 exists, min={nodes.min()}" + assert np.all(nodes <= 170), f"nodes > MAX_JOB_DURATION exists, max={nodes.max()}" + + # ---- Predicted prices must be finite ---- + assert np.all(np.isfinite(prices)), "predicted_prices contains NaN/inf" + + # ---- cores_available invariants (from env, not obs) ---- + ca = env.cores_available + assert ca.shape == nodes.shape + assert np.all((ca >= 0) & (ca <= 96)), f"cores_available out of bounds min={ca.min()} max={ca.max()}" + + off = (nodes == -1) + idle = (nodes == 0) + + assert np.all(ca[off] == 0), "off nodes must have 0 cores_available" + assert np.all(ca[idle] == 96), "idle nodes must have full cores_available" + + # ---- Queue invariants ---- + dur = q[:, 0] + age = q[:, 1] + nn = q[:, 2] + cpn = q[:, 3] + + # slots either "all zeros" or "duration > 0" + all_zero = np.all(q == 0, axis=1) + assert np.all((dur == 0) == all_zero), "queue has partially-zero slots (corruption / holes)" + + # bounds (based on your constants; keep hard-coded here to avoid importing env constants) + assert np.all((dur >= 0) & (dur <= 170)), f"duration out of bounds min={dur.min()} max={dur.max()}" + assert np.all((age >= 0) & (age <= 168)), f"age out of bounds min={age.min()} max={age.max()}" + + # if a job exists, its nodes/cores should be positive and within limits + active = (dur > 0) + if np.any(active): + assert np.all((nn[active] >= 1) & (nn[active] <= 16)), f"job nnodes out of bounds nn={nn[active]}" + assert np.all((cpn[active] >= 1) & (cpn[active] <= 96)), f"cores_per_node out of bounds cpn={cpn[active]}" + + # ---- next_empty_slot sanity (optional) ---- + if hasattr(env, "next_empty_slot"): + ne = env.next_empty_slot + if ne < len(q): + assert q[ne, 0] == 0, "next_empty_slot does not point at an empty slot" + if ne > 0: + assert np.all(q[:ne, 0] != 0), "hole exists before next_empty_slot" + + +def check_obs_is_copy(env, obs): + # ensure obs doesn't alias env.state buffers + before = int(env.state["nodes"][0]) + obs["nodes"][0] = 12345 + after = int(env.state["nodes"][0]) + assert after == before, "obs['nodes'] aliases env.state['nodes'] (not a copy)" + + +def determinism_test(make_env, seed, n_steps=200): + # fixed action sequence + env0 = make_env() + env0.reset(seed=seed, options={"price_start_index": 0}) + actions = [env0.action_space.sample() for _ in range(n_steps)] + env0.close() + + def rollout(): + env = make_env() + # Pin external price window so determinism doesn't vary by episode. + obs, _ = env.reset(seed=seed, options={"price_start_index": 0}) + traj = [] + done = False + i = 0 + while not done and i < n_steps: + obs, r, term, trunc, info = env.step(actions[i]) + # record a small fingerprint + traj.append(( + float(r), + float(info.get("step_cost", 0.0)), + int(info.get("num_unprocessed_jobs", -1)), + int(info.get("num_on_nodes", -1)), + int(info.get("dropped_this_episode", -1)), + )) + done = term or trunc + i += 1 + env.close() + return traj + + a = rollout() + b = rollout() + assert a == b, "Determinism failed: same seed + same actions produced different trajectories" + + +# ----------------------------- +# CLI + env construction +# ----------------------------- +def parse_args(): + p = argparse.ArgumentParser(description="Sanity checks for ComputeClusterEnv (no training).") + p.add_argument("--seed", type=int, default=123) + p.add_argument("--steps", type=int, default=500) + p.add_argument("--episodes", type=int, default=1) + p.add_argument("--check-gym", action="store_true", help="Run gymnasium check_env") + p.add_argument("--check-determinism", action="store_true") + # mirror train.py-ish knobs (mostly optional) + p.add_argument("--session", default="sanity") + p.add_argument("--render", type=str, default="none", choices=["human", "none"]) + p.add_argument("--prices", default="") + p.add_argument("--job-durations", default="") + p.add_argument("--jobs", default="") + p.add_argument("--hourly-jobs", default="") + p.add_argument("--efficiency-weight", type=float, default=0.5) + p.add_argument("--price-weight", type=float, default=0.2) + p.add_argument("--idle-weight", type=float, default=0.1) + p.add_argument("--job-age-weight", type=float, default=0.1) + p.add_argument("--drop-weight", type=float, default=0.1) + p.add_argument("--workload-gen",type=str,default="",choices=["", "flat", "poisson"],help="Enable workload generator (default: disabled).",) + p.add_argument("--wg-poisson-lambda", type=float, default=200.0, help="Poisson lambda for jobs/hour.") + p.add_argument("--wg-max-jobs-hour", type=int, default=1500, help="Cap jobs/hour for generator.") + p.add_argument("--print-job-every", type=int, default=0, + help="Print one sample job every N steps (0 disables).") + p.add_argument("--print-job-kind", choices=["queue", "running", "both"], default="queue", + help="Where to sample the job from.") + p.add_argument("--print-job-index", type=int, default=-1, + help="Queue index to print (>=0), or -1 to print first active job.") + + + return p.parse_args() + + +def make_env_from_args(args, env_cls=ComputeClusterEnv): + weights = Weights( + efficiency_weight=args.efficiency_weight, + price_weight=args.price_weight, + idle_weight=args.idle_weight, + job_age_weight=args.job_age_weight, + drop_weight=args.drop_weight + ) + + workload_gen = None + if args.workload_gen: + cfg = WorkloadGenConfig( + arrivals=args.workload_gen, + poisson_lambda=args.wg_poisson_lambda, + max_new_jobs_per_hour=args.wg_max_jobs_hour, + min_duration=1, + max_duration=MAX_JOB_DURATION, + min_nodes=MIN_NODES_PER_JOB, + max_nodes=MAX_NODES_PER_JOB, + min_cores=MIN_CORES_PER_JOB, + max_cores=CORES_PER_NODE, + ) + workload_gen = WorkloadGenerator(cfg) + + + # Train.py passes strings; the env treats "" as falsy in some places and truthy in others. + # To be safe: normalize "" -> None here. + def norm_path(x): + return None if (x is None or str(x).strip() == "") else x + + return env_cls( + weights=weights, + session=args.session, + render_mode=args.render, + quick_plot=False, + external_prices=load_prices(args.prices), + external_durations=norm_path(args.job_durations), + external_jobs=norm_path(args.jobs), + external_hourly_jobs=norm_path(args.hourly_jobs), + plot_rewards=False, + plots_dir="plots", + plot_once=False, + plot_eff_reward=False, + plot_price_reward=False, + plot_idle_penalty=False, + plot_job_age_penalty=False, + skip_plot_price=True, + skip_plot_online_nodes=True, + skip_plot_used_nodes=True, + skip_plot_job_queue=True, + steps_per_iteration=EPISODE_HOURS, # prevent plot cadence surprises + evaluation_mode=False, + # plot_total_reward=False, + workload_gen=workload_gen + ) + +def maybe_print_job(env, obs, step_idx, every, kind="queue", job_index=-1): + if not every or every <= 0: + return + if step_idx % every != 0: + return + + nodes, q, prices = _extract(obs) + + def print_queue_job(): + active = np.flatnonzero(q[:, 0] > 0) + if active.size == 0: + print(f"[job@step {step_idx}] queue empty") + return + idx = int(active[0]) if job_index < 0 else int(job_index) + if idx < 0 or idx >= q.shape[0]: + print(f"[job@step {step_idx}] invalid queue index {idx}") + return + d, a, nn, cpn = map(int, q[idx]) + print(f"[job@step {step_idx}] QUEUE idx={idx}: dur_h={d} age_h={a} nodes={nn} cores_per_node={cpn}") + + def print_running_job(): + if not getattr(env, "running_jobs", None): + print(f"[job@step {step_idx}] running_jobs empty") + return + # deterministic-ish: smallest job_id + job_id = sorted(env.running_jobs.keys())[0] + jd = env.running_jobs[job_id] + dur = int(jd["duration"]) + alloc = jd.get("allocation", []) + nn = len(alloc) + cpn = int(alloc[0][1]) if alloc else 0 + node_ids = [int(x[0]) for x in alloc[:8]] + more = "" if len(alloc) <= 8 else f" (+{len(alloc)-8} more)" + print(f"[job@step {step_idx}] RUNNING job_id={job_id}: rem_h={dur} nodes={nn} cores_per_node={cpn} node_idxs={node_ids}{more}") + + if kind in ("queue", "both"): + print_queue_job() + if kind in ("running", "both"): + print_running_job() + + + +def main(): + args = parse_args() + + class DeterministicPriceEnv(ComputeClusterEnv): + def reset(self, seed=None, options=None): + if options is None: + options = {} + if seed is not None and "price_start_index" not in options: + options = dict(options) + options["price_start_index"] = 0 + return super().reset(seed=seed, options=options) + + +# ------------------------------------- + + seed = 123 + action = np.array([1, 0], dtype=np.int64) # "maintain, magnitude 1" effectively + + env = make_env_from_args(args, env_cls=DeterministicPriceEnv) + + o1, _ = env.reset(seed=seed) + o1s, r1, t1, tr1, i1 = env.step(action) + + o2, _ = env.reset(seed=seed) + o2s, r2, t2, tr2, i2 = env.step(action) + + def cmp(name, a, b): + eq = np.array_equal(a, b) + print(name, "equal:", eq) + if not eq: + # show first mismatch + idx = np.flatnonzero(a.flatten() != b.flatten())[0] + print(" first mismatch idx:", idx, "a:", a.flatten()[idx], "b:", b.flatten()[idx]) + + cmp("nodes", o1s["nodes"], o2s["nodes"]) + cmp("job_queue", o1s["job_queue"], o2s["job_queue"]) + cmp("predicted_prices", o1s["predicted_prices"], o2s["predicted_prices"]) + print("reward", r1, r2) + print("info.current_price", i1.get("current_price"), i2.get("current_price")) + +#---------------------------------------- + + # 1) Gym API compliance (optional) + if args.check_gym: + # Pin external price window so gym's determinism check is meaningful. + env = make_env_from_args(args, env_cls=DeterministicPriceEnv) + check_env(env, skip_render_check=True) + env.close() + print("[OK] gymnasium check_env passed") + + # 2) Invariants + copy checks during random rollout + env = make_env_from_args(args) + for ep in range(args.episodes): + obs, info = env.reset(seed=args.seed + ep) + env.action_space.seed(args.seed + ep) # IMPORTANT: deterministic action sampling + check_invariants(env, obs) + #check_obs_is_copy(env, obs) + + done = False + steps = 0 + while not done and steps < args.steps: + action = env.action_space.sample() + obs, r, term, trunc, info = env.step(action) + maybe_print_job( + env, obs, + step_idx=steps, + every=args.print_job_every, + kind=args.print_job_kind, + job_index=args.print_job_index, + ) + check_invariants(env, obs) + done = term or trunc + steps += 1 + + print(f"[OK] episode={ep} steps={steps} done={done}") + env.close() + + # 3) Determinism (optional) + if args.check_determinism: + determinism_test(lambda: make_env_from_args(args), seed=args.seed, n_steps=min(args.steps, 500)) + print("[OK] determinism test passed") + + print("done") + + +if __name__ == "__main__": + main() diff --git a/train.py b/train.py index e934009..7aefeeb 100644 --- a/train.py +++ b/train.py @@ -2,13 +2,13 @@ import os from src.environment import ComputeClusterEnv, Weights, PlottingComplete from src.callbacks import ComputeClusterCallback -from src.plot import plot_cumulative_savings +from src.plotter import plot_dashboard, plot_cumulative_savings import re import glob import argparse import pandas as pd from src.workloadgen import WorkloadGenerator, WorkloadGenConfig - +import time # Import environment constants from config module: from src.config import ( MAX_JOB_DURATION, @@ -39,6 +39,7 @@ def main(): parser.add_argument('--plot-price-reward', action='store_true', help='Include price reward in the plot (dashed line).') parser.add_argument('--plot-idle-penalty', action='store_true', help='Include idle penalty in the plot (dashed line).') parser.add_argument('--plot-job-age-penalty', action='store_true', help='Include job age penalty in the plot (dashed line).') + parser.add_argument('--plot-total-reward', action='store_true', help='Include total reward per step in the dashboard (raw values).') parser.add_argument('--skip-plot-price', action='store_true', help='Skip electricity price in the plot (blue line).') parser.add_argument('--skip-plot-online-nodes', action='store_true', help='Skip online nodes in the plot (blue line).') parser.add_argument('--skip-plot-used-nodes', action='store_true', help='Skip used nodes in the plot (blue line).') @@ -56,6 +57,9 @@ def main(): parser.add_argument("--workload-gen", type=str, default="", choices=["", "flat", "poisson", "uniform"], help="Enable workload generator (default: disabled).",) parser.add_argument("--wg-poisson-lambda", type=float, default=200.0, help="Poisson lambda for jobs/hour.") parser.add_argument("--wg-max-jobs-hour", type=int, default=1500, help="Cap jobs/hour for generator.") + parser.add_argument("--plot-dashboard", action="store_true", help="Generate dashboard plot (per-hour panels + cumulative savings).") + parser.add_argument("--dashboard-hours", type=int, default=24*14, help="Hours to show in dashboard time-series panels (default: 336).") + args = parser.parse_args() @@ -145,10 +149,14 @@ def main(): model_files.sort(key=lambda filename: int(re.match(r"(\d+)", os.path.basename(filename)).group())) latest_model_file = model_files[-1] # Get the last file after sorting, which should be the one with the most timesteps print(f"Found a saved model: {latest_model_file}") - model = PPO.load(latest_model_file, env=env, tensorboard_log=log_dir) + model = PPO.load(latest_model_file, env=env, tensorboard_log=log_dir + , n_steps=64, batch_size=64 + ) else: print(f"Starting a new model training...") - model = PPO('MultiInputPolicy', env, verbose=1, tensorboard_log=log_dir, ent_coef=args.ent_coef) + model = PPO('MultiInputPolicy', env, verbose=1, tensorboard_log=log_dir, ent_coef=args.ent_coef + , n_steps=64, batch_size=64 + ) iters = 0 @@ -185,7 +193,8 @@ def main(): obs, reward, terminated, truncated, _ = env.step(action) episode_reward += reward step_count += 1 - # print(f"Episode {episode + 1}, Step {step_count}, Action: {action}, Reward: {reward:.2f}, Total Reward: {episode_reward:.2f}, Total Cost: €{env.total_cost:.2f}") + if step_count%1000==0: + print(f"Episode {episode + 1}, Step {step_count}, Action: {action}, Reward: {reward:.2f}, Total Reward: {episode_reward:.2f}, Total Cost: €{env.total_cost:.2f}") done = terminated or truncated savings_vs_baseline = env.baseline_cost - env.total_cost @@ -193,7 +202,10 @@ def main(): completion_rate = (env.jobs_completed / env.jobs_submitted * 100) if env.jobs_submitted > 0 else 0 avg_wait = env.total_job_wait_time / env.jobs_completed if env.jobs_completed > 0 else 0 print(f" Episode {episode + 1}: " - f"Cost=€{env.total_cost:.0f}, " + f"Agent Cost=€{env.total_cost:.0f}, " + + f"Baseline Cost=€{env.baseline_cost:.0f} | Baseline Off=€{env.baseline_cost_off:.0f}, " + f"Savings=€{savings_vs_baseline:.0f}/€{savings_vs_baseline_off:.0f}, " f"Jobs={env.jobs_completed}/{env.jobs_submitted} ({completion_rate:.0f}%), " f"AvgWait={avg_wait:.1f}h, " @@ -204,7 +216,7 @@ def main(): # Generate cumulative savings plot session_dir = f"sessions/{args.session}" try: - results = plot_cumulative_savings(env, env.episode_costs, session_dir, months=args.eval_months, save=True, show=args.render == 'human') + results = plot_cumulative_savings(env, env.episode_costs, session_dir, save=True, show=args.render == 'human') if results: print(f"\n=== CUMULATIVE SAVINGS ANALYSIS ===") print(f"\nVs Baseline (with idle nodes):") @@ -240,6 +252,30 @@ def main(): except Exception as e: print(f"Could not generate cumulative savings plot: {e}") + # Optional: single dashboard plot combining the per-hour traces from the LAST episode + # and cumulative savings across all evaluated episodes. + if args.plot_dashboard: + try: + plot_dashboard( + env, + num_hours=args.dashboard_hours, + # max_nodes=env.max_nodes if hasattr(env, "max_nodes") else env.num_nodes if hasattr(env, "num_nodes") else 0, + max_nodes=335, + episode_costs=[ # adapt to what your plot_dashboard expects + { + "agent_cost": ep["agent_cost"], + "baseline_cost": ep["baseline_cost"], + "baseline_cost_off": ep["baseline_cost_off"], + } + for ep in env.episode_costs + ], + save=True, + show=(args.render == "human"), + suffix=f"eval_{args.eval_months}m", + ) + except Exception as e: + print(f"Could not generate dashboard plot: {e}") + print("\nEvaluation complete!") env.close() return @@ -248,12 +284,40 @@ def main(): while True: print(f"Training iteration {iters + 1} ({STEPS_PER_ITERATION * (iters + 1)} steps)...") iters += 1 + t0 = time.time() + if (iters+1)%10==0: + print(f"Running... at {iters + 1} of {STEPS_PER_ITERATION * (iters + 1)} steps") if args.iter_limit > 0 and iters > args.iter_limit: print(f"iterations limit ({args.iter_limit}) reached: {iters}.") break try: model.learn(total_timesteps=STEPS_PER_ITERATION, reset_num_timesteps=False, tb_log_name=f"PPO", callback=ComputeClusterCallback()) + print(f"Iteration {iters} finished in {time.time()-t0:.2f}s") model.save(f"{models_dir}/{STEPS_PER_ITERATION * iters}.zip") + + if args.plot_dashboard: + try: + plot_dashboard( + env, + num_hours=args.dashboard_hours, + # max_nodes=env.max_nodes if hasattr(env, "max_nodes") else env.num_nodes if hasattr(env, "num_nodes") else 0, + max_nodes=335, + episode_costs=[ + { + "agent_cost": ep["agent_cost"], + "baseline_cost": ep["baseline_cost"], + "baseline_cost_off": ep["baseline_cost_off"], + } + for ep in getattr(env, "episode_costs", []) + ], + save=True, + show=False, + suffix=STEPS_PER_ITERATION * iters, + ) + except Exception as e: + print(f"Dashboard plot failed (non-fatal): {e}") + + except PlottingComplete: print("Plotting complete, terminating training...") break diff --git a/train_iter.py b/train_iter.py index 3f205c8..d9a0fc7 100644 --- a/train_iter.py +++ b/train_iter.py @@ -81,7 +81,21 @@ def generate_weight_combinations(step=0.1, fixed_weights=None): return combinations -def run(efficiency_weight, price_weight, idle_weight, job_age_weight, drop_weight, iter_limit_per_step, session, prices, job_durations, jobs, hourly_jobs): +def run( + efficiency_weight, + price_weight, + idle_weight, + job_age_weight, + drop_weight, + iter_limit_per_step, + session, + prices, + job_durations, + jobs, + hourly_jobs, + plot_dashboard=False, + dashboard_hours=24 * 14, +): python_executable = sys.executable command = [ python_executable, "train.py", @@ -97,11 +111,14 @@ def run(efficiency_weight, price_weight, idle_weight, job_age_weight, drop_weigh "--hourly-jobs", f"{hourly_jobs}", "--session", f"{session}" ] + if plot_dashboard: + command += ["--plot-dashboard", "--dashboard-hours", str(dashboard_hours)] + print(f"executing: {command}") current_env = os.environ.copy() - result = subprocess.run(command, capture_output=False, text=True, env=current_env) + result = subprocess.run(command, text=True, env=current_env) if result.returncode != 0: - print(f"Error occurred: {result.stderr}") + print("Error occurred: train.py returned a non-zero exit code.") return result.stdout def parse_fixed_weights(fix_weights_str, fix_values_str): @@ -132,6 +149,9 @@ def main(): parser.add_argument("--fix-weights", type=str, help="Comma-separated list of weights to fix (efficiency,price,idle,job-age,drop)") parser.add_argument("--fix-values", type=str, help="Comma-separated list of values for fixed weights") parser.add_argument("--iter-limit-per-step", type=int, help="Max number of training iterations per step (1 iteration = {TIMESTEPS} steps)") + parser.add_argument("--plot-dashboard", action="store_true", help="Forward to train.py to generate dashboard plots.") + parser.add_argument("--dashboard-hours", type=int, default=24*14, help="Forward to train.py.") + parser.add_argument("--session", help="Session ID") args = parser.parse_args() @@ -155,7 +175,7 @@ def main(): for combo in combinations: efficiency_weight, price_weight, idle_weight, job_age_weight, drop_weight = combo print(f"Running with weights: efficiency={efficiency_weight}, price={price_weight}, idle={idle_weight}, job_age={job_age_weight}, drop={drop_weight}") - run(efficiency_weight, price_weight, idle_weight, job_age_weight, drop_weight, args.iter_limit_per_step, args.session, args.prices, args.job_durations, args.jobs, args.hourly_jobs) + run(efficiency_weight, price_weight, idle_weight, job_age_weight, drop_weight, args.iter_limit_per_step, args.session, args.prices, args.job_durations, args.jobs, args.hourly_jobs,plot_dashboard=args.plot_dashboard,dashboard_hours=args.dashboard_hours) if __name__ == "__main__": main()