From e92212cc7c08192db8adc9aaf50085a23696e391 Mon Sep 17 00:00:00 2001 From: camille-bouvy-frequenz Date: Thu, 13 Mar 2025 17:04:02 +0100 Subject: [PATCH 1/4] Upgrade dependencies for updated gRPC streaming API - Bumped `frequenz-api-electricity-trading` to `>=0.5.0` Signed-off-by: camille-bouvy-frequenz --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 16c8c9c..6852b65 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ "frequenz-channels >= 1.6.1, < 2", "frequenz-client-base >= 0.9.0, < 0.10.0", "frequenz-client-common >= 0.1.0, < 0.4.0", - "frequenz-api-electricity-trading >= 0.2.4, < 0.5", + "frequenz-api-electricity-trading >= 0.5.0, < 0.6.0", "protobuf >= 5.29.2, < 6", ] dynamic = ["version"] From 9cf5ae1628c3ab96f747e778354d227f18f2b08e Mon Sep 17 00:00:00 2001 From: camille-bouvy-frequenz Date: Thu, 13 Mar 2025 17:18:46 +0100 Subject: [PATCH 2/4] Unify Public Trades streaming and listing - Removed `list_public_trades` - Replaced `public_trades_stream` with `receive_public_trades` - Updated the method to support streaming with optional time range (`start_time`, `end_time`) - Update the unit tests with the new function name Signed-off-by: camille-bouvy-frequenz --- .../client/electricity_trading/_client.py | 150 ++++++------------ tests/test_client.py | 6 +- 2 files changed, 51 insertions(+), 105 deletions(-) diff --git a/src/frequenz/client/electricity_trading/_client.py b/src/frequenz/client/electricity_trading/_client.py index 9cbd441..29dbca0 100644 --- a/src/frequenz/client/electricity_trading/_client.py +++ b/src/frequenz/client/electricity_trading/_client.py @@ -29,6 +29,7 @@ from frequenz.client.base.streaming import GrpcStreamBroadcaster from frequenz.client.common.pagination import Params from google.protobuf import field_mask_pb2, struct_pb2 +from google.protobuf.timestamp_pb2 import Timestamp from ._types import ( DeliveryArea, @@ -345,62 +346,6 @@ def gridpool_trades_stream( raise return self._gridpool_trades_streams[stream_key] - def public_trades_stream( - # pylint: disable=too-many-arguments, too-many-positional-arguments - self, - states: list[TradeState] | None = None, - delivery_period: DeliveryPeriod | None = None, - buy_delivery_area: DeliveryArea | None = None, - sell_delivery_area: DeliveryArea | None = None, - ) -> GrpcStreamBroadcaster[ - electricity_trading_pb2.ReceivePublicTradesStreamResponse, PublicTrade - ]: - """ - Stream public trades. - - Args: - states: List of order states to filter for. - delivery_period: Delivery period to filter for. - buy_delivery_area: Buy delivery area to filter for. - sell_delivery_area: Sell delivery area to filter for. - - Returns: - Async generator of orders. - - Raises: - grpc.RpcError: If an error occurs while streaming public trades. - """ - self.validate_params(delivery_period=delivery_period) - - public_trade_filter = PublicTradeFilter( - states=states, - delivery_period=delivery_period, - buy_delivery_area=buy_delivery_area, - sell_delivery_area=sell_delivery_area, - ) - - if ( - public_trade_filter not in self._public_trades_streams - or not self._public_trades_streams[public_trade_filter].is_running - ): - try: - self._public_trades_streams[public_trade_filter] = ( - GrpcStreamBroadcaster( - f"electricity-trading-{public_trade_filter}", - lambda: self.stub.ReceivePublicTradesStream( - electricity_trading_pb2.ReceivePublicTradesStreamRequest( - filter=public_trade_filter.to_pb(), - ), - metadata=self._metadata, - ), - lambda response: PublicTrade.from_pb(response.public_trade), - ) - ) - except grpc.RpcError as e: - _logger.exception("Error occurred while streaming public trades: %s", e) - raise - return self._public_trades_streams[public_trade_filter] - def validate_params( # pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-branches self, @@ -947,33 +892,41 @@ async def list_gridpool_trades( _logger.exception("Error occurred while listing gridpool trades: %s", e) raise - async def list_public_trades( + def receive_public_trades( # pylint: disable=too-many-arguments, too-many-positional-arguments self, states: list[TradeState] | None = None, delivery_period: DeliveryPeriod | None = None, buy_delivery_area: DeliveryArea | None = None, sell_delivery_area: DeliveryArea | None = None, - page_size: int | None = None, - timeout: timedelta | None = None, - ) -> AsyncIterator[PublicTrade]: + start_time: datetime | None = None, + end_time: datetime | None = None, + ) -> GrpcStreamBroadcaster[ + electricity_trading_pb2.ReceivePublicTradesStreamResponse, PublicTrade + ]: """ - List all executed public orders with optional filters and pagination. + Stream public trades with optional filters and time range. Args: - states: List of order states to filter by. - delivery_period: The delivery period to filter by. - buy_delivery_area: The buy delivery area to filter by. - sell_delivery_area: The sell delivery area to filter by. - page_size: The number of public trades to return per page. - timeout: Timeout duration, defaults to None. + states: List of order states to filter for. + delivery_period: Delivery period to filter for. + buy_delivery_area: Buy delivery area to filter for. + sell_delivery_area: Sell delivery area to filter for. + start_time: The starting timestamp to stream trades from. If None, streams from now. + end_time: The ending timestamp to stop streaming trades. If None, streams indefinitely. - Yields: - The list of public trades for each page. + Returns: + Async generator of orders. Raises: - grpc.RpcError: If an error occurs while listing public trades. + grpc.RpcError: If an error occurs while streaming public trades. """ + + def dt_to_pb_timestamp(dt: datetime) -> Timestamp: + ts = Timestamp() + ts.FromDatetime(dt) + return ts + public_trade_filter = PublicTradeFilter( states=states, delivery_period=delivery_period, @@ -981,39 +934,32 @@ async def list_public_trades( sell_delivery_area=sell_delivery_area, ) - request = electricity_trading_pb2.ListPublicTradesRequest( - filter=public_trade_filter.to_pb(), - pagination_params=( - Params(page_size=page_size, page_token="").to_proto() - if page_size - else None - ), - ) - - while True: + if ( + public_trade_filter not in self._public_trades_streams + or not self._public_trades_streams[public_trade_filter].is_running + ): try: - response = await cast( - Awaitable[electricity_trading_pb2.ListPublicTradesResponse], - grpc_call_with_timeout( - self.stub.ListPublicTrades, - request, - metadata=self._metadata, - timeout=timeout, - ), - ) - - for public_trade in response.public_trades: - yield PublicTrade.from_pb(public_trade) - - if response.pagination_info.next_page_token: - request.pagination_params.CopyFrom( - PaginationParams( - page_token=response.pagination_info.next_page_token - ) + self._public_trades_streams[public_trade_filter] = ( + GrpcStreamBroadcaster( + f"electricity-trading-{public_trade_filter}", + lambda: self.stub.ReceivePublicTradesStream( + electricity_trading_pb2.ReceivePublicTradesStreamRequest( + filter=public_trade_filter.to_pb(), + start_time=( + dt_to_pb_timestamp(start_time) + if start_time + else None + ), + end_time=( + dt_to_pb_timestamp(end_time) if end_time else None + ), + ), + metadata=self._metadata, + ), + lambda response: PublicTrade.from_pb(response.public_trade), ) - else: - break - + ) except grpc.RpcError as e: - _logger.exception("Error occurred while listing public trades: %s", e) + _logger.exception("Error occurred while streaming public trades: %s", e) raise + return self._public_trades_streams[public_trade_filter] diff --git a/tests/test_client.py b/tests/test_client.py index c562bc7..787aaed 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -172,14 +172,14 @@ async def test_stream_gridpool_trades( assert args[0].filter.side == set_up.side.to_pb() -async def test_stream_public_trades( +async def test_receive_public_trades( set_up: SetupParams, ) -> None: - """Test the method streaming public trades.""" + """Test the method receiving public trades.""" # Fields to filter for trade_states = [TradeState.ACTIVE] - set_up.client.public_trades_stream(states=trade_states) + set_up.client.receive_public_trades(states=trade_states) await asyncio.sleep(0) set_up.mock_stub.ReceivePublicTradesStream.assert_called_once() From 8ff21ccdf303ed893038b281574a9d614ac57633 Mon Sep 17 00:00:00 2001 From: camille-bouvy-frequenz Date: Thu, 13 Mar 2025 17:25:18 +0100 Subject: [PATCH 3/4] Update release notes Signed-off-by: camille-bouvy-frequenz --- RELEASE_NOTES.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index e7ad0c3..b5aa8ee 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -8,10 +8,13 @@ * Updated frequenz-client-common version range to >=0.1.0, <0.4.0 * Upgraded grpcio to >=1.68.1, <2 and protobuf to >=5.29.2, <6 to resolve compatibility issues +* Unify Public Trades streaming and listing (to align with the proto changes in v0.5.0) + * Removed `list_public_trades` + * Replaced `public_trades_stream` with `receive_public_trades` + * `receive_public_trades` now supports an optional time range (`start_time`, `end_time`) ## New Features - ## Bug Fixes From 86069558bd4f8ce3dba21b40809a5a102dc02b95 Mon Sep 17 00:00:00 2001 From: cwasicki <126617870+cwasicki@users.noreply.github.com> Date: Tue, 18 Mar 2025 21:15:19 +0100 Subject: [PATCH 4/4] Switch CLI tool to use public trade streams Signed-off-by: cwasicki <126617870+cwasicki@users.noreply.github.com> --- .../electricity_trading/cli/__main__.py | 14 +++++++-- .../electricity_trading/cli/etrading.py | 31 ++++++++++--------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/src/frequenz/client/electricity_trading/cli/__main__.py b/src/frequenz/client/electricity_trading/cli/__main__.py index 65cb893..f544b56 100644 --- a/src/frequenz/client/electricity_trading/cli/__main__.py +++ b/src/frequenz/client/electricity_trading/cli/__main__.py @@ -23,7 +23,7 @@ list_gridpool_trades as run_list_gridpool_trades, ) from frequenz.client.electricity_trading.cli.etrading import ( - list_public_trades as run_list_public_trades, + receive_public_trades as run_receive_public_trades, ) TZ = ZoneInfo("Europe/Berlin") @@ -47,10 +47,18 @@ def cli() -> None: @cli.command() @click.option("--url", required=True, type=str) @click.option("--key", required=True, type=str) +@click.option("--delivery-start", default=None, type=iso) @click.option("--start", default=None, type=iso) -def receive_public_trades(url: str, key: str, *, start: datetime) -> None: +@click.option("--end", default=None, type=iso) +def receive_public_trades( + url: str, key: str, *, start: datetime, end: datetime, delivery_start: datetime +) -> None: """List and/or stream public trades.""" - asyncio.run(run_list_public_trades(url=url, key=key, delivery_start=start)) + asyncio.run( + run_receive_public_trades( + url=url, key=key, delivery_start=delivery_start, start=start, end=end + ) + ) @cli.command() diff --git a/src/frequenz/client/electricity_trading/cli/etrading.py b/src/frequenz/client/electricity_trading/cli/etrading.py index af42a43..34cf055 100644 --- a/src/frequenz/client/electricity_trading/cli/etrading.py +++ b/src/frequenz/client/electricity_trading/cli/etrading.py @@ -41,17 +41,22 @@ def check_delivery_start( raise ValueError("Delivery period must be a multiple of `duration`.") -async def list_public_trades(url: str, key: str, *, delivery_start: datetime) -> None: +async def receive_public_trades( + url: str, + key: str, + *, + delivery_start: datetime | None = None, + start: datetime | None = None, + end: datetime | None = None, +) -> None: """List trades and stream new public trades. - If delivery_start is provided, list historical trades and stream new trades - for the 15 minute delivery period starting at delivery_start. - If no delivery_start is provided, stream new trades for any delivery period. - Args: url: URL of the trading API. key: API key. delivery_start: Start of the delivery period or None. + start: First execution time to list trades from. + end: Last execution time to list trades until. """ client = Client(server_url=url, auth_key=key) @@ -65,16 +70,12 @@ async def list_public_trades(url: str, key: str, *, delivery_start: datetime) -> start=delivery_start, duration=timedelta(minutes=15), ) - lst = client.list_public_trades(delivery_period=delivery_period) - - async for trade in lst: - print_public_trade(trade) - - if delivery_start <= datetime.now(timezone.utc): - return - - stream = client.public_trades_stream(delivery_period=delivery_period).new_receiver() - async for trade in stream: + stream = client.receive_public_trades( + delivery_period=delivery_period, + start_time=start, + end_time=end, + ) + async for trade in stream.new_receiver(): print_public_trade(trade)