|
1 | | -""" |
2 | | -Xantech Multi-Zone Amplifier Control for Home Assistant |
3 | | -""" |
| 1 | +"""Xantech Multi-Zone Amplifier Control for Home Assistant.""" |
4 | 2 |
|
5 | | -from homeassistant.core import HomeAssistant |
| 3 | +from __future__ import annotations |
6 | 4 |
|
7 | | -PLATFORMS = ['media_player'] |
| 5 | +import logging |
| 6 | +from typing import TYPE_CHECKING |
8 | 7 |
|
| 8 | +from homeassistant.config_entries import ConfigEntry |
| 9 | +from homeassistant.const import ATTR_ENTITY_ID |
| 10 | +from homeassistant.core import HomeAssistant, ServiceCall |
| 11 | +from homeassistant.exceptions import ConfigEntryNotReady |
| 12 | +from homeassistant.helpers import entity_registry as er |
| 13 | +from homeassistant.helpers.typing import ConfigType |
| 14 | +from pyxantech import async_get_amp_controller |
| 15 | +from serial import SerialException |
9 | 16 |
|
10 | | -async def async_setup(hass: HomeAssistant, config: dict): |
11 | | - """Set up the Xantech Multi-Zone Amplifier component.""" |
| 17 | +from .const import ( |
| 18 | + CONF_AMP_TYPE, |
| 19 | + CONF_PORT, |
| 20 | + CONF_SCAN_INTERVAL, |
| 21 | + CONF_SOURCES, |
| 22 | + CONF_ZONES, |
| 23 | + DEFAULT_SCAN_INTERVAL, |
| 24 | + DOMAIN, |
| 25 | + PLATFORMS, |
| 26 | + SERVICE_RESTORE, |
| 27 | + SERVICE_SNAPSHOT, |
| 28 | +) |
| 29 | +from .coordinator import XantechCoordinator |
| 30 | + |
| 31 | +if TYPE_CHECKING: |
| 32 | + from pyxantech import AmpControlBase |
| 33 | + |
| 34 | +LOG = logging.getLogger(__name__) |
| 35 | + |
| 36 | +type XantechConfigEntry = ConfigEntry[XantechData] |
| 37 | + |
| 38 | + |
| 39 | +class XantechData: |
| 40 | + """Runtime data for Xantech integration.""" |
| 41 | + |
| 42 | + def __init__( |
| 43 | + self, |
| 44 | + coordinator: XantechCoordinator, |
| 45 | + amp: AmpControlBase, |
| 46 | + sources: dict[int, str], |
| 47 | + ) -> None: |
| 48 | + """Initialize runtime data.""" |
| 49 | + self.coordinator = coordinator |
| 50 | + self.amp = amp |
| 51 | + self.sources = sources |
| 52 | + |
| 53 | + |
| 54 | +async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: |
| 55 | + """Set up the Xantech component (YAML config not supported).""" |
12 | 56 | return True |
| 57 | + |
| 58 | + |
| 59 | +async def async_setup_entry(hass: HomeAssistant, entry: XantechConfigEntry) -> bool: |
| 60 | + """Set up Xantech Multi-Zone Amplifier from a config entry.""" |
| 61 | + port = entry.data[CONF_PORT] |
| 62 | + amp_type = entry.data[CONF_AMP_TYPE] |
| 63 | + zones_config = entry.data.get(CONF_ZONES, {}) |
| 64 | + sources_config = entry.data.get(CONF_SOURCES, {}) |
| 65 | + |
| 66 | + # convert zones config to list of zone IDs |
| 67 | + zone_ids = [int(zone_id) for zone_id in zones_config] |
| 68 | + |
| 69 | + # convert sources config to simple id->name mapping |
| 70 | + sources: dict[int, str] = {} |
| 71 | + for source_id, source_data in sources_config.items(): |
| 72 | + sources[int(source_id)] = source_data.get('name', f'Source {source_id}') |
| 73 | + |
| 74 | + # get scan interval from options, with fallback to default |
| 75 | + scan_interval = entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL) |
| 76 | + |
| 77 | + try: |
| 78 | + amp = await async_get_amp_controller(amp_type, port, hass.loop) |
| 79 | + if not amp: |
| 80 | + raise ConfigEntryNotReady(f'Failed to connect to {amp_type} at {port}') |
| 81 | + except SerialException as err: |
| 82 | + raise ConfigEntryNotReady( |
| 83 | + f'Serial connection error to {amp_type} at {port}: {err}' |
| 84 | + ) from err |
| 85 | + except Exception as err: |
| 86 | + LOG.exception('Failed to initialize amplifier') |
| 87 | + raise ConfigEntryNotReady(f'Failed to initialize {amp_type} at {port}') from err |
| 88 | + |
| 89 | + LOG.info('Connected to %s amplifier at %s', amp_type, port) |
| 90 | + |
| 91 | + # create coordinator |
| 92 | + amp_name = f'{amp_type}_{port}'.replace('/', '_') |
| 93 | + coordinator = XantechCoordinator( |
| 94 | + hass, |
| 95 | + amp, |
| 96 | + amp_name, |
| 97 | + zone_ids, |
| 98 | + scan_interval, |
| 99 | + ) |
| 100 | + |
| 101 | + # fetch initial data |
| 102 | + await coordinator.async_config_entry_first_refresh() |
| 103 | + |
| 104 | + # store runtime data |
| 105 | + entry.runtime_data = XantechData( |
| 106 | + coordinator=coordinator, |
| 107 | + amp=amp, |
| 108 | + sources=sources, |
| 109 | + ) |
| 110 | + |
| 111 | + # register services |
| 112 | + await _async_register_services(hass) |
| 113 | + |
| 114 | + # forward setup to platforms |
| 115 | + await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS) |
| 116 | + |
| 117 | + # register update listener for options |
| 118 | + entry.async_on_unload(entry.add_update_listener(async_update_options)) |
| 119 | + |
| 120 | + return True |
| 121 | + |
| 122 | + |
| 123 | +async def async_unload_entry(hass: HomeAssistant, entry: XantechConfigEntry) -> bool: |
| 124 | + """Unload a config entry.""" |
| 125 | + unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) |
| 126 | + |
| 127 | + if unload_ok and not hass.config_entries.async_entries(DOMAIN): |
| 128 | + # cleanup services if no more entries |
| 129 | + for service in (SERVICE_SNAPSHOT, SERVICE_RESTORE): |
| 130 | + hass.services.async_remove(DOMAIN, service) |
| 131 | + |
| 132 | + return unload_ok |
| 133 | + |
| 134 | + |
| 135 | +async def async_update_options(hass: HomeAssistant, entry: XantechConfigEntry) -> None: |
| 136 | + """Handle options update.""" |
| 137 | + await hass.config_entries.async_reload(entry.entry_id) |
| 138 | + |
| 139 | + |
| 140 | +async def _async_register_services(hass: HomeAssistant) -> None: |
| 141 | + """Register integration services.""" |
| 142 | + if hass.services.has_service(DOMAIN, SERVICE_SNAPSHOT): |
| 143 | + return # services already registered |
| 144 | + |
| 145 | + async def async_snapshot_service(call: ServiceCall) -> None: |
| 146 | + """Handle snapshot service call.""" |
| 147 | + entity_ids = call.data.get(ATTR_ENTITY_ID, []) |
| 148 | + if isinstance(entity_ids, str): |
| 149 | + entity_ids = [entity_ids] |
| 150 | + |
| 151 | + entity_registry = er.async_get(hass) |
| 152 | + |
| 153 | + for entity_id in entity_ids: |
| 154 | + if hass.states.get(entity_id) and entity_registry.async_get(entity_id): |
| 155 | + LOG.debug('Snapshot requested for %s', entity_id) |
| 156 | + # dispatch to entity's snapshot method via event |
| 157 | + hass.bus.async_fire( |
| 158 | + f'{DOMAIN}_snapshot', |
| 159 | + {'entity_id': entity_id}, |
| 160 | + ) |
| 161 | + |
| 162 | + async def async_restore_service(call: ServiceCall) -> None: |
| 163 | + """Handle restore service call.""" |
| 164 | + entity_ids = call.data.get(ATTR_ENTITY_ID, []) |
| 165 | + if isinstance(entity_ids, str): |
| 166 | + entity_ids = [entity_ids] |
| 167 | + |
| 168 | + for entity_id in entity_ids: |
| 169 | + if hass.states.get(entity_id): |
| 170 | + LOG.debug('Restore requested for %s', entity_id) |
| 171 | + hass.bus.async_fire( |
| 172 | + f'{DOMAIN}_restore', |
| 173 | + {'entity_id': entity_id}, |
| 174 | + ) |
| 175 | + |
| 176 | + hass.services.async_register(DOMAIN, SERVICE_SNAPSHOT, async_snapshot_service) |
| 177 | + hass.services.async_register(DOMAIN, SERVICE_RESTORE, async_restore_service) |
0 commit comments