forked from saltstack/salt
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_request_server.py
More file actions
60 lines (46 loc) · 1.64 KB
/
test_request_server.py
File metadata and controls
60 lines (46 loc) · 1.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import asyncio
import pytest
import salt.transport
import salt.utils.process
async def test_request_server(
io_loop, minion_opts, master_opts, transport, process_manager
):
minion_opts["transport"] = master_opts["transport"] = transport
# Needed by tcp transport's RequestClient
minion_opts["master_uri"] = (
f"tcp://{master_opts['interface']}:{master_opts['ret_port']}"
)
req_server = salt.transport.request_server(master_opts)
req_server.pre_fork(process_manager)
reqmsg = {"req": "test"}
repmsg = {"result": "success"}
requests = []
async def handler(message):
requests.append(message)
return repmsg
req_server.post_fork(handler, io_loop)
if transport == "ws":
start_server_timeout = 10
while start_server_timeout:
start_server_timeout -= 1
if req_server._started.is_set():
break
await asyncio.sleep(0.5)
else:
pytest.fail("Failed to start req server after 5 seconds")
req_client = salt.transport.request_client(minion_opts, io_loop)
try:
ret = await req_client.send({"req": "test"})
if transport == "tcp":
assert req_client.task is not None
assert [reqmsg] == requests
assert repmsg == ret
finally:
req_client.close()
if transport == "tcp":
# Ensure that issue 68277 is fixed
assert req_client.task is None or req_client.task.done()
assert req_client._closing
req_server.close()
# Yield to loop in order to allow background close methods to finish.
await asyncio.sleep(0.3)