Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: Unit Tests

on:
pull_request:
branches:
- main
- dev
push:
branches:
- main
- dev

jobs:
test:
runs-on: ubuntu-latest
container:
image: python:3.8

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Install dependencies
run: |
pip install --upgrade pip
pip install soco==0.24.0
pip install git+https://github.com/majamassarini/automate-home.git

- name: Run unit tests
run: |
python -m unittest discover -v

- name: Run doctests
run: |
python -m doctest soco_plugin/gateway.py -v
python -m doctest soco_plugin/message.py -v
python -m doctest soco_plugin/command/play.py -v
python -m doctest soco_plugin/command/pause.py -v
python -m doctest soco_plugin/command/stop.py -v
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__pycache__/
*.pyc
*.pyo
63 changes: 63 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,69 @@ Trigger and command for a [sound player model](https://automate-home.readthedocs
- !home.appliance.sound.player.event.forced.Event.Not
```

## Known Issues: Command Echo Problem

### Problem Description

The SoCo library (underlying Sonos communication library) does **not** distinguish between:
- Events triggered by commands sent from automate-home
- Events triggered by user actions (Sonos app, physical speaker controls)

This creates a "command echo" problem where your own commands trigger state change events.

### Example Scenario

When pressing a button to enable Forced Circadian Rhythm mode on a Sonos speaker:

1. **Button pressed** → sends `forced.Event.CircadianRhythm` → State becomes **Forced Circadian Rhythm**
2. **Performer executes commands** in sequence:
- `mode.Command` (set shuffle mode)
- `playlist.Command` (select playlist)
- `volume.Command` (set volume)
- `play.Command` (start playback)
3. **During reconfiguration**: Sonos internally pauses → fires `pause.Trigger` → sends `forced.Event.Off` → State becomes **Off** (unforced!)
4. **After reconfiguration**: Sonos starts playing → fires `play.Trigger` → sends `forced.Event.On` → State becomes **Forced On** (WRONG! Should be Forced Circadian Rhythm)

### Why This Happens

- Sonos speakers send pause events during mode/playlist reconfiguration
- The pause trigger (lines 25-28 in example above) interprets this as user action
- This unforces the state prematurely
- Subsequent play event from Sonos is then interpreted as generic "forced on" instead of maintaining circadian rhythm mode

### Root Cause

The SoCo Event objects contain:
- `sid`: subscription ID
- `seq`: event sequence number
- `service`: subscribed service (avTransport or renderingControl)
- `timestamp`: when event was received
- `variables`: state variables like `transport_state` and `volume`

**They do NOT contain:**
- Command correlation ID
- Source identifier (app vs. external command vs. button)
- Any way to distinguish "echo of your command" from "actual user action"

This is a fundamental limitation of the Sonos UPnP event system and cannot be fully solved without changes to the SoCo library or Sonos firmware.

### Current Mitigation: Appliance-level Event Disabling

The mitigation is implemented at the automate-home appliance state machine level rather
than in this plugin. When a Sonos player enters a playing state (Fade In, Forced On,
Forced Circadian Rhythm), automate-home temporarily disables `forced.Event.Off` in the
appliance state machine so that echo events from Sonos are silently ignored:

1. **On state entry** — a `state.entering.disable_events.Trigger` immediately calls
`appliance.disable(forced.Event.Off)`. Any `stop`/`pause` echo arriving from Sonos
is ignored by the state machine.
2. **After 30 seconds** — a `state.entering.delay.enable_events.Trigger` calls
`appliance.enable(forced.Event.Off)`. Legitimate stop/pause commands from the user
are processed normally again.

This plugin passes all Sonos events through without any filtering; the suppression
window is configured entirely in the automate-home scheduler trigger YAML.

## Documentation

* [automate-home protocol commands/triggers chapter](https://automate-home.readthedocs.io/en/latest/performer.html)
Expand Down
149 changes: 41 additions & 108 deletions soco_plugin/gateway.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import time
import asyncio
import logging
import requests
Expand All @@ -14,37 +13,18 @@
class Gateway(home.protocol.Gateway):

PROTOCOL = Description.PROTOCOL
SEND_MESSAGE_TIMEOUT = 1
POLL_TIMEOUT = 0.05

def __init__(self):
self._players = {}
self._loop = asyncio.get_event_loop()
self.executor = ThreadPoolExecutor(max_workers=100)

# lock events
# generated by our commands
self._players_locked = {}

self.logger = logging.getLogger(__name__)
self._waiting_for_response = dict()
self._waiting_for_response_type = dict()


async def disconnect(self) -> None:
for (player, sub_rendering, sub_avtransport) in self._players:
for (player, sub_rendering, sub_avtransport) in self._players.values():
player.renderingControl.unsubscribe()
player.avTransport.unsubscribe()

@staticmethod
def find_player(logger, name):
while True:
try:
return soco.discovery.scan_network_get_by_name(name)
except Exception as e:
logger.error(e)
player = None
time.sleep(120)
self.executor.shutdown(wait=True)

async def _associate(
self, descriptions: Iterable["soco_plugin.Description"]
Expand All @@ -54,7 +34,7 @@ async def _associate(
if name not in self._players:
try:
player = await self._loop.run_in_executor(
self.executor, lambda: self.find_player(self.logger, name)
self.executor, lambda: soco.discovery.scan_network_get_by_name(name)
)
except TypeError as e:
self.logger.error(e)
Expand All @@ -67,11 +47,14 @@ async def _associate(
player = None
self.logger.info("Player %s: %s" % (name, str(player)))
if player:
sub_rendering = player.renderingControl.subscribe()
sub_avtransport = player.avTransport.subscribe()
# Subscribe calls are blocking and must run in executor
sub_rendering = await self._loop.run_in_executor(
self.executor, lambda: player.renderingControl.subscribe()
)
sub_avtransport = await self._loop.run_in_executor(
self.executor, lambda: player.avTransport.subscribe()
)
self._players[name] = (player, sub_rendering, sub_avtransport)
self._waiting_for_response[player] = False
self._waiting_for_response_type[player] = None

async def associate_commands(
self, descriptions: Iterable["soco_plugin.Command"]
Expand Down Expand Up @@ -125,46 +108,19 @@ def build_msgs_from_bus(
return msgs

async def _wait_for_event(self, player: soco.SoCo, channel, tasks) -> None:
self.logger.info("Waiting for events at player {}".format(player.player_name))
self.logger.info("Waiting for events from player {}".format(player.player_name))
while True:
await asyncio.sleep(0.01) # avoid potential starvation
try:
event = await self._loop.run_in_executor(
self.executor, lambda: channel.events.get(block=True)
)
self.logger.info("Soco event {}".format(event))
self.logger.debug("Sonos event from %s: %s" % (player.player_name, str(event)))
msgs = self.build_msgs_from_bus(player, event)
for task in tasks:
for msg in msgs:
if self._waiting_for_response[player]:
if self._waiting_for_response_type[player]:
if (
msg["name"]
== self._waiting_for_response_type[player]
):
self.logger.debug(
"processed msg {} of waited type".format(msg)
)
self._loop.create_task(task(msg), name="Soco process msg {}".format(msg))
else:
self.logger.debug(
"waiting for response type {} received response of type {}".format(
self._waiting_for_response_type[player],
msg["name"],
)
)
else:
self.logger.debug(
"waiting for response type {} received response of type {}".format(
self._waiting_for_response_type[player],
msg["name"],
)
)
else:
self.logger.debug(
"processed msg {} no waited response".format(msg)
)
self._loop.create_task(task(msg), name="Soco process msg {}".format(msg))
self.logger.debug("Processing Sonos event: {}".format(msg))
self._loop.create_task(task(msg))
except Exception as e:
self.logger.error(e)

Expand All @@ -174,6 +130,7 @@ def get_action(player: soco.SoCo, msg: "soco_plugin.message.Command") -> Callabl
>>> import home
>>> import soco_plugin
>>> class Player:
... player_name = "TestPlayer"
... def play(self):
... print("play")
... def pause(self):
Expand All @@ -194,7 +151,12 @@ def get_action(player: soco.SoCo, msg: "soco_plugin.message.Command") -> Callabl
... print("play mode is {}".format(mode))
... def get_sonos_playlist_by_attr(self, attr, title):
... print ("playlist title is {}".format(title))
... return {"uri": "a uri"}
... class Playlist:
... def __init__(self):
... class Resource:
... uri = "a uri"
... self.resources = [Resource()]
... return Playlist()
... def clear_queue(self):
... pass
... def add_uri_to_queue(self, uri):
Expand Down Expand Up @@ -264,71 +226,42 @@ def get_action(player: soco.SoCo, msg: "soco_plugin.message.Command") -> Callabl
async def send_msg(
self, msg: "soco_plugin.message.Command", player: soco.SoCo
) -> None:
self.logger.info("send msg {} to player {}".format(msg, player.player_name))
self.logger.info("Executing Sonos action {} on player {}".format(msg["name"], player.player_name))
if msg:
action = self.get_action(player, msg)
if action:
self.logger.info("Action found: {}".format(action))
try:
await self._loop.run_in_executor(self.executor, action)
self.logger.info("Action executed successfully")
except soco.exceptions.SoCoUPnPException as e:
self.logger.error(e)
self.logger.error("SoCo UPnP exception: {}".format(e))
except Exception as e:
self.logger.error(e)

async def _cancel_waiting_on_response(self, player, timeout):
await asyncio.sleep(timeout)
if self._waiting_for_response[player]:
self.logger.debug(
"waiting for response of type {} timed out".format(
self._waiting_for_response_type[player]
)
)
self._waiting_for_response[player] = False
self._waiting_for_response_type[player] = None

def get_response_type(self, msg: "soco_plugin.message.Command"):
res = None
name = msg["name"]
if name == command.play.Command.ACTION:
res = trigger.play.Trigger.ACTION
elif name == command.stop.Command.ACTION:
res = trigger.stop.Trigger.ACTION
elif name == command.pause.Command.ACTION:
res = trigger.pause.Trigger.ACTION
elif name in (
command.volume.relative.Command.ACTION,
command.volume.absolute.Command.ACTION,
command.volume.ramp.Command.ACTION,
):
res = trigger.volume.Trigger.ACTION
return res
self.logger.error("Exception executing action: {}".format(e))
else:
self.logger.warning("No action found for message: {}".format(msg))

async def writer(
self, msgs: Iterable["soco_plugin.message.Command"], *args
) -> None:
self.logger.debug("Writer called")
msg_count = 0
for msg in msgs:
msg_count += 1
self.logger.info("Processing message {}: {} (type: {})".format(msg_count, msg, type(msg)))
if isinstance(msg, Msg):
for address in msg["addresses"]:
self.logger.info("Checking address: {} in players: {}".format(address, list(self._players.keys())))
if address in self._players:
(player, _, _) = self._players[address]
while True:
if not self._waiting_for_response[player]:
self._waiting_for_response[player] = True
self._waiting_for_response_type[
player
] = self.get_response_type(msg)
self._loop.create_task(
self._cancel_waiting_on_response(
player, self.SEND_MESSAGE_TIMEOUT
),
name="Cancel waiting on response for {}".format(player.player_name)
)

self.logger.debug("send message {}".format(msg))
await self.send_msg(msg, player)
break
else:
await asyncio.sleep(self.POLL_TIMEOUT)
self.logger.info("Sending Sonos command {} to {}".format(msg, address))
await self.send_msg(msg, player)
else:
self.logger.warning("Address {} not found in players {}".format(address, list(self._players.keys())))
else:
self.logger.warning("Message is not a Msg instance: {} (type: {})".format(msg, type(msg)))
if msg_count == 0:
self.logger.debug("Writer called with 0 messages")

@staticmethod
def make_trigger(msg: "soco_plugin.Description") -> "soco_plugin.Trigger":
Expand Down
8 changes: 6 additions & 2 deletions soco_plugin/tests/test_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ async def postpone_gw_running(self):

async def asyncSetUp(self):
self._gw = soco_plugin.Gateway()
# Make _players dict support iteration for logging
self._gw._players = {}
self._loop = asyncio.get_event_loop()
self._loop.create_task(
self._gw.associate_triggers(
Expand All @@ -54,12 +56,13 @@ async def test_stopped(self):

test = Test("test_stopped")
mock = unittest.mock.Mock()
mock.player_name = "Bagno"
event_mock = unittest.mock.Mock()
event_mock.variables = {"transport_state": "STOPPED"}
av_mock = unittest.mock.Mock()
av_mock.events.get.return_value = event_mock
mock.avTransport.subscribe.return_value = av_mock
with unittest.mock.patch("soco.discovery.by_name") as new_mock:
with unittest.mock.patch("soco.discovery.scan_network_get_by_name") as new_mock:
new_mock.return_value = mock
test.run()
tc.assertIn(Test.STATE_CHANGED, events)
Expand Down Expand Up @@ -118,12 +121,13 @@ async def test_stopped(self):

test = Test("test_stopped")
mock = unittest.mock.Mock()
mock.player_name = "Bagno"
event_mock = unittest.mock.Mock()
event_mock.variables = {"transport_state": "STOPPED"}
av_mock = unittest.mock.Mock()
av_mock.events.get.return_value = event_mock
mock.avTransport.subscribe.return_value = av_mock
with unittest.mock.patch("soco.discovery.by_name") as new_mock:
with unittest.mock.patch("soco.discovery.scan_network_get_by_name") as new_mock:
new_mock.return_value = mock
test.run()
tc.assertIn(Test.STATE_CHANGED, events)
Loading