diff --git a/mopidy_alsamixer/__init__.py b/mopidy_alsamixer/__init__.py index edf070b..ab19e06 100644 --- a/mopidy_alsamixer/__init__.py +++ b/mopidy_alsamixer/__init__.py @@ -1,7 +1,6 @@ import pathlib import pkg_resources - from mopidy import config, ext __version__ = pkg_resources.get_distribution("Mopidy-ALSAMixer").version diff --git a/mopidy_alsamixer/mixer.py b/mopidy_alsamixer/mixer.py index ecbe1cf..e85eeab 100644 --- a/mopidy_alsamixer/mixer.py +++ b/mopidy_alsamixer/mixer.py @@ -1,7 +1,9 @@ import logging import math +import random import select -import threading +import struct +import time import alsaaudio import gi @@ -12,6 +14,7 @@ from mopidy import exceptions, mixer # noqa isort:skip +from .polling_actor import PollingActor # noqa logger = logging.getLogger(__name__) @@ -55,6 +58,7 @@ def __init__(self, config): self._last_volume = None self._last_mute = None + self._observer = None logger.info( f"Mixing using ALSA, {self.device_title}, " @@ -62,12 +66,25 @@ def __init__(self, config): ) def on_start(self): - self._observer = AlsaMixerObserver( - device=self.device, - control=self.control, - callback=self.actor_ref.proxy().trigger_events_for_changed_values, + self._observer = AlsaMixerObserver.start( + self._await_mixer(), self.actor_ref.proxy() ) - self._observer.start() + + def on_stop(self): + self._stop_observer() + + def on_failure(self, exception_type, exception_value, traceback): + self._stop_observer() + + def restart_observer(self, exc=None): + self._stop_observer() + self._observer = AlsaMixerObserver.start( + self._await_mixer(exc), self.actor_ref.proxy() + ) + + def _stop_observer(self): + if self._observer is not None and self._observer.is_alive(): + self._observer.stop() @property def _mixer(self): @@ -78,18 +95,50 @@ def _mixer(self): control=self.control, ) + def _await_mixer(self, exc_0=None, sleep=True): + while True: + try: + if exc_0 is not None: + exc, exc_0 = exc_0, None + raise exc + + return self._mixer + + except (alsaaudio.ALSAAudioError, OSError) as exc: + logger.info( + f"Could not open ALSA {self.device_title}. " + "Retrying in a few seconds... " + f"Error: {exc}" + ) + + if sleep: + time.sleep(random.uniform(7, 10)) + def get_volume(self): - channels = self._mixer.getvolume() + try: + channels = self._mixer.getvolume() + except alsaaudio.ALSAAudioError as exc: + logger.debug(f"Could not get ALSA mixer volume: {exc}") + return None + if not channels: return None elif channels.count(channels[0]) == len(channels): return self.mixer_volume_to_volume(channels[0]) else: - # Not all channels have the same volume + logger.debug( + "Could not determine single ALSA mixer volume " + "because channels have different volumes" + ) return None def set_volume(self, volume): - self._mixer.setvolume(self.volume_to_mixer_volume(volume)) + try: + self._mixer.setvolume(self.volume_to_mixer_volume(volume)) + except alsaaudio.ALSAAudioError as exc: + logger.debug(f"Could not set ALSA mixer volume: {exc}") + return False + return True def mixer_volume_to_volume(self, mixer_volume): @@ -144,14 +193,18 @@ def get_mute(self): try: channels_muted = self._mixer.getmute() except alsaaudio.ALSAAudioError as exc: - logger.debug(f"Getting mute state failed: {exc}") + logger.debug(f"Could not get ALSA mixer mute state: {exc}") return None + if all(channels_muted): return True elif not any(channels_muted): return False else: - # Not all channels have the same mute state + logger.debug( + "Could not determine single ALSA mixer mute state " + "because channels have different mute states" + ) return None def set_mute(self, mute): @@ -159,7 +212,7 @@ def set_mute(self, mute): self._mixer.setmute(int(mute)) return True except alsaaudio.ALSAAudioError as exc: - logger.debug(f"Setting mute state failed: {exc}") + logger.debug(f"Could not set ALSA mixer mute state: {exc}") return False def trigger_events_for_changed_values(self): @@ -173,36 +226,44 @@ def trigger_events_for_changed_values(self): self.trigger_mute_changed(self._last_mute) -class AlsaMixerObserver(threading.Thread): - daemon = True - name = "AlsaMixerObserver" +class AlsaMixerObserver(PollingActor): - def __init__(self, device, control, callback=None): - super().__init__() - self.running = True + name = "alsamixer-observer" - # Keep the mixer instance alive for the descriptors to work - self.mixer = alsaaudio.Mixer(device=device, control=control) + combine_events = True - descriptors = self.mixer.polldescriptors() - assert len(descriptors) == 1 - self.fd = descriptors[0][0] - self.event_mask = descriptors[0][1] + def __init__(self, mixer, parent): + # Note: ALSA mixer instance must be kept alive + # to keep poll descriptors open + self._mixer = mixer + self._parent = parent - self.callback = callback + # TODO: When a yet unreleased version of pyalsaaudio is used (> 0.9.0) + # this function with its call below can be safely removed. + # See https://github.com/larsimmisch/pyalsaaudio/pull/108 + def mitigate_invalid_fd_conversion(fd): + return fd != struct.unpack("I", b"\xFF\xFF\xFF\xFF")[0] - def stop(self): - self.running = False + super().__init__( + fds=tuple( + (fd, event_mask | select.EPOLLET) + for (fd, event_mask) in self._mixer.polldescriptors() + if fd != -1 and mitigate_invalid_fd_conversion(fd) + ) + ) - def run(self): - poller = select.epoll() - poller.register(self.fd, self.event_mask | select.EPOLLET) - while self.running: - try: - events = poller.poll(timeout=1) - if events and self.callback is not None: - self.callback() - except OSError as exc: - # poller.poll() will raise an IOError because of the - # interrupted system call when suspending the machine. - logger.debug(f"Ignored IO error: {exc}") + def on_start(self): + logger.debug( + f"Starting AlsaMixerObserver with {len(self._fds)} valid poll descriptors" + ) + + def on_failure(self, exception_type, exception_value, traceback): + if exception_type is OSError: + # OSError can normally occur after suspend/resume or device disconnection + self._parent.restart_observer(exception_value) + + def on_poll(self, fd, event): + if event & (select.EPOLLHUP | select.EPOLLERR): + self._parent.restart_observer() + else: + self._parent.trigger_events_for_changed_values().get() diff --git a/mopidy_alsamixer/polling_actor.py b/mopidy_alsamixer/polling_actor.py new file mode 100644 index 0000000..f3355df --- /dev/null +++ b/mopidy_alsamixer/polling_actor.py @@ -0,0 +1,174 @@ +import logging +import os +import queue +import select +import sys +from typing import Any, NamedTuple, Tuple + +import pykka +import pykka._envelope +import pykka.messages + +logger = logging.getLogger(__name__) + + +class PollingActor(pykka.ThreadingActor): + + combine_events = False + + def __init__(self, fds=tuple()): + super().__init__() + + self._fds = fds + self._poll = None + + def _start_actor_loop(self): + try: + self._wake_fd_read, self._wake_fd_write = os.pipe() + logging.debug( + f"Wake channel for {self} is opened with " + f"rfd={self._wake_fd_read:d} and wfd={self._wake_fd_write:d}" + ) + + self._poll = select.epoll() + self._poll.register( + self._wake_fd_read, select.EPOLLIN | select.EPOLLET + ) + + for fd, event_mask in self._fds: + self._poll.register(fd, event_mask) + + self.actor_inbox._actor = self + except Exception: + self._handle_failure(*sys.exc_info()) + return + + super()._start_actor_loop() + + def _stop(self): + super()._stop() + + os.close(self._wake_fd_write) + os.close(self._wake_fd_read) + + def _listen(self, timeout): + assert ( + self._poll is not None + ), "Must not request events before poll initialization" + + logging.debug( + f"Actor {self} is entering poll sleep with timeout = {timeout!r}" + ) + events = self._poll.poll(timeout) + logging.debug(f"Actor {self} has been woken with events {events!r}") + + # Don't handle any events if + # actor has been woken during stopping, + # so it can quickly finish its lifecycle + if not self.actor_ref.is_alive(): + return tuple() + + return ( + (fd, event) for (fd, event) in events if fd != self._wake_fd_read + ) + + def _wake(self): + logging.debug(f"Waking actor {self}") + os.write(self._wake_fd_write, b"\xFF") + + def _handle_receive(self, message): + if isinstance(message, ActorError): + self._handle_failure(*message.exc_info) + try: + self.on_failure(*message.exc_info) + except Exception: + self._handle_failure(*sys.exc_info()) + return + + if isinstance(message, PollEvent): + return self.on_poll(message.fd, message.event) + + return super()._handle_receive(message) + + def on_poll(self, fd, event): + raise NotImplementedError("Use a subclass of PollingActor") + + @classmethod + def _create_actor_inbox(cls): + return PollingActorInbox(cls.combine_events) + + +class PollingActorInbox(queue.Queue): + def __init__(self, combine_events=False): + super().__init__() + + self._actor = None + self._combine_events = combine_events + + def put(self, item, block=True, timeout=None): + if self._actor is not None: + self._actor._wake() + + super().put(item, block, timeout) + + def get(self, block=True, timeout=None): + assert ( + self._actor is not None + ), "Actor must be set before starting polling" + + while True: + if not self.empty(): + return super().get(False) + + try: + # If a non-blocking call is requested simulate + # it with the minimal timeout of 1 millisecond + if not block: + events = self._actor._listen(1) + else: + # TODO: Since this can be called more than once + # we need to properly update timeout if it isn't None + events = self._actor._listen( + timeout * 1000 if timeout is not None else None + ) + except Exception: + return pykka._envelope.Envelope( + ActorError(exc_info=sys.exc_info()) + ) + + if self._combine_events: + events = filter(PollingActorInbox._combine_filter(), events) + + for event in events: + super().put(pykka._envelope.Envelope(PollEvent(*event))) + + if not block and self.empty(): + raise queue.Empty + + def _combine_filter(): + trigger = False + + def combiner(event): + nonlocal trigger + + if event[1] & ~(select.EPOLLIN | select.EPOLLOUT | select.EPOLLPRI): + return True + + if trigger: + return False + + trigger = True + return True + + return combiner + + +class PollEvent(NamedTuple): + + fd: int + + event: int + + +class ActorError(NamedTuple): + exc_info: Tuple[Any] diff --git a/tests/test_mixer.py b/tests/test_mixer.py index bcf4aa2..b2ea1b5 100644 --- a/tests/test_mixer.py +++ b/tests/test_mixer.py @@ -1,11 +1,18 @@ +import contextlib import copy +import errno +import os +import select +import threading +import time import unittest from unittest import mock import alsaaudio - from mopidy import exceptions -from mopidy_alsamixer.mixer import AlsaMixer + +from mopidy_alsamixer.mixer import AlsaMixer, AlsaMixerObserver +from mopidy_alsamixer.polling_actor import PollingActorInbox @mock.patch( @@ -37,6 +44,7 @@ def get_mixer(self, alsa_mock=None, config=None, apply_default_config=True): actual_config["alsamixer"].update(config["alsamixer"]) else: actual_config = config + return AlsaMixer(config=actual_config) def test_has_config(self, alsa_mock): @@ -197,6 +205,12 @@ def test_get_volume_when_no_channels(self, alsa_mock): mixer_mock.getvolume.assert_called_once_with() + def test_get_volume_when_unavailable(self, alsa_mock): + mixer = self.get_mixer(alsa_mock) + alsa_mock.Mixer.side_effect = alsaaudio.ALSAAudioError + + self.assertIsNone(mixer.get_volume()) + def test_set_volume(self, alsa_mock): config = {"alsamixer": {"volume_scale": "linear"}} mixer = self.get_mixer(alsa_mock, config=config) @@ -224,6 +238,12 @@ def test_set_volume_log(self, alsa_mock): mixer_mock.setvolume.assert_called_once_with(93) + def test_set_volume_when_unavailable(self, alsa_mock): + mixer = self.get_mixer(alsa_mock) + alsa_mock.Mixer.side_effect = alsaaudio.ALSAAudioError + + self.assertIs(mixer.set_volume(74), False) + def test_get_mute_when_muted(self, alsa_mock): mixer = self.get_mixer(alsa_mock) mixer_mock = alsa_mock.Mixer.return_value @@ -320,3 +340,137 @@ def test_trigger_events_for_changed_values_when_changes(self, alsa_mock): mixer_mock.getmute.assert_called_once_with() mixer.trigger_volume_changed.assert_called_once_with(75) mixer.trigger_mute_changed.assert_called_once_with(True) + + def test_await_mixer(self, alsa_mock): + mixer_mock = mock.Mock() + mixer = self.get_mixer(alsa_mock) + + alsa_mock.Mixer.side_effect = (alsa_mock.ALSAAudioError(""), mixer_mock) + self.assertEqual(mixer._await_mixer(sleep=False), mixer_mock) + self.assertEqual(alsa_mock.Mixer.call_count, 2) + alsa_mock.Mixer.reset_mock() + + alsa_mock.Mixer.side_effect = (alsa_mock.ALSAAudioError(""), mixer_mock) + self.assertEqual( + mixer._await_mixer(OSError(errno.EBADF), sleep=False), + mixer_mock, + ) + self.assertEqual(alsa_mock.Mixer.call_count, 2) + + +@mock.patch( + "mopidy_alsamixer.mixer.AlsaMixer", + spec=AlsaMixer, +) +class ObserverTest(unittest.TestCase): + @contextlib.contextmanager + def running_observer(self, mixer_mock, parent_mock): + event = threading.Event() + + def side_effect(*args): + event.set() + return mock.Mock()(*args) + + parent_mock.trigger_events_for_changed_values.side_effect = side_effect + parent_mock.restart_observer.side_effect = side_effect + + observer = AlsaMixerObserver.start(mixer_mock, parent_mock) + yield observer + + event.wait() + observer.stop() + + @contextlib.contextmanager + def pipes(self, n, close=True): + fds = tuple(os.pipe() for i in range(n)) + yield fds + + if close: + for rfd, wfd in fds: + os.close(rfd) + os.close(wfd) + + def test_multiple_fd_multiple_events(self, parent_mock): + mixer_mock = mock.Mock() + + with self.pipes(3) as fds: + mixer_mock.polldescriptors.return_value = ( + (fds[0][0], select.EPOLLOUT), + (fds[1][0], select.EPOLLIN), + (fds[2][0], select.EPOLLIN | select.EPOLLOUT | select.EPOLLPRI), + ) + + with self.running_observer(mixer_mock, parent_mock): + os.write(fds[1][1], b"\xFF") + time.sleep(1) + os.write(fds[2][1], b"\xFF") + time.sleep(1) + os.write(fds[0][1], b"\xFF") # Should be ignored + time.sleep(1) + os.write(fds[1][1], b"\xFF") + time.sleep(1) + + mixer_mock.polldescriptors.assert_called_once_with() + self.assertEqual( + parent_mock.trigger_events_for_changed_values.call_count, 3 + ) + parent_mock.restart_observer.assert_not_called() + + def test_mixer_disconnect(self, parent_mock): + mixer_mock = mock.Mock() + + with self.pipes(1, False) as fds: + mixer_mock.polldescriptors.return_value = ( + ( + fds[0][1], + select.EPOLLIN, + ), # Note that we pass write end of the pipe + ) + + with self.running_observer(mixer_mock, parent_mock): + os.close( + fds[0][0] + ) # Closing read end of the pipe triggers EPOLLERR + + mixer_mock.polldescriptors.assert_called_once_with() + parent_mock.restart_observer.assert_called_once() + parent_mock.trigger_events_for_changed_values.assert_not_called() + + def test_poll_exception(self, parent_mock): + mixer_mock = mock.Mock() + mixer_mock.polldescriptors.return_value = tuple() + + with self.running_observer(mixer_mock, parent_mock) as observer: + observer._actor._poll = mock.Mock() + observer._actor._poll.poll.side_effect = OSError + + observer._actor._wake() + time.sleep(1) + + parent_mock.restart_observer.assert_called_once() + parent_mock.trigger_events_for_changed_values.assert_not_called() + + def test_combine_filter(self, parent_mock): + combiner = PollingActorInbox._combine_filter() + + x = ( + (0, select.EPOLLERR), + (1, select.EPOLLERR), + (0, select.EPOLLIN), + (1, select.EPOLLOUT), + (0, select.EPOLLERR), + (1, select.EPOLLIN | select.EPOLLERR), + (0, select.EPOLLIN), + (1, select.EPOLLHUP), + ) + + y = ( + (0, select.EPOLLERR), + (1, select.EPOLLERR), + (0, select.EPOLLIN), + (0, select.EPOLLERR), + (1, select.EPOLLIN | select.EPOLLERR), + (1, select.EPOLLHUP), + ) + + self.assertEqual(tuple(filter(combiner, x)), y)