Skip to content

Commit 6ab566c

Browse files
committed
Opened the gates to Multi-threading for the AP2 server instances.
Makes things smoother for multiple HomeKit connections. Makes chasing bugs more fun/challenging.
1 parent 5acba16 commit 6ab566c

File tree

4 files changed

+86
-38
lines changed

4 files changed

+86
-38
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,17 @@ Next steps:
3939
- FairPlay v2 Support
4040
---
4141

42+
## Multiple Connections
43+
44+
Since multithreading is now enabled, this allows multiple concurrent connections. There are no safeguards
45+
built to prevent you playing multiple streams. Python multiprocessing makes this "DJ" mode a
46+
possibility but makes stream management and session management (global state data) nigh impossible. So
47+
threading is the right approach in the receiver.
48+
49+
HomeKit and other AP senders can now connect concurrently to the receiver and perform operations. This
50+
opens the path to Remote Control functionality.
51+
52+
4253
## mDNS/ZeroConf
4354

4455
If you encounter strange errors like NonUniqueNameException, or Address already in use,

ap2-receiver.py

Lines changed: 63 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import argparse
88
import tempfile
99
import multiprocessing
10+
import threading
11+
from threading import current_thread
1012

1113
import pprint
1214

@@ -111,6 +113,7 @@ def get_hex_bitmask(in_features):
111113

112114

