Skip to content

Commit 4f4a924

Browse files
fix(jobs): use MySQL server time consistently for all scheduling
- Use MySQL NOW() instead of Python datetime.now() for consistent timing - Jobs refresh: use CURRENT_TIMESTAMP default for scheduled_time (delay=0) - Jobs reserve: use MySQL NOW() for scheduled_time comparison and reserved_time - Jobs complete/error: use MySQL NOW() for completed_time - Stale/orphan timeout checks: use MySQL NOW() - INTERVAL syntax This fixes timezone mismatches between Python and MySQL that caused jobs to not be found in CI environments. Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent d778e4f commit 4f4a924

File tree

1 file changed

+27
-22
lines changed

1 file changed

+27
-22
lines changed

src/datajoint/jobs.py

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ def refresh(
348348
3. Remove stale jobs: jobs older than stale_timeout whose keys not in key_source
349349
4. Remove orphaned jobs: reserved jobs older than orphan_timeout (if specified)
350350
"""
351-
from datetime import datetime, timedelta
351+
from datetime import timedelta
352352

353353
from .settings import config
354354

@@ -363,7 +363,6 @@ def refresh(
363363
stale_timeout = config.jobs.stale_timeout
364364

365365
result = {"added": 0, "removed": 0, "orphaned": 0, "re_pended": 0}
366-
now = datetime.now()
367366

368367
# 1. Add new jobs
369368
key_source = self._target.key_source
@@ -378,14 +377,21 @@ def refresh(
378377
new_key_list = new_keys.keys()
379378

380379
if new_key_list:
381-
scheduled_time = now + timedelta(seconds=delay) if delay > 0 else now
380+
# Get MySQL server time for delayed scheduling (delay > 0 only)
381+
# When delay == 0, omit scheduled_time to use MySQL's CURRENT_TIMESTAMP default
382+
scheduled_time = None
383+
if delay > 0:
384+
server_now = self.connection.query("SELECT NOW()").fetchone()[0]
385+
scheduled_time = server_now + timedelta(seconds=delay)
386+
382387
for key in new_key_list:
383388
job_entry = {
384389
**key,
385390
"status": "pending",
386391
"priority": priority,
387-
"scheduled_time": scheduled_time,
388392
}
393+
if scheduled_time is not None:
394+
job_entry["scheduled_time"] = scheduled_time
389395
try:
390396
self.insert1(job_entry, ignore_extra_fields=True)
391397
result["added"] += 1
@@ -403,21 +409,19 @@ def refresh(
403409
self.insert1({**key, "status": "pending", "priority": priority})
404410
result["re_pended"] += 1
405411

406-
# 3. Remove stale jobs (not ignore status)
412+
# 3. Remove stale jobs (not ignore status) - use MySQL NOW() for consistent timing
407413
if stale_timeout > 0:
408-
stale_cutoff = now - timedelta(seconds=stale_timeout)
409-
old_jobs = self & f'created_time < "{stale_cutoff}"' & 'status != "ignore"'
414+
old_jobs = self & f"created_time < NOW() - INTERVAL {stale_timeout} SECOND" & 'status != "ignore"'
410415

411416
for key in old_jobs.keys():
412417
# Check if key still in key_source
413418
if not (key_source & key):
414419
(self & key).delete_quick()
415420
result["removed"] += 1
416421

417-
# 4. Handle orphaned reserved jobs
422+
# 4. Handle orphaned reserved jobs - use MySQL NOW() for consistent timing
418423
if orphan_timeout is not None and orphan_timeout > 0:
419-
orphan_cutoff = now - timedelta(seconds=orphan_timeout)
420-
orphaned_jobs = self.reserved & f'reserved_time < "{orphan_cutoff}"'
424+
orphaned_jobs = self.reserved & f"reserved_time < NOW() - INTERVAL {orphan_timeout} SECOND"
421425

422426
for key in orphaned_jobs.keys():
423427
(self & key).delete_quick()
@@ -443,21 +447,21 @@ def reserve(self, key: dict) -> bool:
443447
bool
444448
True if reservation successful, False if job not available.
445449
"""
446-
from datetime import datetime
447-
448-
# Check if job is pending and scheduled
449-
now = datetime.now()
450-
job = (self & key & 'status="pending"' & f'scheduled_time <= "{now}"').to_dicts()
450+
# Check if job is pending and scheduled (use MySQL NOW() for consistent timing)
451+
job = (self & key & 'status="pending"' & "scheduled_time <= NOW()").to_dicts()
451452

452453
if not job:
453454
return False
454455

456+
# Get MySQL server time for reserved_time
457+
server_now = self.connection.query("SELECT NOW()").fetchone()[0]
458+
455459
# Build update row with primary key and new values
456460
pk = self._get_pk(key)
457461
update_row = {
458462
**pk,
459463
"status": "reserved",
460-
"reserved_time": now,
464+
"reserved_time": server_now,
461465
"host": platform.node(),
462466
"pid": os.getpid(),
463467
"connection_id": self.connection.connection_id,
@@ -489,16 +493,16 @@ def complete(self, key: dict, duration: float | None = None) -> None:
489493
- If True: updates status to ``'success'`` with completion time and duration
490494
- If False: deletes the job entry
491495
"""
492-
from datetime import datetime
493-
494496
from .settings import config
495497

496498
if config.jobs.keep_completed:
499+
# Use MySQL server time for completed_time
500+
server_now = self.connection.query("SELECT NOW()").fetchone()[0]
497501
pk = self._get_pk(key)
498502
update_row = {
499503
**pk,
500504
"status": "success",
501-
"completed_time": datetime.now(),
505+
"completed_time": server_now,
502506
}
503507
if duration is not None:
504508
update_row["duration"] = duration
@@ -519,16 +523,17 @@ def error(self, key: dict, error_message: str, error_stack: str | None = None) -
519523
error_stack : str, optional
520524
Full stack trace.
521525
"""
522-
from datetime import datetime
523-
524526
if len(error_message) > ERROR_MESSAGE_LENGTH:
525527
error_message = error_message[: ERROR_MESSAGE_LENGTH - len(TRUNCATION_APPENDIX)] + TRUNCATION_APPENDIX
526528

529+
# Use MySQL server time for completed_time
530+
server_now = self.connection.query("SELECT NOW()").fetchone()[0]
531+
527532
pk = self._get_pk(key)
528533
update_row = {
529534
**pk,
530535
"status": "error",
531-
"completed_time": datetime.now(),
536+
"completed_time": server_now,
532537
"error_message": error_message,
533538
}
534539
if error_stack is not None:

0 commit comments

Comments
 (0)