Skip to content

Commit 6cbee9f

Browse files
Add spotify volume synchronization
1 parent 9efc33d commit 6cbee9f

File tree

7 files changed

+374
-21
lines changed

7 files changed

+374
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* System
55
* Update our spotify provider `go-librespot` to `0.7.1`
66
* Upgraded volume calculations to preserve relative positions when hitting the min or max setting via source volume bar
7+
* Added volume matching between AmpliPi and Spotify and vice-versa
78
* Web App
89
* Changed caching rules to ensure that users don't get stuck with old versions of the webapp post update
910

amplipi/streams/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
def build_stream(stream: models.Stream, mock: bool = False, validate: bool = True) -> AnyStream:
6666
""" Build a stream from the generic arguments given in stream, discriminated by stream.type
6767
68-
we are waiting on Pydantic's implemenatation of discriminators to fully integrate streams into our model definitions
68+
we are waiting on Pydantic's implementation of discriminators to fully integrate streams into our model definitions
6969
"""
7070
# pylint: disable=too-many-return-statements
7171
args = stream.dict(exclude_none=True)

amplipi/streams/base_streams.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
from amplipi import models
77
from amplipi import utils
8+
from amplipi import app
89

910
logger = logging.getLogger(__name__)
1011
logger.level = logging.DEBUG
@@ -62,6 +63,24 @@ def __init__(self, stype: str, name: str, only_src=None, disabled: bool = False,
6263
if validate:
6364
self.validate_stream(name=name, mock=mock, **kwargs)
6465

66+
def get_zone_data(self):
67+
if self.src is not None:
68+
ctrl = app.get_ctrl()
69+
state = ctrl.get_state()
70+
return [zone for zone in state.zones if zone.source_id == self.src]
71+
72+
@property
73+
def connected_zones(self) -> List[int]:
74+
connected_zones = self.get_zone_data()
75+
return [zone.id for zone in connected_zones]
76+
77+
@property
78+
def volume(self) -> float:
79+
connected_zones = self.get_zone_data()
80+
if connected_zones:
81+
return sum([zone.vol_f for zone in connected_zones]) / len(connected_zones)
82+
return 0
83+
6584
def __del__(self):
6685
self.disconnect()
6786

@@ -242,7 +261,7 @@ def deactivate(self):
242261
raise Exception(f'Failed to deactivate {self.name}: {e}') from e
243262
finally:
244263
self.state = "disconnected" # make this look like a normal stream for now
245-
if 'vsrc' in self.__dir__() and self.vsrc:
264+
if 'vsrc' in self.__dir__() and self.vsrc is not None:
246265
vsrc = self.vsrc
247266
self.vsrc = None
248267
vsources.free(vsrc)

amplipi/streams/spotify_connect.py

Lines changed: 68 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,27 @@
22

33
import io
44
import os
5+
import threading
56
import re
67
import sys
78
import subprocess
89
import time
910
from typing import ClassVar, Optional
1011
import yaml
12+
import logging
13+
import json
1114
from amplipi import models, utils
12-
from .base_streams import PersistentStream, InvalidStreamField, logger
15+
from .base_streams import PersistentStream, InvalidStreamField
1316
from .. import tasks
1417

1518
# Our subprocesses run behind the scenes, is there a more standard way to do this?
1619
# pylint: disable=consider-using-with
1720

21+
logger = logging.getLogger(__name__)
22+
logger.level = logging.DEBUG
23+
sh = logging.StreamHandler(sys.stdout)
24+
logger.addHandler(sh)
25+
1826

1927
class SpotifyConnect(PersistentStream):
2028
""" A SpotifyConnect Stream based off librespot-go """
@@ -33,9 +41,30 @@ def __init__(self, name: str, disabled: bool = False, mock: bool = False, valida
3341
self._log_file: Optional[io.TextIOBase] = None
3442
self._api_port: int
3543
self.proc2: Optional[subprocess.Popen] = None
44+
self.volume_sync_process: Optional[subprocess.Popen] = None # Runs the actual vol sync script
45+
self.volume_watcher_process: Optional[threading.Thread] = None # Populates the fifo that the vol sync script depends on
46+
self.src_config_folder: Optional[str] = None
3647
self.meta_file: str = ''
37-
self.max_volume: int = 100 # default configuration from 'volume_steps'
38-
self.last_volume: float = 0
48+
self._volume_fifo = None
49+
50+
def watch_vol(self):
51+
"""Creates and supplies a FIFO with volume data for volume sync"""
52+
while True:
53+
try:
54+
if self.src is not None:
55+
if self._volume_fifo is None and self.src_config_folder is not None:
56+
fifo_path = f"{self.src_config_folder}/vol"
57+
if not os.path.isfile(fifo_path):
58+
os.mkfifo(fifo_path)
59+
self._volume_fifo = os.open(fifo_path, os.O_WRONLY, os.O_NONBLOCK)
60+
data = json.dumps({
61+
'zones': self.connected_zones,
62+
'volume': self.volume,
63+
})
64+
os.write(self._volume_fifo, bytearray(f"{data}\r\n", encoding="utf8"))
65+
except Exception as e:
66+
logger.error(f"{self.name} volume thread ran into exception: {e}")
67+
time.sleep(0.1)
3968

4069
def reconfig(self, **kwargs):
4170
self.validate_stream(**kwargs)
@@ -52,9 +81,9 @@ def _activate(self, vsrc: int):
5281
""" Connect to a given audio source
5382
"""
5483

55-
src_config_folder = f'{utils.get_folder("config")}/srcs/v{vsrc}'
84+
self.src_config_folder = f'{utils.get_folder("config")}/srcs/v{vsrc}'
5685
try:
57-
os.remove(f'{src_config_folder}/currentSong')
86+
os.remove(f'{self.src_config_folder}/currentSong')
5887
except FileNotFoundError:
5988
pass
6089
self._connect_time = time.time()
@@ -78,16 +107,16 @@ def _activate(self, vsrc: int):
78107
}
79108

80109
# make all of the necessary dir(s) & files
81-
os.makedirs(src_config_folder, exist_ok=True)
110+
os.makedirs(self.src_config_folder, exist_ok=True)
82111

83-
config_file = f'{src_config_folder}/config.yml'
112+
config_file = f'{self.src_config_folder}/config.yml'
84113
with open(config_file, 'w', encoding='utf8') as f:
85114
f.write(yaml.dump(config))
86115

87-
self.meta_file = f'{src_config_folder}/metadata.json'
116+
self.meta_file = f'{self.src_config_folder}/metadata.json'
88117

89-
self._log_file = open(f'{src_config_folder}/log', mode='w', encoding='utf8')
90-
player_args = f"{utils.get_folder('streams')}/go-librespot --config_dir {src_config_folder}".split(' ')
118+
self._log_file = open(f'{self.src_config_folder}/log', mode='w', encoding='utf8')
119+
player_args = f"{utils.get_folder('streams')}/go-librespot --config_dir {self.src_config_folder}".split(' ')
91120
logger.debug(f'spotify player args: {player_args}')
92121

93122
self.proc = subprocess.Popen(args=player_args, stdin=subprocess.PIPE,
@@ -99,20 +128,45 @@ def _activate(self, vsrc: int):
99128
logger.info(f'{self.name}: starting metadata reader: {meta_args}')
100129
self.proc2 = subprocess.Popen(args=meta_args, stdout=self._log_file, stderr=self._log_file)
101130

131+
vol_sync = f"{utils.get_folder('streams')}/spotify_volume_handler.py"
132+
vol_args = [sys.executable, vol_sync, str(self._api_port), self.src_config_folder, "--debug"]
133+
logger.info(f'{self.name}: starting vol synchronizer: {vol_args}')
134+
self.volume_sync_process = subprocess.Popen(args=vol_args, stdout=self._log_file, stderr=self._log_file)
135+
136+
self.volume_watcher_process = threading.Thread(target=self.watch_vol, daemon=True)
137+
self.volume_watcher_process.start()
138+
102139
def _deactivate(self):
103140
if self._is_running():
104141
self.proc.stdin.close()
105142
logger.info(f'{self.name}: stopping player')
143+
144+
# Call terminate on all processes
106145
self.proc.terminate()
107146
self.proc2.terminate()
147+
if self.volume_sync_process:
148+
self.volume_sync_process.terminate()
149+
150+
# Ensure the processes have closed, by force if necessary
108151
if self.proc.wait(1) != 0:
109152
logger.info(f'{self.name}: killing player')
110153
self.proc.kill()
154+
111155
if self.proc2.wait(1) != 0:
112156
logger.info(f'{self.name}: killing metadata reader')
113157
self.proc2.kill()
158+
159+
if self.volume_sync_process:
160+
if self.volume_sync_process.wait(1) != 0:
161+
logger.info(f'{self.name}: killing volume synchronizer')
162+
self.volume_sync_process.kill()
163+
164+
# Validate on the way out
114165
self.proc.communicate()
115166
self.proc2.communicate()
167+
if self.volume_sync_process:
168+
self.volume_sync_process.communicate()
169+
116170
if self.proc and self._log_file: # prevent checking _log_file when it may not exist, thanks validation!
117171
self._log_file.close()
118172
if self.src:
@@ -121,8 +175,12 @@ def _deactivate(self):
121175
except Exception as e:
122176
logger.exception(f'{self.name}: Error removing config files: {e}')
123177
self._disconnect()
178+
124179
self.proc = None
125180
self.proc2 = None
181+
self.volume_sync_process = None
182+
self.volume_watcher_process = None
183+
self._volume_fifo = None
126184

127185
def info(self) -> models.SourceInfo:
128186
source = models.SourceInfo(
@@ -190,10 +248,3 @@ def validate_stream(self, **kwargs):
190248
NAME = r"[a-zA-Z0-9][A-Za-z0-9\- ]*[a-zA-Z0-9]"
191249
if 'name' in kwargs and not re.fullmatch(NAME, kwargs['name']):
192250
raise InvalidStreamField("name", "Invalid stream name")
193-
194-
def sync_volume(self, volume: float) -> None:
195-
""" Set the volume of amplipi to the Spotify Connect stream"""
196-
if volume != self.last_volume:
197-
url = f"http://localhost:{self._api_port}/"
198-
self.last_volume = volume # update last_volume for future syncs
199-
tasks.post.delay(url + 'volume', data={'volume': int(volume * self.max_volume)})

streams/spotify_volume_handler.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
"""Script for synchronizing AmpliPi and Spotify volumes"""
2+
import argparse
3+
import json
4+
import logging
5+
import sys
6+
7+
import websockets
8+
import requests
9+
10+
from volume_synchronizer import VolSyncDispatcher, StreamWatcher, VolEvents
11+
from spot_connect_meta import Event
12+
13+
14+
class SpotifyWatcher(StreamWatcher):
15+
"""A class that watches and tracks changes to spotify-side volume"""
16+
17+
def __init__(self, api_port: int):
18+
super().__init__()
19+
20+
self.api_port: int = api_port
21+
"""What port is go-librespot running on? Typically set to 3678 + vsrc."""
22+
23+
async def watch_vol(self):
24+
"""Watch the go-librespot websocket endpoint for volume change events and update AmpliPi volume info accordingly"""
25+
try:
26+
# pylint: disable=E1101
27+
# E1101: Module 'websockets' has no 'connect' member (no-member)
28+
async with websockets.connect(f"ws://localhost:{self.api_port}/events", open_timeout=5) as websocket:
29+
while True:
30+
try:
31+
msg = await websocket.recv()
32+
event = Event.from_json(json.loads(msg))
33+
if event.event_type == "volume":
34+
last_volume = float(self.volume) if self.volume is not None else None
35+
self.volume = event.data.value / 100 # Translate spotify volume (0 - 100) to amplipi volume (0 - 1)
36+
37+
self.logger.debug(f"Spotify volume changed from {last_volume} to {self.volume}")
38+
if last_volume is not None and self.volume != last_volume:
39+
self.schedule_event(VolEvents.CHANGE_AMPLIPI)
40+
elif event.event_type == "will_play" and self.volume is None:
41+
self.schedule_event(VolEvents.CHANGE_STREAM) # Intercept the event that occurs when a song starts playing and use that as a trigger for the initial state sync
42+
43+
except Exception as e:
44+
self.logger.exception(f"Error: {e}")
45+
return
46+
except Exception as e:
47+
self.logger.exception(f"Error: {e}")
48+
return
49+
50+
def set_vol(self, new_vol: float, vol_set_point: float) -> float:
51+
"""Update Spotify's volume slider"""
52+
try:
53+
raise Exception("Fake exception that is used to test logging abilities")
54+
if new_vol is None:
55+
return vol_set_point
56+
57+
if abs(new_vol - vol_set_point) <= 0.005 and self.volume is not None:
58+
self.logger.debug("Ignored minor AmpliPi -> Spotify change")
59+
return vol_set_point
60+
61+
url = f"http://localhost:{self.api_port}/player/volume"
62+
spot_vol = int(new_vol * 100)
63+
self.logger.debug(f"Setting Spotify volume to {new_vol} from {self.volume}")
64+
requests.post(url, json={"volume": spot_vol}, timeout=5)
65+
return new_vol
66+
except Exception as e:
67+
self.logger.exception(f"Exception: {e}")
68+
69+
70+
if __name__ == "__main__":
71+
72+
parser = argparse.ArgumentParser(description="Read metadata from a given URL and write it to a file.")
73+
74+
parser.add_argument("port", help="port that go-librespot is running on", type=int)
75+
parser.add_argument("config_dir", help="The directory of the vsrc config", type=str)
76+
parser.add_argument("--debug", action="store_true", help="Change log level from WARNING to DEBUG")
77+
78+
args = parser.parse_args()
79+
80+
handler = VolSyncDispatcher(SpotifyWatcher(api_port=args.port), args.config_dir, args.debug)

0 commit comments

Comments
 (0)