|
| 1 | +import random |
| 2 | +from datetime import datetime, timedelta |
| 3 | +from typing import Iterator |
| 4 | + |
| 5 | +WEEKDAYS = { |
| 6 | + "mon": 1, |
| 7 | + "tue": 2, |
| 8 | + "wed": 3, |
| 9 | + "thu": 4, |
| 10 | + "fri": 5, |
| 11 | + "sat": 6, |
| 12 | + "sun": 7, |
| 13 | +} |
| 14 | + |
| 15 | +MONTHS = { |
| 16 | + "jan": 1, |
| 17 | + "feb": 2, |
| 18 | + "mar": 3, |
| 19 | + "apr": 4, |
| 20 | + "may": 5, |
| 21 | + "jun": 6, |
| 22 | + "jul": 7, |
| 23 | + "aug": 8, |
| 24 | + "sep": 9, |
| 25 | + "oct": 10, |
| 26 | + "nov": 11, |
| 27 | + "dec": 12, |
| 28 | +} |
| 29 | + |
| 30 | + |
| 31 | +class CrontabParseError(Exception): |
| 32 | + pass |
| 33 | + |
| 34 | + |
| 35 | +class CrontabExhausted(Exception): |
| 36 | + pass |
| 37 | + |
| 38 | + |
| 39 | +class CrontabParser: |
| 40 | + def __init__( |
| 41 | + self, |
| 42 | + min_value: int, |
| 43 | + max_value: int, |
| 44 | + names: dict[str, int] | None = None, |
| 45 | + ): |
| 46 | + self.min_value = min_value |
| 47 | + self.max_value = max_value |
| 48 | + self.names = { |
| 49 | + key[:3].lower(): self._range_check(value) |
| 50 | + for key, value in (names or {}).items() |
| 51 | + } |
| 52 | + |
| 53 | + def _range_check(self, num: int) -> int: |
| 54 | + if num < self.min_value or num > self.max_value: |
| 55 | + raise CrontabParseError( |
| 56 | + f"{num} is not in the range {self.min_value}-{self.max_value}" |
| 57 | + ) |
| 58 | + return num |
| 59 | + |
| 60 | + def _get_value(self, part: str) -> int: |
| 61 | + if part.isdigit(): |
| 62 | + return self._range_check(int(part)) |
| 63 | + elif value := self.names.get(part[:3].lower()): |
| 64 | + return value |
| 65 | + raise CrontabParseError(f"Could not parse value: {part}") |
| 66 | + |
| 67 | + def parse_part(self, part: str) -> list[int]: |
| 68 | + if "/" in part: |
| 69 | + value, step = part.split("/", 1) |
| 70 | + step = self._range_check(int(step)) |
| 71 | + values = self.parse_part(value) |
| 72 | + return values[::step] |
| 73 | + elif part == "*": |
| 74 | + return list(range(self.min_value, self.max_value + 1)) |
| 75 | + elif part == "~": |
| 76 | + return [random.randint(self.min_value, self.max_value)] |
| 77 | + elif "-" in part: |
| 78 | + lo, hi = (self._get_value(p) for p in part.split("-", 1)) |
| 79 | + if lo > hi: |
| 80 | + raise CrontabParseError(f"{lo}-{hi} is not a valid range ({lo} > {hi})") |
| 81 | + return list(range(lo, hi + 1)) |
| 82 | + return [self._get_value(part)] |
| 83 | + |
| 84 | + def parse(self, spec: str) -> list[int]: |
| 85 | + values: set[int] = set() |
| 86 | + for part in spec.split(","): |
| 87 | + values.update(self.parse_part(part)) |
| 88 | + return list(sorted(values)) |
| 89 | + |
| 90 | + |
| 91 | +minute = CrontabParser(0, 59) |
| 92 | +hour = CrontabParser(0, 23) |
| 93 | +day = CrontabParser(1, 31) |
| 94 | +month = CrontabParser(1, 12, names=MONTHS) |
| 95 | +weekday = CrontabParser(0, 7, names=WEEKDAYS) |
| 96 | + |
| 97 | + |
| 98 | +class Crontab: |
| 99 | + def __init__(self, spec: str): |
| 100 | + parts = spec.split(None) |
| 101 | + if len(parts) != 5: |
| 102 | + raise CrontabParseError("Crontab specs must have 5 parts") |
| 103 | + self.spec = spec |
| 104 | + self.minutes = minute.parse(parts[0]) |
| 105 | + self.hours = hour.parse(parts[1]) |
| 106 | + self.days = day.parse(parts[2]) |
| 107 | + self.months = month.parse(parts[3]) |
| 108 | + self.weekdays = weekday.parse(parts[4]) |
| 109 | + self.specifies_day = parts[2] != "*" |
| 110 | + self.specifies_weekday = parts[4] != "*" |
| 111 | + |
| 112 | + def __repr__(self): |
| 113 | + return f"crontab({self.spec!r})" |
| 114 | + |
| 115 | + def match(self, dt: datetime) -> bool: |
| 116 | + """ |
| 117 | + Returns whether the specified datetime matches this crontab spec. |
| 118 | + """ |
| 119 | + if dt.minute not in self.minutes: |
| 120 | + return False |
| 121 | + if dt.hour not in self.hours: |
| 122 | + return False |
| 123 | + if dt.month not in self.months: |
| 124 | + return False |
| 125 | + if self.specifies_day and self.specifies_weekday: |
| 126 | + # Special case when both day and weekday are specified - by spec it matches |
| 127 | + # when *either* match. |
| 128 | + if (dt.day not in self.days) and (dt.isoweekday() not in self.weekdays): |
| 129 | + return False |
| 130 | + else: |
| 131 | + # Otherwise when one or none are specified, check them separately. |
| 132 | + if dt.day not in self.days: |
| 133 | + return False |
| 134 | + if dt.isoweekday() not in self.weekdays: |
| 135 | + return False |
| 136 | + return True |
| 137 | + |
| 138 | + def next( |
| 139 | + self, |
| 140 | + after: datetime | None = None, |
| 141 | + until: datetime | None = None, |
| 142 | + ) -> datetime: |
| 143 | + """ |
| 144 | + Returns the next matching date after the one specified (or after the current date if not specified), and before the specified `until` (or one year after the |
| 145 | + intial date if not specified). |
| 146 | + """ |
| 147 | + if after is None: |
| 148 | + after = datetime.now() |
| 149 | + if until is None: |
| 150 | + until = after.replace(year=after.year + 1) |
| 151 | + while after < until: |
| 152 | + after += timedelta(minutes=1) |
| 153 | + if self.match(after) and (after < until): |
| 154 | + return after.replace(second=0, microsecond=0) |
| 155 | + raise CrontabExhausted(f"Could not find matching date before {until}") |
| 156 | + |
| 157 | + def dates( |
| 158 | + self, |
| 159 | + after: datetime | None = None, |
| 160 | + until: datetime | None = None, |
| 161 | + ) -> Iterator[datetime]: |
| 162 | + """ |
| 163 | + Yields each date between `after` and `until`. |
| 164 | + """ |
| 165 | + d = after |
| 166 | + while True: |
| 167 | + try: |
| 168 | + d = self.next(after=d, until=until) |
| 169 | + yield d |
| 170 | + except CrontabExhausted: |
| 171 | + break |
0 commit comments