Skip to content

Commit 6ab9a3f

Browse files
committed
Fix linting issues
1 parent 91cc137 commit 6ab9a3f

File tree

12 files changed

+44
-60
lines changed

12 files changed

+44
-60
lines changed

ansible_base/task/admin.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +0,0 @@
1-
from django.contrib import admin
2-
3-
# Register your models here.

ansible_base/task/apps.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
from django.apps import AppConfig
2-
31
from dispatcher.config import setup
2+
from django.apps import AppConfig
43

54
from ansible_base.task.config import get_config
65

ansible_base/task/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ def get_config() -> dict:
1616
"channels": settings.DAB_TASK_LISTEN_QUEUES,
1717
# The default publish channel allows using .delay without more arguments
1818
# this may still have task-specific overrides
19-
"default_publish_channel": settings.DAB_TASK_ADMIN_QUEUE
19+
"default_publish_channel": settings.DAB_TASK_ADMIN_QUEUE,
2020
}
2121
},
2222
"producers": {
2323
"ScheduledProducer": {
2424
"task_schedule": {
2525
"ansible_base.task.tasks.run_task_from_queue": {"schedule": 60},
26-
"ansible_base.task.tasks.manage_lost_tasks": {"schedule": 60*10}
26+
"ansible_base.task.tasks.manage_lost_tasks": {"schedule": 60 * 10},
2727
}
2828
},
2929
},

ansible_base/task/management/commands/run_dispatcher.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import logging
22

33
from dispatcher import run_service
4-
4+
from django.core.cache import cache
55
from django.core.management.base import BaseCommand
66
from django.db import connection
7-
from django.core.cache import cache
87

98
logger = logging.getLogger(__name__)
109

