Skip to content

Commit e808736

Browse files
authored
fix bug with workflow scheduling not getting the db time (#36)
1 parent f2e3351 commit e808736

File tree

4 files changed

+51
-5
lines changed

4 files changed

+51
-5
lines changed

yawn/utilities/database.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,11 @@ def wrapper(*args, **kwargs):
2626
'was used inside an transaction.atomic() block.'
2727

2828
return wrapper
29+
30+
31+
def current_time():
32+
"""Return the database time"""
33+
with db.connection.cursor() as cursor:
34+
cursor.execute("SELECT STATEMENT_TIMESTAMP()")
35+
row = cursor.fetchone()
36+
return row[0]

yawn/utilities/tests/test_database.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
import datetime
2+
13
import pytest
24
from django.db import connection
35

6+
from yawn.utilities import database
47
from yawn.utilities.database import close_on_exception
58

69

@@ -24,3 +27,7 @@ def example_disconnect():
2427
# but the exception is caught, and on retry the database reconnects:
2528
with connection.cursor() as cursor:
2629
cursor.execute('select 1')
30+
31+
32+
def test_current_time():
33+
assert isinstance(database.current_time(), datetime.datetime)

yawn/worker/tests/test_main.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1+
import datetime
2+
from unittest import mock
3+
4+
from django.utils import timezone
5+
16
from yawn.worker.models import Queue
2-
from yawn.worker.main import Main
7+
from yawn.worker.main import Main, State
8+
from yawn.workflow.models import WorkflowName, Workflow
39

410

511
def test_init_default_queue():
@@ -21,3 +27,27 @@ def test_init_custom_queues():
2127
queue_names = Queue.objects.filter(
2228
id__in=worker1.queue_ids).values_list('name', flat=True)
2329
assert sorted(queue_names) == queues
30+
31+
32+
@mock.patch('yawn.worker.main.time.sleep')
33+
def test_run(mock_time):
34+
# so the worker exits immediately
35+
worker = Main(1, 'test name', ['default'])
36+
37+
def set_shutdown(_):
38+
# stop the worker after one run
39+
worker.state = State.shutdown
40+
41+
mock_time.side_effect = set_shutdown
42+
worker.run()
43+
44+
45+
def test_schedule_workflows():
46+
name = WorkflowName.objects.create(name='workflow1')
47+
next_run = timezone.now() - datetime.timedelta(hours=1)
48+
workflow = Workflow.objects.create(
49+
name=name, version=1, schedule_active=True, schedule='0 0 *', next_run=next_run)
50+
worker = Main(1, 'test name', ['default'])
51+
worker.schedule_workflows()
52+
workflow.refresh_from_db()
53+
assert workflow.next_run > next_run

yawn/workflow/models.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
from django.db import models
22
from django.contrib.postgres import fields
33
from django.db.models import functions
4-
from django.utils import timezone
54

6-
from yawn.utilities import cron
5+
from yawn.utilities import cron, database
76
from yawn.utilities.cron import Crontab
87

98

@@ -41,8 +40,7 @@ class Meta:
4140
def save(self, **kwargs):
4241
if self.schedule_active:
4342
if not self.next_run:
44-
# this call uses the server time instead of the db time...
45-
self.next_run = Crontab(self.schedule).next_run(timezone.now())
43+
self.next_run = Crontab(self.schedule).next_run(database.current_time())
4644
else:
4745
self.next_run = None
4846
super().save(**kwargs)
@@ -78,6 +76,9 @@ def submit_run(self, parameters=None, scheduled_time=None):
7876
)
7977
if not template.upstream.exists():
8078
task.enqueue()
79+
80+
# refresh to get the actual DB submitted time
81+
run.refresh_from_db()
8182
return run
8283

8384
def __str__(self):

0 commit comments

Comments
 (0)