113115
def update_status_flags(flag=None, on=False, push=True):
116+
global MDNS_OBJ
114117
""" Use this to check for and send out updated status flags
115118
if flag is None, skip changing the flags (e.g. updates already queued)
116119
if on is true, add the flag in, otherwise remove it.
@@ -298,6 +301,7 @@ class AP2Handler(http.server.BaseHTTPRequestHandler):
298301
fairplay_keymsg = None
299302
ecdh_shared_key = None
300303
session = None
304+
hap = None
301305

302306
# Maps paths to methods a la HAP-python
303307
HANDLERS = {
@@ -353,7 +357,7 @@ def dispatch(self):
353357
404,
354358
f": Method {self.command} Path {path} endpoint not implemented"
355359
)
356-
self.server.hap = None
360+
self.hap = None
357361

358362
def parse_request(self):
359363
self.raw_requestline = self.raw_requestline.replace(b"RTSP/1.0", b"HTTP/1.1")
@@ -507,10 +511,12 @@ def do_SETUP(self):
507511

508512
self.server.streams.append(streamobj)
509513

510-
event_port, self.event_proc = EventGeneric.spawn(
511-
self.server.server_address, name='events', shared_key=self.ecdh_shared_key, isDebug=DEBUG)
512-
timing_port, self.timing_proc = EventGeneric.spawn(
513-
self.server.server_address, name='ntp', shared_key=self.ecdh_shared_key, isDebug=DEBUG)
514+
if not self.server.event_proc:
515+
self.server.event_port, self.server.event_proc = EventGeneric.spawn(
516+
self.server.server_address, name='events', shared_key=self.ecdh_shared_key, isDebug=DEBUG)
517+
if not self.server.timing_proc:
518+
self.server.timing_port, self.server.timing_proc = EventGeneric.spawn(
519+
self.server.server_address, name='ntp', shared_key=self.ecdh_shared_key, isDebug=DEBUG)
514520
transport = self.headers["Transport"].split(';')
515521
res = []
516522
res.append("RTP/AVP/UDP")
@@ -551,10 +557,10 @@ def do_SETUP(self):
551557

552558
if "streams" not in plist:
553559
self.logger.debug("Sending EVENT:")
554-
event_port, self.event_proc = EventGeneric.spawn(
560+
self.server.event_port, self.server.event_proc = EventGeneric.spawn(
555561
self.server.server_address, name='events', shared_key=self.ecdh_shared_key, isDebug=DEBUG)
556-
device_setup["eventPort"] = event_port
557-
self.logger.debug(f"[+] eventPort={event_port}")
562+
device_setup["eventPort"] = self.server.event_port
563+
self.logger.debug(f"[+] eventPort={self.server.event_port}")
558564

559565
self.logger.debug(self.pp.pformat(device_setup))
560566
res = writePlistToString(device_setup)
@@ -756,11 +762,10 @@ def do_TEARDOWN(self):
756762
# Send flag that we're no longer active
757763
update_status_flags(StatusFlags.getRecvSessActive(StatusFlags))
758764

759-
# Erase the hap() instance, otherwise reconnects fail
760-
self.server.hap = None
761-
762-
if(self.timing_proc):
763-
self.timing_proc.terminate()
765+
try:
766+
self.server.timing_proc.terminate()
767+
except AttributeError:
768+
pass
764769

765770
if len(self.server.streams) == 0:
766771
session = None
@@ -938,12 +943,12 @@ def handle_pair_verify(self):
938943
def handle_pair_SV(self, op):
939944
body = self.rfile.read(int(self.headers["Content-Length"]))
940945

941-
if not self.server.hap:
942-
self.server.hap = Hap(PI, DEBUG)
946+
if not self.hap:
947+
self.hap = Hap(PI, DEBUG)
943948
if op == 'verify':
944-
res = self.server.hap.pair_verify(body)
949+
res = self.hap.pair_verify(body)
945950
elif op == 'setup':
946-
res = self.server.hap.pair_setup(body)
951+
res = self.hap.pair_setup(body)
947952

948953
self.send_response(200)
949954
self.send_header("Content-Length", len(res))
@@ -953,12 +958,12 @@ def handle_pair_SV(self, op):
953958
self.end_headers()
954959
self.wfile.write(res)
955960

956-
if self.server.hap.encrypted and self.server.hap.mfi_setup:
961+
if self.hap.encrypted and self.hap.mfi_setup:
957962
self.logger.warning('MFi setup not yet possible. Disable feature bit 51.')
958-
elif self.server.hap.encrypted:
959-
hexdump(self.server.hap.accessory_shared_key) if DEBUG else ''
960-
self.ecdh_shared_key = self.server.hap.accessory_shared_key
961-
self.upgrade_to_encrypted(self.server.hap.accessory_shared_key)
963+
elif self.hap.encrypted:
964+
hexdump(self.hap.accessory_shared_key) if DEBUG else ''
965+
self.ecdh_shared_key = self.hap.accessory_shared_key
966+
self.upgrade_to_encrypted(self.hap.accessory_shared_key)
962967

963968
def handle_pair_add(self):
964969
self.handle_pair_ARL('add')
@@ -976,11 +981,11 @@ def handle_pair_ARL(self, op):
976981
if content_len > 0:
977982
body = self.rfile.read(content_len)
978983
if op == 'add':
979-
res = self.server.hap.pair_add(body)
984+
res = self.hap.pair_add(body)
980985
elif op == 'remove':
981-
res = self.server.hap.pair_remove(body)
986+
res = self.hap.pair_remove(body)
982987
elif op == 'list':
983-
res = self.server.hap.pair_list(body)
988+
res = self.hap.pair_list(body)
984989
hexdump(res) if DEBUG else ''
985990
self.send_response(200)
986991
self.send_header("Content-Type", self.headers["Content-Type"])
@@ -1049,7 +1054,7 @@ def handle_configure(self):
10491054
HK_PW = pw
10501055
DEV_PROPS.setDevicePassword(pw)
10511056

1052-
accessory_id, accessory_ltpk = self.server.hap.configure()
1057+
accessory_id, accessory_ltpk = self.hap.configure()
10531058
configure_info = {
10541059
'Identifier': accessory_id.decode('utf-8'),
10551060
hkac_s: hkac,
@@ -1166,7 +1171,7 @@ def unregister_mdns(zeroconf, info):
11661171
zeroconf.close()
11671172

11681173

1169-
class AP2Server(socketserver.TCPServer):
1174+
class AP2Server(socketserver.ThreadingTCPServer):
11701175
# Fixes 99% of scenarios on restart after we terminate uncleanly/crash
11711176
# and port was not closed before crash (is still open).
11721177
# AP2 client connects from random port.
@@ -1176,24 +1181,46 @@ class AP2Server(socketserver.TCPServer):
11761181
def __init__(self, addr_port, handler):
11771182
super().__init__(addr_port, handler)
11781183
self.connections = {}
1179-
self.hap = None
1184+
""" Handle the HAP object here: it's not a fact that the HAP
1185+
connection is torn down, when the sessions and streams are. This means
1186+
HomeKit, RemoteControl and other niceties continue to work.
1187+
"""
1188+
self.serv_addr, self.serv_port = addr_port
1189+
# self.hap = None # thread local, not global.
1190+
self.event_proc = None
1191+
self.event_port = None
1192+
self.timing_proc = None
1193+
self.timing_port = None
11801194
self.enc_layer = False
11811195
self.streams = []
1196+
self.sessions = []
1197+
log_string = f'{self.__class__.__name__}: {self.serv_addr}:{self.serv_port}'
1198+
level = 'DEBUG' if DEBUG else 'INFO'
1199+
self.logger = get_screen_logger(log_string, level=level)
11821200

11831201
# Override
11841202
def get_request(self):
1185-
# Quick clean-up in case anything from before is still around.
1186-
self.hap = None
11871203
client_socket, client_addr = super().get_request()
1188-
SCR_LOG.info(f"Opened connection from {client_addr[0]}:{client_addr[1]}")
1204+
self.logger.info(f"Opened connection from {client_addr[0]}:{client_addr[1]}")
11891205
self.connections[client_addr] = client_socket
11901206
return (client_socket, client_addr)
11911207

11921208
def upgrade_to_encrypted(self, client_address, shared_key):
11931209
client_socket = self.connections[client_address]
1194-
hap_socket = HAPSocket(client_socket, shared_key)
1195-
self.connections[client_address] = hap_socket
1196-
return hap_socket
1210+
self.hap_socket = HAPSocket(client_socket, shared_key)
1211+
self.logger.info(f"{current_thread().name}: Opened HAPSocket from {client_address[0]}:{client_address[1]}")
1212+
self.connections[client_address] = self.hap_socket
1213+
return self.hap_socket
1214+
1215+
# Override
1216+
def server_close(self):
1217+
if self.logger:
1218+
self.logger.debug('Removing AP2Server object.')
1219+
self.hap = None
1220+
self.hap_socket = None
1221+
self.streams.clear()
1222+
self.logger = None
1223+
self.shutdown()
11971224

11981225

11991226
def list_network_interfaces():
@@ -1362,15 +1389,15 @@ def list_available_flags():
13621389
PORT = 7000
13631390
if IPV6 and not IPV4:
13641391
with AP2Server((IPV6, PORT), AP2Handler) as httpd:
1365-
SCR_LOG.info(f"serving at port {PORT}")
13661392
IPADDR_BIN = IP6ADDR_BIN
13671393
IPADDR = IPV6
1394+
SCR_LOG.info(f"serving on {IPADDR}:{PORT}")
13681395
httpd.serve_forever()
13691396
else: # i.e. (IPV4 and not IPV6) or (IPV6 and IPV4)
13701397
with AP2Server((IPV4, PORT), AP2Handler) as httpd:
1371-
SCR_LOG.info(F"serving at port {PORT}")
13721398
IPADDR_BIN = IP4ADDR_BIN
13731399
IPADDR = IPV4
1400+
SCR_LOG.info(f"serving on {IPADDR}:{PORT}")
13741401
httpd.serve_forever()
13751402

13761403
except KeyboardInterrupt:

ap2/connections/event.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import socket
22
import multiprocessing
33
import os
4+
import time
45

56
from ..utils import get_file_logger, get_free_port
67

@@ -35,10 +36,14 @@ def serve(self):
3536
event_file = open(self.file, "wb")
3637
try:
3738
while True:
38-
data = conn.recv(1)
39+
data = conn.recv(1, socket.MSG_WAITALL)
3940
if data:
4041
event_file.write(data)
4142
pass
43+
else:
44+
# This while loop can run away.
45+
break
46+
raise KeyboardInterrupt
4247
except KeyboardInterrupt:
4348
pass
4449
finally:

ap2/utils.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@
8080
'level': 'DEBUG',
8181
'propagate': False,
8282
},
83+
'Receiver*': {
84+
'handlers': ['console'],
85+
'level': 'DEBUG',
86+
'propagate': False,
87+
},
8388
'RTPBuffer': {
8489
'handlers': ['console'],
8590
'level': 'DEBUG',
@@ -123,7 +128,7 @@ def get_file_logger(name, level="INFO"):
123128
def get_screen_logger(name, level="INFO"):
124129
logger = logging.getLogger(name)
125130
logger.setLevel(level)
126-
logger.propagate = False
131+
# logger.propagate = False
127132
if level == 'DEBUG':
128133
print(f'[{name}] logging level: {level}')
129134
return logger

0 commit comments

Comments
 (0)