Skip to content

Commit 360358c

Browse files
Add options to the streaming receivers (#106)
* Add the `warn_on_overflow` option to the streaming receivers to allow ignoring overflow warnings * Add the `max_size` option to the streaming receivers
2 parents 2e97971 + 39c3405 commit 360358c

File tree

3 files changed

+32
-5
lines changed

3 files changed

+32
-5
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
* Add helper function to support creating quantities that conform to the API expectations.
1414
* Add `receive-gridpool-trades` command to CLI tool.
15+
* Add the `warn_on_overflow` option to the streaming receivers to allow ignoring overflow warnings
16+
* Add the `max_size` option to the streaming receivers
1517

1618
<!-- Here goes the main new features and examples or instructions on how to use them -->
1719

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ dependencies = [
3131
"entsoe-py >= 0.6.16, < 1",
3232
"frequenz-api-common >= 0.6.3, < 0.7.0",
3333
"grpcio >= 1.66.2, < 2",
34-
"frequenz-channels >= 1.0.0, < 2",
35-
"frequenz-client-base >= 0.8.0, < 0.9.0",
34+
"frequenz-channels >= 1.6.1, < 2",
35+
"frequenz-client-base >= 0.9.0, < 0.10.0",
3636
"frequenz-client-common >= 0.1.0, < 0.3.0",
3737
"frequenz-api-electricity-trading >= 0.2.4, < 1",
3838
"protobuf >= 5.28.0, < 6",

src/frequenz/client/electricity_trading/_client.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
"""Module to define the client class."""
55

6+
# pylint: disable=too-many-lines
7+
68
from __future__ import annotations
79

810
import asyncio
@@ -225,6 +227,8 @@ async def stream_gridpool_orders(
225227
delivery_area: DeliveryArea | None = None,
226228
delivery_period: DeliveryPeriod | None = None,
227229
tag: str | None = None,
230+
max_size: int = 50,
231+
warn_on_overflow: bool = False,
228232
) -> Receiver[OrderDetail]:
229233
"""
230234
Stream gridpool orders.
@@ -236,6 +240,10 @@ async def stream_gridpool_orders(
236240
delivery_area: Delivery area to filter for.
237241
delivery_period: Delivery period to filter for.
238242
tag: Tag to filter for.
243+
max_size: The maximum number of messages to buffer.
244+
warn_on_overflow: Whether to log a warning when the receiver's
245+
buffer is full and a message is dropped.
246+
239247
240248
Returns:
241249
Async generator of orders.
@@ -273,7 +281,9 @@ async def stream_gridpool_orders(
273281
"Error occurred while streaming gridpool orders: %s", e
274282
)
275283
raise
276-
return self._gridpool_orders_streams[stream_key].new_receiver()
284+
return self._gridpool_orders_streams[stream_key].new_receiver(
285+
warn_on_overflow=warn_on_overflow, maxsize=max_size
286+
)
277287

278288
async def stream_gridpool_trades(
279289
# pylint: disable=too-many-arguments, too-many-positional-arguments
@@ -284,6 +294,8 @@ async def stream_gridpool_trades(
284294
market_side: MarketSide | None = None,
285295
delivery_period: DeliveryPeriod | None = None,
286296
delivery_area: DeliveryArea | None = None,
297+
max_size: int = 50,
298+
warn_on_overflow: bool = False,
287299
) -> Receiver[Trade]:
288300
"""
289301
Stream gridpool trades.
@@ -295,6 +307,9 @@ async def stream_gridpool_trades(
295307
market_side: The market side to filter for.
296308
delivery_period: The delivery period to filter for.
297309
delivery_area: The delivery area to filter for.
310+
max_size: The maximum number of messages to buffer.
311+
warn_on_overflow: Whether to log a warning when the receiver's
312+
buffer is full and a message is dropped.
298313
299314
Returns:
300315
The gridpool trades streamer.
@@ -332,14 +347,19 @@ async def stream_gridpool_trades(
332347
"Error occurred while streaming gridpool trades: %s", e
333348
)
334349
raise
335-
return self._gridpool_trades_streams[stream_key].new_receiver()
350+
return self._gridpool_trades_streams[stream_key].new_receiver(
351+
warn_on_overflow=warn_on_overflow, maxsize=max_size
352+
)
336353

337354
async def stream_public_trades(
355+
# pylint: disable=too-many-arguments, too-many-positional-arguments
338356
self,
339357
states: list[TradeState] | None = None,
340358
delivery_period: DeliveryPeriod | None = None,
341359
buy_delivery_area: DeliveryArea | None = None,
342360
sell_delivery_area: DeliveryArea | None = None,
361+
max_size: int = 50,
362+
warn_on_overflow: bool = False,
343363
) -> Receiver[PublicTrade]:
344364
"""
345365
Stream public trades.
@@ -349,6 +369,9 @@ async def stream_public_trades(
349369
delivery_period: Delivery period to filter for.
350370
buy_delivery_area: Buy delivery area to filter for.
351371
sell_delivery_area: Sell delivery area to filter for.
372+
max_size: The maximum number of messages to buffer.
373+
warn_on_overflow: Whether to log a warning when the receiver's
374+
buffer is full and a message is dropped.
352375
353376
Returns:
354377
Async generator of orders.
@@ -382,7 +405,9 @@ async def stream_public_trades(
382405
except grpc.RpcError as e:
383406
_logger.exception("Error occurred while streaming public trades: %s", e)
384407
raise
385-
return self._public_trades_streams[public_trade_filter].new_receiver()
408+
return self._public_trades_streams[public_trade_filter].new_receiver(
409+
warn_on_overflow=warn_on_overflow, maxsize=max_size
410+
)
386411

387412
def validate_params(
388413
# pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-branches

0 commit comments

Comments
 (0)