Skip to content

Commit d237180

Browse files
authored
Allow re-discovery of mqtt integration config payloads (#127362)
1 parent d8b618f commit d237180

File tree

2 files changed

+166
-35
lines changed

2 files changed

+166
-35
lines changed

homeassistant/components/mqtt/discovery.py

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,22 @@
44

55
import asyncio
66
from collections import deque
7+
from dataclasses import dataclass
78
import functools
89
from itertools import chain
910
import logging
1011
import re
1112
import time
1213
from typing import TYPE_CHECKING, Any
1314

14-
from homeassistant.config_entries import ConfigEntry
15+
from homeassistant.config_entries import (
16+
SOURCE_MQTT,
17+
ConfigEntry,
18+
signal_discovered_config_entry_removed,
19+
)
1520
from homeassistant.const import CONF_DEVICE, CONF_PLATFORM
1621
from homeassistant.core import HassJobType, HomeAssistant, callback
22+
from homeassistant.helpers import discovery_flow
1723
import homeassistant.helpers.config_validation as cv
1824
from homeassistant.helpers.dispatcher import (
1925
async_dispatcher_connect,
@@ -71,6 +77,14 @@ class MQTTDiscoveryPayload(dict[str, Any]):
7177
discovery_data: DiscoveryInfoType
7278

7379

80+
@dataclass(frozen=True)
81+
class MQTTIntegrationDiscoveryConfig:
82+
"""Class to hold an integration discovery playload."""
83+
84+
integration: str
85+
msg: ReceiveMessage
86+
87+
7488
def clear_discovery_hash(hass: HomeAssistant, discovery_hash: tuple[str, str]) -> None:
7589
"""Clear entry from already discovered list."""
7690
hass.data[DATA_MQTT].discovery_already_discovered.discard(discovery_hash)
@@ -191,7 +205,7 @@ async def async_start( # noqa: C901
191205
"""Start MQTT Discovery."""
192206
mqtt_data = hass.data[DATA_MQTT]
193207
platform_setup_lock: dict[str, asyncio.Lock] = {}
194-
integration_discovery_messages: dict[str, int] = {}
208+
integration_discovery_messages: dict[str, MQTTIntegrationDiscoveryConfig] = {}
195209

196210
@callback
197211
def _async_add_component(discovery_payload: MQTTDiscoveryPayload) -> None:
@@ -364,13 +378,39 @@ def discovery_done(_: Any) -> None:
364378
mqtt_integrations = await async_get_mqtt(hass)
365379
integration_unsubscribe = mqtt_data.integration_unsubscribe
366380

381+
async def _async_handle_config_entry_removed(entry: ConfigEntry) -> None:
382+
"""Handle integration config entry changes."""
383+
for discovery_key in entry.discovery_keys[DOMAIN]:
384+
if (
385+
discovery_key.version != 1
386+
or not isinstance(discovery_key.key, str)
387+
or discovery_key.key not in integration_discovery_messages
388+
):
389+
continue
390+
topic = discovery_key.key
391+
discovery_message = integration_discovery_messages[topic]
392+
del integration_discovery_messages[topic]
393+
_LOGGER.debug("Rediscover service on topic %s", topic)
394+
# Initiate re-discovery
395+
await async_integration_message_received(
396+
discovery_message.integration, discovery_message.msg
397+
)
398+
399+
mqtt_data.discovery_unsubscribe.append(
400+
async_dispatcher_connect(
401+
hass,
402+
signal_discovered_config_entry_removed(DOMAIN),
403+
_async_handle_config_entry_removed,
404+
)
405+
)
406+
367407
async def async_integration_message_received(
368408
integration: str, msg: ReceiveMessage
369409
) -> None:
370410
"""Process the received message."""
371411
if (
372412
msg.topic in integration_discovery_messages
373-
and integration_discovery_messages[msg.topic] == hash(msg.payload)
413+
and integration_discovery_messages[msg.topic].msg.payload == msg.payload
374414
):
375415
_LOGGER.debug(
376416
"Ignoring already processed discovery message for '%s' on topic %s: %s",
@@ -393,14 +433,23 @@ async def async_integration_message_received(
393433
subscribed_topic=msg.subscribed_topic,
394434
timestamp=msg.timestamp,
395435
)
396-
await hass.config_entries.flow.async_init(
397-
integration, context={"source": DOMAIN}, data=data
436+
discovery_key = discovery_flow.DiscoveryKey(
437+
domain=DOMAIN, key=msg.topic, version=1
438+
)
439+
discovery_flow.async_create_flow(
440+
hass,
441+
integration,
442+
{"source": SOURCE_MQTT},
443+
data,
444+
discovery_key=discovery_key,
398445
)
399446
if msg.payload:
400447
# Update the last discovered config message
401-
integration_discovery_messages[msg.topic] = hash(msg.payload)
448+
integration_discovery_messages[msg.topic] = (
449+
MQTTIntegrationDiscoveryConfig(integration=integration, msg=msg)
450+
)
402451
elif msg.topic in integration_discovery_messages:
403-
# Cleanup hash if discovery payload is empty
452+
# Cleanup cache if discovery payload is empty
404453
del integration_discovery_messages[msg.topic]
405454

406455
integration_unsubscribe.update(

tests/components/mqtt/test_discovery.py

Lines changed: 110 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
Platform,
3535
)
3636
from homeassistant.core import Event, HomeAssistant, callback
37-
from homeassistant.data_entry_flow import FlowResult
37+
from homeassistant.data_entry_flow import AbortFlow, FlowResult
3838
from homeassistant.helpers import device_registry as dr, entity_registry as er
3939
from homeassistant.helpers.dispatcher import (
4040
async_dispatcher_connect,
@@ -63,6 +63,53 @@
6363
)
6464

6565

66+
@pytest.fixture
67+
def mqtt_data_flow_calls() -> list[MqttServiceInfo]:
68+
"""Return list to capture MQTT data data flow calls."""
69+
return []
70+
71+
72+
@pytest.fixture
73+
async def mock_mqtt_flow(
74+
hass: HomeAssistant, mqtt_data_flow_calls: list[MqttServiceInfo]
75+
) -> config_entries.ConfigFlow:
76+
"""Test fixure for mqtt integration flow.
77+
78+
The topic is used as a unique ID.
79+
The component test domain used is: `comp`.
80+
81+
Creates an entry if does not exist.
82+
Updates an entry if it exists, and there is an updated payload.
83+
"""
84+
85+
class TestFlow(config_entries.ConfigFlow):
86+
"""Test flow."""
87+
88+
async def async_step_mqtt(self, discovery_info: MqttServiceInfo) -> FlowResult:
89+
"""Test mqtt step."""
90+
await asyncio.sleep(0)
91+
mqtt_data_flow_calls.append(discovery_info)
92+
# Abort a flow if there is an update for the existing entry
93+
if entry := self.hass.config_entries.async_entry_for_domain_unique_id(
94+
"comp", discovery_info.topic
95+
):
96+
hass.config_entries.async_update_entry(
97+
entry,
98+
data={
99+
"name": discovery_info.topic,
100+
"payload": discovery_info.payload,
101+
},
102+
)
103+
raise AbortFlow("already_configured")
104+
await self.async_set_unique_id(discovery_info.topic)
105+
return self.async_create_entry(
106+
title="Test",
107+
data={"name": discovery_info.topic, "payload": discovery_info.payload},
108+
)
109+
110+
return TestFlow
111+
112+
66113
@pytest.mark.parametrize(
67114
"mqtt_config_entry_data",
68115
[{mqtt.CONF_BROKER: "mock-broker", mqtt.CONF_DISCOVERY: False}],
@@ -1518,20 +1565,14 @@ async def test_mqtt_discovery_flow_starts_once(
15181565
hass: HomeAssistant,
15191566
mqtt_client_mock: MqttMockPahoClient,
15201567
caplog: pytest.LogCaptureFixture,
1568+
mock_mqtt_flow: config_entries.ConfigFlow,
1569+
mqtt_data_flow_calls: list[MqttServiceInfo],
15211570
) -> None:
1522-
"""Check MQTT integration discovery starts a flow once."""
1523-
1524-
flow_calls: list[MqttServiceInfo] = []
1525-
1526-
class TestFlow(config_entries.ConfigFlow):
1527-
"""Test flow."""
1528-
1529-
async def async_step_mqtt(self, discovery_info: MqttServiceInfo) -> FlowResult:
1530-
"""Test mqtt step."""
1531-
await asyncio.sleep(0)
1532-
flow_calls.append(discovery_info)
1533-
return self.async_create_entry(title="Test", data={})
1571+
"""Check MQTT integration discovery starts a flow once.
15341572
1573+
A flow should be started once after discovery,
1574+
and after an entry was removed, to trigger re-discovery.
1575+
"""
15351576
mock_integration(
15361577
hass, MockModule(domain="comp", async_setup_entry=AsyncMock(return_value=True))
15371578
)
@@ -1552,7 +1593,7 @@ def wait_birth(msg: ReceiveMessage) -> None:
15521593
"homeassistant.components.mqtt.discovery.async_get_mqtt",
15531594
return_value={"comp": ["comp/discovery/#"]},
15541595
),
1555-
mock_config_flow("comp", TestFlow),
1596+
mock_config_flow("comp", mock_mqtt_flow),
15561597
):
15571598
assert await hass.config_entries.async_setup(entry.entry_id)
15581599
await mqtt.async_subscribe(hass, "homeassistant/status", wait_birth)
@@ -1561,41 +1602,82 @@ def wait_birth(msg: ReceiveMessage) -> None:
15611602

15621603
assert ("comp/discovery/#", 0) in help_all_subscribe_calls(mqtt_client_mock)
15631604

1605+
# Test the initial flow
15641606
async_fire_mqtt_message(hass, "comp/discovery/bla/config1", "initial message")
15651607
await hass.async_block_till_done(wait_background_tasks=True)
1566-
assert len(flow_calls) == 1
1567-
assert flow_calls[0].topic == "comp/discovery/bla/config1"
1568-
assert flow_calls[0].payload == "initial message"
1608+
assert len(mqtt_data_flow_calls) == 1
1609+
assert mqtt_data_flow_calls[0].topic == "comp/discovery/bla/config1"
1610+
assert mqtt_data_flow_calls[0].payload == "initial message"
15691611

1612+
# Test we can ignore updates if they are the same
15701613
with caplog.at_level(logging.DEBUG):
15711614
async_fire_mqtt_message(
15721615
hass, "comp/discovery/bla/config1", "initial message"
15731616
)
15741617
await hass.async_block_till_done(wait_background_tasks=True)
15751618
assert "Ignoring already processed discovery message" in caplog.text
1576-
assert len(flow_calls) == 1
1619+
assert len(mqtt_data_flow_calls) == 1
15771620

1621+
# Test we can apply updates
1622+
async_fire_mqtt_message(hass, "comp/discovery/bla/config1", "update message")
1623+
await hass.async_block_till_done(wait_background_tasks=True)
1624+
1625+
assert len(mqtt_data_flow_calls) == 2
1626+
assert mqtt_data_flow_calls[1].topic == "comp/discovery/bla/config1"
1627+
assert mqtt_data_flow_calls[1].payload == "update message"
1628+
1629+
# Test we set up multiple entries
15781630
async_fire_mqtt_message(hass, "comp/discovery/bla/config2", "initial message")
15791631
await hass.async_block_till_done(wait_background_tasks=True)
15801632

1581-
assert len(flow_calls) == 2
1582-
assert flow_calls[1].topic == "comp/discovery/bla/config2"
1583-
assert flow_calls[1].payload == "initial message"
1633+
assert len(mqtt_data_flow_calls) == 3
1634+
assert mqtt_data_flow_calls[2].topic == "comp/discovery/bla/config2"
1635+
assert mqtt_data_flow_calls[2].payload == "initial message"
15841636

1637+
# Test we update multiple entries
15851638
async_fire_mqtt_message(hass, "comp/discovery/bla/config2", "update message")
15861639
await hass.async_block_till_done(wait_background_tasks=True)
15871640

1588-
assert len(flow_calls) == 3
1589-
assert flow_calls[2].topic == "comp/discovery/bla/config2"
1590-
assert flow_calls[2].payload == "update message"
1641+
assert len(mqtt_data_flow_calls) == 4
1642+
assert mqtt_data_flow_calls[3].topic == "comp/discovery/bla/config2"
1643+
assert mqtt_data_flow_calls[3].payload == "update message"
15911644

1592-
# An empty message triggers a flow to allow cleanup
1645+
# Test an empty message triggers a flow to allow cleanup (if needed)
15931646
async_fire_mqtt_message(hass, "comp/discovery/bla/config2", "")
15941647
await hass.async_block_till_done(wait_background_tasks=True)
15951648

1596-
assert len(flow_calls) == 4
1597-
assert flow_calls[3].topic == "comp/discovery/bla/config2"
1598-
assert flow_calls[3].payload == ""
1649+
assert len(mqtt_data_flow_calls) == 5
1650+
assert mqtt_data_flow_calls[4].topic == "comp/discovery/bla/config2"
1651+
assert mqtt_data_flow_calls[4].payload == ""
1652+
1653+
# Cleanup the the second entry
1654+
assert (
1655+
entry := hass.config_entries.async_entry_for_domain_unique_id(
1656+
"comp", "comp/discovery/bla/config2"
1657+
)
1658+
) is not None
1659+
await hass.config_entries.async_remove(entry.entry_id)
1660+
assert len(hass.config_entries.async_entries(domain="comp")) == 1
1661+
1662+
# Remove remaining entry1 and assert this triggers an
1663+
# automatic re-discovery flow with latest config
1664+
assert (
1665+
entry := hass.config_entries.async_entry_for_domain_unique_id(
1666+
"comp", "comp/discovery/bla/config1"
1667+
)
1668+
) is not None
1669+
assert entry.unique_id == "comp/discovery/bla/config1"
1670+
await hass.config_entries.async_remove(entry.entry_id)
1671+
assert len(hass.config_entries.async_entries(domain="comp")) == 0
1672+
1673+
# Wait for re-discovery flow to complete
1674+
await hass.async_block_till_done(wait_background_tasks=True)
1675+
assert len(mqtt_data_flow_calls) == 6
1676+
assert mqtt_data_flow_calls[5].topic == "comp/discovery/bla/config1"
1677+
assert mqtt_data_flow_calls[5].payload == "update message"
1678+
1679+
# Re-discovery triggered the config flow
1680+
assert len(hass.config_entries.async_entries(domain="comp")) == 1
15991681

16001682
assert not mqtt_client_mock.unsubscribe.called
16011683

0 commit comments

Comments
 (0)