diff --git a/src/baseline.py b/src/baseline.py index 666b4ac..f3dc6d3 100644 --- a/src/baseline.py +++ b/src/baseline.py @@ -39,7 +39,11 @@ def baseline_step(baseline_state, baseline_cores_available, baseline_running_job new_jobs_nodes, new_jobs_cores, baseline_next_empty_slot ) metrics['baseline_jobs_submitted'] += len(new_baseline_jobs) + if 'episode_baseline_jobs_submitted' in metrics: + metrics['episode_baseline_jobs_submitted'] += len(new_baseline_jobs) metrics['baseline_jobs_rejected_queue_full'] += (new_jobs_count - len(new_baseline_jobs)) + if 'episode_baseline_jobs_rejected_queue_full' in metrics: + metrics['episode_baseline_jobs_rejected_queue_full'] += (new_jobs_count - len(new_baseline_jobs)) _, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes( job_queue_2d, baseline_state['nodes'], baseline_cores_available, @@ -54,6 +58,9 @@ def baseline_step(baseline_state, baseline_cores_available, baseline_running_job # Track baseline max queue size if num_unprocessed_jobs > metrics['baseline_max_queue_size_reached']: metrics['baseline_max_queue_size_reached'] = num_unprocessed_jobs + if 'episode_baseline_max_queue_size_reached' in metrics: + if num_unprocessed_jobs > metrics['episode_baseline_max_queue_size_reached']: + metrics['episode_baseline_max_queue_size_reached'] = num_unprocessed_jobs baseline_state['job_queue'] = job_queue_2d.flatten() diff --git a/src/environment.py b/src/environment.py index bd7619d..e640606 100644 --- a/src/environment.py +++ b/src/environment.py @@ -84,7 +84,8 @@ def __init__(self, skip_plot_job_queue, steps_per_iteration, evaluation_mode=False, - workload_gen=None): + workload_gen=None, + carry_over_state=False): super().__init__() self.weights = weights @@ -123,6 +124,7 @@ def __init__(self, self.np_random = None self._seed = None self.workload_gen = workload_gen + self.carry_over_state = carry_over_state if self.external_durations: durations_sampler.init(self.external_durations) @@ -170,6 +172,9 @@ def __init__(self, self.min_job_age_penalty = -0.0 self.max_job_age_penalty = PENALTY_WAITING_JOB * MAX_JOB_AGE * MAX_QUEUE_SIZE + self.reset_timeline_state() + self.metrics.reset_episode_metrics() + # actions: - change number of available nodes: # action_type: 0: decrease, 1: maintain, 2: increase # action_magnitude: 0-MAX_CHANGE (+1ed in the action) @@ -210,19 +215,57 @@ def reset(self, seed=None, options=None): self.episode_idx += 1 # Reset metrics - self.metrics.reset_state_metrics() + if self.carry_over_state: + self.metrics.reset_episode_metrics() + else: + self.metrics.reset_state_metrics() + + if not self.carry_over_state: + # Choose starting index in the external price series + if self.prices is not None and getattr(self.prices, "external_prices", None) is not None: + n_prices = len(self.prices.external_prices) + episode_span = EPISODE_HOURS + + # Episode k starts at hour k * episode_span (wrapping around the year) + start_index = (self.episode_idx * episode_span) % n_prices + if options and "price_start_index" in options: # For testing Purposes. Leave out 'options' to advance episode. + start_index = int(options["price_start_index"]) % n_prices + self.prices.reset(start_index=start_index) + else: + # Synthetic prices or no external prices + self.prices.reset(start_index=0) - # Choose starting index in the external price series - if self.prices is not None and getattr(self.prices, "external_prices", None) is not None: - n_prices = len(self.prices.external_prices) - episode_span = EPISODE_HOURS + self.state = { + # Initialize all nodes to be 'online but free' (0) + 'nodes': np.zeros(MAX_NODES, dtype=np.int32), + # Initialize job queue to be empty + 'job_queue': np.zeros((MAX_QUEUE_SIZE * 4), dtype=np.int32), + # Initialize predicted prices array + 'predicted_prices': self.prices.predicted_prices.copy(), + } - # Episode k starts at hour k * episode_span (wrapping around the year) - start_index = (self.episode_idx * episode_span) % n_prices - self.prices.reset(start_index=start_index) - else: - # Synthetic prices or no external prices - self.prices.reset(start_index=0) + self.baseline_state = { + 'nodes': np.zeros(MAX_NODES, dtype=np.int32), + 'job_queue': np.zeros((MAX_QUEUE_SIZE * 4), dtype=np.int32), + } + + self.cores_available = np.full(MAX_NODES, CORES_PER_NODE, dtype=np.int32) + self.baseline_cores_available = np.full(MAX_NODES, CORES_PER_NODE, dtype=np.int32) + + # Job tracking: { job_id: {'duration': remaining_hours, 'allocation': [(node_idx1, cores1), ...]}, ... } + self.running_jobs = {} + self.baseline_running_jobs = {} + + self.next_job_id = 0 # shared between baseline and normal jobs + + # Track next empty slot in job queue for O(1) insertion + self.next_empty_slot = 0 + self.baseline_next_empty_slot = 0 + + return self.state, {} + + def reset_timeline_state(self): + self.metrics.current_hour = 0 self.state = { # Initialize all nodes to be 'online but free' (0) @@ -240,7 +283,6 @@ def reset(self, seed=None, options=None): self.cores_available = np.full(MAX_NODES, CORES_PER_NODE, dtype=np.int32) self.baseline_cores_available = np.full(MAX_NODES, CORES_PER_NODE, dtype=np.int32) - # Job tracking: { job_id: {'duration': remaining_hours, 'allocation': [(node_idx1, cores1), ...]}, ... } self.running_jobs = {} self.baseline_running_jobs = {} @@ -251,8 +293,6 @@ def reset(self, seed=None, options=None): self.next_empty_slot = 0 self.baseline_next_empty_slot = 0 - return self.state, {} - def step(self, action): self.current_step += 1 self.metrics.current_hour += 1 @@ -288,7 +328,9 @@ def step(self, action): new_jobs_nodes, new_jobs_cores, self.next_empty_slot ) self.metrics.jobs_submitted += len(new_jobs) + self.metrics.episode_jobs_submitted += len(new_jobs) self.metrics.jobs_rejected_queue_full += (new_jobs_count - len(new_jobs)) + self.metrics.episode_jobs_rejected_queue_full += (new_jobs_count - len(new_jobs)) self.env_print("nodes: ", np.array2string(self.state['nodes'], separator=' ', max_line_width=np.inf)) self.env_print(f"cores_available: {np.array2string(self.cores_available, separator=' ', max_line_width=np.inf)} ({np.sum(self.cores_available)})") @@ -314,6 +356,12 @@ def step(self, action): 'baseline_total_job_wait_time': self.metrics.baseline_total_job_wait_time, 'baseline_jobs_dropped': self.metrics.baseline_jobs_dropped, 'baseline_dropped_this_episode': self.metrics.baseline_dropped_this_episode, + 'episode_jobs_completed': self.metrics.episode_jobs_completed, + 'episode_total_job_wait_time': self.metrics.episode_total_job_wait_time, + 'episode_jobs_dropped': self.metrics.episode_jobs_dropped, + 'episode_baseline_jobs_completed': self.metrics.episode_baseline_jobs_completed, + 'episode_baseline_total_job_wait_time': self.metrics.episode_baseline_total_job_wait_time, + 'episode_baseline_jobs_dropped': self.metrics.episode_baseline_jobs_dropped, } num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id = assign_jobs_to_available_nodes( @@ -326,6 +374,9 @@ def step(self, action): self.metrics.total_job_wait_time = job_metrics['total_job_wait_time'] self.metrics.jobs_dropped = job_metrics['jobs_dropped'] self.metrics.dropped_this_episode = job_metrics['dropped_this_episode'] + self.metrics.episode_jobs_completed = job_metrics['episode_jobs_completed'] + self.metrics.episode_total_job_wait_time = job_metrics['episode_total_job_wait_time'] + self.metrics.episode_jobs_dropped = job_metrics['episode_jobs_dropped'] self.env_print(f" {num_launched_jobs} jobs launched") @@ -347,6 +398,8 @@ def step(self, action): # Track max queue size if num_unprocessed_jobs > self.metrics.max_queue_size_reached: self.metrics.max_queue_size_reached = num_unprocessed_jobs + if num_unprocessed_jobs > self.metrics.episode_max_queue_size_reached: + self.metrics.episode_max_queue_size_reached = num_unprocessed_jobs self.env_print(f"[5] Calculating reward...") @@ -359,6 +412,12 @@ def step(self, action): 'baseline_jobs_dropped': self.metrics.baseline_jobs_dropped, 'baseline_dropped_this_episode': self.metrics.baseline_dropped_this_episode, 'baseline_max_queue_size_reached': self.metrics.baseline_max_queue_size_reached, + 'episode_baseline_jobs_submitted': self.metrics.episode_baseline_jobs_submitted, + 'episode_baseline_jobs_rejected_queue_full': self.metrics.episode_baseline_jobs_rejected_queue_full, + 'episode_baseline_jobs_completed': self.metrics.episode_baseline_jobs_completed, + 'episode_baseline_total_job_wait_time': self.metrics.episode_baseline_total_job_wait_time, + 'episode_baseline_jobs_dropped': self.metrics.episode_baseline_jobs_dropped, + 'episode_baseline_max_queue_size_reached': self.metrics.episode_baseline_max_queue_size_reached, } baseline_cost, baseline_cost_off, self.baseline_next_empty_slot, self.next_job_id = baseline_step( @@ -375,9 +434,17 @@ def step(self, action): self.metrics.baseline_jobs_dropped = baseline_metrics['baseline_jobs_dropped'] self.metrics.baseline_dropped_this_episode = baseline_metrics['baseline_dropped_this_episode'] self.metrics.baseline_max_queue_size_reached = baseline_metrics['baseline_max_queue_size_reached'] + self.metrics.episode_baseline_jobs_submitted = baseline_metrics['episode_baseline_jobs_submitted'] + self.metrics.episode_baseline_jobs_rejected_queue_full = baseline_metrics['episode_baseline_jobs_rejected_queue_full'] + self.metrics.episode_baseline_jobs_completed = baseline_metrics['episode_baseline_jobs_completed'] + self.metrics.episode_baseline_total_job_wait_time = baseline_metrics['episode_baseline_total_job_wait_time'] + self.metrics.episode_baseline_jobs_dropped = baseline_metrics['episode_baseline_jobs_dropped'] + self.metrics.episode_baseline_max_queue_size_reached = baseline_metrics['episode_baseline_max_queue_size_reached'] self.metrics.baseline_cost += baseline_cost self.metrics.baseline_cost_off += baseline_cost_off + self.metrics.episode_baseline_cost += baseline_cost + self.metrics.episode_baseline_cost_off += baseline_cost_off # Calculate reward reward_bounds = { @@ -400,6 +467,7 @@ def step(self, action): self.metrics.episode_reward += step_reward self.metrics.total_cost += step_cost + self.metrics.episode_total_cost += step_cost # Store normalized reward components for plotting self.metrics.eff_rewards.append(eff_reward_norm * 100) @@ -451,4 +519,11 @@ def step(self, action): self.env_print(Fore.GREEN + f"]]]" + Fore.RESET) - return self.state, step_reward, terminated, truncated, {} + info = { + "step_cost": float(step_cost), + "num_unprocessed_jobs": int(num_unprocessed_jobs), + "num_on_nodes": int(num_on_nodes), + "dropped_this_episode": int(getattr(self.metrics, "dropped_this_episode", 0)), + } + + return self.state, step_reward, terminated, truncated, info diff --git a/src/job_management.py b/src/job_management.py index 0036036..47f70d0 100644 --- a/src/job_management.py +++ b/src/job_management.py @@ -156,9 +156,15 @@ def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running if is_baseline: metrics['baseline_jobs_completed'] += 1 metrics['baseline_total_job_wait_time'] += job_age + if 'episode_baseline_jobs_completed' in metrics: + metrics['episode_baseline_jobs_completed'] += 1 + metrics['episode_baseline_total_job_wait_time'] += job_age else: metrics['jobs_completed'] += 1 metrics['total_job_wait_time'] += job_age + if 'episode_jobs_completed' in metrics: + metrics['episode_jobs_completed'] += 1 + metrics['episode_total_job_wait_time'] += job_age num_processed_jobs += 1 continue @@ -178,9 +184,13 @@ def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running if is_baseline: metrics['baseline_jobs_dropped'] += 1 metrics['baseline_dropped_this_episode'] += 1 + if 'episode_baseline_jobs_dropped' in metrics: + metrics['episode_baseline_jobs_dropped'] += 1 else: metrics['jobs_dropped'] += 1 metrics['dropped_this_episode'] += 1 + if 'episode_jobs_dropped' in metrics: + metrics['episode_jobs_dropped'] += 1 else: job_queue_2d[job_idx][1] = new_age diff --git a/src/metrics_tracker.py b/src/metrics_tracker.py index 46ba1d1..cb9dc54 100644 --- a/src/metrics_tracker.py +++ b/src/metrics_tracker.py @@ -6,27 +6,47 @@ class MetricsTracker: def __init__(self): """Initialize all metric counters.""" - self.reset_episode_metrics() + self.reset_cumulative_metrics() self.reset_state_metrics() # Cumulative metrics across all episodes self.episode_costs = [] - def reset_episode_metrics(self): + def reset_cumulative_metrics(self): """Reset metrics that persist across episodes.""" - # Job tracking metrics for agent (cumulative across episodes) + # Cost tracking (cumulative across episodes) + self.total_cost = 0 + self.baseline_cost = 0 + self.baseline_cost_off = 0 + + # Agent job metrics (cumulative across episodes) + self.jobs_submitted = 0 + self.jobs_completed = 0 + self.total_job_wait_time = 0 + self.max_queue_size_reached = 0 self.jobs_dropped = 0 self.jobs_rejected_queue_full = 0 - # Job tracking metrics for baseline (cumulative across episodes) + # Baseline job metrics (cumulative across episodes) + self.baseline_jobs_submitted = 0 + self.baseline_jobs_completed = 0 + self.baseline_total_job_wait_time = 0 + self.baseline_max_queue_size_reached = 0 self.baseline_jobs_dropped = 0 self.baseline_jobs_rejected_queue_full = 0 def reset_state_metrics(self): + """Reset timeline-dependent metrics.""" + self.current_hour = 0 + self.reset_episode_metrics() + + def reset_episode_metrics(self): """Reset metrics at the start of each episode.""" # Episode-level metrics - self.current_hour = 0 self.episode_reward = 0 + self.episode_total_cost = 0 + self.episode_baseline_cost = 0 + self.episode_baseline_cost_off = 0 # Time series data for plotting self.on_nodes = [] @@ -39,22 +59,21 @@ def reset_state_metrics(self): self.idle_penalties = [] self.job_age_penalties = [] - # Cost tracking - self.total_cost = 0 - self.baseline_cost = 0 - self.baseline_cost_off = 0 - - # Agent job metrics - self.jobs_submitted = 0 - self.jobs_completed = 0 - self.total_job_wait_time = 0 - self.max_queue_size_reached = 0 - - # Baseline job metrics - self.baseline_jobs_submitted = 0 - self.baseline_jobs_completed = 0 - self.baseline_total_job_wait_time = 0 - self.baseline_max_queue_size_reached = 0 + # Agent job metrics (episode-level) + self.episode_jobs_submitted = 0 + self.episode_jobs_completed = 0 + self.episode_total_job_wait_time = 0 + self.episode_max_queue_size_reached = 0 + self.episode_jobs_dropped = 0 + self.episode_jobs_rejected_queue_full = 0 + + # Baseline job metrics (episode-level) + self.episode_baseline_jobs_submitted = 0 + self.episode_baseline_jobs_completed = 0 + self.episode_baseline_total_job_wait_time = 0 + self.episode_baseline_max_queue_size_reached = 0 + self.episode_baseline_jobs_dropped = 0 + self.episode_baseline_jobs_rejected_queue_full = 0 # Per-episode drop counters self.dropped_this_episode = 0 @@ -71,45 +90,45 @@ def record_episode_completion(self, current_episode): Dictionary with episode data """ # Calculate average wait times - avg_wait_time = self.total_job_wait_time / self.jobs_completed if self.jobs_completed > 0 else 0 - baseline_avg_wait_time = self.baseline_total_job_wait_time / self.baseline_jobs_completed if self.baseline_jobs_completed > 0 else 0 + avg_wait_time = self.episode_total_job_wait_time / self.episode_jobs_completed if self.episode_jobs_completed > 0 else 0 + baseline_avg_wait_time = self.episode_baseline_total_job_wait_time / self.episode_baseline_jobs_completed if self.episode_baseline_jobs_completed > 0 else 0 # Calculate completion rates - completion_rate = (self.jobs_completed / self.jobs_submitted * 100) if self.jobs_submitted > 0 else 0 - baseline_completion_rate = (self.baseline_jobs_completed / self.baseline_jobs_submitted * 100) if self.baseline_jobs_submitted > 0 else 0 + completion_rate = (self.episode_jobs_completed / self.episode_jobs_submitted * 100) if self.episode_jobs_submitted > 0 else 0 + baseline_completion_rate = (self.episode_baseline_jobs_completed / self.episode_baseline_jobs_submitted * 100) if self.episode_baseline_jobs_submitted > 0 else 0 - drop_rate = (self.jobs_dropped / self.jobs_submitted * 100) if self.jobs_submitted else 0.0 - baseline_drop_rate = (self.baseline_jobs_dropped / self.baseline_jobs_submitted * 100) if self.baseline_jobs_submitted else 0.0 + drop_rate = (self.episode_jobs_dropped / self.episode_jobs_submitted * 100) if self.episode_jobs_submitted else 0.0 + baseline_drop_rate = (self.episode_baseline_jobs_dropped / self.episode_baseline_jobs_submitted * 100) if self.episode_baseline_jobs_submitted else 0.0 episode_data = { 'episode': current_episode, - 'agent_cost': float(self.total_cost), - 'baseline_cost': float(self.baseline_cost), - 'baseline_cost_off': float(self.baseline_cost_off), - 'savings_vs_baseline': float(self.baseline_cost - self.total_cost), - 'savings_vs_baseline_off': float(self.baseline_cost_off - self.total_cost), - 'savings_pct_baseline': float(((self.baseline_cost - self.total_cost) / self.baseline_cost) * 100) if self.baseline_cost > 0 else 0, - 'savings_pct_baseline_off': float(((self.baseline_cost_off - self.total_cost) / self.baseline_cost_off) * 100) if self.baseline_cost_off > 0 else 0, + 'agent_cost': float(self.episode_total_cost), + 'baseline_cost': float(self.episode_baseline_cost), + 'baseline_cost_off': float(self.episode_baseline_cost_off), + 'savings_vs_baseline': float(self.episode_baseline_cost - self.episode_total_cost), + 'savings_vs_baseline_off': float(self.episode_baseline_cost_off - self.episode_total_cost), + 'savings_pct_baseline': float(((self.episode_baseline_cost - self.episode_total_cost) / self.episode_baseline_cost) * 100) if self.episode_baseline_cost > 0 else 0, + 'savings_pct_baseline_off': float(((self.episode_baseline_cost_off - self.episode_total_cost) / self.episode_baseline_cost_off) * 100) if self.episode_baseline_cost_off > 0 else 0, 'total_reward': float(self.episode_reward), # Agent job metrics - 'jobs_submitted': self.jobs_submitted, - 'jobs_completed': self.jobs_completed, + 'jobs_submitted': self.episode_jobs_submitted, + 'jobs_completed': self.episode_jobs_completed, 'avg_wait_time': float(avg_wait_time), 'completion_rate': float(completion_rate), - 'max_queue_size': self.max_queue_size_reached, + 'max_queue_size': self.episode_max_queue_size_reached, # Baseline job metrics - 'baseline_jobs_submitted': self.baseline_jobs_submitted, - 'baseline_jobs_completed': self.baseline_jobs_completed, + 'baseline_jobs_submitted': self.episode_baseline_jobs_submitted, + 'baseline_jobs_completed': self.episode_baseline_jobs_completed, 'baseline_avg_wait_time': float(baseline_avg_wait_time), 'baseline_completion_rate': float(baseline_completion_rate), - 'baseline_max_queue_size': self.baseline_max_queue_size_reached, + 'baseline_max_queue_size': self.episode_baseline_max_queue_size_reached, # Drop metrics - "jobs_dropped": self.jobs_dropped, + "jobs_dropped": self.episode_jobs_dropped, "drop_rate": float(drop_rate), - "jobs_rejected_queue_full": self.jobs_rejected_queue_full, - "baseline_jobs_dropped": self.baseline_jobs_dropped, + "jobs_rejected_queue_full": self.episode_jobs_rejected_queue_full, + "baseline_jobs_dropped": self.episode_baseline_jobs_dropped, "baseline_drop_rate": float(baseline_drop_rate), - "baseline_jobs_rejected_queue_full": self.baseline_jobs_rejected_queue_full, + "baseline_jobs_rejected_queue_full": self.episode_baseline_jobs_rejected_queue_full, } self.episode_costs.append(episode_data) return episode_data diff --git a/src/plotter.py b/src/plotter.py new file mode 100644 index 0000000..97c30e2 --- /dev/null +++ b/src/plotter.py @@ -0,0 +1,333 @@ +import matplotlib.pyplot as plt +from matplotlib.gridspec import GridSpec +from datetime import datetime +import numpy as np +import os + + +def _as_series(x, n): + if x is None: + return None + a = np.asarray(x, dtype=float).reshape(-1) + if a.size >= n: + return a[:n] + out = np.full(n, np.nan, dtype=float) + out[:a.size] = a + return out + + +def _compute_cumulative_savings(episode_costs): + """ + episode_costs: list of dicts with keys: + agent_cost, baseline_cost, baseline_cost_off + Returns arrays for plotting. + """ + if not episode_costs: + return None + + cum_s = [] + cum_s_off = [] + monthly_pct = [] + monthly_pct_off = [] + + total = 0.0 + total_off = 0.0 + + for i, ep in enumerate(episode_costs): + agent = float(ep["agent_cost"]) + base = float(ep["baseline_cost"]) + base_off = float(ep["baseline_cost_off"]) + + total += (base - agent) + total_off += (base_off - agent) + cum_s.append(total) + cum_s_off.append(total_off) + + # monthly % every 2 episodes (episode = 2 weeks assumption) + if i % 2 == 1: + prev = episode_costs[i - 1] + month_base = float(prev["baseline_cost"]) + base + month_base_off = float(prev["baseline_cost_off"]) + base_off + month_agent = float(prev["agent_cost"]) + agent + + pct = ((month_base - month_agent) / month_base * 100.0) if month_base > 0 else 0.0 + pct_off = ((month_base_off - month_agent) / month_base_off * 100.0) if month_base_off > 0 else 0.0 + + # duplicate for step-like visualization + monthly_pct.extend([pct, pct]) + monthly_pct_off.extend([pct_off, pct_off]) + + # x-axis in "months" (2-week steps) + n_eps = len(episode_costs) + weeks = (np.arange(1, n_eps + 1) * 2.0) + months = weeks / 4.33 + + # monthly arrays are shorter (only defined at month boundaries) -> pad/align + if len(monthly_pct) < n_eps: + last = monthly_pct[-1] if monthly_pct else 0.0 + monthly_pct = monthly_pct + [last] * (n_eps - len(monthly_pct)) + last_off = monthly_pct_off[-1] if monthly_pct_off else 0.0 + monthly_pct_off = monthly_pct_off + [last_off] * (n_eps - len(monthly_pct_off)) + + return { + "months": months, + "cum_s": np.asarray(cum_s, dtype=float), + "cum_s_off": np.asarray(cum_s_off, dtype=float), + "monthly_pct": np.asarray(monthly_pct[:n_eps], dtype=float), + "monthly_pct_off": np.asarray(monthly_pct_off[:n_eps], dtype=float), + } + + +def plot_dashboard(env, num_hours, max_nodes, episode_costs=None, save=True, show=True, suffix=""): + """ + Per-hour dashboard: price, nodes, queue, reward components, etc. + NOTE: episode_costs is accepted for backwards compatibility but NOT used here anymore. + Cumulative savings now lives in plot_cumulative_savings(). + """ + hours = np.arange(num_hours) + + # ----- header text ----- + metrics = getattr(env, "metrics", None) + completion_rate = ( + (getattr(metrics, "jobs_completed", 0) / getattr(metrics, "jobs_submitted", 0) * 100) + if getattr(metrics, "jobs_submitted", 0) > 0 + else 0.0 + ) + baseline_completion_rate = ( + (getattr(metrics, "baseline_jobs_completed", 0) / getattr(metrics, "baseline_jobs_submitted", 0) * 100) + if getattr(metrics, "baseline_jobs_submitted", 0) > 0 + else 0.0 + ) + avg_wait = ( + (getattr(metrics, "total_job_wait_time", 0) / getattr(metrics, "jobs_completed", 0)) + if getattr(metrics, "jobs_completed", 0) > 0 + else 0.0 + ) + baseline_avg_wait = ( + (getattr(metrics, "baseline_total_job_wait_time", 0) / getattr(metrics, "baseline_jobs_completed", 0)) + if getattr(metrics, "baseline_jobs_completed", 0) > 0 + else 0.0 + ) + + base_cost = float(getattr(env, "baseline_cost", 0.0)) + base_cost_off = float(getattr(env, "baseline_cost_off", 0.0)) + agent_cost = float(getattr(env, "total_cost", 0.0)) + + pct_vs_base = ((base_cost - agent_cost) / base_cost * 100.0) if base_cost > 0 else 0.0 + pct_vs_base_off = ((base_cost_off - agent_cost) / base_cost_off * 100.0) if base_cost_off > 0 else 0.0 + + header = ( + f"{env.session} | ep:{env.current_episode} step:{env.current_step} | {env.weights}\n" + f"Cost: €{agent_cost:.0f}, Base: €{base_cost:.0f} (+{base_cost - agent_cost:.0f}, {pct_vs_base:.1f}%), " + f"Base_Off: €{base_cost_off:.0f} (+{base_cost_off - agent_cost:.0f}, {pct_vs_base_off:.1f}%)\n" + f"Jobs: {env.jobs_completed}/{env.jobs_submitted} ({completion_rate:.0f}%, wait={avg_wait:.1f}h, Q={env.max_queue_size_reached}) | " + f"Base: {env.baseline_jobs_completed}/{env.baseline_jobs_submitted} ({baseline_completion_rate:.0f}%, wait={baseline_avg_wait:.1f}h, Q={env.baseline_max_queue_size_reached})" + ) + + # ----- collect per-hour panels (one / panel, optional overlay) ----- + panels = [] + + def add_panel(title, series, ylabel, ylim=None, overlay=None): + """ + overlay: optional (label, series2) + """ + s = _as_series(series, num_hours) + if s is None: + return + + ov = None + if overlay is not None: + ov_label, ov_series = overlay + s2 = _as_series(ov_series, num_hours) + if s2 is not None: + ov = (ov_label, s2) + + panels.append((title, s, ylabel, ylim, ov)) + + # Price + if not getattr(env, "skip_plot_price", False): + add_panel("Electricity price", getattr(env, "price_stats", None), "€/MWh", None) + + # Nodes + if not getattr(env, "skip_plot_online_nodes", False): + add_panel("Online nodes", getattr(env, "on_nodes", None), "count", (0, max_nodes * 1.1)) + if not getattr(env, "skip_plot_used_nodes", False): + add_panel("Used nodes", getattr(env, "used_nodes", None), "count", (0, max_nodes)) + + # Queue + running jobs (same plot) + if not getattr(env, "skip_plot_job_queue", False): + running_series = getattr(env, "running_jobs_counts", None) + add_panel( + "Job queue & running jobs", + getattr(env, "job_queue_sizes", None), + "jobs", + None, + overlay=("Running jobs", running_series), + ) + + # Reward components + if getattr(env, "plot_eff_reward", False): + add_panel("Efficiency reward (%)", getattr(env, "eff_rewards", None), "score", None) + if getattr(env, "plot_price_reward", False): + add_panel("Price reward (%)", getattr(env, "price_rewards", None), "score", None) + if getattr(env, "plot_idle_penalty", False): + add_panel("Idle penalty (%)", getattr(env, "idle_penalties", None), "score", None) + if getattr(env, "plot_job_age_penalty", False): + add_panel("Job-age penalty (%)", getattr(env, "job_age_penalties", None), "score", None) + if getattr(env, "plot_total_reward", False): + add_panel("Total reward", getattr(env, "rewards", None), "reward", None) + + if not panels: + print("plot_dashboard(): nothing to plot.") + return + + n_pan = len(panels) + ncols = 2 if n_pan <= 6 else 3 + nrows = int(np.ceil(n_pan / ncols)) + + fig = plt.figure(figsize=(14, 3.2 * nrows)) + gs = GridSpec(nrows, ncols, figure=fig) + + # Place panel axes + axs = [] + for i in range(nrows * ncols): + r = i // ncols + c = i % ncols + axs.append(fig.add_subplot(gs[r, c])) + + # Plot per-hour panels + for idx, (title, s, ylabel, ylim, overlay) in enumerate(panels): + ax = axs[idx] + # main series + ax.plot(hours, s, label=title) + # overlay series (e.g. running jobs) + if overlay is not None: + ov_label, s2 = overlay + ax.plot(hours, s2, label=ov_label, linestyle="--") + ax.legend(fontsize=7) + + ax.set_title(title, fontsize=9, pad=2) + ax.set_ylabel(ylabel, fontsize=9) + ax.grid(True, alpha=0.25) + ax.tick_params(labelsize=8) + if ylim is not None: + ax.set_ylim(*ylim) + + # Hide unused axes + for j in range(n_pan, nrows * ncols): + axs[j].axis("off") + + # Shared x-label + for ax in axs[(nrows - 1) * ncols : nrows * ncols]: + if ax.has_data(): + ax.set_xlabel("Hours", fontsize=9) + + # Header text + fig.subplots_adjust(top=0.82, left=0.06, right=0.98, bottom=0.06, hspace=0.45, wspace=0.25) + fig.text(0.01, 0.99, header, ha="left", va="top", fontsize=9, family="monospace") + + # Save/show + prefix = f"e{env.weights.efficiency_weight}_p{env.weights.price_weight}_i{env.weights.idle_weight}_d{env.weights.job_age_weight}" + if save: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + fname = f"{prefix}_{suffix}_{timestamp}.png" + save_path = os.path.join(getattr(env, "plots_dir", "."), fname) + plt.savefig(save_path, dpi=200, bbox_inches="tight") + print(f"Dashboard figure saved as: {save_path}") + if show: + plt.show() + plt.close(fig) + + +def plot_cumulative_savings(env, episode_costs, session_dir=None, save=True, show=True, suffix=""): + """ + Separate canvas for long-term cumulative savings & monthly % savings. + """ + data = _compute_cumulative_savings(episode_costs) + if data is None: + print("plot_cumulative_savings(): no episode_costs, skipping.") + return None + + months = data["months"] + cum_s = data["cum_s"] + cum_s_off = data["cum_s_off"] + monthly_pct = data["monthly_pct"] + monthly_pct_off = data["monthly_pct_off"] + + # Basic stats + final_savings = float(cum_s[-1]) + final_savings_off = float(cum_s_off[-1]) + avg_monthly_savings = float(np.mean(monthly_pct)) if monthly_pct.size > 0 else 0.0 + avg_monthly_savings_off = float(np.mean(monthly_pct_off)) if monthly_pct_off.size > 0 else 0.0 + + fig, ax1 = plt.subplots(figsize=(14, 8)) + + # Primary axis - cumulative savings (€) + ax1.set_xlabel("Time (months)", fontsize=12) + ax1.set_ylabel("Cumulative savings (€)", fontsize=12) + line1 = ax1.plot(months, cum_s, linewidth=3, label="Savings vs baseline (with idle)") + line1b = ax1.plot(months, cum_s_off, linewidth=3, linestyle="--", label="Savings vs baseline_off (no idle)") + ax1.tick_params(axis="y") + ax1.grid(True, alpha=0.3) + + # Secondary axis - monthly savings % + ax2 = ax1.twinx() + ax2.set_ylabel("Monthly savings (%)", fontsize=12) + line2 = ax2.plot(months, monthly_pct, linewidth=2, linestyle=":", alpha=0.7, label="Monthly % (vs baseline)") + line2b = ax2.plot(months, monthly_pct_off, linewidth=2, linestyle=":", alpha=0.7, label="Monthly % (vs baseline_off)") + ax2.tick_params(axis="y") + + max_pct = max( + float(np.max(monthly_pct)) if monthly_pct.size > 0 else 0.0, + float(np.max(monthly_pct_off)) if monthly_pct_off.size > 0 else 0.0, + ) + ax2.set_ylim(0, max_pct * 1.1 if max_pct > 0 else 100) + + # Title and summary box + weights_str = str(getattr(env, "weights", "")) + plt.title( + f"PowerSched Long-Term Cost Savings Analysis\n{weights_str}\n" + f"Savings vs Baseline: €{final_savings:,.0f} ({avg_monthly_savings:.1f}% avg) | " + f"Savings vs Baseline_off: €{final_savings_off:,.0f} ({avg_monthly_savings_off:.1f}% avg)", + fontsize=14, + pad=20, + ) + + textstr = ( + f"Vs Baseline (with idle):\n" + f" €{final_savings:,.0f} | {avg_monthly_savings:.1f}%\n" + f"Vs Baseline_off (no idle):\n" + f" €{final_savings_off:,.0f} | {avg_monthly_savings_off:.1f}%" + ) + props = dict(boxstyle="round", facecolor="wheat", alpha=0.8) + ax1.text(0.02, 0.98, textstr, transform=ax1.transAxes, fontsize=10, verticalalignment="top", bbox=props) + + # Combine legends + lines = line1 + line1b + line2 + line2b + labels = [l.get_label() for l in lines] + ax1.legend(lines, labels, loc="center right", fontsize=9) + + plt.tight_layout() + + # Save/show + prefix = f"e{env.weights.efficiency_weight}_p{env.weights.price_weight}_i{env.weights.idle_weight}_d{env.weights.job_age_weight}" + if session_dir is None: + session_dir = getattr(env, "plots_dir", ".") + if save: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + fname = f"cumulative_savings_{prefix}_{suffix}_{timestamp}.png" + save_path = os.path.join(session_dir, fname) + plt.savefig(save_path, dpi=300, bbox_inches="tight") + print(f"Cumulative savings figure saved: {save_path}") + + if show: + plt.show() + + plt.close(fig) + + return { + "total_savings": final_savings, + "avg_monthly_savings_pct": avg_monthly_savings, + "total_savings_off": final_savings_off, + "avg_monthly_savings_pct_off": avg_monthly_savings_off, + } diff --git a/test/test_sanity_env.py b/test/test_sanity_env.py new file mode 100644 index 0000000..5f25bbe --- /dev/null +++ b/test/test_sanity_env.py @@ -0,0 +1,412 @@ +#!/usr/bin/env python3 + +'''Standalone sanity check for Env, that instantiates the env the same way "train.py" does, but runs a battery of check +like API compliance, invariants, determinism.''' + +'''Use it as: +- Quick invariant run: python sanity_env.py --steps 200 +- Full belt-and-braces: python sanity_env.py --check-gym --check-determinism --steps 300 +- With external data: python sanity_env.py --prices <> --hourly-jobs <> --steps 300 --print-job-every 50 --print-job-kind both +''' + +import argparse +import copy +import numpy as np + +from gymnasium.utils.env_checker import check_env + +from src.environment import ComputeClusterEnv, Weights +import pandas as pd +from src.workloadgen import WorkloadGenerator, WorkloadGenConfig + +# Import environment variables: +from src.config import ( + MAX_JOB_DURATION, + MIN_NODES_PER_JOB, MAX_NODES_PER_JOB, + MIN_CORES_PER_JOB, + CORES_PER_NODE, EPISODE_HOURS +) + + +def load_prices(prices_file_path: str | None): + if not prices_file_path: + return None + df = pd.read_csv(prices_file_path, parse_dates=["Date"]) + prices = df["Price"].astype(float).tolist() + print(f"Loaded {len(prices)} prices from CSV: {prices_file_path}") + return prices + +# ----------------------------- +# Invariants / sanity checks +# ----------------------------- +def _extract(obs): + nodes = obs["nodes"] + q = obs["job_queue"].reshape(-1, 4) + prices = obs["predicted_prices"] + return nodes, q, prices + +def check_invariants(env, obs): + nodes, q, prices = _extract(obs) + + # ---- Shapes ---- + assert nodes.ndim == 1, nodes.shape + assert q.ndim == 2 and q.shape[1] == 4, q.shape + assert prices.shape == (24,), prices.shape + + # ---- Node bounds ---- + assert np.all(nodes >= -1), f"nodes < -1 exists, min={nodes.min()}" + assert np.all(nodes <= 170), f"nodes > MAX_JOB_DURATION exists, max={nodes.max()}" + + # ---- Predicted prices must be finite ---- + assert np.all(np.isfinite(prices)), "predicted_prices contains NaN/inf" + + # ---- cores_available invariants (from env, not obs) ---- + ca = env.cores_available + assert ca.shape == nodes.shape + assert np.all((ca >= 0) & (ca <= 96)), f"cores_available out of bounds min={ca.min()} max={ca.max()}" + + off = (nodes == -1) + idle = (nodes == 0) + + assert np.all(ca[off] == 0), "off nodes must have 0 cores_available" + assert np.all(ca[idle] == 96), "idle nodes must have full cores_available" + + # ---- Queue invariants ---- + dur = q[:, 0] + age = q[:, 1] + nn = q[:, 2] + cpn = q[:, 3] + + # slots either "all zeros" or "duration > 0" + all_zero = np.all(q == 0, axis=1) + assert np.all((dur == 0) == all_zero), "queue has partially-zero slots (corruption / holes)" + + # bounds (based on your constants; keep hard-coded here to avoid importing env constants) + assert np.all((dur >= 0) & (dur <= 170)), f"duration out of bounds min={dur.min()} max={dur.max()}" + assert np.all((age >= 0) & (age <= 168)), f"age out of bounds min={age.min()} max={age.max()}" + + # if a job exists, its nodes/cores should be positive and within limits + active = (dur > 0) + if np.any(active): + assert np.all((nn[active] >= 1) & (nn[active] <= 16)), f"job nnodes out of bounds nn={nn[active]}" + assert np.all((cpn[active] >= 1) & (cpn[active] <= 96)), f"cores_per_node out of bounds cpn={cpn[active]}" + + # ---- next_empty_slot sanity (optional) ---- + if hasattr(env, "next_empty_slot"): + ne = env.next_empty_slot + if ne < len(q): + assert q[ne, 0] == 0, "next_empty_slot does not point at an empty slot" + if ne > 0: + assert np.all(q[:ne, 0] != 0), "hole exists before next_empty_slot" + + +def check_obs_is_copy(env, obs): + # ensure obs doesn't alias env.state buffers + before = int(env.state["nodes"][0]) + obs["nodes"][0] = 12345 + after = int(env.state["nodes"][0]) + assert after == before, "obs['nodes'] aliases env.state['nodes'] (not a copy)" + + +def determinism_test(make_env, seed, n_steps=200): + # fixed action sequence + env0 = make_env() + env0.reset(seed=seed, options={"price_start_index": 0}) + actions = [env0.action_space.sample() for _ in range(n_steps)] + env0.close() + + def rollout(): + env = make_env() + # Pin external price window so determinism doesn't vary by episode. + _obs, _ = env.reset(seed=seed, options={"price_start_index": 0}) + traj = [] + done = False + i = 0 + while not done and i < n_steps: + _obs, r, term, trunc, info = env.step(actions[i]) + # record a small fingerprint + traj.append(( + float(r), + float(info.get("step_cost", 0.0)), + int(info.get("num_unprocessed_jobs", -1)), + int(info.get("num_on_nodes", -1)), + int(info.get("dropped_this_episode", -1)), + )) + done = term or trunc + i += 1 + env.close() + return traj + + a = rollout() + b = rollout() + assert a == b, "Determinism failed: same seed + same actions produced different trajectories" + +def carry_over_test(make_env, seed, n_steps=1): + env = make_env() + obs, _ = env.reset(seed=seed) + env.action_space.seed(seed) + actions = [env.action_space.sample() for _ in range(n_steps)] + for action in actions: + obs, r, term, trunc, info = env.step(action) + if term or trunc: + break + + snapshot = { + "nodes": env.state["nodes"].copy(), + "job_queue": env.state["job_queue"].copy(), + "predicted_prices": env.state["predicted_prices"].copy(), + "cores_available": env.cores_available.copy(), + "running_jobs": copy.deepcopy(env.running_jobs), + "price_index": env.prices.price_index, + "next_job_id": env.next_job_id, + "next_empty_slot": env.next_empty_slot, + "current_hour": env.metrics.current_hour, + } + + env.reset(seed=seed) + + assert np.array_equal(env.state["nodes"], snapshot["nodes"]), "carry-over failed: nodes reset" + assert np.array_equal(env.state["job_queue"], snapshot["job_queue"]), "carry-over failed: job_queue reset" + assert np.array_equal(env.state["predicted_prices"], snapshot["predicted_prices"]), "carry-over failed: predicted_prices reset" + assert np.array_equal(env.cores_available, snapshot["cores_available"]), "carry-over failed: cores_available reset" + assert env.running_jobs == snapshot["running_jobs"], "carry-over failed: running_jobs reset" + assert env.prices.price_index == snapshot["price_index"], "carry-over failed: price_index reset" + assert env.next_job_id == snapshot["next_job_id"], "carry-over failed: next_job_id reset" + assert env.next_empty_slot == snapshot["next_empty_slot"], "carry-over failed: next_empty_slot reset" + assert env.metrics.current_hour == snapshot["current_hour"], "carry-over failed: current_hour reset" + env.close() + + +# ----------------------------- +# CLI + env construction +# ----------------------------- +def parse_args(): + p = argparse.ArgumentParser(description="Sanity checks for ComputeClusterEnv (no training).") + p.add_argument("--seed", type=int, default=123) + p.add_argument("--steps", type=int, default=500) + p.add_argument("--episodes", type=int, default=1) + p.add_argument("--check-gym", action="store_true", help="Run gymnasium check_env") + p.add_argument("--check-determinism", action="store_true") + # mirror train.py-ish knobs (mostly optional) + p.add_argument("--session", default="sanity") + p.add_argument("--render", type=str, default="none", choices=["human", "none"]) + p.add_argument("--prices", default="") + p.add_argument("--job-durations", default="") + p.add_argument("--jobs", default="") + p.add_argument("--hourly-jobs", default="") + p.add_argument("--efficiency-weight", type=float, default=0.5) + p.add_argument("--price-weight", type=float, default=0.2) + p.add_argument("--idle-weight", type=float, default=0.1) + p.add_argument("--job-age-weight", type=float, default=0.1) + p.add_argument("--drop-weight", type=float, default=0.1) + p.add_argument("--workload-gen",type=str,default="",choices=["", "flat", "poisson"],help="Enable workload generator (default: disabled).",) + p.add_argument("--wg-poisson-lambda", type=float, default=200.0, help="Poisson lambda for jobs/hour.") + p.add_argument("--wg-max-jobs-hour", type=int, default=1500, help="Cap jobs/hour for generator.") + p.add_argument("--print-job-every", type=int, default=0, + help="Print one sample job every N steps (0 disables).") + p.add_argument("--print-job-kind", choices=["queue", "running", "both"], default="queue", + help="Where to sample the job from.") + p.add_argument("--print-job-index", type=int, default=-1, + help="Queue index to print (>=0), or -1 to print first active job.") + p.add_argument("--carry-over-state", action="store_true", + help="Carry over nodes/jobs/prices across episodes (timeline mode).") + + + return p.parse_args() + + +def make_env_from_args(args, env_cls=ComputeClusterEnv): + weights = Weights( + efficiency_weight=args.efficiency_weight, + price_weight=args.price_weight, + idle_weight=args.idle_weight, + job_age_weight=args.job_age_weight, + drop_weight=args.drop_weight + ) + + workload_gen = None + if args.workload_gen: + cfg = WorkloadGenConfig( + arrivals=args.workload_gen, + poisson_lambda=args.wg_poisson_lambda, + max_new_jobs_per_hour=args.wg_max_jobs_hour, + min_duration=1, + max_duration=MAX_JOB_DURATION, + min_nodes=MIN_NODES_PER_JOB, + max_nodes=MAX_NODES_PER_JOB, + min_cores=MIN_CORES_PER_JOB, + max_cores=CORES_PER_NODE, + ) + workload_gen = WorkloadGenerator(cfg) + + + # Train.py passes strings; the env treats "" as falsy in some places and truthy in others. + # To be safe: normalize "" -> None here. + def norm_path(x): + return None if (x is None or str(x).strip() == "") else x + + return env_cls( + weights=weights, + session=args.session, + render_mode=args.render, + quick_plot=False, + external_prices=load_prices(args.prices), + external_durations=norm_path(args.job_durations), + external_jobs=norm_path(args.jobs), + external_hourly_jobs=norm_path(args.hourly_jobs), + plot_rewards=False, + plots_dir="plots", + plot_once=False, + plot_eff_reward=False, + plot_price_reward=False, + plot_idle_penalty=False, + plot_job_age_penalty=False, + skip_plot_price=True, + skip_plot_online_nodes=True, + skip_plot_used_nodes=True, + skip_plot_job_queue=True, + steps_per_iteration=EPISODE_HOURS, # prevent plot cadence surprises + evaluation_mode=False, + # plot_total_reward=False, + workload_gen=workload_gen, + carry_over_state=args.carry_over_state + ) + +def maybe_print_job(env, obs, step_idx, every, kind="queue", job_index=-1): + if not every or every <= 0: + return + if step_idx % every != 0: + return + + nodes, q, prices = _extract(obs) + + def print_queue_job(): + active = np.flatnonzero(q[:, 0] > 0) + if active.size == 0: + print(f"[job@step {step_idx}] queue empty") + return + idx = int(active[0]) if job_index < 0 else int(job_index) + if idx < 0 or idx >= q.shape[0]: + print(f"[job@step {step_idx}] invalid queue index {idx}") + return + d, a, nn, cpn = map(int, q[idx]) + print(f"[job@step {step_idx}] QUEUE idx={idx}: dur_h={d} age_h={a} nodes={nn} cores_per_node={cpn}") + + def print_running_job(): + if not getattr(env, "running_jobs", None): + print(f"[job@step {step_idx}] running_jobs empty") + return + # deterministic-ish: smallest job_id + job_id = sorted(env.running_jobs.keys())[0] + jd = env.running_jobs[job_id] + dur = int(jd["duration"]) + alloc = jd.get("allocation", []) + nn = len(alloc) + cpn = int(alloc[0][1]) if alloc else 0 + node_ids = [int(x[0]) for x in alloc[:8]] + more = "" if len(alloc) <= 8 else f" (+{len(alloc)-8} more)" + print(f"[job@step {step_idx}] RUNNING job_id={job_id}: rem_h={dur} nodes={nn} cores_per_node={cpn} node_idxs={node_ids}{more}") + + if kind in ("queue", "both"): + print_queue_job() + if kind in ("running", "both"): + print_running_job() + + + +def main(): + args = parse_args() + + class DeterministicPriceEnv(ComputeClusterEnv): + def reset(self, seed=None, options=None): + if options is None: + options = {} + if seed is not None and "price_start_index" not in options: + options = dict(options) + options["price_start_index"] = 0 + return super().reset(seed=seed, options=options) + + def make_env_with_carry(carry_over_state, env_cls=ComputeClusterEnv): + local_args = argparse.Namespace(**vars(args)) + local_args.carry_over_state = carry_over_state + return make_env_from_args(local_args, env_cls=env_cls) + +# ------------------------------------- + + seed = 123 + action = np.array([1, 0], dtype=np.int64) # "maintain, magnitude 1" effectively + + env = make_env_with_carry(False, env_cls=DeterministicPriceEnv) + + o1, _ = env.reset(seed=seed) + o1s, r1, t1, tr1, i1 = env.step(action) + + o2, _ = env.reset(seed=seed) + o2s, r2, t2, tr2, i2 = env.step(action) + + def cmp(name, a, b): + eq = np.array_equal(a, b) + print(name, "equal:", eq) + if not eq: + # show first mismatch + idx = np.flatnonzero(a.flatten() != b.flatten())[0] + print(" first mismatch idx:", idx, "a:", a.flatten()[idx], "b:", b.flatten()[idx]) + + cmp("nodes", o1s["nodes"], o2s["nodes"]) + cmp("job_queue", o1s["job_queue"], o2s["job_queue"]) + cmp("predicted_prices", o1s["predicted_prices"], o2s["predicted_prices"]) + print("reward", r1, r2) + print("info.current_price", i1.get("current_price"), i2.get("current_price")) + +#---------------------------------------- + + # 1) Gym API compliance (optional) + if args.check_gym: + # Pin external price window so gym's determinism check is meaningful. + env = make_env_with_carry(False, env_cls=DeterministicPriceEnv) + check_env(env, skip_render_check=True) + env.close() + print("[OK] gymnasium check_env passed") + + # 2) Invariants + copy checks during random rollout + env = make_env_from_args(args) + for ep in range(args.episodes): + obs, info = env.reset(seed=args.seed + ep) + env.action_space.seed(args.seed + ep) # IMPORTANT: deterministic action sampling + check_invariants(env, obs) + #check_obs_is_copy(env, obs) + + done = False + steps = 0 + while not done and steps < args.steps: + action = env.action_space.sample() + obs, r, term, trunc, info = env.step(action) + maybe_print_job( + env, obs, + step_idx=steps, + every=args.print_job_every, + kind=args.print_job_kind, + job_index=args.print_job_index, + ) + check_invariants(env, obs) + done = term or trunc + steps += 1 + + print(f"[OK] episode={ep} steps={steps} done={done}") + env.close() + + # 3) Determinism (optional) + if args.check_determinism: + determinism_test(lambda: make_env_with_carry(False), seed=args.seed, n_steps=min(args.steps, 500)) + print("[OK] determinism test passed") + + # 4) Carry-over continuity (optional) + if args.carry_over_state: + carry_over_test(lambda: make_env_with_carry(True), seed=args.seed, n_steps=min(args.steps, 10)) + print("[OK] carry-over continuity test passed") + + print("done") + + +if __name__ == "__main__": + main() diff --git a/train.py b/train.py index e934009..7aefeeb 100644 --- a/train.py +++ b/train.py @@ -2,13 +2,13 @@ import os from src.environment import ComputeClusterEnv, Weights, PlottingComplete from src.callbacks import ComputeClusterCallback -from src.plot import plot_cumulative_savings +from src.plotter import plot_dashboard, plot_cumulative_savings import re import glob import argparse import pandas as pd from src.workloadgen import WorkloadGenerator, WorkloadGenConfig - +import time # Import environment constants from config module: from src.config import ( MAX_JOB_DURATION, @@ -39,6 +39,7 @@ def main(): parser.add_argument('--plot-price-reward', action='store_true', help='Include price reward in the plot (dashed line).') parser.add_argument('--plot-idle-penalty', action='store_true', help='Include idle penalty in the plot (dashed line).') parser.add_argument('--plot-job-age-penalty', action='store_true', help='Include job age penalty in the plot (dashed line).') + parser.add_argument('--plot-total-reward', action='store_true', help='Include total reward per step in the dashboard (raw values).') parser.add_argument('--skip-plot-price', action='store_true', help='Skip electricity price in the plot (blue line).') parser.add_argument('--skip-plot-online-nodes', action='store_true', help='Skip online nodes in the plot (blue line).') parser.add_argument('--skip-plot-used-nodes', action='store_true', help='Skip used nodes in the plot (blue line).') @@ -56,6 +57,9 @@ def main(): parser.add_argument("--workload-gen", type=str, default="", choices=["", "flat", "poisson", "uniform"], help="Enable workload generator (default: disabled).",) parser.add_argument("--wg-poisson-lambda", type=float, default=200.0, help="Poisson lambda for jobs/hour.") parser.add_argument("--wg-max-jobs-hour", type=int, default=1500, help="Cap jobs/hour for generator.") + parser.add_argument("--plot-dashboard", action="store_true", help="Generate dashboard plot (per-hour panels + cumulative savings).") + parser.add_argument("--dashboard-hours", type=int, default=24*14, help="Hours to show in dashboard time-series panels (default: 336).") + args = parser.parse_args() @@ -145,10 +149,14 @@ def main(): model_files.sort(key=lambda filename: int(re.match(r"(\d+)", os.path.basename(filename)).group())) latest_model_file = model_files[-1] # Get the last file after sorting, which should be the one with the most timesteps print(f"Found a saved model: {latest_model_file}") - model = PPO.load(latest_model_file, env=env, tensorboard_log=log_dir) + model = PPO.load(latest_model_file, env=env, tensorboard_log=log_dir + , n_steps=64, batch_size=64 + ) else: print(f"Starting a new model training...") - model = PPO('MultiInputPolicy', env, verbose=1, tensorboard_log=log_dir, ent_coef=args.ent_coef) + model = PPO('MultiInputPolicy', env, verbose=1, tensorboard_log=log_dir, ent_coef=args.ent_coef + , n_steps=64, batch_size=64 + ) iters = 0 @@ -185,7 +193,8 @@ def main(): obs, reward, terminated, truncated, _ = env.step(action) episode_reward += reward step_count += 1 - # print(f"Episode {episode + 1}, Step {step_count}, Action: {action}, Reward: {reward:.2f}, Total Reward: {episode_reward:.2f}, Total Cost: €{env.total_cost:.2f}") + if step_count%1000==0: + print(f"Episode {episode + 1}, Step {step_count}, Action: {action}, Reward: {reward:.2f}, Total Reward: {episode_reward:.2f}, Total Cost: €{env.total_cost:.2f}") done = terminated or truncated savings_vs_baseline = env.baseline_cost - env.total_cost @@ -193,7 +202,10 @@ def main(): completion_rate = (env.jobs_completed / env.jobs_submitted * 100) if env.jobs_submitted > 0 else 0 avg_wait = env.total_job_wait_time / env.jobs_completed if env.jobs_completed > 0 else 0 print(f" Episode {episode + 1}: " - f"Cost=€{env.total_cost:.0f}, " + f"Agent Cost=€{env.total_cost:.0f}, " + + f"Baseline Cost=€{env.baseline_cost:.0f} | Baseline Off=€{env.baseline_cost_off:.0f}, " + f"Savings=€{savings_vs_baseline:.0f}/€{savings_vs_baseline_off:.0f}, " f"Jobs={env.jobs_completed}/{env.jobs_submitted} ({completion_rate:.0f}%), " f"AvgWait={avg_wait:.1f}h, " @@ -204,7 +216,7 @@ def main(): # Generate cumulative savings plot session_dir = f"sessions/{args.session}" try: - results = plot_cumulative_savings(env, env.episode_costs, session_dir, months=args.eval_months, save=True, show=args.render == 'human') + results = plot_cumulative_savings(env, env.episode_costs, session_dir, save=True, show=args.render == 'human') if results: print(f"\n=== CUMULATIVE SAVINGS ANALYSIS ===") print(f"\nVs Baseline (with idle nodes):") @@ -240,6 +252,30 @@ def main(): except Exception as e: print(f"Could not generate cumulative savings plot: {e}") + # Optional: single dashboard plot combining the per-hour traces from the LAST episode + # and cumulative savings across all evaluated episodes. + if args.plot_dashboard: + try: + plot_dashboard( + env, + num_hours=args.dashboard_hours, + # max_nodes=env.max_nodes if hasattr(env, "max_nodes") else env.num_nodes if hasattr(env, "num_nodes") else 0, + max_nodes=335, + episode_costs=[ # adapt to what your plot_dashboard expects + { + "agent_cost": ep["agent_cost"], + "baseline_cost": ep["baseline_cost"], + "baseline_cost_off": ep["baseline_cost_off"], + } + for ep in env.episode_costs + ], + save=True, + show=(args.render == "human"), + suffix=f"eval_{args.eval_months}m", + ) + except Exception as e: + print(f"Could not generate dashboard plot: {e}") + print("\nEvaluation complete!") env.close() return @@ -248,12 +284,40 @@ def main(): while True: print(f"Training iteration {iters + 1} ({STEPS_PER_ITERATION * (iters + 1)} steps)...") iters += 1 + t0 = time.time() + if (iters+1)%10==0: + print(f"Running... at {iters + 1} of {STEPS_PER_ITERATION * (iters + 1)} steps") if args.iter_limit > 0 and iters > args.iter_limit: print(f"iterations limit ({args.iter_limit}) reached: {iters}.") break try: model.learn(total_timesteps=STEPS_PER_ITERATION, reset_num_timesteps=False, tb_log_name=f"PPO", callback=ComputeClusterCallback()) + print(f"Iteration {iters} finished in {time.time()-t0:.2f}s") model.save(f"{models_dir}/{STEPS_PER_ITERATION * iters}.zip") + + if args.plot_dashboard: + try: + plot_dashboard( + env, + num_hours=args.dashboard_hours, + # max_nodes=env.max_nodes if hasattr(env, "max_nodes") else env.num_nodes if hasattr(env, "num_nodes") else 0, + max_nodes=335, + episode_costs=[ + { + "agent_cost": ep["agent_cost"], + "baseline_cost": ep["baseline_cost"], + "baseline_cost_off": ep["baseline_cost_off"], + } + for ep in getattr(env, "episode_costs", []) + ], + save=True, + show=False, + suffix=STEPS_PER_ITERATION * iters, + ) + except Exception as e: + print(f"Dashboard plot failed (non-fatal): {e}") + + except PlottingComplete: print("Plotting complete, terminating training...") break diff --git a/train_iter.py b/train_iter.py index 3f205c8..d9a0fc7 100644 --- a/train_iter.py +++ b/train_iter.py @@ -81,7 +81,21 @@ def generate_weight_combinations(step=0.1, fixed_weights=None): return combinations -def run(efficiency_weight, price_weight, idle_weight, job_age_weight, drop_weight, iter_limit_per_step, session, prices, job_durations, jobs, hourly_jobs): +def run( + efficiency_weight, + price_weight, + idle_weight, + job_age_weight, + drop_weight, + iter_limit_per_step, + session, + prices, + job_durations, + jobs, + hourly_jobs, + plot_dashboard=False, + dashboard_hours=24 * 14, +): python_executable = sys.executable command = [ python_executable, "train.py", @@ -97,11 +111,14 @@ def run(efficiency_weight, price_weight, idle_weight, job_age_weight, drop_weigh "--hourly-jobs", f"{hourly_jobs}", "--session", f"{session}" ] + if plot_dashboard: + command += ["--plot-dashboard", "--dashboard-hours", str(dashboard_hours)] + print(f"executing: {command}") current_env = os.environ.copy() - result = subprocess.run(command, capture_output=False, text=True, env=current_env) + result = subprocess.run(command, text=True, env=current_env) if result.returncode != 0: - print(f"Error occurred: {result.stderr}") + print("Error occurred: train.py returned a non-zero exit code.") return result.stdout def parse_fixed_weights(fix_weights_str, fix_values_str): @@ -132,6 +149,9 @@ def main(): parser.add_argument("--fix-weights", type=str, help="Comma-separated list of weights to fix (efficiency,price,idle,job-age,drop)") parser.add_argument("--fix-values", type=str, help="Comma-separated list of values for fixed weights") parser.add_argument("--iter-limit-per-step", type=int, help="Max number of training iterations per step (1 iteration = {TIMESTEPS} steps)") + parser.add_argument("--plot-dashboard", action="store_true", help="Forward to train.py to generate dashboard plots.") + parser.add_argument("--dashboard-hours", type=int, default=24*14, help="Forward to train.py.") + parser.add_argument("--session", help="Session ID") args = parser.parse_args() @@ -155,7 +175,7 @@ def main(): for combo in combinations: efficiency_weight, price_weight, idle_weight, job_age_weight, drop_weight = combo print(f"Running with weights: efficiency={efficiency_weight}, price={price_weight}, idle={idle_weight}, job_age={job_age_weight}, drop={drop_weight}") - run(efficiency_weight, price_weight, idle_weight, job_age_weight, drop_weight, args.iter_limit_per_step, args.session, args.prices, args.job_durations, args.jobs, args.hourly_jobs) + run(efficiency_weight, price_weight, idle_weight, job_age_weight, drop_weight, args.iter_limit_per_step, args.session, args.prices, args.job_durations, args.jobs, args.hourly_jobs,plot_dashboard=args.plot_dashboard,dashboard_hours=args.dashboard_hours) if __name__ == "__main__": main()