Skip to content

Commit 7b41571

Browse files
Add airplay volume synchronizer
1 parent 97b4be1 commit 7b41571

File tree

4 files changed

+165
-20
lines changed

4 files changed

+165
-20
lines changed

amplipi/streams/airplay.py

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
import time
88
import os
99
import io
10+
import sys
11+
import threading
12+
import json
1013

1114

1215
def write_sp_config_file(filename, config):
@@ -42,6 +45,29 @@ def __init__(self, name: str, ap2: bool, disabled: bool = False, mock: bool = Fa
4245
self._connect_time = 0.0
4346
self._coverart_dir = ''
4447
self._log_file: Optional[io.TextIOBase] = None
48+
self.src_config_folder: Optional[str] = None
49+
self.volume_watcher_process: Optional[threading.Thread] = None # Populates the fifo that the vol sync script depends on
50+
self.volume_sync_process: Optional[subprocess.Popen] = None
51+
self._volume_fifo: Optional[str] = None
52+
53+
def watch_vol(self):
54+
"""Creates and supplies a FIFO with volume data for volume sync"""
55+
while True:
56+
try:
57+
if self.src is not None:
58+
if self._volume_fifo is None and self.src_config_folder is not None:
59+
fifo_path = f"{self.src_config_folder}/vol"
60+
if not os.path.isfile(fifo_path):
61+
os.mkfifo(fifo_path)
62+
self._volume_fifo = os.open(fifo_path, os.O_WRONLY, os.O_NONBLOCK)
63+
data = json.dumps({
64+
'zones': self.connected_zones,
65+
'volume': self.volume,
66+
})
67+
os.write(self._volume_fifo, bytearray(f"{data}\r\n", encoding="utf8"))
68+
except Exception as e:
69+
logger.error(f"{self.name} volume thread ran into exception: {e}")
70+
time.sleep(0.1)
4571

4672
def reconfig(self, **kwargs):
4773
self.validate_stream(**kwargs)
@@ -71,9 +97,9 @@ def _activate(self, vsrc: int):
7197
logger.info(f'Another Airplay 2 stream is already in use, unable to start {self.name}, mocking connection')
7298
return
7399

74-
src_config_folder = f'{utils.get_folder("config")}/srcs/v{vsrc}'
100+
self.src_config_folder = f'{utils.get_folder("config")}/srcs/v{vsrc}'
75101
try:
76-
os.remove(f'{src_config_folder}/currentSong')
102+
os.remove(f'{self.src_config_folder}/currentSong')
77103
except FileNotFoundError:
78104
pass
79105
self._connect_time = time.time()
@@ -86,9 +112,9 @@ def _activate(self, vsrc: int):
86112
'name': self.name,
87113
'port': 5100 + 100 * vsrc, # Listen for service requests on this port
88114
'udp_port_base': 6101 + 100 * vsrc, # start allocating UDP ports from this port number when needed
89-
'drift': 2000, # allow this number of frames of drift away from exact synchronisation before attempting to correct it
90-
'resync_threshold': 0, # a synchronisation error greater than this will cause resynchronisation; 0 disables it
91-
'log_verbosity': 0, # "0" means no debug verbosity, "3" is most verbose.
115+
'drift_in_seconds': 2, # allow this number of frames of drift away from exact synchronisation before attempting to correct it
116+
'resync_threshold_in_seconds': 0, # a synchronisation error greater than this will cause resynchronisation; 0 disables it
117+
'log_verbosity': "diagnostics", # "none" means no debug verbosity, "diagnostics" is most verbose.
92118
'mpris_service_bus': 'Session',
93119
},
94120
'metadata': {
@@ -99,7 +125,7 @@ def _activate(self, vsrc: int):
99125
'alsa': {
100126
'output_device': utils.virtual_output_device(vsrc), # alsa output device
101127
# If set too small, buffer underflow occurs on low-powered machines. Too long and the response times with software mixer become annoying.
102-
'audio_backend_buffer_desired_length': 11025
128+
'audio_backend_buffer_desired_length': 11025,
103129
},
104130
}
105131

