|
5 | 5 | import hashlib
|
6 | 6 | import os.path
|
7 | 7 | import re
|
8 |
| -from select import select |
| 8 | +import time |
| 9 | +from pathlib import Path |
9 | 10 | from socket import AF_UNIX, SOCK_STREAM, socket
|
10 |
| -from threading import Event, Thread |
11 |
| - |
12 |
| -from framework import utils |
| 11 | +from subprocess import Popen |
| 12 | +from threading import Thread |
13 | 13 |
|
14 | 14 | ECHO_SERVER_PORT = 5252
|
15 | 15 | SERVER_ACCEPT_BACKLOG = 128
|
|
19 | 19 | VSOCK_UDS_PATH = "v.sock"
|
20 | 20 |
|
21 | 21 |
|
22 |
| -class HostEchoServer(Thread): |
23 |
| - """A simple "echo" server for vsock. |
24 |
| -
|
25 |
| - This server will accept incoming connections (initiated by the guest vm), |
26 |
| - and, for each connection, it will read any incoming data and then echo it |
27 |
| - right back. |
28 |
| - """ |
29 |
| - |
30 |
| - def __init__(self, vm, path): |
31 |
| - """.""" |
32 |
| - super().__init__() |
33 |
| - self.vm = vm |
34 |
| - self.path = path |
35 |
| - self.sock = socket(AF_UNIX, SOCK_STREAM) |
36 |
| - self.sock.bind(path) |
37 |
| - self.sock.listen(SERVER_ACCEPT_BACKLOG) |
38 |
| - self.error = None |
39 |
| - self.clients = [] |
40 |
| - self.exit_evt = Event() |
41 |
| - |
42 |
| - # Link the listening Unix socket into the VM's jail, so that |
43 |
| - # Firecracker can connect to it. |
44 |
| - vm.create_jailed_resource(path) |
45 |
| - |
46 |
| - def run(self): |
47 |
| - """Thread code payload. |
48 |
| -
|
49 |
| - Wrap up the real "run" into a catch-all block, because Python cannot |
50 |
| - into threads - if this thread were to raise an unhandled exception, |
51 |
| - the whole process would lock. |
52 |
| - """ |
53 |
| - try: |
54 |
| - self._run() |
55 |
| - # pylint: disable=broad-except |
56 |
| - except Exception as err: |
57 |
| - self.error = err |
58 |
| - |
59 |
| - def _run(self): |
60 |
| - while not self.exit_evt.is_set(): |
61 |
| - watch_list = self.clients + [self.sock] |
62 |
| - rd_list, _, _ = select(watch_list, [], [], 1) |
63 |
| - for rdo in rd_list: |
64 |
| - if rdo == self.sock: |
65 |
| - # Read event on the listening socket: a new client |
66 |
| - # wants to connect. |
67 |
| - (client, _) = self.sock.accept() |
68 |
| - self.clients.append(client) |
69 |
| - continue |
70 |
| - # Read event on a connected socket: new data is |
71 |
| - # available from some client. |
72 |
| - buf = rdo.recv(BUF_SIZE) |
73 |
| - if not buf: |
74 |
| - # Zero-length read: connection reset by peer. |
75 |
| - self.clients.remove(rdo) |
76 |
| - continue |
77 |
| - sent = 0 |
78 |
| - while sent < len(buf): |
79 |
| - # Send back everything we just read. |
80 |
| - sent += rdo.send(buf[sent:]) |
81 |
| - |
82 |
| - def exit(self): |
83 |
| - """Shut down the echo server and wait for it to exit. |
84 |
| -
|
85 |
| - This method can be called from any thread. Upon returning, the |
86 |
| - echo server will have shut down. |
87 |
| - """ |
88 |
| - self.exit_evt.set() |
89 |
| - self.join() |
90 |
| - self.sock.close() |
91 |
| - utils.run_cmd("rm -f {}".format(self.path)) |
92 |
| - |
93 |
| - |
94 | 22 | class HostEchoWorker(Thread):
|
95 | 23 | """A vsock echo worker, connecting to a guest echo server.
|
96 | 24 |
|
@@ -199,8 +127,19 @@ def check_guest_connections(vm, server_port_path, blob_path, blob_hash):
|
199 | 127 | start `TEST_CONNECTION_COUNT` workers inside the guest VM, all
|
200 | 128 | communicating with the echo server.
|
201 | 129 | """
|
202 |
| - echo_server = HostEchoServer(vm, server_port_path) |
203 |
| - echo_server.start() |
| 130 | + |
| 131 | + echo_server = Popen( |
| 132 | + ["socat", f"UNIX-LISTEN:{server_port_path},fork,backlog=5", "exec:'/bin/cat'"] |
| 133 | + ) |
| 134 | + |
| 135 | + # Link the listening Unix socket into the VM's jail, so that |
| 136 | + # Firecracker can connect to it. |
| 137 | + attempt = 0 |
| 138 | + # But 1st, give socat a bit of time to create the socket |
| 139 | + while not Path(server_port_path).exists() and attempt < 3: |
| 140 | + time.sleep(0.2) |
| 141 | + attempt += 1 |
| 142 | + vm.create_jailed_resource(server_port_path) |
204 | 143 |
|
205 | 144 | # Increase maximum process count for the ssh service.
|
206 | 145 | # Avoids: "bash: fork: retry: Resource temporarily unavailable"
|
@@ -236,9 +175,10 @@ def check_guest_connections(vm, server_port_path, blob_path, blob_hash):
|
236 | 175 | cmd += "for w in $workers; do wait $w || exit -1; done"
|
237 | 176 |
|
238 | 177 | ecode, stdout, stderr = vm.ssh.run(cmd)
|
239 |
| - |
240 |
| - echo_server.exit() |
241 |
| - assert echo_server.error is None |
| 178 | + echo_server.terminate() |
| 179 | + rc = echo_server.wait() |
| 180 | + # socat exits with 128 + 15 (SIGTERM) |
| 181 | + assert rc == 143 |
242 | 182 |
|
243 | 183 | print(stdout.read())
|
244 | 184 | assert ecode == 0, stderr.read()
|
|
0 commit comments