From e534476ab60ac9173f300936c80509e01198caa9 Mon Sep 17 00:00:00 2001 From: camille-bouvy-frequenz Date: Thu, 13 Mar 2025 17:04:02 +0100 Subject: [PATCH 1/5] Upgrade dependencies for updated gRPC streaming API - Bumped `frequenz-api-electricity-trading` to `>=0.5.0` - Updated `protobuf` to `>=5.29.2` Signed-off-by: camille-bouvy-frequenz --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 44d6538..2e0b481 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,8 +34,8 @@ dependencies = [ "frequenz-channels >= 1.6.1, < 2", "frequenz-client-base >= 0.9.0, < 0.10.0", "frequenz-client-common >= 0.1.0, < 0.3.0", - "frequenz-api-electricity-trading >= 0.2.4, < 1", - "protobuf >= 5.28.0, < 6", + "frequenz-api-electricity-trading >= 0.5.0, < 0.6.0", + "protobuf >= 5.29.2, < 7", ] dynamic = ["version"] From e33bd3ff5aaa7e51bb7e8cc93bec1480e91bf304 Mon Sep 17 00:00:00 2001 From: camille-bouvy-frequenz Date: Thu, 13 Mar 2025 17:18:46 +0100 Subject: [PATCH 2/5] Unify Public Trades streaming and listing - Removed `list_public_trades` - Replaced `public_trades_stream` with `receive_public_trades` - Updated the method to support streaming with optional time range (`start_time`, `end_time`) - Update the unit tests with the new function name Signed-off-by: camille-bouvy-frequenz --- .../client/electricity_trading/_client.py | 150 ++++++------------ tests/test_client.py | 6 +- 2 files changed, 53 insertions(+), 103 deletions(-) diff --git a/src/frequenz/client/electricity_trading/_client.py b/src/frequenz/client/electricity_trading/_client.py index 57674e1..2164eee 100644 --- a/src/frequenz/client/electricity_trading/_client.py +++ b/src/frequenz/client/electricity_trading/_client.py @@ -29,6 +29,7 @@ from frequenz.client.base.streaming import GrpcStreamBroadcaster from frequenz.client.common.pagination import Params from google.protobuf import field_mask_pb2, struct_pb2 +from google.protobuf.timestamp_pb2 import Timestamp from ._types import ( DeliveryArea, @@ -345,62 +346,6 @@ def gridpool_trades_stream( raise return self._gridpool_trades_streams[stream_key] - def public_trades_stream( - # pylint: disable=too-many-arguments, too-many-positional-arguments - self, - states: list[TradeState] | None = None, - delivery_period: DeliveryPeriod | None = None, - buy_delivery_area: DeliveryArea | None = None, - sell_delivery_area: DeliveryArea | None = None, - ) -> GrpcStreamBroadcaster[ - electricity_trading_pb2.ReceivePublicTradesStreamResponse, PublicTrade - ]: - """ - Stream public trades. - - Args: - states: List of order states to filter for. - delivery_period: Delivery period to filter for. - buy_delivery_area: Buy delivery area to filter for. - sell_delivery_area: Sell delivery area to filter for. - - Returns: - Async generator of orders. - - Raises: - grpc.RpcError: If an error occurs while streaming public trades. - """ - self.validate_params(delivery_period=delivery_period) - - public_trade_filter = PublicTradeFilter( - states=states, - delivery_period=delivery_period, - buy_delivery_area=buy_delivery_area, - sell_delivery_area=sell_delivery_area, - ) - - if ( - public_trade_filter not in self._public_trades_streams - or not self._public_trades_streams[public_trade_filter].is_running - ): - try: - self._public_trades_streams[public_trade_filter] = ( - GrpcStreamBroadcaster( - f"electricity-trading-{public_trade_filter}", - lambda: self.stub.ReceivePublicTradesStream( - electricity_trading_pb2.ReceivePublicTradesStreamRequest( - filter=public_trade_filter.to_pb(), - ), - metadata=self._metadata, - ), - lambda response: PublicTrade.from_pb(response.public_trade), - ) - ) - except grpc.RpcError as e: - _logger.exception("Error occurred while streaming public trades: %s", e) - raise - return self._public_trades_streams[public_trade_filter] - def validate_params( # pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-branches self, @@ -943,33 +888,43 @@ async def list_gridpool_trades( _logger.exception("Error occurred while listing gridpool trades: %s", e) raise - async def list_public_trades( + def receive_public_trades( # pylint: disable=too-many-arguments, too-many-positional-arguments self, states: list[TradeState] | None = None, delivery_period: DeliveryPeriod | None = None, buy_delivery_area: DeliveryArea | None = None, sell_delivery_area: DeliveryArea | None = None, - page_size: int | None = None, - timeout: timedelta | None = None, - ) -> AsyncIterator[PublicTrade]: + start_time: datetime | None = None, + end_time: datetime | None = None, + ) -> GrpcStreamBroadcaster[ + electricity_trading_pb2.ReceivePublicTradesStreamResponse, PublicTrade + ]: """ - List all executed public orders with optional filters and pagination. + Stream public trades with optional filters and time range. Args: - states: List of order states to filter by. - delivery_period: The delivery period to filter by. - buy_delivery_area: The buy delivery area to filter by. - sell_delivery_area: The sell delivery area to filter by. - page_size: The number of public trades to return per page. - timeout: Timeout duration, defaults to None. + states: List of order states to filter for. + delivery_period: Delivery period to filter for. + buy_delivery_area: Buy delivery area to filter for. + sell_delivery_area: Sell delivery area to filter for. + start_time: The starting timestamp to stream trades from. If None, streams from now. + end_time: The ending timestamp to stop streaming trades. If None, streams indefinitely. - Yields: - The list of public trades for each page. + Returns: + Async generator of orders. Raises: - grpc.RpcError: If an error occurs while listing public trades. + grpc.RpcError: If an error occurs while streaming public trades. """ + + def dt_to_pb_timestamp(dt: datetime) -> Timestamp: + ts = Timestamp() + ts.FromDatetime(dt) + return ts + + self.validate_params(delivery_period=delivery_period) + public_trade_filter = PublicTradeFilter( states=states, delivery_period=delivery_period, @@ -977,37 +932,32 @@ async def list_public_trades( sell_delivery_area=sell_delivery_area, ) - request = electricity_trading_pb2.ListPublicTradesRequest( - filter=public_trade_filter.to_pb(), - pagination_params=( - Params(page_size=page_size).to_proto() if page_size else None - ), - ) - - while True: + if ( + public_trade_filter not in self._public_trades_streams + or not self._public_trades_streams[public_trade_filter].is_running + ): try: - response = await cast( - Awaitable[electricity_trading_pb2.ListPublicTradesResponse], - grpc_call_with_timeout( - self.stub.ListPublicTrades, - request, - metadata=self._metadata, - timeout=timeout, - ), - ) - - for public_trade in response.public_trades: - yield PublicTrade.from_pb(public_trade) - - if response.pagination_info.next_page_token: - request.pagination_params.CopyFrom( - PaginationParams( - page_token=response.pagination_info.next_page_token - ) + self._public_trades_streams[public_trade_filter] = ( + GrpcStreamBroadcaster( + f"electricity-trading-{public_trade_filter}", + lambda: self.stub.ReceivePublicTradesStream( + electricity_trading_pb2.ReceivePublicTradesStreamRequest( + filter=public_trade_filter.to_pb(), + start_time=( + dt_to_pb_timestamp(start_time) + if start_time + else None + ), + end_time=( + dt_to_pb_timestamp(end_time) if end_time else None + ), + ), + metadata=self._metadata, + ), + lambda response: PublicTrade.from_pb(response.public_trade), ) - else: - break - + ) except grpc.RpcError as e: - _logger.exception("Error occurred while listing public trades: %s", e) + _logger.exception("Error occurred while streaming public trades: %s", e) raise + return self._public_trades_streams[public_trade_filter] diff --git a/tests/test_client.py b/tests/test_client.py index c562bc7..787aaed 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -172,14 +172,14 @@ async def test_stream_gridpool_trades( assert args[0].filter.side == set_up.side.to_pb() -async def test_stream_public_trades( +async def test_receive_public_trades( set_up: SetupParams, ) -> None: - """Test the method streaming public trades.""" + """Test the method receiving public trades.""" # Fields to filter for trade_states = [TradeState.ACTIVE] - set_up.client.public_trades_stream(states=trade_states) + set_up.client.receive_public_trades(states=trade_states) await asyncio.sleep(0) set_up.mock_stub.ReceivePublicTradesStream.assert_called_once() From 64911b84bd71236abd48ac6d909bea0cfca8312d Mon Sep 17 00:00:00 2001 From: camille-bouvy-frequenz Date: Thu, 13 Mar 2025 17:25:18 +0100 Subject: [PATCH 3/5] Update release notes Signed-off-by: camille-bouvy-frequenz --- RELEASE_NOTES.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 40e493f..0f27e8b 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,11 +6,13 @@ ## Upgrading - +* Unify Public Trades streaming and listing (to align with the proto changes in v0.5.0) + * Removed `list_public_trades` + * Replaced `public_trades_stream` with `receive_public_trades` + * `receive_public_trades` now supports an optional time range (`start_time`, `end_time`) ## New Features - ## Bug Fixes From 8a75e9e9e5fb3c40ff35ce379c096c13e6ccff74 Mon Sep 17 00:00:00 2001 From: cwasicki <126617870+cwasicki@users.noreply.github.com> Date: Wed, 19 Mar 2025 16:33:00 +0100 Subject: [PATCH 4/5] Fix receive_public_trades method The delivery period does not need to be in the past. Moreover the protobuf dependency is tightened again. --- pyproject.toml | 2 +- src/frequenz/client/electricity_trading/_client.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2e0b481..4423577 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ dependencies = [ "frequenz-client-base >= 0.9.0, < 0.10.0", "frequenz-client-common >= 0.1.0, < 0.3.0", "frequenz-api-electricity-trading >= 0.5.0, < 0.6.0", - "protobuf >= 5.29.2, < 7", + "protobuf >= 5.29.2, < 6", ] dynamic = ["version"] diff --git a/src/frequenz/client/electricity_trading/_client.py b/src/frequenz/client/electricity_trading/_client.py index 2164eee..24dc1fd 100644 --- a/src/frequenz/client/electricity_trading/_client.py +++ b/src/frequenz/client/electricity_trading/_client.py @@ -923,8 +923,6 @@ def dt_to_pb_timestamp(dt: datetime) -> Timestamp: ts.FromDatetime(dt) return ts - self.validate_params(delivery_period=delivery_period) - public_trade_filter = PublicTradeFilter( states=states, delivery_period=delivery_period, From 4cd0051db29ed0ea724a94304f28e8a3b24d2484 Mon Sep 17 00:00:00 2001 From: cwasicki <126617870+cwasicki@users.noreply.github.com> Date: Tue, 18 Mar 2025 21:15:19 +0100 Subject: [PATCH 5/5] Switch CLI tool to use public trade streams --- .../electricity_trading/cli/__main__.py | 14 +++++++-- .../electricity_trading/cli/etrading.py | 31 ++++++++++--------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/src/frequenz/client/electricity_trading/cli/__main__.py b/src/frequenz/client/electricity_trading/cli/__main__.py index 65cb893..f544b56 100644 --- a/src/frequenz/client/electricity_trading/cli/__main__.py +++ b/src/frequenz/client/electricity_trading/cli/__main__.py @@ -23,7 +23,7 @@ list_gridpool_trades as run_list_gridpool_trades, ) from frequenz.client.electricity_trading.cli.etrading import ( - list_public_trades as run_list_public_trades, + receive_public_trades as run_receive_public_trades, ) TZ = ZoneInfo("Europe/Berlin") @@ -47,10 +47,18 @@ def cli() -> None: @cli.command() @click.option("--url", required=True, type=str) @click.option("--key", required=True, type=str) +@click.option("--delivery-start", default=None, type=iso) @click.option("--start", default=None, type=iso) -def receive_public_trades(url: str, key: str, *, start: datetime) -> None: +@click.option("--end", default=None, type=iso) +def receive_public_trades( + url: str, key: str, *, start: datetime, end: datetime, delivery_start: datetime +) -> None: """List and/or stream public trades.""" - asyncio.run(run_list_public_trades(url=url, key=key, delivery_start=start)) + asyncio.run( + run_receive_public_trades( + url=url, key=key, delivery_start=delivery_start, start=start, end=end + ) + ) @cli.command() diff --git a/src/frequenz/client/electricity_trading/cli/etrading.py b/src/frequenz/client/electricity_trading/cli/etrading.py index af42a43..34cf055 100644 --- a/src/frequenz/client/electricity_trading/cli/etrading.py +++ b/src/frequenz/client/electricity_trading/cli/etrading.py @@ -41,17 +41,22 @@ def check_delivery_start( raise ValueError("Delivery period must be a multiple of `duration`.") -async def list_public_trades(url: str, key: str, *, delivery_start: datetime) -> None: +async def receive_public_trades( + url: str, + key: str, + *, + delivery_start: datetime | None = None, + start: datetime | None = None, + end: datetime | None = None, +) -> None: """List trades and stream new public trades. - If delivery_start is provided, list historical trades and stream new trades - for the 15 minute delivery period starting at delivery_start. - If no delivery_start is provided, stream new trades for any delivery period. - Args: url: URL of the trading API. key: API key. delivery_start: Start of the delivery period or None. + start: First execution time to list trades from. + end: Last execution time to list trades until. """ client = Client(server_url=url, auth_key=key) @@ -65,16 +70,12 @@ async def list_public_trades(url: str, key: str, *, delivery_start: datetime) -> start=delivery_start, duration=timedelta(minutes=15), ) - lst = client.list_public_trades(delivery_period=delivery_period) - - async for trade in lst: - print_public_trade(trade) - - if delivery_start <= datetime.now(timezone.utc): - return - - stream = client.public_trades_stream(delivery_period=delivery_period).new_receiver() - async for trade in stream: + stream = client.receive_public_trades( + delivery_period=delivery_period, + start_time=start, + end_time=end, + ) + async for trade in stream.new_receiver(): print_public_trade(trade)