Skip to content

Commit 0312fb1

Browse files
committed
Add streaming to sACN
1 parent 74bd928 commit 0312fb1

File tree

5 files changed

+102
-19
lines changed

5 files changed

+102
-19
lines changed

custom_components/dmx/protocols/sacn/protocol.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
CONF_PRIORITY_DEFAULT,
2525
CONF_RATE_LIMIT,
2626
CONF_RATE_LIMIT_DEFAULT,
27+
CONF_REFRESH_EVERY,
28+
CONF_REFRESH_EVERY_DEFAULT,
2729
CONF_SOURCE_NAME,
2830
CONF_SOURCE_NAME_DEFAULT,
2931
CONF_SYNC_ADDRESS,
@@ -60,6 +62,7 @@ async def setup(self, config: dict[str, Any]) -> None:
6062
interface_ip=config.get(CONF_INTERFACE_IP),
6163
multicast_ttl=config.get(CONF_MULTICAST_TTL, CONF_MULTICAST_TTL_DEFAULT),
6264
enable_preview_data=config.get(CONF_ENABLE_PREVIEW_DATA, False),
65+
retransmit_interval=config.get(CONF_REFRESH_EVERY, CONF_REFRESH_EVERY_DEFAULT),
6366
)
6467

6568
self._rate_limit = config.get(CONF_RATE_LIMIT, CONF_RATE_LIMIT_DEFAULT)

custom_components/dmx/server/sacn_server.py

Lines changed: 85 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import socket
44
import struct
5+
import time
56
import uuid
67
from collections.abc import Callable
78
from dataclasses import dataclass, field
@@ -26,6 +27,8 @@ class SacnServerConfig:
2627
enable_per_universe_sync: bool = False
2728
multicast_ttl: int = 64
2829
enable_preview_data: bool = False
30+
retransmit_interval: float = 0.8
31+
"""Interval in seconds to retransmit last frame when no new data arrives."""
2932

3033
def __post_init__(self) -> None:
3134
if self.cid is None:
@@ -36,9 +39,12 @@ def __post_init__(self) -> None:
3639
class UniverseState:
3740
sequence_number: int = 0
3841
last_data: bytearray | None = None
39-
send_task: asyncio.Task[None] | None = None
42+
stream_task: asyncio.Task[None] | None = None
4043
termination_sent: bool = False
4144
unicast_addresses: list[dict[str, Any]] = field(default_factory=list)
45+
pending_data: bytearray | None = None
46+
last_update_time: float = field(default_factory=lambda: 0.0)
47+
last_send_time: float = field(default_factory=lambda: 0.0)
4248

