Skip to content

Commit 4ac54aa

Browse files
authored
Merge pull request #166 from isofinly/main
feat: Implement window constraints with new command-line options.
2 parents 0a05cd1 + e29825c commit 4ac54aa

File tree

4 files changed

+748
-19
lines changed

4 files changed

+748
-19
lines changed

cats/CI_api_interface.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def ciuk_parse_response_data(response: dict):
4747
and is set up to cache data from call to call even accross different
4848
processes within the same half hour window. The returned prediction data
4949
is in half hour blocks starting from the half hour containing the current
50-
time and extending for 48 hours into the future.
50+
time and extending up to 48 hours into the future.
5151
5252
:param response:
5353
:return:

cats/__init__.py

Lines changed: 123 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
from .configure import get_runtime_config
1616
from .constants import CATS_ASCII_BANNER_COLOUR, CATS_ASCII_BANNER_NO_COLOUR
1717
from .plotting import plotplan
18-
from .forecast import CarbonIntensityAverageEstimate, WindowedForecast
18+
from .forecast import (
19+
CarbonIntensityAverageEstimate,
20+
WindowedForecast,
21+
)
1922

2023
__version__ = "1.1.0"
2124

@@ -28,6 +31,68 @@ def indent_lines(lines, spaces):
2831
return "\n".join(" " * spaces + line for line in lines.split("\n"))
2932

3033

