Skip to content

Commit e308e7b

Browse files
committed
Restart streams if they are closed
Now that we are exposing the `GrpcStreamBroadcaster` instances which can be closed by external parties, we need to check that existing instances are active before reusing them. Signed-off-by: Sahas Subramanian <[email protected]>
1 parent e14024d commit e308e7b

File tree

1 file changed

+12
-3
lines changed

1 file changed

+12
-3
lines changed

src/frequenz/client/electricity_trading/_client.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,10 @@ def gridpool_orders_stream(
258258

259259
stream_key = (gridpool_id, gridpool_order_filter)
260260

261-
if stream_key not in self._gridpool_orders_streams:
261+
if (
262+
stream_key not in self._gridpool_orders_streams
263+
or not self._gridpool_orders_streams[stream_key].is_running
264+
):
262265
try:
263266
self._gridpool_orders_streams[stream_key] = GrpcStreamBroadcaster(
264267
f"electricity-trading-{stream_key}",
@@ -319,7 +322,10 @@ def gridpool_trades_stream(
319322

320323
stream_key = (gridpool_id, gridpool_trade_filter)
321324

322-
if stream_key not in self._gridpool_trades_streams:
325+
if (
326+
stream_key not in self._gridpool_trades_streams
327+
or not self._gridpool_trades_streams[stream_key].is_running
328+
):
323329
try:
324330
self._gridpool_trades_streams[stream_key] = GrpcStreamBroadcaster(
325331
f"electricity-trading-{stream_key}",
@@ -373,7 +379,10 @@ def public_trades_stream(
373379
sell_delivery_area=sell_delivery_area,
374380
)
375381

376-
if public_trade_filter not in self._public_trades_streams:
382+
if (
383+
public_trade_filter not in self._public_trades_streams
384+
or not self._public_trades_streams[public_trade_filter].is_running
385+
):
377386
try:
378387
self._public_trades_streams[public_trade_filter] = (
379388
GrpcStreamBroadcaster(

0 commit comments

Comments
 (0)