4349
def increment_sequence(self) -> None:
4450
self.sequence_number = (self.sequence_number + 1) % 256
@@ -53,6 +59,7 @@ class SacnServer:
5359
- Universe synchronization
5460
- Configurable priorities and options
5561
- Integration with existing DMX universe system
62+
- Continuous streaming at sACN standard frame rate (44 fps)
5663
"""
5764

5865
def __init__(self, hass: HomeAssistant, config: SacnServerConfig | None = None) -> None:
@@ -165,14 +172,71 @@ def send_dmx_data(self, universe_id: int, dmx_data: bytearray) -> bool:
165172

166173
universe_state = self.universes[universe_id]
167174

168-
if universe_state.send_task and not universe_state.send_task.done():
169-
universe_state.send_task.cancel()
175+
universe_state.pending_data = dmx_data.copy()
176+
universe_state.last_update_time = time.time()
177+
log.debug(f"Universe {universe_id}: received new DMX data")
170178

171-
universe_state.send_task = self.hass.async_create_task(self._send_universe_data(universe_id, dmx_data))
179+
# Start streaming task if not already running
180+
if universe_state.stream_task is None or universe_state.stream_task.done():
181+
log.debug(f"Universe {universe_id}: creating new streaming task")
182+
universe_state.stream_task = self.hass.async_create_task(self._stream_universe_data(universe_id))
183+
184+
self.hass.async_create_task(self._send_pending_frame(universe_id))
172185

173186
return True
174187

175-
async def _send_universe_data(self, universe_id: int, dmx_data: bytearray) -> None:
188+
async def _stream_universe_data(self, universe_id: int) -> None:
189+
"""Continuously retransmit last frame on keep-alive timeout.
190+
191+
New frames are sent immediately via _send_pending_frame when they arrive.
192+
This loop only handles retransmission if no new data for retransmit_interval seconds.
193+
"""
194+
195+
try:
196+
universe_state = self.universes[universe_id]
197+
log.debug(f"Started streaming task for universe {universe_id}")
198+
199+
while self.running and universe_id in self.universes and not universe_state.termination_sent:
200+
time_since_send = time.time() - universe_state.last_send_time
201+
202+
if universe_state.last_data is not None and time_since_send >= self.config.retransmit_interval:
203+
log.debug(f"Universe {universe_id}: retransmitting frame (seq: {universe_state.sequence_number})")
204+
await self._send_packet(universe_id, universe_state.last_data)
205+
universe_state.increment_sequence()
206+
universe_state.last_send_time = time.time()
207+
else:
208+
sleep_duration = self.config.retransmit_interval - time_since_send
209+
await asyncio.sleep(sleep_duration)
210+
211+
except asyncio.CancelledError:
212+
log.debug(f"Streaming task for universe {universe_id} cancelled")
213+
except Exception as e:
214+
log.error(f"Error in streaming loop for universe {universe_id}: {e}")
215+
finally:
216+
log.debug(f"Stopped streaming for universe {universe_id}")
217+
218+
async def _send_pending_frame(self, universe_id: int) -> None:
219+
"""Hot path: send pending frame immediately (called from send_dmx_data)."""
220+
import time
221+
222+
try:
223+
universe_state = self.universes[universe_id]
224+
225+
if universe_state.pending_data is not None:
226+
log.debug(f"Universe {universe_id}: sending immediate frame (seq: {universe_state.sequence_number})")
227+
await self._send_packet(universe_id, universe_state.pending_data)
228+
universe_state.increment_sequence()
229+
universe_state.last_data = universe_state.pending_data.copy()
230+
universe_state.pending_data = None
231+
universe_state.last_send_time = time.time()
232+
universe_state.last_update_time = universe_state.last_send_time
233+
universe_state.termination_sent = False
234+
235+
except Exception as e:
236+
log.error(f"Error sending pending frame for universe {universe_id}: {e}")
237+
238+
async def _send_packet(self, universe_id: int, dmx_data: bytearray) -> None:
239+
"""Send a single sACN packet."""
176240
try:
177241
universe_state = self.universes[universe_id]
178242

@@ -201,25 +265,26 @@ async def _send_universe_data(self, universe_id: int, dmx_data: bytearray) -> No
201265
packet_bytes = packet.serialize()
202266

203267
if self.socket is not None:
268+
log.debug(
269+
f"Sending sACN data to universe {universe_id} (multicast {multicast_addr},"
270+
f" seq: {universe_state.sequence_number})"
271+
)
204272
await self.hass.async_add_executor_job(self.socket.sendto, packet_bytes, (multicast_addr, SACN_PORT))
205273

206274
for unicast_addr in universe_state.unicast_addresses:
207275
if self.socket is not None:
276+
log.debug(
277+
f"Sending sACN data to universe {universe_id} (unicast {unicast_addr['host']}:"
278+
f"{unicast_addr['port']}, seq: {universe_state.sequence_number})"
279+
)
208280
await self.hass.async_add_executor_job(
209-
self.socket.sendto, packet_bytes, (unicast_addr["host"], unicast_addr["port"])
281+
self.socket.sendto,
282+
packet_bytes,
283+
(unicast_addr["host"], unicast_addr["port"]),
210284
)
211285

212-
unicast_info = (
213-
f" + {len(universe_state.unicast_addresses)} unicast" if universe_state.unicast_addresses else ""
214-
)
215-
log.debug(f"Sent sACN data to universe {universe_id} ({multicast_addr}){unicast_info}")
216-
217-
universe_state.increment_sequence()
218-
universe_state.last_data = dmx_data.copy()
219-
universe_state.termination_sent = False
220-
221286
except Exception as e:
222-
log.error(f"Error sending sACN data to universe {universe_id}: {e}")
287+
log.error(f"Error sending sACN packet for universe {universe_id}: {e}")
223288

224289
def terminate_universe(self, universe_id: int) -> None:
225290
if universe_id not in self.universes:
@@ -229,6 +294,9 @@ def terminate_universe(self, universe_id: int) -> None:
229294
if universe_state.termination_sent:
230295
return
231296

297+
if universe_state.stream_task and not universe_state.stream_task.done():
298+
universe_state.stream_task.cancel()
299+
232300
try:
233301
options = SacnOptions(preview_data=False, stream_terminated=True, force_synchronization=False)
234302

@@ -324,7 +392,6 @@ def connection_lost(self, exc: Exception | None) -> None:
324392

325393
def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
326394
try:
327-
log.debug(f"Received {len(data)} bytes from {addr[0]}:{addr[1]}")
328395
packet = SacnPacket.deserialize(data)
329396

330397
if packet.universe in self.subscribed_universes:
@@ -336,7 +403,6 @@ def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
336403

337404
# Ignore packets from our own sACN server to prevent feedback loops
338405
if self.own_source_name and packet.source_name == self.own_source_name:
339-
log.debug(f"Ignoring sACN packet from own source '{packet.source_name}' to prevent feedback")
340406
return
341407

342408
if self.data_callback:
@@ -377,8 +443,8 @@ def unsubscribe_universe(self, universe_id: int) -> None:
377443

378444
def _manage_multicast_group(self, universe_id: int, join: bool) -> None:
379445
"""Join or leave multicast group for a universe"""
446+
multicast_addr = f"239.255.{universe_id >> 8}.{universe_id & 0xFF}"
380447
try:
381-
multicast_addr = f"239.255.{universe_id >> 8}.{universe_id & 0xFF}"
382448

383449
sock = self.transport.get_extra_info("socket") if self.transport else None
384450
if sock:

custom_components/dmx/setup/config_processor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ async def process_fixtures(hass: HomeAssistant, fixture_folder: str) -> dict[str
200200
vol.Coerce(int), vol.Range(min=1, max=255)
201201
),
202202
vol.Optional(CONF_ENABLE_PREVIEW_DATA, default=False): cv.boolean,
203+
vol.Optional(CONF_REFRESH_EVERY, default=CONF_REFRESH_EVERY_DEFAULT): cv.positive_float,
203204
vol.Optional(CONF_RATE_LIMIT, default=CONF_RATE_LIMIT_DEFAULT): cv.positive_float,
204205
vol.Required(CONF_UNIVERSES): vol.Schema(
205206
[

docs/config.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ dmx:
7676
priority: 100
7777
multicast_ttl: 64
7878
enable_preview_data: false
79+
interface_ip: "10.101.97.101"
80+
sync_address: 7000
81+
refresh_every: 0.8
82+
rate_limit: 0.5
7983

8084
universes:
8185
- 1:
@@ -160,6 +164,14 @@ Universe definitions. Each universe can be specified as:
160164
Time-to-live for multicast packets [1-255]
161165
- **`enable_preview_data`** *(optional, default: `false`)*
162166
Mark transmitted data as preview data (non-live)
167+
- **`interface_ip`** *(optional, default: `0.0.0.0`)*
168+
IP address of the interface to use for sACN transmission
169+
- **`sync_address`** *(optional, default: `7000`)*
170+
Sync address for sACN transmission
171+
- **`refresh_every`** *(optional, default: `0.8`)*
172+
The interval in seconds in which universe data is retransmitted. This is useful when there are external controllers sending to the same universes.
173+
- **`rate_limit`** *(optional, default: `0.5`)*
174+
The rate limit in seconds between each entity update when received from an external controller. Increase this value if HomeAssistant slows down too much when receiving updates.
163175

164176
#### `dmx.sacn.universes`
165177
Universe definitions for sACN. Each universe number [1-63999] maps to multicast address `239.255.X.Y` where X.Y represents the universe number.

docs/sacn-communication.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ dmx:
203203
priority: 85 # Lower than main console (100)
204204
sync_address: 7000 # Receive universe sync from external controllers
205205
interface_ip: "10.101.97.101" # Listen on a specified interface
206+
refresh_every: 0.5 # Lower retransmit interval to reduce jitter
206207

207208
universes:
208209
- 1: # Receives from external lighting console

0 commit comments

Comments
 (0)