|
104 | 104 | }), |
105 | 105 | }, extra=vol.ALLOW_EXTRA) |
106 | 106 |
|
| 107 | +HEARTBEAT_INTERVAL = 20 |
| 108 | +HEARTBEAT_TIMEOUT = 5 |
| 109 | + |
107 | 110 |
|
108 | 111 | def async_yaml_to_config_entry(instance_conf): |
109 | 112 | """Convert YAML config into data and options used by a config entry.""" |
@@ -268,6 +271,7 @@ def __init__(self, hass, config_entry): |
268 | 271 | self._connection_state_entity = '{}remote_connection_{}_{}'.format(self._connection_state_entity, self._entry.data[CONF_HOST].replace('.', '_').replace('-', '_'), self._entry.data[CONF_PORT]) |
269 | 272 |
|
270 | 273 | self._connection = None |
| 274 | + self._heartbeat_task = None |
271 | 275 | self._is_stopping = False |
272 | 276 | self._entities = set() |
273 | 277 | self._all_entity_names = set() |
@@ -360,6 +364,29 @@ async def _async_instance_id_match(): |
360 | 364 | self._hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_stop_handler) |
361 | 365 |
|
362 | 366 | asyncio.ensure_future(self._recv()) |
| 367 | + self._heartbeat_task = self._hass.loop.create_task(self._heartbeat_loop()) |
| 368 | + |
| 369 | + async def _heartbeat_loop(self): |
| 370 | + """Send periodic heartbeats to remote instance.""" |
| 371 | + while not self._connection.closed: |
| 372 | + await asyncio.sleep(HEARTBEAT_INTERVAL) |
| 373 | + |
| 374 | + _LOGGER.debug("Sending ping") |
| 375 | + event = asyncio.Event() |
| 376 | + def resp(message): |
| 377 | + _LOGGER.debug("Got pong: %s", message) |
| 378 | + event.set() |
| 379 | + |
| 380 | + await self._call(resp, "ping") |
| 381 | + |
| 382 | + try: |
| 383 | + await asyncio.wait_for(event.wait(), HEARTBEAT_TIMEOUT) |
| 384 | + except asyncio.TimeoutError: |
| 385 | + _LOGGER.error("heartbeat failed") |
| 386 | + |
| 387 | + # Schedule closing on event loop to avoid deadlock |
| 388 | + asyncio.ensure_future(self._connection.close()) |
| 389 | + break |
363 | 390 |
|
364 | 391 | async def async_stop(self): |
365 | 392 | """Close connection.""" |
@@ -387,10 +414,14 @@ async def _disconnected(self): |
387 | 414 | # Remove all published entries |
388 | 415 | for entity in self._entities: |
389 | 416 | self._hass.states.async_remove(entity) |
| 417 | + if self._heartbeat_task is not None: |
| 418 | + self._heartbeat_task.cancel() |
| 419 | + await self._heartbeat_task |
390 | 420 | if self._remove_listener is not None: |
391 | 421 | self._remove_listener() |
392 | 422 |
|
393 | 423 | self.set_connection_state(STATE_DISCONNECTED) |
| 424 | + self._heartbeat_task = None |
394 | 425 | self._remove_listener = None |
395 | 426 | self._entities = set() |
396 | 427 | self._all_entity_names = set() |
|
0 commit comments