34+
def parse_time_constraint(
35+
time_str: str, timezone_info=None
36+
) -> Optional[datetime.datetime]:
37+
"""
38+
Parse a time constraint string into a datetime object.
39+
40+
:param time_str: Time string in various formats (HH:MM, YYYY-MM-DDTHH:MM, etc.)
41+
:param timezone_info: Default timezone if not specified in the string
42+
:return: Parsed datetime object
43+
:raises ValueError: If the time string cannot be parsed
44+
"""
45+
if not time_str:
46+
return None
47+
48+
# If timezone_info is not provided, use system local timezone
49+
if timezone_info is None:
50+
timezone_info = datetime.datetime.now().astimezone().tzinfo
51+
52+
# Try to parse as full ISO format first
53+
try:
54+
if "T" in time_str:
55+
# Full datetime string
56+
if time_str.endswith("Z"):
57+
time_str = time_str[:-1] + "+00:00"
58+
elif time_str[-6] not in ["+", "-"] and time_str[-3] != ":":
59+
# No timezone info, add default
60+
dt = datetime.datetime.fromisoformat(time_str)
61+
return dt.replace(tzinfo=timezone_info)
62+
return datetime.datetime.fromisoformat(time_str)
63+
else:
64+
# Time only (HH:MM or HH:MM:SS)
65+
time_part = datetime.time.fromisoformat(time_str)
66+
today = datetime.datetime.now().date()
67+
return datetime.datetime.combine(today, time_part, tzinfo=timezone_info)
68+
except ValueError as e:
69+
raise ValueError(f"Unable to parse time constraint '{time_str}': {e}")
70+
71+
72+
def validate_window_constraints(
73+
start_window: Optional[datetime.datetime],
74+
end_window: Optional[datetime.datetime],
75+
window_minutes: int,
76+
) -> tuple[Optional[datetime.datetime], Optional[datetime.datetime], int]:
77+
"""
78+
Validate window constraints.
79+
80+
:param start_window: Start window constraint datetime
81+
:param end_window: End window constraint datetime
82+
:param window_minutes: Maximum window duration in minutes
83+
:return: Tuple of (start_datetime, end_datetime, validated_window_minutes)
84+
:raises ValueError: If constraints are invalid
85+
"""
86+
if window_minutes < 1 or window_minutes > 2820:
87+
raise ValueError("Window must be between 1 and 2820 minutes (47 hours)")
88+
89+
if start_window and end_window:
90+
if start_window >= end_window:
91+
raise ValueError("Start window must be before end window")
92+
93+
return start_window, end_window, window_minutes
94+
95+
3196
def parse_arguments():
3297
"""
3398
Parse command line arguments
@@ -201,6 +266,27 @@ def positive_integer(string):
201266
"\"pip install 'climate-aware-task-scheduler[plots]'\"",
202267
action="store_true",
203268
)
269+
parser.add_argument(
270+
"--window",
271+
type=positive_integer,
272+
help="Maximum time window to search for optimal start time, in minutes. "
273+
"Must be between 1 and 2820 (47 hours). Default: 2820 minutes (47 hours).",
274+
default=2820,
275+
)
276+
parser.add_argument(
277+
"--start-window",
278+
type=parse_time_constraint,
279+
help="Earliest time the job is allowed to start, in ISO format (e.g., '2024-01-15T09:00'). "
280+
"If only time is provided (e.g., '09:00'), today's date is assumed. "
281+
"Timezone info is optional and defaults to system timezone.",
282+
)
283+
parser.add_argument(
284+
"--end-window",
285+
type=parse_time_constraint,
286+
help="Latest time the job is allowed to start, in ISO format (e.g., '2024-01-15T17:00'). "
287+
"If only time is provided (e.g., '17:00'), today's date is assumed. "
288+
"Timezone info is optional and defaults to system timezone.",
289+
)
204290

205291
return parser
206292

@@ -324,9 +410,6 @@ def main(arguments=None) -> int:
324410
args = parser.parse_args(arguments)
325411
colour_output = args.no_colour or args.no_color
326412

327-
# Print CATS ASCII art banner, before any output from printing or logging
328-
print_banner(colour_output)
329-
330413
if args.command and not args.scheduler:
331414
print(
332415
"cats: To run a command or sbatch script with the -c or --command option, you must\n"
@@ -335,11 +418,27 @@ def main(arguments=None) -> int:
335418
return 1
336419

337420
CI_API_interface, location, duration, jobinfo, PUE = get_runtime_config(args)
338-
if duration > CI_API_interface.max_duration:
339-
print(
340-
f"""API allows a maximum job duration of {CI_API_interface.max_duration} minutes.
341-
This is usually due to forecast limitations."""
421+
422+
# Validate and parse window constraints
423+
try:
424+
start_constraint, end_constraint, max_window = validate_window_constraints(
425+
args.start_window, args.end_window, args.window
342426
)
427+
except ValueError as e:
428+
print(f"Error in window constraints: {e}")
429+
return 1
430+
# Check against both API limit and user-specified window
431+
effective_max_duration = min(CI_API_interface.max_duration, max_window)
432+
if duration > effective_max_duration:
433+
if max_window < CI_API_interface.max_duration:
434+
print(
435+
f"""Job duration ({duration} minutes) exceeds specified window ({max_window} minutes)."""
436+
)
437+
else:
438+
print(
439+
f"""API allows a maximum job duration of {CI_API_interface.max_duration} minutes.
440+
This is usually due to forecast limitations."""
441+
)
343442
return 1
344443

345444
########################
@@ -362,8 +461,21 @@ def main(arguments=None) -> int:
362461

363462
# Find best possible average carbon intensity, along
364463
# with corresponding job start time.
464+
search_start = datetime.datetime.now().astimezone()
465+
466+
# Apply start window constraint if provided
467+
if start_constraint:
468+
# Ensure start constraint is in the same timezone as search_start
469+
if start_constraint.tzinfo != search_start.tzinfo:
470+
start_constraint = start_constraint.astimezone(search_start.tzinfo)
471+
search_start = max(search_start, start_constraint)
472+
365473
wf = WindowedForecast(
366-
CI_forecast, duration, start=datetime.datetime.now().astimezone()
474+
CI_forecast,
475+
duration,
476+
start=search_start,
477+
max_window_minutes=max_window,
478+
end_constraint=end_constraint,
367479
)
368480
now_avg, best_avg = wf[0], min(wf)
369481
output = CATSOutput(now_avg, best_avg, location, "GBR", colour=not colour_output)
@@ -390,6 +502,8 @@ def main(arguments=None) -> int:
390502
dateformat = args.dateformat or ""
391503
print(output.to_json(dateformat, sort_keys=True, indent=2))
392504
else:
505+
# Print CATS ASCII art banner, before any output from printing or logging
506+
print_banner(colour_output)
393507
print(output)
394508
if args.plot:
395509
plotplan(CI_forecast, output)

cats/forecast.py

Lines changed: 93 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from dataclasses import dataclass, InitVar
1+
from dataclasses import dataclass
22
from typing import Optional
33
from datetime import datetime, timedelta
44

@@ -29,8 +29,8 @@ class CarbonIntensityAverageEstimate:
2929
value: float
3030
start: datetime # Start of the time-integration window
3131
end: datetime # End of the time-integration window
32-
start_value: float CI point estimate at start time
33-
end_value: float CI point estimate at end time
32+
start_value: float # CI point estimate at start time
33+
end_value: float # CI point estimate at end time
3434

3535

3636
class WindowedForecast:
@@ -39,10 +39,23 @@ def __init__(
3939
data: list[CarbonIntensityPointEstimate],
4040
duration: int, # in minutes
4141
start: datetime,
42+
max_window_minutes: Optional[int] = None,
43+
end_constraint: Optional[datetime] = None,
4244
):
43-
self.data_stepsize = data[1].datetime - data[0].datetime
45+
self.duration = duration
46+
self.max_window_minutes = max_window_minutes
47+
self.end_constraint = end_constraint
48+
49+
# Filter data based on constraints if any are specified
50+
if max_window_minutes is not None or end_constraint is not None:
51+
filtered_data = self._filter_data_by_constraints(
52+
data, start, duration, max_window_minutes or 2820, end_constraint
53+
)
54+
else:
55+
filtered_data = data
56+
57+
self.data_stepsize = filtered_data[1].datetime - filtered_data[0].datetime
4458
self.start = start
45-
# TODO: Expect duration as a timedelta directly
4659
self.end = start + timedelta(minutes=duration)
4760

4861
# Restrict data points so that start time falls within the
@@ -57,7 +70,7 @@ def bisect_right(data, t):
5770
# bisect_right(data, start) returns the index of the first
5871
# data point with datetime value immediately preceding the job
5972
# start time
60-
self.data = data[bisect_right(data, start) :]
73+
self.data = filtered_data[bisect_right(filtered_data, start) :]
6174

6275
# Find number of data points in a window, by finding the index
6376
# of the closest data point past the job end time. Could be
@@ -74,6 +87,46 @@ def bisect_left(data, t):
7487

7588
self.ndata = bisect_left(self.data, self.end) # window size
7689

90+
def _filter_data_by_constraints(
91+
self,
92+
data: list[CarbonIntensityPointEstimate],
93+
start: datetime,
94+
duration: int,
95+
max_window_minutes: int,
96+
end_constraint: Optional[datetime],
97+
) -> list[CarbonIntensityPointEstimate]:
98+
"""Filter forecast data based on time constraints."""
99+
100+
# Calculate the maximum time we need data for
101+
search_window_end = start + timedelta(minutes=max_window_minutes)
102+
103+
if end_constraint:
104+
# Ensure timezone compatibility
105+
if end_constraint.tzinfo != start.tzinfo:
106+
end_constraint = end_constraint.astimezone(start.tzinfo)
107+
# Jobs must start before end_constraint
108+
search_window_end = min(search_window_end, end_constraint)
109+
110+
# We need data points to cover jobs starting up to search_window_end
111+
# plus the duration of those jobs
112+
max_data_time = search_window_end + timedelta(minutes=duration)
113+
114+
# Filter data to respect the constraints
115+
filtered_data = []
116+
for d in data:
117+
if d.datetime <= max_data_time:
118+
filtered_data.append(d)
119+
else:
120+
break
121+
122+
if len(filtered_data) < 2:
123+
raise ValueError(
124+
"Insufficient forecast data for the specified time window constraints. "
125+
"Try increasing --window or adjusting --end-window."
126+
)
127+
128+
return filtered_data
129+
77130
def __getitem__(self, index: int) -> CarbonIntensityAverageEstimate:
78131
"""Return the average of timeseries data from index over the
79132
window size. Data points are integrated using the trapeziodal
@@ -86,6 +139,9 @@ def __getitem__(self, index: int) -> CarbonIntensityAverageEstimate:
86139
dividing the total integral value by the number of intervals.
87140
"""
88141

142+
if index >= len(self):
143+
raise IndexError("Window index out of range")
144+
89145
# Account for the fact that the start and end of each window
90146
# might not fall exactly on data points. The starting
91147
# intensity is interpolated between the first (index) and
@@ -149,8 +205,36 @@ def interp(
149205
)
150206

151207
def __iter__(self):
152-
for index in range(self.__len__()):
153-
yield self.__getitem__(index)
208+
for index in range(len(self)):
209+
yield self[index]
154210

155211
def __len__(self):
156-
return len(self.data) - self.ndata
212+
"""Return number of valid forecast windows respecting all constraints."""
213+
base_length = len(self.data) - self.ndata
214+
215+
if base_length <= 0:
216+
return 0
217+
218+
max_valid_index = base_length - 1
219+
220+
# Check max window constraint only if specified
221+
if self.max_window_minutes is not None:
222+
data_stepsize_minutes = self.data_stepsize.total_seconds() / 60
223+
max_index_by_window = int(self.max_window_minutes / data_stepsize_minutes)
224+
max_valid_index = min(max_valid_index, max_index_by_window)
225+
226+
# Check end constraint
227+
if self.end_constraint:
228+
if self.end_constraint.tzinfo != self.start.tzinfo:
229+
end_constraint = self.end_constraint.astimezone(self.start.tzinfo)
230+
else:
231+
end_constraint = self.end_constraint
232+
233+
# Find the maximum index where job start time is before end_constraint
234+
for i in range(min(base_length, max_valid_index + 1)):
235+
window_start = self.start + i * self.data_stepsize
236+
if window_start >= end_constraint:
237+
max_valid_index = i - 1
238+
break
239+
240+
return max(0, max_valid_index + 1)

0 commit comments

Comments
 (0)