Skip to content

Commit e75c90b

Browse files
authored
Return GrpcStreamBroadcaster instances from the streaming methods (#110)
This makes it easier to close the streamers when they are no longer needed. Also rename the methods from `stream_*` to `*_stream`.
2 parents ce39eb9 + f64a0c9 commit e75c90b

File tree

4 files changed

+49
-57
lines changed

4 files changed

+49
-57
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
* Rename `receive_trades` to `receive_public_trades`
1111
* Rename `receive_orders` to `receive_gridpool_orders`
1212

13+
- The `stream_*` methods in the client have been renamed to `*_stream`. They no longer return `Receiver` instances, but rather `GrpcStreamBroadcaster` instances. They expose a `new_receiver()` method which can be used to get a new receiver. They also expose a `stop()` method which can be used to stop the background streaming task when the stream is no longer needed.
14+
1315
## New Features
1416

1517
* Print tags and filled (instead of open) quantity for gridpool orders in CLI tool.

src/frequenz/client/electricity_trading/_client.py

Lines changed: 27 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
from frequenz.api.electricity_trading.v1.electricity_trading_pb2_grpc import (
2525
ElectricityTradingServiceStub,
2626
)
27-
from frequenz.channels import Receiver
2827
from frequenz.client.base.client import BaseApiClient
2928
from frequenz.client.base.exception import ClientNotConnected
3029
from frequenz.client.base.streaming import GrpcStreamBroadcaster
@@ -218,7 +217,7 @@ def stub(self) -> electricity_trading_pb2_grpc.ElectricityTradingServiceAsyncStu
218217
# type-checker, so it can only be used for type hints.
219218
return self._stub # type: ignore
220219

221-
async def stream_gridpool_orders(
220+
def gridpool_orders_stream(
222221
# pylint: disable=too-many-arguments, too-many-positional-arguments
223222
self,
224223
gridpool_id: int,
@@ -227,9 +226,9 @@ async def stream_gridpool_orders(
227226
delivery_area: DeliveryArea | None = None,
228227
delivery_period: DeliveryPeriod | None = None,
229228
tag: str | None = None,
230-
max_size: int = 50,
231-
warn_on_overflow: bool = False,
232-
) -> Receiver[OrderDetail]:
229+
) -> GrpcStreamBroadcaster[
230+
electricity_trading_pb2.ReceiveGridpoolOrdersStreamResponse, OrderDetail
231+
]:
233232
"""
234233
Stream gridpool orders.
235234
@@ -240,10 +239,6 @@ async def stream_gridpool_orders(
240239
delivery_area: Delivery area to filter for.
241240
delivery_period: Delivery period to filter for.
242241
tag: Tag to filter for.
243-
max_size: The maximum number of messages to buffer.
244-
warn_on_overflow: Whether to log a warning when the receiver's
245-
buffer is full and a message is dropped.
246-
247242
248243
Returns:
249244
Async generator of orders.
@@ -263,7 +258,10 @@ async def stream_gridpool_orders(
263258

264259
stream_key = (gridpool_id, gridpool_order_filter)
265260

266-
if stream_key not in self._gridpool_orders_streams:
261+
if (
262+
stream_key not in self._gridpool_orders_streams
263+
or not self._gridpool_orders_streams[stream_key].is_running
264+
):
267265
try:
268266
self._gridpool_orders_streams[stream_key] = GrpcStreamBroadcaster(
269267
f"electricity-trading-{stream_key}",
@@ -281,11 +279,9 @@ async def stream_gridpool_orders(
281279
"Error occurred while streaming gridpool orders: %s", e
282280
)
283281
raise
284-
return self._gridpool_orders_streams[stream_key].new_receiver(
285-
warn_on_overflow=warn_on_overflow, maxsize=max_size
286-
)
282+
return self._gridpool_orders_streams[stream_key]
287283

288-
async def stream_gridpool_trades(
284+
def gridpool_trades_stream(
289285
# pylint: disable=too-many-arguments, too-many-positional-arguments
290286
self,
291287
gridpool_id: int,
@@ -294,9 +290,9 @@ async def stream_gridpool_trades(
294290
market_side: MarketSide | None = None,
295291
delivery_period: DeliveryPeriod | None = None,
296292
delivery_area: DeliveryArea | None = None,
297-
max_size: int = 50,
298-
warn_on_overflow: bool = False,
299-
) -> Receiver[Trade]:
293+
) -> GrpcStreamBroadcaster[
294+
electricity_trading_pb2.ReceiveGridpoolTradesStreamResponse, Trade
295+
]:
300296
"""
301297
Stream gridpool trades.
302298
@@ -307,9 +303,6 @@ async def stream_gridpool_trades(
307303
market_side: The market side to filter for.
308304
delivery_period: The delivery period to filter for.
309305
delivery_area: The delivery area to filter for.
310-
max_size: The maximum number of messages to buffer.
311-
warn_on_overflow: Whether to log a warning when the receiver's
312-
buffer is full and a message is dropped.
313306
314307
Returns:
315308
The gridpool trades streamer.
@@ -329,7 +322,10 @@ async def stream_gridpool_trades(
329322

330323
stream_key = (gridpool_id, gridpool_trade_filter)
331324

332-
if stream_key not in self._gridpool_trades_streams:
325+
if (
326+
stream_key not in self._gridpool_trades_streams
327+
or not self._gridpool_trades_streams[stream_key].is_running
328+
):
333329
try:
334330
self._gridpool_trades_streams[stream_key] = GrpcStreamBroadcaster(
335331
f"electricity-trading-{stream_key}",
@@ -347,20 +343,18 @@ async def stream_gridpool_trades(
347343
"Error occurred while streaming gridpool trades: %s", e
348344
)
349345
raise
350-
return self._gridpool_trades_streams[stream_key].new_receiver(
351-
warn_on_overflow=warn_on_overflow, maxsize=max_size
352-
)
346+
return self._gridpool_trades_streams[stream_key]
353347

354-
async def stream_public_trades(
348+
def public_trades_stream(
355349
# pylint: disable=too-many-arguments, too-many-positional-arguments
356350
self,
357351
states: list[TradeState] | None = None,
358352
delivery_period: DeliveryPeriod | None = None,
359353
buy_delivery_area: DeliveryArea | None = None,
360354
sell_delivery_area: DeliveryArea | None = None,
361-
max_size: int = 50,
362-
warn_on_overflow: bool = False,
363-
) -> Receiver[PublicTrade]:
355+
) -> GrpcStreamBroadcaster[
356+
electricity_trading_pb2.ReceivePublicTradesStreamResponse, PublicTrade
357+
]:
364358
"""
365359
Stream public trades.
366360
@@ -369,9 +363,6 @@ async def stream_public_trades(
369363
delivery_period: Delivery period to filter for.
370364
buy_delivery_area: Buy delivery area to filter for.
371365
sell_delivery_area: Sell delivery area to filter for.
372-
max_size: The maximum number of messages to buffer.
373-
warn_on_overflow: Whether to log a warning when the receiver's
374-
buffer is full and a message is dropped.
375366
376367
Returns:
377368
Async generator of orders.
@@ -388,7 +379,10 @@ async def stream_public_trades(
388379
sell_delivery_area=sell_delivery_area,
389380
)
390381

391-
if public_trade_filter not in self._public_trades_streams:
382+
if (
383+
public_trade_filter not in self._public_trades_streams
384+
or not self._public_trades_streams[public_trade_filter].is_running
385+
):
392386
try:
393387
self._public_trades_streams[public_trade_filter] = (
394388
GrpcStreamBroadcaster(
@@ -405,9 +399,7 @@ async def stream_public_trades(
405399
except grpc.RpcError as e:
406400
_logger.exception("Error occurred while streaming public trades: %s", e)
407401
raise
408-
return self._public_trades_streams[public_trade_filter].new_receiver(
409-
warn_on_overflow=warn_on_overflow, maxsize=max_size
410-
)
402+
return self._public_trades_streams[public_trade_filter]
411403

412404
def validate_params(
413405
# pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-branches

src/frequenz/client/electricity_trading/cli/etrading.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ async def list_public_trades(url: str, key: str, *, delivery_start: datetime) ->
7373
if delivery_start <= datetime.now(timezone.utc):
7474
return
7575

76-
stream = await client.stream_public_trades(delivery_period=delivery_period)
76+
stream = client.public_trades_stream(delivery_period=delivery_period).new_receiver()
7777
async for trade in stream:
7878
print_public_trade(trade)
7979

@@ -111,7 +111,9 @@ async def list_gridpool_trades(
111111
if delivery_start and delivery_start <= datetime.now(timezone.utc):
112112
return
113113

114-
stream = await client.stream_gridpool_trades(gid, delivery_period=delivery_period)
114+
stream = client.gridpool_trades_stream(
115+
gid, delivery_period=delivery_period
116+
).new_receiver()
115117
async for trade in stream:
116118
print_trade(trade)
117119

@@ -154,7 +156,9 @@ async def list_gridpool_orders(
154156
if delivery_start and delivery_start <= datetime.now(timezone.utc):
155157
return
156158

157-
stream = await client.stream_gridpool_orders(gid, delivery_period=delivery_period)
159+
stream = client.gridpool_orders_stream(
160+
gid, delivery_period=delivery_period
161+
).new_receiver()
158162
async for order in stream:
159163
print_order(order)
160164

tests/test_client.py

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -131,27 +131,23 @@ def set_up_order_detail_response(
131131
).to_pb()
132132

133133

134-
def test_stream_gridpool_orders(set_up: SetupParams) -> None:
134+
async def test_stream_gridpool_orders(set_up: SetupParams) -> None:
135135
"""Test the method streaming gridpool orders."""
136-
set_up.loop.run_until_complete(
137-
set_up.client.stream_gridpool_orders(set_up.gridpool_id)
138-
)
136+
set_up.client.gridpool_orders_stream(set_up.gridpool_id)
137+
await asyncio.sleep(0)
139138

140139
set_up.mock_stub.ReceiveGridpoolOrdersStream.assert_called_once()
141140
args, _ = set_up.mock_stub.ReceiveGridpoolOrdersStream.call_args
142141
assert args[0].gridpool_id == set_up.gridpool_id
143142

144143

145-
def test_stream_gridpool_orders_with_optional_inputs(set_up: SetupParams) -> None:
144+
async def test_stream_gridpool_orders_with_optional_inputs(set_up: SetupParams) -> None:
146145
"""Test the method streaming gridpool orders with some fields to filter for."""
147146
# Fields to filter for
148147
order_states = [OrderState.ACTIVE]
149148

150-
set_up.loop.run_until_complete(
151-
set_up.client.stream_gridpool_orders(
152-
set_up.gridpool_id, order_states=order_states
153-
)
154-
)
149+
set_up.client.gridpool_orders_stream(set_up.gridpool_id, order_states=order_states)
150+
await asyncio.sleep(0)
155151

156152
set_up.mock_stub.ReceiveGridpoolOrdersStream.assert_called_once()
157153
args, _ = set_up.mock_stub.ReceiveGridpoolOrdersStream.call_args
@@ -161,32 +157,30 @@ def test_stream_gridpool_orders_with_optional_inputs(set_up: SetupParams) -> Non
161157
]
162158

163159

164-
def test_stream_gridpool_trades(
160+
async def test_stream_gridpool_trades(
165161
set_up: SetupParams,
166162
) -> None:
167163
"""Test the method streaming gridpool trades."""
168-
set_up.loop.run_until_complete(
169-
set_up.client.stream_gridpool_trades(
170-
gridpool_id=set_up.gridpool_id, market_side=set_up.side
171-
)
164+
set_up.client.gridpool_trades_stream(
165+
gridpool_id=set_up.gridpool_id, market_side=set_up.side
172166
)
167+
await asyncio.sleep(0)
173168

174169
set_up.mock_stub.ReceiveGridpoolTradesStream.assert_called_once()
175170
args, _ = set_up.mock_stub.ReceiveGridpoolTradesStream.call_args
176171
assert args[0].gridpool_id == set_up.gridpool_id
177172
assert args[0].filter.side == set_up.side.to_pb()
178173

179174

180-
def test_stream_public_trades(
175+
async def test_stream_public_trades(
181176
set_up: SetupParams,
182177
) -> None:
183178
"""Test the method streaming public trades."""
184179
# Fields to filter for
185180
trade_states = [TradeState.ACTIVE]
186181

187-
set_up.loop.run_until_complete(
188-
set_up.client.stream_public_trades(states=trade_states)
189-
)
182+
set_up.client.public_trades_stream(states=trade_states)
183+
await asyncio.sleep(0)
190184

191185
set_up.mock_stub.ReceivePublicTradesStream.assert_called_once()
192186
args, _ = set_up.mock_stub.ReceivePublicTradesStream.call_args

0 commit comments

Comments
 (0)