Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* System
* Update our spotify provider `go-librespot` to `0.7.1`
* Upgraded volume calculations to preserve relative positions when hitting the min or max setting via source volume bar
* Added volume matching between AmpliPi and Spotify and vice-versa
* Web App
* Changed caching rules to ensure that users don't get stuck with old versions of the webapp post update

Expand Down
2 changes: 1 addition & 1 deletion amplipi/streams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
def build_stream(stream: models.Stream, mock: bool = False, validate: bool = True) -> AnyStream:
""" Build a stream from the generic arguments given in stream, discriminated by stream.type

we are waiting on Pydantic's implemenatation of discriminators to fully integrate streams into our model definitions
we are waiting on Pydantic's implementation of discriminators to fully integrate streams into our model definitions
"""
# pylint: disable=too-many-return-statements
args = stream.dict(exclude_none=True)
Expand Down
21 changes: 20 additions & 1 deletion amplipi/streams/base_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
from amplipi import models
from amplipi import utils
from amplipi import app

logger = logging.getLogger(__name__)
logger.level = logging.DEBUG
Expand Down Expand Up @@ -62,6 +63,24 @@ def __init__(self, stype: str, name: str, only_src=None, disabled: bool = False,
if validate:
self.validate_stream(name=name, mock=mock, **kwargs)

def get_zone_data(self):
if self.src is not None:
ctrl = app.get_ctrl()
state = ctrl.get_state()
return [zone for zone in state.zones if zone.source_id == self.src]

@property
def connected_zones(self) -> List[int]:
connected_zones = self.get_zone_data()
return [zone.id for zone in connected_zones]

@property
def volume(self) -> float:
connected_zones = self.get_zone_data()
if connected_zones:
return sum([zone.vol_f for zone in connected_zones]) / len(connected_zones)
return 0

def __del__(self):
self.disconnect()

Expand Down Expand Up @@ -242,7 +261,7 @@ def deactivate(self):
raise Exception(f'Failed to deactivate {self.name}: {e}') from e
finally:
self.state = "disconnected" # make this look like a normal stream for now
if 'vsrc' in self.__dir__() and self.vsrc:
if 'vsrc' in self.__dir__() and self.vsrc is not None:
vsrc = self.vsrc
self.vsrc = None
vsources.free(vsrc)
Expand Down
85 changes: 68 additions & 17 deletions amplipi/streams/spotify_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,27 @@

import io
import os
import threading
import re
import sys
import subprocess
import time
from typing import ClassVar, Optional
import yaml
import logging
import json
from amplipi import models, utils
from .base_streams import PersistentStream, InvalidStreamField, logger
from .base_streams import PersistentStream, InvalidStreamField
from .. import tasks

# Our subprocesses run behind the scenes, is there a more standard way to do this?
# pylint: disable=consider-using-with

logger = logging.getLogger(__name__)
logger.level = logging.DEBUG
sh = logging.StreamHandler(sys.stdout)
logger.addHandler(sh)


class SpotifyConnect(PersistentStream):
""" A SpotifyConnect Stream based off librespot-go """
Expand All @@ -33,9 +41,30 @@ def __init__(self, name: str, disabled: bool = False, mock: bool = False, valida
self._log_file: Optional[io.TextIOBase] = None
self._api_port: int
self.proc2: Optional[subprocess.Popen] = None
self.volume_sync_process: Optional[subprocess.Popen] = None # Runs the actual vol sync script
self.volume_watcher_process: Optional[threading.Thread] = None # Populates the fifo that the vol sync script depends on
self.src_config_folder: Optional[str] = None
self.meta_file: str = ''
self.max_volume: int = 100 # default configuration from 'volume_steps'
self.last_volume: float = 0
self._volume_fifo = None

def watch_vol(self):
"""Creates and supplies a FIFO with volume data for volume sync"""
while True:
try:
if self.src is not None:
if self._volume_fifo is None and self.src_config_folder is not None:
fifo_path = f"{self.src_config_folder}/vol"
if not os.path.isfile(fifo_path):
os.mkfifo(fifo_path)
self._volume_fifo = os.open(fifo_path, os.O_WRONLY, os.O_NONBLOCK)
data = json.dumps({
'zones': self.connected_zones,
'volume': self.volume,
})
os.write(self._volume_fifo, bytearray(f"{data}\r\n", encoding="utf8"))
except Exception as e:
logger.error(f"{self.name} volume thread ran into exception: {e}")
time.sleep(0.1)

def reconfig(self, **kwargs):
self.validate_stream(**kwargs)
Expand All @@ -52,9 +81,9 @@ def _activate(self, vsrc: int):
""" Connect to a given audio source
"""

src_config_folder = f'{utils.get_folder("config")}/srcs/v{vsrc}'
self.src_config_folder = f'{utils.get_folder("config")}/srcs/v{vsrc}'
try:
os.remove(f'{src_config_folder}/currentSong')
os.remove(f'{self.src_config_folder}/currentSong')
except FileNotFoundError:
pass
self._connect_time = time.time()
Expand All @@ -78,16 +107,16 @@ def _activate(self, vsrc: int):
}

# make all of the necessary dir(s) & files
os.makedirs(src_config_folder, exist_ok=True)
os.makedirs(self.src_config_folder, exist_ok=True)

config_file = f'{src_config_folder}/config.yml'
config_file = f'{self.src_config_folder}/config.yml'
with open(config_file, 'w', encoding='utf8') as f:
f.write(yaml.dump(config))

self.meta_file = f'{src_config_folder}/metadata.json'
self.meta_file = f'{self.src_config_folder}/metadata.json'

self._log_file = open(f'{src_config_folder}/log', mode='w', encoding='utf8')
player_args = f"{utils.get_folder('streams')}/go-librespot --config_dir {src_config_folder}".split(' ')
self._log_file = open(f'{self.src_config_folder}/log', mode='w', encoding='utf8')
player_args = f"{utils.get_folder('streams')}/go-librespot --config_dir {self.src_config_folder}".split(' ')
logger.debug(f'spotify player args: {player_args}')

self.proc = subprocess.Popen(args=player_args, stdin=subprocess.PIPE,
Expand All @@ -99,20 +128,45 @@ def _activate(self, vsrc: int):
logger.info(f'{self.name}: starting metadata reader: {meta_args}')
self.proc2 = subprocess.Popen(args=meta_args, stdout=self._log_file, stderr=self._log_file)

vol_sync = f"{utils.get_folder('streams')}/spotify_volume_handler.py"
vol_args = [sys.executable, vol_sync, str(self._api_port), self.src_config_folder, "--debug"]
logger.info(f'{self.name}: starting vol synchronizer: {vol_args}')
self.volume_sync_process = subprocess.Popen(args=vol_args, stdout=self._log_file, stderr=self._log_file)

self.volume_watcher_process = threading.Thread(target=self.watch_vol, daemon=True)
self.volume_watcher_process.start()

def _deactivate(self):
if self._is_running():
self.proc.stdin.close()
logger.info(f'{self.name}: stopping player')

# Call terminate on all processes
self.proc.terminate()
self.proc2.terminate()
if self.volume_sync_process:
self.volume_sync_process.terminate()

# Ensure the processes have closed, by force if necessary
if self.proc.wait(1) != 0:
logger.info(f'{self.name}: killing player')
self.proc.kill()

if self.proc2.wait(1) != 0:
logger.info(f'{self.name}: killing metadata reader')
self.proc2.kill()

if self.volume_sync_process:
if self.volume_sync_process.wait(1) != 0:
logger.info(f'{self.name}: killing volume synchronizer')
self.volume_sync_process.kill()

# Validate on the way out
self.proc.communicate()
self.proc2.communicate()
if self.volume_sync_process:
self.volume_sync_process.communicate()

if self.proc and self._log_file: # prevent checking _log_file when it may not exist, thanks validation!
self._log_file.close()
if self.src:
Expand All @@ -121,8 +175,12 @@ def _deactivate(self):
except Exception as e:
logger.exception(f'{self.name}: Error removing config files: {e}')
self._disconnect()

self.proc = None
self.proc2 = None
self.volume_sync_process = None
self.volume_watcher_process = None
self._volume_fifo = None

def info(self) -> models.SourceInfo:
source = models.SourceInfo(
Expand Down Expand Up @@ -190,10 +248,3 @@ def validate_stream(self, **kwargs):
NAME = r"[a-zA-Z0-9][A-Za-z0-9\- ]*[a-zA-Z0-9]"
if 'name' in kwargs and not re.fullmatch(NAME, kwargs['name']):
raise InvalidStreamField("name", "Invalid stream name")

def sync_volume(self, volume: float) -> None:
""" Set the volume of amplipi to the Spotify Connect stream"""
if volume != self.last_volume:
url = f"http://localhost:{self._api_port}/"
self.last_volume = volume # update last_volume for future syncs
tasks.post.delay(url + 'volume', data={'volume': int(volume * self.max_volume)})
80 changes: 80 additions & 0 deletions streams/spotify_volume_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Script for synchronizing AmpliPi and Spotify volumes"""
import argparse
import json
import logging
import sys

import websockets
import requests

from volume_synchronizer import VolSyncDispatcher, StreamWatcher, VolEvents
from spot_connect_meta import Event


class SpotifyWatcher(StreamWatcher):
"""A class that watches and tracks changes to spotify-side volume"""

def __init__(self, api_port: int):
super().__init__()

self.api_port: int = api_port
"""What port is go-librespot running on? Typically set to 3678 + vsrc."""

async def watch_vol(self):
"""Watch the go-librespot websocket endpoint for volume change events and update AmpliPi volume info accordingly"""
try:
# pylint: disable=E1101
# E1101: Module 'websockets' has no 'connect' member (no-member)
async with websockets.connect(f"ws://localhost:{self.api_port}/events", open_timeout=5) as websocket:
while True:
try:
msg = await websocket.recv()
event = Event.from_json(json.loads(msg))
if event.event_type == "volume":
last_volume = float(self.volume) if self.volume is not None else None
self.volume = event.data.value / 100 # Translate spotify volume (0 - 100) to amplipi volume (0 - 1)

self.logger.debug(f"Spotify volume changed from {last_volume} to {self.volume}")
if last_volume is not None and self.volume != last_volume:
self.schedule_event(VolEvents.CHANGE_AMPLIPI)
elif event.event_type == "will_play" and self.volume is None:
self.schedule_event(VolEvents.CHANGE_STREAM) # Intercept the event that occurs when a song starts playing and use that as a trigger for the initial state sync

except Exception as e:
self.logger.exception(f"Error: {e}")
return
except Exception as e:
self.logger.exception(f"Error: {e}")
return

def set_vol(self, new_vol: float, vol_set_point: float) -> float:
"""Update Spotify's volume slider"""
try:
raise Exception("Fake exception that is used to test logging abilities")
if new_vol is None:
return vol_set_point

if abs(new_vol - vol_set_point) <= 0.005 and self.volume is not None:
self.logger.debug("Ignored minor AmpliPi -> Spotify change")
return vol_set_point

url = f"http://localhost:{self.api_port}/player/volume"
spot_vol = int(new_vol * 100)
self.logger.debug(f"Setting Spotify volume to {new_vol} from {self.volume}")
requests.post(url, json={"volume": spot_vol}, timeout=5)
return new_vol
except Exception as e:
self.logger.exception(f"Exception: {e}")


if __name__ == "__main__":

parser = argparse.ArgumentParser(description="Read metadata from a given URL and write it to a file.")

parser.add_argument("port", help="port that go-librespot is running on", type=int)
parser.add_argument("config_dir", help="The directory of the vsrc config", type=str)
parser.add_argument("--debug", action="store_true", help="Change log level from WARNING to DEBUG")

args = parser.parse_args()

handler = VolSyncDispatcher(SpotifyWatcher(api_port=args.port), args.config_dir, args.debug)
Loading
Loading