@@ -109,10 +135,10 @@ def _activate(self, vsrc: int):
109135
except FileNotFoundError:
110136
pass
111137
os.makedirs(self._coverart_dir, exist_ok=True)
112-
os.makedirs(src_config_folder, exist_ok=True)
113-
config_file = f'{src_config_folder}/shairport.conf'
138+
os.makedirs(self.src_config_folder, exist_ok=True)
139+
config_file = f'{self.src_config_folder}/shairport.conf'
114140
write_sp_config_file(config_file, config)
115-
self._log_file = open(f'{src_config_folder}/log', mode='w')
141+
self._log_file = open(f'{self.src_config_folder}/log', mode='w')
116142
shairport_args = f"{utils.get_folder('streams')}/shairport-sync{'-ap2' if self.ap2 else ''} -c {config_file}".split(' ')
117143
logger.info(f'shairport_args: {shairport_args}')
118144

@@ -125,7 +151,15 @@ def _activate(self, vsrc: int):
125151
# shairport sync only adds the pid to the mpris name if it cannot use the default name
126152
if len(os.popen("pgrep shairport-sync").read().strip().splitlines()) > 1:
127153
mpris_name += f".i{self.proc.pid}"
128-
self.mpris = MPRIS(mpris_name, f'{src_config_folder}/metadata.txt')
154+
self.mpris = MPRIS(mpris_name, f'{self.src_config_folder}/metadata.txt')
155+
156+
vol_sync = f"{utils.get_folder('streams')}/shairport_volume_handler.py"
157+
vol_args = [sys.executable, vol_sync, mpris_name, f"{utils.get_folder('config')}/srcs/v{self.vsrc}"]
158+
159+
logger.info(f'{self.name}: starting vol synchronizer: {vol_args}')
160+
self.volume_watcher_process = threading.Thread(target=self.watch_vol, daemon=True)
161+
self.volume_watcher_process.start()
162+
self.volume_sync_process = subprocess.Popen(args=vol_args, stdout=self._log_file, stderr=self._log_file)
129163
except Exception as exc:
130164
logger.exception(f'Error starting airplay MPRIS reader: {exc}')
131165

@@ -135,12 +169,22 @@ def _deactivate(self):
135169
self.mpris = None
136170
if self._is_running():
137171
self.proc.stdin.close()
172+
138173
logger.info('stopping shairport-sync')
139174
self.proc.terminate()
175+
if self.volume_sync_process is not None:
176+
self.volume_sync_process.terminate()
177+
140178
if self.proc.wait(1) != 0:
141179
logger.info('killing shairport-sync')
142180
self.proc.kill()
143181
self.proc.communicate()
182+
183+
if self.volume_sync_process is not None:
184+
if self.volume_sync_process.wait(1) != 0:
185+
logger.info('killing shairport vol sync')
186+
self.volume_sync_process.kill()
187+
144188
if '_log_file' in self.__dir__() and self._log_file:
145189
self._log_file.close()
146190
if self.src:
@@ -149,7 +193,11 @@ def _deactivate(self):
149193
except Exception as e:
150194
logger.exception(f'Error removing airplay config files: {e}')
151195
self._disconnect()
196+
152197
self.proc = None
198+
self.volume_sync_process = None
199+
self.volume_watcher_process = None
200+
self._volume_fifo = None
153201

