diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index e1b70e3d..dd802cb7 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -12,6 +12,8 @@ * Add helper function to support creating quantities that conform to the API expectations. * Add `receive-gridpool-trades` command to CLI tool. +* Add the `warn_on_overflow` option to the streaming receivers to allow ignoring overflow warnings +* Add the `max_size` option to the streaming receivers diff --git a/pyproject.toml b/pyproject.toml index e82e7813..44d6538d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,8 +31,8 @@ dependencies = [ "entsoe-py >= 0.6.16, < 1", "frequenz-api-common >= 0.6.3, < 0.7.0", "grpcio >= 1.66.2, < 2", - "frequenz-channels >= 1.0.0, < 2", - "frequenz-client-base >= 0.8.0, < 0.9.0", + "frequenz-channels >= 1.6.1, < 2", + "frequenz-client-base >= 0.9.0, < 0.10.0", "frequenz-client-common >= 0.1.0, < 0.3.0", "frequenz-api-electricity-trading >= 0.2.4, < 1", "protobuf >= 5.28.0, < 6", diff --git a/src/frequenz/client/electricity_trading/_client.py b/src/frequenz/client/electricity_trading/_client.py index 59dffa2a..6da36483 100644 --- a/src/frequenz/client/electricity_trading/_client.py +++ b/src/frequenz/client/electricity_trading/_client.py @@ -3,6 +3,8 @@ """Module to define the client class.""" +# pylint: disable=too-many-lines + from __future__ import annotations import asyncio @@ -225,6 +227,8 @@ async def stream_gridpool_orders( delivery_area: DeliveryArea | None = None, delivery_period: DeliveryPeriod | None = None, tag: str | None = None, + max_size: int = 50, + warn_on_overflow: bool = False, ) -> Receiver[OrderDetail]: """ Stream gridpool orders. @@ -236,6 +240,10 @@ async def stream_gridpool_orders( delivery_area: Delivery area to filter for. delivery_period: Delivery period to filter for. tag: Tag to filter for. + max_size: The maximum number of messages to buffer. + warn_on_overflow: Whether to log a warning when the receiver's + buffer is full and a message is dropped. + Returns: Async generator of orders. @@ -273,7 +281,9 @@ async def stream_gridpool_orders( "Error occurred while streaming gridpool orders: %s", e ) raise - return self._gridpool_orders_streams[stream_key].new_receiver() + return self._gridpool_orders_streams[stream_key].new_receiver( + warn_on_overflow=warn_on_overflow, maxsize=max_size + ) async def stream_gridpool_trades( # pylint: disable=too-many-arguments, too-many-positional-arguments @@ -284,6 +294,8 @@ async def stream_gridpool_trades( market_side: MarketSide | None = None, delivery_period: DeliveryPeriod | None = None, delivery_area: DeliveryArea | None = None, + max_size: int = 50, + warn_on_overflow: bool = False, ) -> Receiver[Trade]: """ Stream gridpool trades. @@ -295,6 +307,9 @@ async def stream_gridpool_trades( market_side: The market side to filter for. delivery_period: The delivery period to filter for. delivery_area: The delivery area to filter for. + max_size: The maximum number of messages to buffer. + warn_on_overflow: Whether to log a warning when the receiver's + buffer is full and a message is dropped. Returns: The gridpool trades streamer. @@ -332,14 +347,19 @@ async def stream_gridpool_trades( "Error occurred while streaming gridpool trades: %s", e ) raise - return self._gridpool_trades_streams[stream_key].new_receiver() + return self._gridpool_trades_streams[stream_key].new_receiver( + warn_on_overflow=warn_on_overflow, maxsize=max_size + ) async def stream_public_trades( + # pylint: disable=too-many-arguments, too-many-positional-arguments self, states: list[TradeState] | None = None, delivery_period: DeliveryPeriod | None = None, buy_delivery_area: DeliveryArea | None = None, sell_delivery_area: DeliveryArea | None = None, + max_size: int = 50, + warn_on_overflow: bool = False, ) -> Receiver[PublicTrade]: """ Stream public trades. @@ -349,6 +369,9 @@ async def stream_public_trades( delivery_period: Delivery period to filter for. buy_delivery_area: Buy delivery area to filter for. sell_delivery_area: Sell delivery area to filter for. + max_size: The maximum number of messages to buffer. + warn_on_overflow: Whether to log a warning when the receiver's + buffer is full and a message is dropped. Returns: Async generator of orders. @@ -382,7 +405,9 @@ async def stream_public_trades( except grpc.RpcError as e: _logger.exception("Error occurred while streaming public trades: %s", e) raise - return self._public_trades_streams[public_trade_filter].new_receiver() + return self._public_trades_streams[public_trade_filter].new_receiver( + warn_on_overflow=warn_on_overflow, maxsize=max_size + ) def validate_params( # pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-branches