Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ def baseline_step(baseline_state, baseline_cores_available, baseline_running_job
new_jobs_nodes, new_jobs_cores, baseline_next_empty_slot
)
metrics['baseline_jobs_submitted'] += len(new_baseline_jobs)
if 'episode_baseline_jobs_submitted' in metrics:
metrics['episode_baseline_jobs_submitted'] += len(new_baseline_jobs)
metrics['baseline_jobs_rejected_queue_full'] += (new_jobs_count - len(new_baseline_jobs))
if 'episode_baseline_jobs_rejected_queue_full' in metrics:
metrics['episode_baseline_jobs_rejected_queue_full'] += (new_jobs_count - len(new_baseline_jobs))

_, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, baseline_state['nodes'], baseline_cores_available,
Expand All @@ -54,6 +58,9 @@ def baseline_step(baseline_state, baseline_cores_available, baseline_running_job
# Track baseline max queue size
if num_unprocessed_jobs > metrics['baseline_max_queue_size_reached']:
metrics['baseline_max_queue_size_reached'] = num_unprocessed_jobs
if 'episode_baseline_max_queue_size_reached' in metrics:
if num_unprocessed_jobs > metrics['episode_baseline_max_queue_size_reached']:
metrics['episode_baseline_max_queue_size_reached'] = num_unprocessed_jobs

baseline_state['job_queue'] = job_queue_2d.flatten()

Expand Down
107 changes: 91 additions & 16 deletions src/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ def __init__(self,
skip_plot_job_queue,
steps_per_iteration,
evaluation_mode=False,
workload_gen=None):
workload_gen=None,
carry_over_state=False):
super().__init__()

