Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

* Add helper function to support creating quantities that conform to the API expectations.
* Add `receive-gridpool-trades` command to CLI tool.
* Add the `warn_on_overflow` option to the streaming receivers to allow ignoring overflow warnings
* Add the `max_size` option to the streaming receivers

<!-- Here goes the main new features and examples or instructions on how to use them -->

Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ dependencies = [
"entsoe-py >= 0.6.16, < 1",
"frequenz-api-common >= 0.6.3, < 0.7.0",
"grpcio >= 1.66.2, < 2",
"frequenz-channels >= 1.0.0, < 2",
"frequenz-client-base >= 0.8.0, < 0.9.0",
"frequenz-channels >= 1.6.1, < 2",
"frequenz-client-base >= 0.9.0, < 0.10.0",
"frequenz-client-common >= 0.1.0, < 0.3.0",
"frequenz-api-electricity-trading >= 0.2.4, < 1",
"protobuf >= 5.28.0, < 6",
Expand Down
31 changes: 28 additions & 3 deletions src/frequenz/client/electricity_trading/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

"""Module to define the client class."""

# pylint: disable=too-many-lines

from __future__ import annotations

import asyncio
Expand Down Expand Up @@ -225,6 +227,8 @@ async def stream_gridpool_orders(
delivery_area: DeliveryArea | None = None,
delivery_period: DeliveryPeriod | None = None,
tag: str | None = None,
max_size: int = 50,
warn_on_overflow: bool = False,
) -> Receiver[OrderDetail]:
"""
Stream gridpool orders.
Expand All @@ -236,6 +240,10 @@ async def stream_gridpool_orders(
delivery_area: Delivery area to filter for.
delivery_period: Delivery period to filter for.
tag: Tag to filter for.
max_size: The maximum number of messages to buffer.
warn_on_overflow: Whether to log a warning when the receiver's
buffer is full and a message is dropped.


Returns:
Async generator of orders.
Expand Down Expand Up @@ -273,7 +281,9 @@ async def stream_gridpool_orders(
"Error occurred while streaming gridpool orders: %s", e
)
raise
return self._gridpool_orders_streams[stream_key].new_receiver()
return self._gridpool_orders_streams[stream_key].new_receiver(
warn_on_overflow=warn_on_overflow, maxsize=max_size
)

async def stream_gridpool_trades(
# pylint: disable=too-many-arguments, too-many-positional-arguments
Expand All @@ -284,6 +294,8 @@ async def stream_gridpool_trades(
market_side: MarketSide | None = None,
delivery_period: DeliveryPeriod | None = None,
delivery_area: DeliveryArea | None = None,
max_size: int = 50,
warn_on_overflow: bool = False,
) -> Receiver[Trade]:
"""
Stream gridpool trades.
Expand All @@ -295,6 +307,9 @@ async def stream_gridpool_trades(
market_side: The market side to filter for.
delivery_period: The delivery period to filter for.
delivery_area: The delivery area to filter for.
max_size: The maximum number of messages to buffer.
warn_on_overflow: Whether to log a warning when the receiver's
buffer is full and a message is dropped.

Returns:
The gridpool trades streamer.
Expand Down Expand Up @@ -332,14 +347,19 @@ async def stream_gridpool_trades(
"Error occurred while streaming gridpool trades: %s", e
)
raise
return self._gridpool_trades_streams[stream_key].new_receiver()
return self._gridpool_trades_streams[stream_key].new_receiver(
warn_on_overflow=warn_on_overflow, maxsize=max_size
)

async def stream_public_trades(
# pylint: disable=too-many-arguments, too-many-positional-arguments
self,
states: list[TradeState] | None = None,
delivery_period: DeliveryPeriod | None = None,
buy_delivery_area: DeliveryArea | None = None,
sell_delivery_area: DeliveryArea | None = None,
max_size: int = 50,
warn_on_overflow: bool = False,
) -> Receiver[PublicTrade]:
"""
Stream public trades.
Expand All @@ -349,6 +369,9 @@ async def stream_public_trades(
delivery_period: Delivery period to filter for.
buy_delivery_area: Buy delivery area to filter for.
sell_delivery_area: Sell delivery area to filter for.
max_size: The maximum number of messages to buffer.
warn_on_overflow: Whether to log a warning when the receiver's
buffer is full and a message is dropped.

Returns:
Async generator of orders.
Expand Down Expand Up @@ -382,7 +405,9 @@ async def stream_public_trades(
except grpc.RpcError as e:
_logger.exception("Error occurred while streaming public trades: %s", e)
raise
return self._public_trades_streams[public_trade_filter].new_receiver()
return self._public_trades_streams[public_trade_filter].new_receiver(
warn_on_overflow=warn_on_overflow, maxsize=max_size
)

def validate_params(
# pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-branches
Expand Down
Loading