Skip to content

Commit 3205166

Browse files
committed
Get subprocess test to run
1 parent ad97eee commit 3205166

File tree

5 files changed

+139
-16
lines changed

5 files changed

+139
-16
lines changed

ansible_base/task/__init__.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from ansible_base.lib.utils.db import get_pg_notify_params
2+
3+
4+
def get_config():
5+
psycopg_params = get_pg_notify_params()
6+
psycopg_params.pop('autocommit') # dispatcher automatically adds this, causes error, TODO: need pre-check
7+
psycopg_params.pop('cursor_factory')
8+
psycopg_params.pop('context') # TODO: remove in inner method, makes non-async, not good
9+
10+
return {
11+
"producers": {
12+
"brokers": {
13+
"pg_notify": psycopg_params,
14+
"channels": ["dab_broadcast"],
15+
},
16+
"scheduled": {},
17+
},
18+
"pool": {"max_workers": 4}, # TODO: to settings
19+
}

ansible_base/task/management/commands/run_dispatcher.py

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from dispatcher.main import DispatcherMain
55

6-
from ansible_base.lib.utils.db import get_pg_notify_params
6+
from ansible_base.task import get_config
77

88
from django.core.management.base import BaseCommand
99
from django.db import connection
@@ -16,21 +16,7 @@ class Command(BaseCommand):
1616
help = "Runs bug checking sanity checks, gets scale metrics, and recommendations for Role Based Access Control"
1717

1818
def handle(self, *args, **options):
19-
psycopg_params = get_pg_notify_params()
20-
psycopg_params.pop('autocommit') # dispatcher automatically adds this, causes error, TODO: need pre-check
21-
psycopg_params.pop('cursor_factory')
22-
psycopg_params.pop('context') # TODO: remove in inner method, makes non-async, not good
23-
24-
dispatcher_config = {
25-
"producers": {
26-
"brokers": {
27-
"pg_notify": psycopg_params,
28-
"channels": ["dab_broadcast"],
29-
},
30-
"scheduled": {},
31-
},
32-
"pool": {"max_workers": 4}, # TODO: to settings
33-
}
19+
dispatcher_config = get_config()
3420

3521
loop = asyncio.get_event_loop()
3622
dispatcher = DispatcherMain(dispatcher_config)

test_app/tasks.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
import time
2+
from uuid import UUID
3+
4+
from test_app.models import UUIDModel
25

36
from ansible_base.task.publish import durable_task
47

@@ -8,6 +11,11 @@ def hello_world():
811
print('hello world')
912

1013

14+
@durable_task()
15+
def create_uuid_entry(uuid: UUID):
16+
UUIDModel.objects.create(id=uuid)
17+
18+
1119
@durable_task()
1220
def sleep(seconds=2):
1321
print(f'about to sleep for {seconds} seconds')

test_app/tests/task/conftest.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import asyncio
2+
import multiprocessing
3+
import contextlib
4+
import time
5+
6+
import pytest
7+
8+
9+
from dispatcher.main import DispatcherMain
10+
11+
12+
13+
async def asyncio_target(queue_in, queue_out, config):
14+
try:
15+
dispatcher = DispatcherMain(config)
16+
17+
await dispatcher.connect_signals()
18+
await dispatcher.start_working()
19+
await dispatcher.wait_for_producers_ready()
20+
queue_out.put('ready')
21+
22+
23+
print('dispatcher server listening on queue_in')
24+
loop = asyncio.get_event_loop()
25+
message = await loop.run_in_executor(None, queue_in.get)
26+
27+
print(f'got message, will shut down: {message}')
28+
finally:
29+
await dispatcher.shutdown()
30+
await dispatcher.cancel_tasks()
31+
32+
33+
def subprocess_target(queue_in, queue_out, config):
34+
loop = asyncio.get_event_loop()
35+
try:
36+
loop.run_until_complete(asyncio_target(queue_in, queue_out, config))
37+
except Exception:
38+
import traceback
39+
40+
traceback.print_exc()
41+
# We are in a subprocess here, so even if we handle the exception
42+
# the main process will not know and still wait forever
43+
# so give them a kick on our way out
44+
print('sending error message after error')
45+
queue_out.put('error')
46+
finally:
47+
print('closing asyncio loop')
48+
loop.close()
49+
50+
51+
class SubprocessRunner:
52+
53+
def __init__(self):
54+
self.queue_in = multiprocessing.Queue()
55+
self.queue_out = multiprocessing.Queue()
56+
57+
def start_in_subprocess(self, config):
58+
process = multiprocessing.Process(target=subprocess_target, args=(self.queue_in, self.queue_out, config))
59+
process.start()
60+
return process
61+
62+
@contextlib.contextmanager
63+
def with_server(self, config):
64+
process = self.start_in_subprocess(config)
65+
msg = self.queue_out.get()
66+
if msg != 'ready':
67+
raise RuntimeError('never got ready message from subprocess')
68+
try:
69+
yield self
70+
finally:
71+
self.queue_in.put('stop')
72+
process.terminate() # SIGTERM
73+
# Poll to close process resources, due to race condition where it is not still running
74+
for i in range(3):
75+
time.sleep(0.1)
76+
try:
77+
process.close()
78+
break
79+
except Exception:
80+
if i == 2:
81+
raise
82+
83+
84+
@pytest.fixture
85+
def dispatcher_subprocess():
86+
server = SubprocessRunner()
87+
return server.with_server

test_app/tests/task/test_basic.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from uuid import uuid4
2+
import time
3+
4+
import pytest
5+
6+
from ansible_base.task import get_config
7+
8+
from test_app.tasks import create_uuid_entry
9+
from test_app.models import UUIDModel
10+
11+
12+
@pytest.mark.django_db
13+
def test_run_task(dispatcher_subprocess):
14+
with dispatcher_subprocess(get_config()) as server:
15+
my_uuid = str(uuid4())
16+
create_uuid_entry.delay(uuid=my_uuid)
17+
18+
for i in range(20):
19+
if UUIDModel.objects.filter(id=my_uuid).exists():
20+
break
21+
time.sleep(0.05)
22+
else:
23+
assert f'Task never inserted UUID entry with {my_uuid}'

0 commit comments

Comments
 (0)