self.weights = weights
Expand Down Expand Up @@ -123,6 +124,7 @@ def __init__(self,
self.np_random = None
self._seed = None
self.workload_gen = workload_gen
self.carry_over_state = carry_over_state

if self.external_durations:
durations_sampler.init(self.external_durations)
Expand Down Expand Up @@ -170,6 +172,9 @@ def __init__(self,
self.min_job_age_penalty = -0.0
self.max_job_age_penalty = PENALTY_WAITING_JOB * MAX_JOB_AGE * MAX_QUEUE_SIZE

self.reset_timeline_state()
self.metrics.reset_episode_metrics()

Comment on lines +175 to +177
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, find the Prices class and check __init__
rg -n "class Prices" src/ -A 20 | head -50

Repository: FairRootGroup/powersched

Length of output: 2426


🏁 Script executed:

# Also check the environment.py file at the mentioned lines
sed -n '160,190p' src/environment.py

Repository: FairRootGroup/powersched

Length of output: 1410


🏁 Script executed:

# And check lines 268-277 as mentioned
sed -n '260,285p' src/environment.py

Repository: FairRootGroup/powersched

Length of output: 1050


🏁 Script executed:

# Now let's look at the reset_timeline_state implementation
rg -n "def reset_timeline_state" src/environment.py -A 10

Repository: FairRootGroup/powersched

Length of output: 577


🏁 Script executed:

# Search for reset method in Prices class
rg -n "def reset" src/prices_deterministic.py -A 15

Repository: FairRootGroup/powersched

Length of output: 796


🏁 Script executed:

# Check the full environment.__init__ to see the complete sequence
rg -n "def __init__" src/environment.py -A 80 | head -120

Repository: FairRootGroup/powersched

Length of output: 3707


🏁 Script executed:

# Also check if Prices.reset() is called anywhere in environment.__init__
rg -n "self.prices.reset" src/environment.py

Repository: FairRootGroup/powersched

Length of output: 183


🏁 Script executed:

# Get full context around these critical lines
sed -n '115,250p' src/environment.py | cat -n

Repository: FairRootGroup/powersched

Length of output: 6999


reset_timeline_state() attempts to copy self.prices.predicted_prices while it is still None.

In __init__ at line 175, reset_timeline_state() is called before Prices.reset() is invoked. However, Prices.__init__ initializes predicted_prices = None (line 19), and it only gets populated when Prices.reset() is called. This causes AttributeError: 'NoneType' object has no attribute 'copy' at line 277 when trying to execute self.prices.predicted_prices.copy().

Call self.prices.reset(start_index=0) before self.reset_timeline_state() in __init__, matching the pattern used in the reset() method (lines 120–123).

🤖 Prompt for AI Agents
In `@src/environment.py` around lines 175 - 177, The constructor calls
reset_timeline_state() before Prices.reset(), so reset_timeline_state() tries to
copy self.prices.predicted_prices while it's still None; fix by invoking
self.prices.reset(start_index=0) before calling self.reset_timeline_state() in
__init__ (same order used in reset()), ensuring predicted_prices is initialized
before reset_timeline_state() uses self.prices.predicted_prices.copy().

# actions: - change number of available nodes:
# action_type: 0: decrease, 1: maintain, 2: increase
# action_magnitude: 0-MAX_CHANGE (+1ed in the action)
Expand Down Expand Up @@ -210,19 +215,57 @@ def reset(self, seed=None, options=None):
self.episode_idx += 1

# Reset metrics
self.metrics.reset_state_metrics()
if self.carry_over_state:
self.metrics.reset_episode_metrics()
else:
self.metrics.reset_state_metrics()

if not self.carry_over_state:
# Choose starting index in the external price series
if self.prices is not None and getattr(self.prices, "external_prices", None) is not None:
n_prices = len(self.prices.external_prices)
episode_span = EPISODE_HOURS

# Episode k starts at hour k * episode_span (wrapping around the year)
start_index = (self.episode_idx * episode_span) % n_prices
if options and "price_start_index" in options: # For testing Purposes. Leave out 'options' to advance episode.
start_index = int(options["price_start_index"]) % n_prices
self.prices.reset(start_index=start_index)
else:
# Synthetic prices or no external prices
self.prices.reset(start_index=0)

# Choose starting index in the external price series
if self.prices is not None and getattr(self.prices, "external_prices", None) is not None:
n_prices = len(self.prices.external_prices)
episode_span = EPISODE_HOURS
self.state = {
# Initialize all nodes to be 'online but free' (0)
'nodes': np.zeros(MAX_NODES, dtype=np.int32),
# Initialize job queue to be empty
'job_queue': np.zeros((MAX_QUEUE_SIZE * 4), dtype=np.int32),
# Initialize predicted prices array
'predicted_prices': self.prices.predicted_prices.copy(),
}

# Episode k starts at hour k * episode_span (wrapping around the year)
start_index = (self.episode_idx * episode_span) % n_prices
self.prices.reset(start_index=start_index)
else:
# Synthetic prices or no external prices
self.prices.reset(start_index=0)
self.baseline_state = {
'nodes': np.zeros(MAX_NODES, dtype=np.int32),
'job_queue': np.zeros((MAX_QUEUE_SIZE * 4), dtype=np.int32),
}

self.cores_available = np.full(MAX_NODES, CORES_PER_NODE, dtype=np.int32)
self.baseline_cores_available = np.full(MAX_NODES, CORES_PER_NODE, dtype=np.int32)

# Job tracking: { job_id: {'duration': remaining_hours, 'allocation': [(node_idx1, cores1), ...]}, ... }
self.running_jobs = {}
self.baseline_running_jobs = {}

self.next_job_id = 0 # shared between baseline and normal jobs

# Track next empty slot in job queue for O(1) insertion
self.next_empty_slot = 0
self.baseline_next_empty_slot = 0

return self.state, {}

def reset_timeline_state(self):
self.metrics.current_hour = 0

self.state = {
# Initialize all nodes to be 'online but free' (0)
Expand All @@ -240,7 +283,6 @@ def reset(self, seed=None, options=None):

self.cores_available = np.full(MAX_NODES, CORES_PER_NODE, dtype=np.int32)
self.baseline_cores_available = np.full(MAX_NODES, CORES_PER_NODE, dtype=np.int32)

# Job tracking: { job_id: {'duration': remaining_hours, 'allocation': [(node_idx1, cores1), ...]}, ... }
self.running_jobs = {}
self.baseline_running_jobs = {}
Expand All @@ -251,8 +293,6 @@ def reset(self, seed=None, options=None):
self.next_empty_slot = 0
self.baseline_next_empty_slot = 0

return self.state, {}

def step(self, action):
self.current_step += 1
self.metrics.current_hour += 1
Expand Down Expand Up @@ -288,7 +328,9 @@ def step(self, action):
new_jobs_nodes, new_jobs_cores, self.next_empty_slot
)
self.metrics.jobs_submitted += len(new_jobs)
self.metrics.episode_jobs_submitted += len(new_jobs)
self.metrics.jobs_rejected_queue_full += (new_jobs_count - len(new_jobs))
self.metrics.episode_jobs_rejected_queue_full += (new_jobs_count - len(new_jobs))

self.env_print("nodes: ", np.array2string(self.state['nodes'], separator=' ', max_line_width=np.inf))
self.env_print(f"cores_available: {np.array2string(self.cores_available, separator=' ', max_line_width=np.inf)} ({np.sum(self.cores_available)})")
Expand All @@ -314,6 +356,12 @@ def step(self, action):
'baseline_total_job_wait_time': self.metrics.baseline_total_job_wait_time,
'baseline_jobs_dropped': self.metrics.baseline_jobs_dropped,
'baseline_dropped_this_episode': self.metrics.baseline_dropped_this_episode,
'episode_jobs_completed': self.metrics.episode_jobs_completed,
'episode_total_job_wait_time': self.metrics.episode_total_job_wait_time,
'episode_jobs_dropped': self.metrics.episode_jobs_dropped,
'episode_baseline_jobs_completed': self.metrics.episode_baseline_jobs_completed,
'episode_baseline_total_job_wait_time': self.metrics.episode_baseline_total_job_wait_time,
'episode_baseline_jobs_dropped': self.metrics.episode_baseline_jobs_dropped,
}

num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id = assign_jobs_to_available_nodes(
Expand All @@ -326,6 +374,9 @@ def step(self, action):
self.metrics.total_job_wait_time = job_metrics['total_job_wait_time']
self.metrics.jobs_dropped = job_metrics['jobs_dropped']
self.metrics.dropped_this_episode = job_metrics['dropped_this_episode']
self.metrics.episode_jobs_completed = job_metrics['episode_jobs_completed']
self.metrics.episode_total_job_wait_time = job_metrics['episode_total_job_wait_time']
self.metrics.episode_jobs_dropped = job_metrics['episode_jobs_dropped']

self.env_print(f" {num_launched_jobs} jobs launched")

Expand All @@ -347,6 +398,8 @@ def step(self, action):
# Track max queue size
if num_unprocessed_jobs > self.metrics.max_queue_size_reached:
self.metrics.max_queue_size_reached = num_unprocessed_jobs
if num_unprocessed_jobs > self.metrics.episode_max_queue_size_reached:
self.metrics.episode_max_queue_size_reached = num_unprocessed_jobs

self.env_print(f"[5] Calculating reward...")

Expand All @@ -359,6 +412,12 @@ def step(self, action):
'baseline_jobs_dropped': self.metrics.baseline_jobs_dropped,
'baseline_dropped_this_episode': self.metrics.baseline_dropped_this_episode,
'baseline_max_queue_size_reached': self.metrics.baseline_max_queue_size_reached,
'episode_baseline_jobs_submitted': self.metrics.episode_baseline_jobs_submitted,
'episode_baseline_jobs_rejected_queue_full': self.metrics.episode_baseline_jobs_rejected_queue_full,
'episode_baseline_jobs_completed': self.metrics.episode_baseline_jobs_completed,
'episode_baseline_total_job_wait_time': self.metrics.episode_baseline_total_job_wait_time,
'episode_baseline_jobs_dropped': self.metrics.episode_baseline_jobs_dropped,
'episode_baseline_max_queue_size_reached': self.metrics.episode_baseline_max_queue_size_reached,
}

baseline_cost, baseline_cost_off, self.baseline_next_empty_slot, self.next_job_id = baseline_step(
Expand All @@ -375,9 +434,17 @@ def step(self, action):
self.metrics.baseline_jobs_dropped = baseline_metrics['baseline_jobs_dropped']
self.metrics.baseline_dropped_this_episode = baseline_metrics['baseline_dropped_this_episode']
self.metrics.baseline_max_queue_size_reached = baseline_metrics['baseline_max_queue_size_reached']
self.metrics.episode_baseline_jobs_submitted = baseline_metrics['episode_baseline_jobs_submitted']
self.metrics.episode_baseline_jobs_rejected_queue_full = baseline_metrics['episode_baseline_jobs_rejected_queue_full']
self.metrics.episode_baseline_jobs_completed = baseline_metrics['episode_baseline_jobs_completed']
self.metrics.episode_baseline_total_job_wait_time = baseline_metrics['episode_baseline_total_job_wait_time']
self.metrics.episode_baseline_jobs_dropped = baseline_metrics['episode_baseline_jobs_dropped']
self.metrics.episode_baseline_max_queue_size_reached = baseline_metrics['episode_baseline_max_queue_size_reached']

self.metrics.baseline_cost += baseline_cost
self.metrics.baseline_cost_off += baseline_cost_off
self.metrics.episode_baseline_cost += baseline_cost
self.metrics.episode_baseline_cost_off += baseline_cost_off

# Calculate reward
reward_bounds = {
Expand All @@ -400,6 +467,7 @@ def step(self, action):

self.metrics.episode_reward += step_reward
self.metrics.total_cost += step_cost
self.metrics.episode_total_cost += step_cost

# Store normalized reward components for plotting
self.metrics.eff_rewards.append(eff_reward_norm * 100)
Expand Down Expand Up @@ -451,4 +519,11 @@ def step(self, action):

self.env_print(Fore.GREEN + f"]]]" + Fore.RESET)

return self.state, step_reward, terminated, truncated, {}
info = {
"step_cost": float(step_cost),
"num_unprocessed_jobs": int(num_unprocessed_jobs),
"num_on_nodes": int(num_on_nodes),
"dropped_this_episode": int(getattr(self.metrics, "dropped_this_episode", 0)),
}

return self.state, step_reward, terminated, truncated, info
10 changes: 10 additions & 0 deletions src/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,15 @@ def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running
if is_baseline:
metrics['baseline_jobs_completed'] += 1
metrics['baseline_total_job_wait_time'] += job_age
if 'episode_baseline_jobs_completed' in metrics:
metrics['episode_baseline_jobs_completed'] += 1
metrics['episode_baseline_total_job_wait_time'] += job_age
else:
metrics['jobs_completed'] += 1
metrics['total_job_wait_time'] += job_age
if 'episode_jobs_completed' in metrics:
metrics['episode_jobs_completed'] += 1
metrics['episode_total_job_wait_time'] += job_age

num_processed_jobs += 1
continue
Expand All @@ -178,9 +184,13 @@ def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running
if is_baseline:
metrics['baseline_jobs_dropped'] += 1
metrics['baseline_dropped_this_episode'] += 1
if 'episode_baseline_jobs_dropped' in metrics:
metrics['episode_baseline_jobs_dropped'] += 1
else:
metrics['jobs_dropped'] += 1
metrics['dropped_this_episode'] += 1
if 'episode_jobs_dropped' in metrics:
metrics['episode_jobs_dropped'] += 1
else:
job_queue_2d[job_idx][1] = new_age

Expand Down
Loading