154202
def info(self) -> models.SourceInfo:
155203
source = models.SourceInfo(
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
"""Script for synchronizing AmpliPi and Spotify volumes"""
2+
import argparse
3+
from time import sleep
4+
from dasbus.connection import SessionMessageBus
5+
from dasbus.typing import Variant
6+
from volume_synchronizer import VolSyncDispatcher, StreamWatcher, VolEvents
7+
8+
9+
class ShairportWatcher(StreamWatcher):
10+
"""A class that watches and tracks changes to airplay-side volume"""
11+
12+
def __init__(self, service_suffix: str):
13+
super().__init__()
14+
self.mpris = SessionMessageBus().get_proxy(
15+
service_name=f"org.mpris.MediaPlayer2.{service_suffix}",
16+
object_path="/org/mpris/MediaPlayer2",
17+
interface_name="org.mpris.MediaPlayer2.Player"
18+
)
19+
20+
self.dbus = SessionMessageBus().get_proxy(
21+
service_name=f"org.mpris.MediaPlayer2.{service_suffix}",
22+
object_path="/org/mpris/MediaPlayer2",
23+
interface_name="org.freedesktop.DBus.Properties"
24+
)
25+
26+
async def watch_vol(self):
27+
"""Watch the shairport mpris stream for volume changes and update amplipi volume info accordingly"""
28+
while True:
29+
try:
30+
if self.volume != self.mpris.Volume:
31+
self.logger.debug(f"Airplay volume changed from {self.volume} to {self.mpris.Volume}")
32+
self.volume = float(self.mpris.Volume)
33+
self.schedule_event(VolEvents.CHANGE_AMPLIPI)
34+
# self.delta = self.mpris.Volume - self.volume
35+
36+
except Exception as e:
37+
self.logger.exception(f"Error: {e}")
38+
return
39+
sleep(0.1)
40+
41+
def set_vol(self, amplipi_volume: float, vol_set_point: float) -> float: # This has unused variable vol_set_point to keep up with the underlying StreamData.set_vol function schema
42+
"""Update Airplay's volume slider to match AmpliPi"""
43+
try:
44+
# Airplay does not allow external devices to set the volume of a users system
45+
46+
# Airplay is a fully authoritative volume source, meaning it forces amplipi volume to equal its volume now. If that ever changes, this will be relevant:
47+
# There are two values this could realistically be returned and become the new vol_set_point, and they each have their own drawbacks:
48+
49+
# amplipi_volume: If amplipi_volume is the new set point, any changes to airplay volume will send the volume to an odd
50+
# spot as it just sets the vol average of amplipi to be the same as the value of airplay's vol
51+
52+
# vol_set_point: if vol_set_point is retained as the set point, any changes to amplipi will reflect for 1-2 seconds at most and then
53+
# bounce back to where it had been, resulting in a glitchy front end interface
54+
55+
# In any future MPRIS based volume synchronizers, you can check if self.mpris.CanControl is true and then potentially directly set self.mpris.Volume
56+
# Note that we cannot do this due to this line: <property name='Volume' type='d' access='read'/>
57+
# That exists in the MPRIS config xml at https://github.com/mikebrady/shairport-sync/blob/master/org.mpris.MediaPlayer2.xml
58+
59+
# self.dbus.Set(
60+
# 'org.mpris.MediaPlayer2',
61+
# 'Volume',
62+
# Variant("d", amplipi_volume)
63+
# )
64+
65+
return amplipi_volume
66+
except Exception as e:
67+
self.logger.exception(f"Exception: {e}")
68+
69+
70+
if __name__ == "__main__":
71+
72+
parser = argparse.ArgumentParser(description="Read metadata from a given URL and write it to a file.")
73+
74+
parser.add_argument("service_suffix", help="Name of mpris instance", type=str)
75+
parser.add_argument("config_dir", help="The directory of the vsrc config", type=str)
76+
parser.add_argument("--debug", action="store_true", help="Change log level from WARNING to DEBUG")
77+
78+
args = parser.parse_args()
79+
80+
handler = VolSyncDispatcher(ShairportWatcher(service_suffix=args.service_suffix), args.config_dir, args.debug)

streams/spotify_volume_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ class SpotifyWatcher(StreamWatcher):
2222

2323
def __init__(self, api_port: int):
2424
super().__init__()
25-
2625
self.api_port: int = api_port
2726
"""What port is go-librespot running on? Typically set to 3678 + vsrc."""
2827

2928
async def watch_vol(self):
3029
"""Watch the go-librespot websocket endpoint for volume change events and update AmpliPi volume info accordingly"""
3130
try:
31+
# Connect to the websocket and listen for state changes
3232
# pylint: disable=E1101
3333
# E1101: Module 'websockets' has no 'connect' member (no-member)
3434
async with websockets.connect(f"ws://localhost:{self.api_port}/events", open_timeout=5) as websocket:

streams/volume_synchronizer.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def __init__(self):
3232
self._volume: float = None
3333
"""Value between 0 and 1, or None if not yet initialized by the upstream"""
3434

35+
self.delta: Optional[float] = None
3536
self.logger: logging.Logger
3637
"""logging.Logger instance provided by VolSyncDispatcher"""
3738

@@ -73,7 +74,7 @@ def __init__(self, config_dir: str, schedule_event: Callable, logger: logging.Lo
7374
"""Event scheduler function provided by VolSyncDispatcher, has limited valid inputs that can be seen in the VolEvents enum"""
7475

7576
self.logger: logging.Logger = logger
76-
self.volume: float = None
77+
self.volume: Optional[float] = None
7778
self.config_dir: str = config_dir
7879

7980
self.connected_zones: List[int] = []
@@ -87,13 +88,16 @@ def get_vol(self):
8788
Read the volume FIFO from .config/amplipi/srcs/v{vsrc}/vol to load the currently connected zones and the averaged volume of them
8889
If the read volume is different than the previous volume, send a volume change event to the stream
8990
"""
90-
with open(f'{self.config_dir}/vol', 'r') as fifo:
91-
while True:
92-
data = json.loads(fifo.readline().strip())
93-
if self.volume != data["volume"]:
94-
self.volume = data["volume"]
95-
self.schedule_event(VolEvents.CHANGE_STREAM)
96-
self.connected_zones = data["zones"]
91+
try:
92+
with open(f'{self.config_dir}/vol', 'r') as fifo:
93+
while True:
94+
data = json.loads(fifo.readline().strip())
95+
if self.volume != data["volume"]:
96+
self.volume = data["volume"]
97+
self.schedule_event(VolEvents.CHANGE_STREAM)
98+
self.connected_zones = data["zones"]
99+
except Exception as e:
100+
self.logger.exception(f"Error while getting writing to {self.config_dir}/vol fifo: {e}")
97101

98102
def set_vol(self, stream_volume: float, vol_set_point: float):
99103
"""Update AmpliPi's volume to match the stream volume"""
@@ -106,6 +110,13 @@ def set_vol(self, stream_volume: float, vol_set_point: float):
106110
return vol_set_point
107111

108112
delta = float(stream_volume - self.volume)
113+
return self.set_vol_delta(delta)
114+
except Exception as e:
115+
self.logger.exception(f"Exception: {e}")
116+
117+
def set_vol_delta(self, delta: float):
118+
"""Update AmpliPi's volume by delta"""
119+
try:
109120
expected_volume = self.volume + delta
110121
self.logger.debug(f"Setting AmpliPi volume to {expected_volume} from {self.volume}")
111122
requests.patch(
@@ -185,7 +196,13 @@ def event_loop(self):
185196

186197
event = self.event_queue.get()
187198
if event == VolEvents.CHANGE_AMPLIPI:
188-
self.vol_set_point = self.amplipi.set_vol(self.stream.volume, self.vol_set_point)
199+
if self.stream.delta is not None:
200+
# Reduce race condition potential by decoupling the value from the variable
201+
delta = float(self.stream.delta)
202+
self.vol_set_point = self.amplipi.set_vol_delta(delta)
203+
self.stream.delta -= delta
204+
else:
205+
self.vol_set_point = self.amplipi.set_vol(self.stream.volume, self.vol_set_point)
189206
elif event == VolEvents.CHANGE_STREAM:
190207
self.vol_set_point = self.stream.set_vol(self.amplipi.volume, self.vol_set_point)
191208
except queue.Empty:

0 commit comments

Comments
 (0)