Skip to content

Commit 50e08c6

Browse files
committed
Get multi-server working
1 parent 3205166 commit 50e08c6

File tree

2 files changed

+43
-11
lines changed

2 files changed

+43
-11
lines changed

test_app/tests/task/conftest.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import multiprocessing
33
import contextlib
44
import time
5+
from uuid import uuid4
56

67
import pytest
78

@@ -10,7 +11,7 @@
1011

1112

1213

13-
async def asyncio_target(queue_in, queue_out, config):
14+
async def asyncio_target(queue_in, queue_out, config, this_uuid):
1415
try:
1516
dispatcher = DispatcherMain(config)
1617

@@ -20,31 +21,31 @@ async def asyncio_target(queue_in, queue_out, config):
2021
queue_out.put('ready')
2122

2223

23-
print('dispatcher server listening on queue_in')
24+
print(f'{this_uuid} dispatcher server listening on queue_in')
2425
loop = asyncio.get_event_loop()
2526
message = await loop.run_in_executor(None, queue_in.get)
2627

27-
print(f'got message, will shut down: {message}')
28+
print(f'{this_uuid} got message, will shut down: {message}')
2829
finally:
2930
await dispatcher.shutdown()
3031
await dispatcher.cancel_tasks()
3132

3233

33-
def subprocess_target(queue_in, queue_out, config):
34+
def subprocess_target(queue_in, queue_out, config, this_uuid):
3435
loop = asyncio.get_event_loop()
3536
try:
36-
loop.run_until_complete(asyncio_target(queue_in, queue_out, config))
37+
loop.run_until_complete(asyncio_target(queue_in, queue_out, config, this_uuid))
3738
except Exception:
3839
import traceback
3940

4041
traceback.print_exc()
4142
# We are in a subprocess here, so even if we handle the exception
4243
# the main process will not know and still wait forever
4344
# so give them a kick on our way out
44-
print('sending error message after error')
45+
print(f'{this_uuid} sending error message after error')
4546
queue_out.put('error')
4647
finally:
47-
print('closing asyncio loop')
48+
print(f'{this_uuid} closing asyncio loop')
4849
loop.close()
4950

5051

@@ -54,21 +55,28 @@ def __init__(self):
5455
self.queue_in = multiprocessing.Queue()
5556
self.queue_out = multiprocessing.Queue()
5657

57-
def start_in_subprocess(self, config):
58-
process = multiprocessing.Process(target=subprocess_target, args=(self.queue_in, self.queue_out, config))
58+
def start_in_subprocess(self, config, server_uuid):
59+
process = multiprocessing.Process(target=subprocess_target, args=(self.queue_in, self.queue_out, config, server_uuid))
5960
process.start()
6061
return process
6162

6263
@contextlib.contextmanager
63-
def with_server(self, config):
64-
process = self.start_in_subprocess(config)
64+
def with_server(self, config, server_uuid=None):
65+
if server_uuid is None:
66+
server_uuid = str(uuid4())
67+
process = self.start_in_subprocess(config, server_uuid)
6568
msg = self.queue_out.get()
6669
if msg != 'ready':
6770
raise RuntimeError('never got ready message from subprocess')
6871
try:
6972
yield self
7073
finally:
7174
self.queue_in.put('stop')
75+
process.join(timeout=2)
76+
if not process.is_alive():
77+
return # exited because we told it to
78+
79+
# Subprocess did not behave
7280
process.terminate() # SIGTERM
7381
# Poll to close process resources, due to race condition where it is not still running
7482
for i in range(3):
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from uuid import uuid4
2+
import time
3+
4+
import pytest
5+
6+
from ansible_base.task import get_config
7+
8+
from test_app.tasks import create_uuid_entry
9+
from test_app.models import UUIDModel
10+
11+
12+
@pytest.mark.django_db
13+
def test_run_task_two_servers(dispatcher_subprocess):
14+
with dispatcher_subprocess(get_config(), server_uuid=1) as server1:
15+
with dispatcher_subprocess(get_config(), server_uuid=2) as server2:
16+
my_uuid = str(uuid4())
17+
create_uuid_entry.delay(uuid=my_uuid)
18+
19+
for i in range(20):
20+
if UUIDModel.objects.filter(id=my_uuid).exists():
21+
break
22+
time.sleep(0.05)
23+
else:
24+
assert f'Task never inserted UUID entry with {my_uuid}'

0 commit comments

Comments
 (0)