ansible_base/task/models.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ class Task(models.Model):
2727
uuid = models.UUIDField(primary_key=True, default=uuid4, editable=False, help_text=_('Original dispatcher UUID generated at time of task publishing'))
2828
# TODO: change back to UUIDField, workaround dispatcher using non-UUID uuids for now
2929
# null=True
30-
wrapper_uuid = models.CharField(blank=True, default='', max_length=40, editable=False, help_text=_('Adopted UUID of DAB task app wrapper, attached when background service starts task'))
30+
wrapper_uuid = models.CharField(
31+
blank=True, default='', max_length=40, editable=False, help_text=_('Adopted UUID of DAB task app wrapper, attached when background service starts task')
32+
)
3133
state = models.CharField(
3234
choices=[(s, s.title()) for s in sorted(vars(TASK_STATES).values())],
3335
default=TASK_STATES.WAITING,
@@ -37,12 +39,8 @@ class Task(models.Model):
3739
name = models.TextField(help_text=_('Importable path for class or method'))
3840
queue = models.TextField(default='', help_text=_('The work queue this task is submitted to, only nodes listening to this queue may process the task'))
3941

40-
args = models.JSONField(
41-
default=dict, null=False, blank=True, help_text=_("The arguments for the task run.")
42-
)
43-
kwargs = models.JSONField(
44-
default=dict, null=False, blank=True, help_text=_("The keyword arguments for the task run.")
45-
)
42+
args = models.JSONField(default=dict, null=False, blank=True, help_text=_("The arguments for the task run."))
43+
kwargs = models.JSONField(default=dict, null=False, blank=True, help_text=_("The keyword arguments for the task run."))
4644

4745
created = models.DateTimeField(
4846
auto_now_add=True, help_text=_('Time the publisher (submitter) of this task call created it, approximately the time of submission as well')

ansible_base/task/publish.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from dispatcher.utils import serialize_task
2-
from django.db import transaction
32
from django.conf import settings
3+
from django.db import transaction
44

55
from ansible_base.task.models import Task
66
from ansible_base.task.tasks import run_task_from_queue

ansible_base/task/tasks.py

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,26 @@
11
import logging
22

3-
from dispatcher.utils import resolve_callable
4-
from dispatcher.publish import task
53
from dispatcher.factories import get_control_from_settings
6-
7-
from django.db import transaction
4+
from dispatcher.publish import task as dispatcher_task
5+
from dispatcher.utils import resolve_callable
86
from django.conf import settings
7+
from django.db import transaction
98
from django.utils.timezone import now, timedelta
109

1110
from ansible_base.task.models import TASK_STATES, Task
1211

1312
logger = logging.getLogger(__name__)
1413

1514

16-
@task(queue=settings.DAB_TASK_ADMIN_QUEUE, bind=True)
15+
@dispatcher_task(queue=settings.DAB_TASK_ADMIN_QUEUE, bind=True)
1716
def run_task_from_queue(dispatcher):
1817
with transaction.atomic():
19-
task = Task.objects.filter(state=TASK_STATES.WAITING, queue__in=settings.DAB_TASK_LISTEN_QUEUES).select_for_update().first()
20-
if task:
21-
task.state = TASK_STATES.RUNNING
22-
task.started_at = now()
23-
task.wrapper_uuid = str(dispatcher.uuid)
24-
task.save(update_fields=['state', 'started_at', 'wrapper_uuid'])
18+
db_task = Task.objects.filter(state=TASK_STATES.WAITING, queue__in=settings.DAB_TASK_LISTEN_QUEUES).select_for_update().first()
19+
if db_task:
20+
db_task.state = TASK_STATES.RUNNING
21+
db_task.started_at = now()
22+
db_task.wrapper_uuid = str(dispatcher.uuid)
23+
db_task.save(update_fields=['state', 'started_at', 'wrapper_uuid'])
2524
else:
2625
return
2726

@@ -30,26 +29,26 @@ def run_task_from_queue(dispatcher):
3029
run_task_from_queue.delay()
3130

3231
try:
33-
task_callable = resolve_callable(task.name)
34-
task_callable(*task.args, **task.kwargs)
32+
task_callable = resolve_callable(db_task.name)
33+
task_callable(*db_task.args, **db_task.kwargs)
3534
except Exception:
36-
logger.exception(f'Failed to run and complete {task.name}')
35+
logger.exception(f'Failed to run and complete {db_task.name}')
3736

38-
task.delete()
37+
db_task.delete()
3938

4039

41-
@task(queue=settings.DAB_TASK_ADMIN_QUEUE)
40+
@dispatcher_task(queue=settings.DAB_TASK_ADMIN_QUEUE)
4241
def manage_lost_tasks(grace_period: int = 10):
4342
cutoff_time = now() - timedelta(minutes=grace_period)
44-
for task in Task.objects.filter(state=TASK_STATES.RUNNING, started_at__lt=cutoff_time, queue__in=settings.DAB_TASK_LISTEN_QUEUES).iterator():
43+
for db_task in Task.objects.filter(state=TASK_STATES.RUNNING, started_at__lt=cutoff_time, queue__in=settings.DAB_TASK_LISTEN_QUEUES).iterator():
4544
ctl = get_control_from_settings(default_publish_channel=settings.DAB_TASK_ADMIN_QUEUE)
4645

47-
running_tasks = ctl.control_with_reply('running', data={'uuid': str(task.wrapper_uuid)})
46+
running_tasks = ctl.control_with_reply('running', data={'uuid': str(db_task.wrapper_uuid)})
4847

4948
found = False
5049
for server_reply in running_tasks:
5150
for worker_id, task_data in server_reply:
52-
if task_data.get('uuid') == str(task.wrapper_uuid):
51+
if task_data.get('uuid') == str(db_task.wrapper_uuid):
5352
found = True
5453
break
5554
if found:
@@ -59,11 +58,11 @@ def manage_lost_tasks(grace_period: int = 10):
5958
# TODO: feature of retry policy
6059
try:
6160
with transaction.atomic():
62-
task = Task.objects.select_for_update().get(uuid=task.uuid)
63-
logger.warning(f'Could not find task {task.name} {task.wrapper_uuid}, deleting entry')
64-
task.delete()
61+
db_task = Task.objects.select_for_update().get(uuid=db_task.uuid)
62+
logger.warning(f'Could not find task {db_task.name} {db_task.wrapper_uuid}, deleting entry')
63+
db_task.delete()
6564
except Task.DoesNotExist:
66-
logger.debug(f'task {task.name} uuid={task.uuid} already deleted, doing nothing')
65+
logger.debug(f'task {db_task.name} uuid={db_task.uuid} already deleted, doing nothing')
6766
else:
68-
delta = now() - task.started_at
69-
logger.info(f'Noticed {task.name} {task.wrapper_uuid} running for {delta} seconds, seems to be fine')
67+
delta = now() - db_task.started_at
68+
logger.info(f'Noticed {db_task.name} {db_task.wrapper_uuid} running for {delta} seconds, seems to be fine')

test_app/tasks.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import time
22
from uuid import UUID
33

4-
from test_app.models import UUIDModel
5-
64
from ansible_base.task.publish import durable_task
5+
from test_app.models import UUIDModel
76

87

98
@durable_task()

test_app/tests/task/conftest.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
import asyncio
2-
import multiprocessing
32
import contextlib
3+
import multiprocessing
44
import time
55
from uuid import uuid4
66

77
import pytest
8-
9-
108
from dispatcher.main import DispatcherMain
119

1210

13-
1411
async def asyncio_target(queue_in, queue_out, config, this_uuid):
1512
try:
1613
dispatcher = DispatcherMain(config)
@@ -20,7 +17,6 @@ async def asyncio_target(queue_in, queue_out, config, this_uuid):
2017
await dispatcher.wait_for_producers_ready()
2118
queue_out.put('ready')
2219

23-
2420
print(f'{this_uuid} dispatcher server listening on queue_in')
2521
loop = asyncio.get_event_loop()
2622
message = await loop.run_in_executor(None, queue_in.get)

test_app/tests/task/test_basic.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
1-
from uuid import uuid4
21
import time
2+
from uuid import uuid4
33

44
import pytest
55

66
from ansible_base.task.config import get_config
7-
8-
from test_app.tasks import create_uuid_entry
97
from test_app.models import UUIDModel
8+
from test_app.tasks import create_uuid_entry
109

1110

1211
@pytest.mark.django_db
1312
def test_run_task(dispatcher_subprocess):
14-
with dispatcher_subprocess(get_config()) as server:
13+
with dispatcher_subprocess(get_config()):
1514
my_uuid = str(uuid4())
1615
create_uuid_entry.delay(uuid=my_uuid)
1716

0 commit comments

Comments
 (0)