22
33import io
44import os
5+ import threading
56import re
67import sys
78import subprocess
89import time
910from typing import ClassVar , Optional
1011import yaml
12+ import logging
13+ import json
1114from amplipi import models , utils
12- from .base_streams import PersistentStream , InvalidStreamField , logger
15+ from .base_streams import PersistentStream , InvalidStreamField
1316from .. import tasks
1417
1518# Our subprocesses run behind the scenes, is there a more standard way to do this?
1619# pylint: disable=consider-using-with
1720
21+ logger = logging .getLogger (__name__ )
22+ logger .level = logging .DEBUG
23+ sh = logging .StreamHandler (sys .stdout )
24+ logger .addHandler (sh )
25+
1826
1927class SpotifyConnect (PersistentStream ):
2028 """ A SpotifyConnect Stream based off librespot-go """
@@ -33,9 +41,30 @@ def __init__(self, name: str, disabled: bool = False, mock: bool = False, valida
3341 self ._log_file : Optional [io .TextIOBase ] = None
3442 self ._api_port : int
3543 self .proc2 : Optional [subprocess .Popen ] = None
44+ self .volume_sync_process : Optional [subprocess .Popen ] = None # Runs the actual vol sync script
45+ self .volume_watcher_process : Optional [threading .Thread ] = None # Populates the fifo that the vol sync script depends on
46+ self .src_config_folder : Optional [str ] = None
3647 self .meta_file : str = ''
37- self .max_volume : int = 100 # default configuration from 'volume_steps'
38- self .last_volume : float = 0
48+ self ._volume_fifo = None
49+
50+ def watch_vol (self ):
51+ """Creates and supplies a FIFO with volume data for volume sync"""
52+ while True :
53+ try :
54+ if self .src is not None :
55+ if self ._volume_fifo is None and self .src_config_folder is not None :
56+ fifo_path = f"{ self .src_config_folder } /vol"
57+ if not os .path .isfile (fifo_path ):
58+ os .mkfifo (fifo_path )
59+ self ._volume_fifo = os .open (fifo_path , os .O_WRONLY , os .O_NONBLOCK )
60+ data = json .dumps ({
61+ 'zones' : self .connected_zones ,
62+ 'volume' : self .volume ,
63+ })
64+ os .write (self ._volume_fifo , bytearray (f"{ data } \r \n " , encoding = "utf8" ))
65+ except Exception as e :
66+ logger .error (f"{ self .name } volume thread ran into exception: { e } " )
67+ time .sleep (0.1 )
3968
4069 def reconfig (self , ** kwargs ):
4170 self .validate_stream (** kwargs )
@@ -52,9 +81,9 @@ def _activate(self, vsrc: int):
5281 """ Connect to a given audio source
5382 """
5483
55- src_config_folder = f'{ utils .get_folder ("config" )} /srcs/v{ vsrc } '
84+ self . src_config_folder = f'{ utils .get_folder ("config" )} /srcs/v{ vsrc } '
5685 try :
57- os .remove (f'{ src_config_folder } /currentSong' )
86+ os .remove (f'{ self . src_config_folder } /currentSong' )
5887 except FileNotFoundError :
5988 pass
6089 self ._connect_time = time .time ()
@@ -78,16 +107,16 @@ def _activate(self, vsrc: int):
78107 }
79108
80109 # make all of the necessary dir(s) & files
81- os .makedirs (src_config_folder , exist_ok = True )
110+ os .makedirs (self . src_config_folder , exist_ok = True )
82111
83- config_file = f'{ src_config_folder } /config.yml'
112+ config_file = f'{ self . src_config_folder } /config.yml'
84113 with open (config_file , 'w' , encoding = 'utf8' ) as f :
85114 f .write (yaml .dump (config ))
86115
87- self .meta_file = f'{ src_config_folder } /metadata.json'
116+ self .meta_file = f'{ self . src_config_folder } /metadata.json'
88117
89- self ._log_file = open (f'{ src_config_folder } /log' , mode = 'w' , encoding = 'utf8' )
90- player_args = f"{ utils .get_folder ('streams' )} /go-librespot --config_dir { src_config_folder } " .split (' ' )
118+ self ._log_file = open (f'{ self . src_config_folder } /log' , mode = 'w' , encoding = 'utf8' )
119+ player_args = f"{ utils .get_folder ('streams' )} /go-librespot --config_dir { self . src_config_folder } " .split (' ' )
91120 logger .debug (f'spotify player args: { player_args } ' )
92121
93122 self .proc = subprocess .Popen (args = player_args , stdin = subprocess .PIPE ,
@@ -99,20 +128,45 @@ def _activate(self, vsrc: int):
99128 logger .info (f'{ self .name } : starting metadata reader: { meta_args } ' )
100129 self .proc2 = subprocess .Popen (args = meta_args , stdout = self ._log_file , stderr = self ._log_file )
101130
131+ vol_sync = f"{ utils .get_folder ('streams' )} /spotify_volume_handler.py"
132+ vol_args = [sys .executable , vol_sync , str (self ._api_port ), self .src_config_folder , "--debug" ]
133+ logger .info (f'{ self .name } : starting vol synchronizer: { vol_args } ' )
134+ self .volume_sync_process = subprocess .Popen (args = vol_args , stdout = self ._log_file , stderr = self ._log_file )
135+
136+ self .volume_watcher_process = threading .Thread (target = self .watch_vol , daemon = True )
137+ self .volume_watcher_process .start ()
138+
102139 def _deactivate (self ):
103140 if self ._is_running ():
104141 self .proc .stdin .close ()
105142 logger .info (f'{ self .name } : stopping player' )
143+
144+ # Call terminate on all processes
106145 self .proc .terminate ()
107146 self .proc2 .terminate ()
147+ if self .volume_sync_process :
148+ self .volume_sync_process .terminate ()
149+
150+ # Ensure the processes have closed, by force if necessary
108151 if self .proc .wait (1 ) != 0 :
109152 logger .info (f'{ self .name } : killing player' )
110153 self .proc .kill ()
154+
111155 if self .proc2 .wait (1 ) != 0 :
112156 logger .info (f'{ self .name } : killing metadata reader' )
113157 self .proc2 .kill ()
158+
159+ if self .volume_sync_process :
160+ if self .volume_sync_process .wait (1 ) != 0 :
161+ logger .info (f'{ self .name } : killing volume synchronizer' )
162+ self .volume_sync_process .kill ()
163+
164+ # Validate on the way out
114165 self .proc .communicate ()
115166 self .proc2 .communicate ()
167+ if self .volume_sync_process :
168+ self .volume_sync_process .communicate ()
169+
116170 if self .proc and self ._log_file : # prevent checking _log_file when it may not exist, thanks validation!
117171 self ._log_file .close ()
118172 if self .src :
@@ -121,8 +175,12 @@ def _deactivate(self):
121175 except Exception as e :
122176 logger .exception (f'{ self .name } : Error removing config files: { e } ' )
123177 self ._disconnect ()
178+
124179 self .proc = None
125180 self .proc2 = None
181+ self .volume_sync_process = None
182+ self .volume_watcher_process = None
183+ self ._volume_fifo = None
126184
127185 def info (self ) -> models .SourceInfo :
128186 source = models .SourceInfo (
@@ -190,10 +248,3 @@ def validate_stream(self, **kwargs):
190248 NAME = r"[a-zA-Z0-9][A-Za-z0-9\- ]*[a-zA-Z0-9]"
191249 if 'name' in kwargs and not re .fullmatch (NAME , kwargs ['name' ]):
192250 raise InvalidStreamField ("name" , "Invalid stream name" )
193-
194- def sync_volume (self , volume : float ) -> None :
195- """ Set the volume of amplipi to the Spotify Connect stream"""
196- if volume != self .last_volume :
197- url = f"http://localhost:{ self ._api_port } /"
198- self .last_volume = volume # update last_volume for future syncs
199- tasks .post .delay (url + 'volume' , data = {'volume' : int (volume * self .max_volume )})
0 commit comments