Skip to content

Commit 1222f0c

Browse files
authored
Improved Scheduler (#611)
- Track `last_fired_at` for schedules. - Enable automatic backfill at startup. - Configurable timezones.
1 parent e2d420b commit 1222f0c

File tree

8 files changed

+364
-11
lines changed

8 files changed

+364
-11
lines changed

dbos/_client.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
TypeVar,
1717
Union,
1818
)
19+
from zoneinfo import ZoneInfo
1920

2021
import sqlalchemy as sa
2122

@@ -715,6 +716,8 @@ def create_schedule(
715716
schedule: str,
716717
context: Any = None,
717718
workflow_class_name: Optional[str] = None,
719+
automatic_backfill: bool = False,
720+
cron_timezone: Optional[str] = None,
718721
) -> None:
719722
"""
720723
Create a cron schedule that periodically invokes a workflow.
@@ -725,12 +728,19 @@ def create_schedule(
725728
schedule: A cron expression (supports seconds with 6 fields).
726729
context: A context object passed as the second argument to every invocation. Defaults to ``None``.
727730
workflow_class_name: Class name for static class method workflows. Defaults to ``None``.
731+
automatic_backfill: If ``True``, on startup the scheduler will automatically backfill missed executions since the last time the schedule fired. Defaults to ``False``.
732+
cron_timezone: IANA timezone name (e.g. ``"America/New_York"``) in which to evaluate the cron expression. Defaults to ``None`` (UTC).
728733
729734
Raises:
730735
DBOSException: If the cron expression is invalid or a schedule with the same name already exists.
731736
"""
732737
if not croniter.is_valid(schedule, second_at_beginning=True):
733738
raise DBOSException(f"Invalid cron schedule: '{schedule}'")
739+
if cron_timezone is not None:
740+
try:
741+
ZoneInfo(cron_timezone)
742+
except (KeyError, Exception):
743+
raise DBOSException(f"Invalid timezone: '{cron_timezone}'")
734744
self._sys_db.create_schedule(
735745
WorkflowSchedule(
736746
schedule_id=generate_uuid(),
@@ -740,6 +750,9 @@ def create_schedule(
740750
schedule=schedule,
741751
status="ACTIVE",
742752
context=self._sys_db.serializer.serialize(context),
753+
last_fired_at=None,
754+
automatic_backfill=automatic_backfill,
755+
cron_timezone=cron_timezone,
743756
)
744757
)
745758

@@ -788,6 +801,8 @@ async def create_schedule_async(
788801
schedule: str,
789802
context: Any = None,
790803
workflow_class_name: Optional[str] = None,
804+
automatic_backfill: bool = False,
805+
cron_timezone: Optional[str] = None,
791806
) -> None:
792807
"""Async version of :meth:`create_schedule`."""
793808
await asyncio.to_thread(
@@ -797,6 +812,8 @@ async def create_schedule_async(
797812
schedule=schedule,
798813
context=context,
799814
workflow_class_name=workflow_class_name,
815+
automatic_backfill=automatic_backfill,
816+
cron_timezone=cron_timezone,
800817
)
801818

802819
async def list_schedules_async(
@@ -844,7 +861,19 @@ def apply_schedules(
844861
DBOSException: If a cron expression is invalid
845862
"""
846863
to_apply: List[WorkflowSchedule] = []
847-
for entry in schedules:
864+
for i, entry in enumerate(schedules):
865+
if "schedule_name" not in entry:
866+
raise DBOSException(
867+
f"Schedule entry {i} is missing required field 'schedule_name'"
868+
)
869+
if "workflow_name" not in entry:
870+
raise DBOSException(
871+
f"Schedule entry {i} is missing required field 'workflow_name'"
872+
)
873+
if "schedule" not in entry:
874+
raise DBOSException(
875+
f"Schedule entry {i} is missing required field 'schedule'"
876+
)
848877
cron = entry["schedule"]
849878
if not croniter.is_valid(cron, second_at_beginning=True):
850879
raise DBOSException(f"Invalid cron schedule: '{cron}'")
@@ -857,6 +886,9 @@ def apply_schedules(
857886
schedule=cron,
858887
status="ACTIVE",
859888
context=self._sys_db.serializer.serialize(entry["context"]),
889+
last_fired_at=None,
890+
automatic_backfill=entry.get("automatic_backfill", False),
891+
cron_timezone=entry.get("cron_timezone"),
860892
)
861893
)
862894
with self._sys_db.engine.begin() as c:

dbos/_conductor/protocol.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,9 @@ class ScheduleOutput:
468468
schedule: str
469469
status: str
470470
context: str
471+
last_fired_at: Optional[str]
472+
automatic_backfill: bool
473+
cron_timezone: Optional[str]
471474

472475
@classmethod
473476
def from_schedule(
@@ -482,6 +485,9 @@ def from_schedule(
482485
schedule=s["schedule"],
483486
status=s["status"],
484487
context=context_str,
488+
last_fired_at=s.get("last_fired_at"),
489+
automatic_backfill=s.get("automatic_backfill", False),
490+
cron_timezone=s.get("cron_timezone"),
485491
)
486492

487493

dbos/_dbos.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
Union,
3333
overload,
3434
)
35+
from zoneinfo import ZoneInfo
3536

3637
from dbos._conductor.conductor import ConductorWebsocket
3738
from dbos._debouncer import debouncer_workflow
@@ -288,11 +289,13 @@ def get_internal_queue(self) -> Queue:
288289
return Queue(INTERNAL_QUEUE_NAME)
289290

290291

291-
class ScheduleInput(TypedDict):
292+
class ScheduleInput(TypedDict, total=False):
292293
schedule_name: str
293294
workflow_fn: Callable[[datetime, Any], None]
294295
schedule: str
295296
context: Any
297+
automatic_backfill: bool
298+
cron_timezone: Optional[str]
296299

297300

298301
class DBOS:
@@ -2037,6 +2040,8 @@ def create_schedule(
20372040
workflow_fn: ScheduledWorkflow,
20382041
schedule: str,
20392042
context: Any = None,
2043+
automatic_backfill: bool = False,
2044+
cron_timezone: Optional[str] = None,
20402045
) -> None:
20412046
"""
20422047
Create a cron schedule that periodically invokes a workflow function.
@@ -2050,6 +2055,8 @@ def create_schedule(
20502055
workflow_fn: The workflow function to invoke. Must accept ``(datetime, context)``.
20512056
schedule: A cron expression (supports seconds with 6 fields).
20522057
context: A context object passed as the second argument to every invocation. Defaults to ``None``.
2058+
automatic_backfill: If ``True``, on startup the scheduler will automatically backfill missed executions since the last time the schedule fired. Defaults to ``False``.
2059+
cron_timezone: IANA timezone name (e.g. ``"America/New_York"``) in which to evaluate the cron expression. Defaults to ``None`` (UTC).
20532060
20542061
Raises:
20552062
DBOSException: If the cron expression is invalid, the workflow is not registered, or a schedule with the same name already exists
@@ -2072,6 +2079,11 @@ def create_schedule(
20722079
if fi and fi.class_info and fi.func_type == DBOSFuncType.Class
20732080
else None
20742081
)
2082+
if cron_timezone is not None:
2083+
try:
2084+
ZoneInfo(cron_timezone)
2085+
except (KeyError, Exception):
2086+
raise DBOSException(f"Invalid timezone: '{cron_timezone}'")
20752087
sched = WorkflowSchedule(
20762088
schedule_id=generate_uuid(),
20772089
schedule_name=schedule_name,
@@ -2080,6 +2092,9 @@ def create_schedule(
20802092
schedule=schedule,
20812093
status="ACTIVE",
20822094
context=dbos._sys_db.serializer.serialize(context),
2095+
last_fired_at=None,
2096+
automatic_backfill=automatic_backfill,
2097+
cron_timezone=cron_timezone,
20832098
)
20842099
ctx = snapshot_step_context(reserve_sleep_id=False)
20852100
if ctx and ctx.is_workflow():
@@ -2179,6 +2194,8 @@ async def create_schedule_async(
21792194
workflow_fn: ScheduledWorkflow,
21802195
schedule: str,
21812196
context: Any = None,
2197+
automatic_backfill: bool = False,
2198+
cron_timezone: Optional[str] = None,
21822199
) -> None:
21832200
"""Async version of :meth:`create_schedule`."""
21842201
await cls._configure_asyncio_thread_pool()
@@ -2188,6 +2205,8 @@ async def create_schedule_async(
21882205
workflow_fn=workflow_fn,
21892206
schedule=schedule,
21902207
context=context,
2208+
automatic_backfill=automatic_backfill,
2209+
cron_timezone=cron_timezone,
21912210
)
21922211

21932212
@classmethod
@@ -2272,7 +2291,19 @@ def apply_schedules(
22722291
)
22732292
dbos = _get_dbos_instance()
22742293
to_apply: List[WorkflowSchedule] = []
2275-
for entry in schedules:
2294+
for i, entry in enumerate(schedules):
2295+
if "schedule_name" not in entry:
2296+
raise DBOSException(
2297+
f"Schedule entry {i} is missing required field 'schedule_name'"
2298+
)
2299+
if "workflow_fn" not in entry:
2300+
raise DBOSException(
2301+
f"Schedule entry {i} is missing required field 'workflow_fn'"
2302+
)
2303+
if "schedule" not in entry:
2304+
raise DBOSException(
2305+
f"Schedule entry {i} is missing required field 'schedule'"
2306+
)
22762307
cron = entry["schedule"]
22772308
if not croniter.is_valid(cron, second_at_beginning=True):
22782309
raise DBOSException(f"Invalid cron schedule: '{cron}'")
@@ -2301,6 +2332,9 @@ def apply_schedules(
23012332
schedule=cron,
23022333
status="ACTIVE",
23032334
context=dbos._sys_db.serializer.serialize(entry["context"]),
2335+
last_fired_at=None,
2336+
automatic_backfill=entry.get("automatic_backfill", False),
2337+
cron_timezone=entry.get("cron_timezone"),
23042338
)
23052339
)
23062340
with dbos._sys_db.engine.begin() as c:

dbos/_migration.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ def get_dbos_migration_thirteen(schema: str) -> str:
326326
);
327327
"""
328328

329+
329330
def get_dbos_migration_fourteen(schema: str) -> str:
330331
return f"""
331332
CREATE FUNCTION "{schema}".enqueue_workflow(
@@ -431,6 +432,15 @@ def get_dbos_migration_fourteen(schema: str) -> str:
431432
$$ LANGUAGE plpgsql;
432433
"""
433434

435+
436+
def get_dbos_migration_fifteen(schema: str) -> str:
437+
return f"""
438+
ALTER TABLE "{schema}".workflow_schedules ADD COLUMN "last_fired_at" TEXT DEFAULT NULL;
439+
ALTER TABLE "{schema}".workflow_schedules ADD COLUMN "automatic_backfill" BOOLEAN NOT NULL DEFAULT FALSE;
440+
ALTER TABLE "{schema}".workflow_schedules ADD COLUMN "cron_timezone" TEXT DEFAULT NULL;
441+
"""
442+
443+
434444
def get_dbos_migrations(schema: str, use_listen_notify: bool) -> list[str]:
435445
return [
436446
get_dbos_migration_one(schema, use_listen_notify),
@@ -447,6 +457,7 @@ def get_dbos_migrations(schema: str, use_listen_notify: bool) -> list[str]:
447457
get_dbos_migration_twelve(schema),
448458
get_dbos_migration_thirteen(schema),
449459
get_dbos_migration_fourteen(schema),
460+
get_dbos_migration_fifteen(schema),
450461
]
451462

452463

@@ -613,6 +624,12 @@ def get_sqlite_timestamp_expr() -> str:
613624
created_at INTEGER NOT NULL DEFAULT {get_sqlite_timestamp_expr()}
614625
);
615626
"""
627+
sqlite_migration_fifteen = """
628+
ALTER TABLE workflow_schedules ADD COLUMN "last_fired_at" TEXT DEFAULT NULL;
629+
ALTER TABLE workflow_schedules ADD COLUMN "automatic_backfill" BOOLEAN NOT NULL DEFAULT FALSE;
630+
ALTER TABLE workflow_schedules ADD COLUMN "cron_timezone" TEXT DEFAULT NULL;
631+
"""
632+
616633
sqlite_migrations = [
617634
sqlite_migration_one,
618635
sqlite_migration_two,
@@ -627,4 +644,5 @@ def get_sqlite_timestamp_expr() -> str:
627644
sqlite_migration_twelve,
628645
sqlite_migration_thirteen,
629646
# Note, there is no sqlite version of migration fourteen
647+
sqlite_migration_fifteen,
630648
]

dbos/_scheduler.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import traceback
44
from datetime import datetime, timezone
55
from typing import TYPE_CHECKING, Any, Callable, Coroutine, Optional
6+
from zoneinfo import ZoneInfo
67

78
from ._croniter import croniter # type: ignore
89
from ._error import DBOSException
@@ -30,6 +31,8 @@ def __init__(self, schedule: WorkflowSchedule, serializer: Serializer):
3031
self.cron: str = schedule["schedule"]
3132
self.serialized_context: str = schedule["context"]
3233
self.context: Any = serializer.deserialize(self.serialized_context)
34+
tz_name = schedule.get("cron_timezone")
35+
self.tzinfo = ZoneInfo(tz_name) if tz_name else timezone.utc
3336
self._stop_event = threading.Event()
3437
self._thread = threading.Thread(target=self._loop, daemon=True)
3538
self._thread.start()
@@ -40,7 +43,7 @@ def _loop(self) -> None:
4043
dbos = _get_dbos_instance()
4144
try:
4245
it = croniter(
43-
self.cron, datetime.now(timezone.utc), second_at_beginning=True
46+
self.cron, datetime.now(self.tzinfo), second_at_beginning=True
4447
)
4548
except Exception:
4649
dbos_logger.error(
@@ -69,6 +72,9 @@ def _loop(self) -> None:
6972
self.context,
7073
self.class_name,
7174
)
75+
dbos._sys_db.update_last_fired_at(
76+
self.schedule_name, next_exec_time.isoformat()
77+
)
7278
except Exception:
7379
dbos_logger.warning(
7480
f"Exception in schedule '{self.schedule_name}': "
@@ -148,7 +154,10 @@ def backfill_schedule(
148154
raise DBOSException(f"Schedule '{schedule_name}' does not exist")
149155
context = sys_db.serializer.deserialize(schedule["context"])
150156
class_name = schedule["workflow_class_name"]
151-
it = croniter(schedule["schedule"], start, second_at_beginning=True)
157+
tz_name = schedule.get("cron_timezone")
158+
tz = ZoneInfo(tz_name) if tz_name else timezone.utc
159+
start_in_tz = start.astimezone(tz)
160+
it = croniter(schedule["schedule"], start_in_tz, second_at_beginning=True)
152161
workflow_ids: list[str] = []
153162
while True:
154163
next_time = it.get_next(datetime)
@@ -222,6 +231,26 @@ def dynamic_scheduler_loop(
222231
schedule_thread.stop()
223232
del schedule_threads[schedule_id]
224233
elif schedule_thread is None:
234+
# Automatic backfill: if enabled and last_fired_at is set,
235+
# backfill missed executions before starting the thread.
236+
if schedule.get("automatic_backfill") and schedule.get("last_fired_at"):
237+
try:
238+
assert schedule["last_fired_at"]
239+
last_fired = datetime.fromisoformat(schedule["last_fired_at"])
240+
now = datetime.now(timezone.utc)
241+
if last_fired < now:
242+
backfill_schedule(
243+
dbos._sys_db,
244+
schedule["schedule_name"],
245+
last_fired,
246+
now,
247+
)
248+
except Exception:
249+
dbos_logger.warning(
250+
f"Exception during automatic backfill for "
251+
f"schedule '{schedule['schedule_name']}': "
252+
f"{traceback.format_exc()}"
253+
)
225254
schedule_threads[schedule_id] = _ScheduleThread(
226255
schedule, dbos._sys_db.serializer
227256
)

dbos/_schemas/system_database.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ def set_schema(cls, schema_name: Optional[str]) -> None:
213213
Column("schedule", Text, nullable=False),
214214
Column("status", Text, nullable=False, server_default="ACTIVE"),
215215
Column("context", Text, nullable=False),
216+
Column("last_fired_at", Text, nullable=True),
217+
Column("automatic_backfill", Boolean, nullable=False, server_default="false"),
218+
Column("cron_timezone", Text, nullable=True),
216219
)
217220

218221
application_versions = Table(

0 commit comments

Comments
 (0)