|
| 1 | +"""The `pydantic_extra_types.cron` module provides the [`CronStr`][pydantic_extra_types.cron.CronStr] data type.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +from datetime import datetime |
| 6 | +from typing import TYPE_CHECKING, Any, ClassVar, Protocol, cast |
| 7 | + |
| 8 | +try: |
| 9 | + from cron_converter import Cron # type: ignore[import-untyped] |
| 10 | +except ModuleNotFoundError as e: # pragma: no cover |
| 11 | + raise RuntimeError( |
| 12 | + 'The `cron` module requires "cron-converter" to be installed. You can install it with "pip install cron-converter".' |
| 13 | + ) from e |
| 14 | +from pydantic import GetCoreSchemaHandler |
| 15 | +from pydantic_core import PydanticCustomError, core_schema |
| 16 | + |
| 17 | +if TYPE_CHECKING: |
| 18 | + from cron_converter.sub_modules.seeker import Seeker as CronSeeker # type: ignore[import-untyped] |
| 19 | +else: |
| 20 | + |
| 21 | + class CronSeeker(Protocol): |
| 22 | + def next(self) -> datetime: ... |
| 23 | + |
| 24 | + |
| 25 | +class CronStr(str): |
| 26 | + """A cron expression validated via [`cron-converter`](https://pypi.org/project/cron-converter/).""" |
| 27 | + |
| 28 | + strip_whitespace: ClassVar[bool] = True |
| 29 | + """Whether to strip surrounding whitespace from the input value.""" |
| 30 | + _component_names: ClassVar[tuple[str, ...]] = ( |
| 31 | + 'minute', |
| 32 | + 'hour', |
| 33 | + 'day_of_the_month', |
| 34 | + 'month', |
| 35 | + 'day_of_the_week', |
| 36 | + ) |
| 37 | + """Expected cron expression components in the order enforced by `cron-converter`.""" |
| 38 | + |
| 39 | + minute: str |
| 40 | + hour: str |
| 41 | + day_of_the_month: str |
| 42 | + month: str |
| 43 | + day_of_the_week: str |
| 44 | + cron_obj: Cron |
| 45 | + |
| 46 | + def __new__(cls, cron_expression: str, *, _cron: Cron | None = None) -> CronStr: |
| 47 | + if _cron is None: |
| 48 | + cron_expression, cron_obj = cls._validate(cron_expression) |
| 49 | + else: |
| 50 | + cron_obj = _cron |
| 51 | + cron_expression = cron_obj.to_string() |
| 52 | + |
| 53 | + obj = super().__new__(cls, cron_expression) |
| 54 | + obj._apply_cron(cron_obj) |
| 55 | + return obj |
| 56 | + |
| 57 | + def _apply_cron(self, cron_obj: Cron) -> None: |
| 58 | + self.cron_obj = cron_obj |
| 59 | + self.minute, self.hour, self.day_of_the_month, self.month, self.day_of_the_week = str(self).split() |
| 60 | + |
| 61 | + @classmethod |
| 62 | + def _validate(cls, value: Any) -> tuple[str, Cron]: |
| 63 | + if not isinstance(value, str): |
| 64 | + raise PydanticCustomError('cron_str_type', 'Cron expression must be a string') |
| 65 | + |
| 66 | + cron_expression = value.strip() |
| 67 | + if not cron_expression: |
| 68 | + raise PydanticCustomError('cron_str_empty', 'Cron expression must not be empty') |
| 69 | + |
| 70 | + parts = cron_expression.split() |
| 71 | + if len(parts) != len(cls._component_names): |
| 72 | + parts_list = ', '.join(cls._component_names) |
| 73 | + raise PydanticCustomError( |
| 74 | + 'cron_str_components', |
| 75 | + f'Cron expression must contain {len(cls._component_names)} space separated components: {parts_list}', |
| 76 | + ) |
| 77 | + |
| 78 | + try: |
| 79 | + cron_obj = Cron(cron_expression) |
| 80 | + except (TypeError, ValueError) as exc: |
| 81 | + raise PydanticCustomError('cron_str_invalid', str(exc)) from exc |
| 82 | + |
| 83 | + # `cron-converter` may normalise components (e.g. remove duplicate spaces), |
| 84 | + # so we reuse its canonical representation. |
| 85 | + return cron_obj.to_string(), cron_obj |
| 86 | + |
| 87 | + @classmethod |
| 88 | + def validate(cls, __input_value: Any, _: core_schema.ValidationInfo) -> CronStr: |
| 89 | + cron_expression, cron_obj = cls._validate(__input_value) |
| 90 | + return cls(cron_expression, _cron=cron_obj) |
| 91 | + |
| 92 | + @classmethod |
| 93 | + def __get_pydantic_core_schema__(cls, source_type: Any, handler: GetCoreSchemaHandler) -> core_schema.CoreSchema: |
| 94 | + return core_schema.with_info_after_validator_function( |
| 95 | + cls.validate, |
| 96 | + core_schema.str_schema(strip_whitespace=cls.strip_whitespace), |
| 97 | + ) |
| 98 | + |
| 99 | + @classmethod |
| 100 | + def __get_pydantic_json_schema__( |
| 101 | + cls, schema: core_schema.CoreSchema, handler: GetCoreSchemaHandler |
| 102 | + ) -> dict[str, Any]: |
| 103 | + return dict(handler(schema)) |
| 104 | + |
| 105 | + def schedule(self, start_date: datetime | None = None, timezone_str: str | None = None) -> CronSeeker: |
| 106 | + """Return the iterator produced by `cron-converter` for this expression.""" |
| 107 | + return cast(CronSeeker, self.cron_obj.schedule(start_date=start_date, timezone_str=timezone_str)) |
| 108 | + |
| 109 | + def next_after(self, start_date: datetime | None = None, timezone_str: str | None = None) -> datetime: |
| 110 | + """Return the first run datetime after `start_date` (or now if omitted).""" |
| 111 | + seeker = self.schedule(start_date=start_date, timezone_str=timezone_str) |
| 112 | + return cast(datetime, seeker.next()) |
| 113 | + |
| 114 | + @property |
| 115 | + def next_run(self) -> str: |
| 116 | + """Return the next run as an ISO formatted string (shortcut for backwards compatibility).""" |
| 117 | + return self.next_after().isoformat() |
0 commit comments