Skip to content

Commit 7caa3ed

Browse files
committed
Add threading to host-side Python comms server
This enables multiple layers to connect to the server and make use of host-side services.
1 parent 45cdc31 commit 7caa3ed

File tree

3 files changed

+75
-59
lines changed

3 files changed

+75
-59
lines changed

lgl_android_install.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -606,19 +606,23 @@ def configure_server(conn: ADBConnect,
606606
profile_dir: The desired output directory path for timeline. Existing
607607
files in the directory may be overwritten.
608608
'''
609+
verbose = False
610+
609611
# Create a server instance
610-
instance = server.CommsServer(0)
612+
instance = server.CommsServer(0, verbose)
611613

612614
if timeline_file:
613615
# Import late to avoid pulling in transitive deps when unused
614616
from lglpy.comms import service_gpu_timeline
615-
service_tl = service_gpu_timeline.GPUTimelineService(timeline_file)
617+
service_tl = service_gpu_timeline.GPUTimelineService(
618+
timeline_file, verbose)
616619
instance.register_endpoint(service_tl)
617620

618621
if profile_dir:
619622
# Import late to avoid pulling in transitive deps when unused
620623
from lglpy.comms import service_gpu_profile
621-
service_prof = service_gpu_profile.GPUProfileService(profile_dir)
624+
service_prof = service_gpu_profile.GPUProfileService(
625+
profile_dir, verbose)
622626
instance.register_endpoint(service_prof)
623627

624628
# Start it running

lglpy/comms/server.py

Lines changed: 63 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,23 @@
2222
# -----------------------------------------------------------------------------
2323

2424
'''
25-
This module implements the server-side communications module that can accept
26-
client connections from a layer driver, and dispatch messages to registered
27-
service handler in the server.
28-
29-
This module currently only accepts a single connection at a time and message
30-
handling is synchronous inside the server. It is therefore not possible to
31-
implement pseudo-host-driven event loops if the layer is using multiple
32-
services concurrently - this needs threads per service.
25+
This module implements the server-side of the communications module that can
26+
accept connections from client layer drivers running on the device. The
27+
protocol is service-based, and the server will dispatch messages to the
28+
registered service handler for each message channel.
29+
30+
The server is multi-threaded, allowing multiple layers to concurrently access
31+
networked services provided by host-side implementations. However, within each
32+
client connection messages are handled synchronously by a single worker thread.
33+
It is therefore not possible to implement pseudo-host-driven event loops if a
34+
layer is using multiple services concurrently - this needs threads per service
35+
endpoint which is not yet implemented.
3336
'''
3437

3538
import enum
3639
import socket
3740
import struct
41+
import threading
3842
from typing import Any, Optional
3943

4044

@@ -143,7 +147,7 @@ class CommsServer:
143147
Class listening for client connection from a layer and handling messages.
144148
145149
This implementation is designed to run in a thread, so has a run method
146-
that will setup and listen on the server socket.q
150+
that will setup and listen on the server socket.
147151
148152
This implementation only handles a single layer connection at a time, but
149153
can handle multiple connections serially without restarting.
@@ -173,7 +177,6 @@ def __init__(self, port: int, verbose: bool = False):
173177
self.register_endpoint(self)
174178

175179
self.shutdown = False
176-
self.sockd = None # type: Optional[socket.socket]
177180

178181
self.sockl = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
179182
self.sockl.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@@ -185,6 +188,9 @@ def __init__(self, port: int, verbose: bool = False):
185188
# Work out which port was assigned if not user-defined
186189
self.port = self.sockl.getsockname()[1]
187190

191+
# Pool of worker threads
192+
self.workers: list[threading.Thread] = []
193+
188194
def register_endpoint(self, endpoint: Any) -> int:
189195
'''
190196
Register a new service endpoint with the server.
@@ -235,55 +241,60 @@ def run(self) -> None:
235241
if self.verbose:
236242
print('Waiting for client connection')
237243
try:
238-
self.sockd, _ = self.sockl.accept()
244+
# Wait for a new client connection
245+
sockd, _ = self.sockl.accept()
239246
if self.verbose:
240247
print(' + Client connected')
241248

242-
self.run_client()
249+
# Spawn a worker thread for this client
250+
thread = threading.Thread(target=self.run_client, args=(sockd,))
251+
self.workers.append(thread)
252+
thread.start()
243253

244-
if self.verbose:
245-
print(' + Client disconnected')
246-
self.sockd.close()
247-
self.sockd = None
248-
249-
except ClientDropped:
250-
if self.verbose:
251-
print(' + Client disconnected')
252-
if self.sockd:
253-
self.sockd.close()
254-
self.sockd = None
254+
# Release old worker resources if they have completed
255+
self.workers = [x for x in self.workers if x.is_alive()]
255256

256257
except OSError:
257258
continue
258259

259260
self.sockl.close()
260261

261-
def run_client(self) -> None:
262+
def run_client(self, sockd: socket.socket) -> None:
262263
'''
263264
Enter client message handler run loop.
264265
265266
Raises:
266267
ClientDropped: The client disconnected from the socket.
267268
'''
268-
while not self.shutdown:
269-
# Read the header
270-
data = self.receive_data(Message.HEADER_LEN)
271-
message = Message(data)
272-
273-
# Read the payload if there is one
274-
if message.payload_size:
275-
data = self.receive_data(message.payload_size)
276-
message.add_payload(data)
277-
278-
# Dispatch to a service handler
279-
endpoint = self.endpoints[message.endpoint_id]
280-
response = endpoint.handle_message(message)
269+
try:
270+
while not self.shutdown:
271+
# Read the header
272+
data = self.receive_data(sockd, Message.HEADER_LEN)
273+
message = Message(data)
274+
275+
# Read the payload if there is one
276+
if message.payload_size:
277+
data = self.receive_data(sockd, message.payload_size)
278+
message.add_payload(data)
279+
280+
# Dispatch to a service handler
281+
endpoint = self.endpoints[message.endpoint_id]
282+
response = endpoint.handle_message(message)
283+
284+
# Send a response for all TX_RX messages
285+
if message.message_type == MessageType.TX_RX:
286+
header = Response(message, response)
287+
self.send_data(sockd, header.get_header())
288+
self.send_data(sockd, response)
289+
290+
except ClientDropped:
291+
pass
292+
293+
finally:
294+
if self.verbose:
295+
print(' + Client disconnected')
281296

282-
# Send a response for all TX_RX messages
283-
if message.message_type == MessageType.TX_RX:
284-
header = Response(message, response)
285-
self.send_data(header.get_header())
286-
self.send_data(response)
297+
sockd.close()
287298

288299
def stop(self) -> None:
289300
'''
@@ -294,14 +305,16 @@ def stop(self) -> None:
294305
if self.sockl is not None:
295306
self.sockl.close()
296307

297-
if self.sockd is not None:
298-
self.sockd.shutdown(socket.SHUT_RDWR)
308+
for worker in self.workers:
309+
worker.join()
299310

300-
def receive_data(self, size: int) -> bytes:
311+
@staticmethod
312+
def receive_data(sockd: socket.socket, size: int) -> bytes:
301313
'''
302314
Fetch a fixed size packet from the socket.
303315
304316
Args:
317+
sockd: The data socket.
305318
size: The length of the packet in bytes.
306319
307320
Returns:
@@ -310,31 +323,29 @@ def receive_data(self, size: int) -> bytes:
310323
Raises:
311324
ClientDropped: The client disconnected from the socket.
312325
'''
313-
assert self.sockd is not None
314-
315326
data = b''
316327
while len(data) < size:
317-
new_data = self.sockd.recv(size - len(data))
328+
new_data = sockd.recv(size - len(data))
318329
if not new_data:
319330
raise ClientDropped()
320331
data = data + new_data
321332

322333
return data
323334

324-
def send_data(self, data: bytes) -> None:
335+
@staticmethod
336+
def send_data(sockd: socket.socket, data: bytes) -> None:
325337
'''
326338
Send a fixed size packet to the socket.
327339
328340
Args:
341+
sockd: The data socket.
329342
data: The binary data to send.
330343
331344
Raises:
332345
ClientDropped: The client disconnected from the socket.
333346
'''
334-
assert self.sockd is not None
335-
336347
while len(data):
337-
sent_bytes = self.sockd.send(data)
348+
sent_bytes = sockd.send(data)
338349
if not sent_bytes:
339350
raise ClientDropped()
340351
data = data[sent_bytes:]

lglpy/comms/service_gpu_profile.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import enum
3131
import json
3232
import os
33-
from typing import Any, Optional, TypedDict, Union
33+
from typing import Optional, TypedDict, Union
3434

3535
from lglpy.comms.server import Message
3636

@@ -89,6 +89,7 @@ def __init__(self, dir_path: str, verbose: bool = False):
8989
dir_path: Directory to write on the filesystem
9090
verbose: Should this use verbose logging?
9191
'''
92+
del verbose
9293
self.base_dir = dir_path
9394

9495
# Sample mode is detected on the fly when we get our first data
@@ -137,7 +138,7 @@ def handle_end_frame(self, message: EndFrameMessage):
137138
# Emit the CSV file
138139
print(f'Generating CSV for frame {self.frame_id}')
139140
path = os.path.join(self.base_dir, f'frame_{self.frame_id:05d}.csv')
140-
with open(path, 'w', newline='') as handle:
141+
with open(path, 'w', newline='', encoding='utf-8') as handle:
141142
writer = csv.writer(handle)
142143
writer.writerow(self.table_header)
143144
writer.writerows(self.table_data)
@@ -249,8 +250,8 @@ def handle_frame_sample(self, message: FrameMessage):
249250
self.create_frame_data(message)
250251

251252
print(f'Updating CSV for frame {self.frame_id}')
252-
path = os.path.join(self.base_dir, f'capture.csv')
253-
with open(path, 'w', newline='') as handle:
253+
path = os.path.join(self.base_dir, 'capture.csv')
254+
with open(path, 'w', newline='', encoding='utf-8') as handle:
254255
writer = csv.writer(handle)
255256
writer.writerow(self.table_header)
256257
writer.writerows(self.table_data)

0 commit comments

Comments
 (0)