From e3ccfec539a6066fd76725b216a4302f98baf163 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 5 Mar 2025 11:20:54 +0100 Subject: [PATCH 1/4] Change `stream_*` methods to sync They just start a background task. There's no async until we're actually waiting for messages. Signed-off-by: Sahas Subramanian --- .../client/electricity_trading/_client.py | 6 ++-- .../electricity_trading/cli/etrading.py | 6 ++-- tests/test_client.py | 32 ++++++++----------- 3 files changed, 19 insertions(+), 25 deletions(-) diff --git a/src/frequenz/client/electricity_trading/_client.py b/src/frequenz/client/electricity_trading/_client.py index 6da36483..c8ceacb4 100644 --- a/src/frequenz/client/electricity_trading/_client.py +++ b/src/frequenz/client/electricity_trading/_client.py @@ -218,7 +218,7 @@ def stub(self) -> electricity_trading_pb2_grpc.ElectricityTradingServiceAsyncStu # type-checker, so it can only be used for type hints. return self._stub # type: ignore - async def stream_gridpool_orders( + def stream_gridpool_orders( # pylint: disable=too-many-arguments, too-many-positional-arguments self, gridpool_id: int, @@ -285,7 +285,7 @@ async def stream_gridpool_orders( warn_on_overflow=warn_on_overflow, maxsize=max_size ) - async def stream_gridpool_trades( + def stream_gridpool_trades( # pylint: disable=too-many-arguments, too-many-positional-arguments self, gridpool_id: int, @@ -351,7 +351,7 @@ async def stream_gridpool_trades( warn_on_overflow=warn_on_overflow, maxsize=max_size ) - async def stream_public_trades( + def stream_public_trades( # pylint: disable=too-many-arguments, too-many-positional-arguments self, states: list[TradeState] | None = None, diff --git a/src/frequenz/client/electricity_trading/cli/etrading.py b/src/frequenz/client/electricity_trading/cli/etrading.py index ba85c09b..d31e5140 100644 --- a/src/frequenz/client/electricity_trading/cli/etrading.py +++ b/src/frequenz/client/electricity_trading/cli/etrading.py @@ -73,7 +73,7 @@ async def list_public_trades(url: str, key: str, *, delivery_start: datetime) -> if delivery_start <= datetime.now(timezone.utc): return - stream = await client.stream_public_trades(delivery_period=delivery_period) + stream = client.stream_public_trades(delivery_period=delivery_period) async for trade in stream: print_public_trade(trade) @@ -111,7 +111,7 @@ async def list_gridpool_trades( if delivery_start and delivery_start <= datetime.now(timezone.utc): return - stream = await client.stream_gridpool_trades(gid, delivery_period=delivery_period) + stream = client.stream_gridpool_trades(gid, delivery_period=delivery_period) async for trade in stream: print_trade(trade) @@ -154,7 +154,7 @@ async def list_gridpool_orders( if delivery_start and delivery_start <= datetime.now(timezone.utc): return - stream = await client.stream_gridpool_orders(gid, delivery_period=delivery_period) + stream = client.stream_gridpool_orders(gid, delivery_period=delivery_period) async for order in stream: print_order(order) diff --git a/tests/test_client.py b/tests/test_client.py index ba053cf7..0b75663a 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -131,27 +131,23 @@ def set_up_order_detail_response( ).to_pb() -def test_stream_gridpool_orders(set_up: SetupParams) -> None: +async def test_stream_gridpool_orders(set_up: SetupParams) -> None: """Test the method streaming gridpool orders.""" - set_up.loop.run_until_complete( - set_up.client.stream_gridpool_orders(set_up.gridpool_id) - ) + set_up.client.stream_gridpool_orders(set_up.gridpool_id) + await asyncio.sleep(0) set_up.mock_stub.ReceiveGridpoolOrdersStream.assert_called_once() args, _ = set_up.mock_stub.ReceiveGridpoolOrdersStream.call_args assert args[0].gridpool_id == set_up.gridpool_id -def test_stream_gridpool_orders_with_optional_inputs(set_up: SetupParams) -> None: +async def test_stream_gridpool_orders_with_optional_inputs(set_up: SetupParams) -> None: """Test the method streaming gridpool orders with some fields to filter for.""" # Fields to filter for order_states = [OrderState.ACTIVE] - set_up.loop.run_until_complete( - set_up.client.stream_gridpool_orders( - set_up.gridpool_id, order_states=order_states - ) - ) + set_up.client.stream_gridpool_orders(set_up.gridpool_id, order_states=order_states) + await asyncio.sleep(0) set_up.mock_stub.ReceiveGridpoolOrdersStream.assert_called_once() args, _ = set_up.mock_stub.ReceiveGridpoolOrdersStream.call_args @@ -161,15 +157,14 @@ def test_stream_gridpool_orders_with_optional_inputs(set_up: SetupParams) -> Non ] -def test_stream_gridpool_trades( +async def test_stream_gridpool_trades( set_up: SetupParams, ) -> None: """Test the method streaming gridpool trades.""" - set_up.loop.run_until_complete( - set_up.client.stream_gridpool_trades( - gridpool_id=set_up.gridpool_id, market_side=set_up.side - ) + set_up.client.stream_gridpool_trades( + gridpool_id=set_up.gridpool_id, market_side=set_up.side ) + await asyncio.sleep(0) set_up.mock_stub.ReceiveGridpoolTradesStream.assert_called_once() args, _ = set_up.mock_stub.ReceiveGridpoolTradesStream.call_args @@ -177,16 +172,15 @@ def test_stream_gridpool_trades( assert args[0].filter.side == set_up.side.to_pb() -def test_stream_public_trades( +async def test_stream_public_trades( set_up: SetupParams, ) -> None: """Test the method streaming public trades.""" # Fields to filter for trade_states = [TradeState.ACTIVE] - set_up.loop.run_until_complete( - set_up.client.stream_public_trades(states=trade_states) - ) + set_up.client.stream_public_trades(states=trade_states) + await asyncio.sleep(0) set_up.mock_stub.ReceivePublicTradesStream.assert_called_once() args, _ = set_up.mock_stub.ReceivePublicTradesStream.call_args From e14024d94812c29c7aff8dde6905f4e5b04b3dc5 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 5 Mar 2025 11:47:58 +0100 Subject: [PATCH 2/4] Return `GrpcStreamBroadcaster` instances from the streaming methods This makes it easier to close the streamers when they are no longer needed. Also rename the methods from `stream_*` to `*_stream`. Signed-off-by: Sahas Subramanian --- .../client/electricity_trading/_client.py | 47 ++++++------------- .../electricity_trading/cli/etrading.py | 10 ++-- tests/test_client.py | 8 ++-- 3 files changed, 26 insertions(+), 39 deletions(-) diff --git a/src/frequenz/client/electricity_trading/_client.py b/src/frequenz/client/electricity_trading/_client.py index c8ceacb4..94155a0b 100644 --- a/src/frequenz/client/electricity_trading/_client.py +++ b/src/frequenz/client/electricity_trading/_client.py @@ -24,7 +24,6 @@ from frequenz.api.electricity_trading.v1.electricity_trading_pb2_grpc import ( ElectricityTradingServiceStub, ) -from frequenz.channels import Receiver from frequenz.client.base.client import BaseApiClient from frequenz.client.base.exception import ClientNotConnected from frequenz.client.base.streaming import GrpcStreamBroadcaster @@ -218,7 +217,7 @@ def stub(self) -> electricity_trading_pb2_grpc.ElectricityTradingServiceAsyncStu # type-checker, so it can only be used for type hints. return self._stub # type: ignore - def stream_gridpool_orders( + def gridpool_orders_stream( # pylint: disable=too-many-arguments, too-many-positional-arguments self, gridpool_id: int, @@ -227,9 +226,9 @@ def stream_gridpool_orders( delivery_area: DeliveryArea | None = None, delivery_period: DeliveryPeriod | None = None, tag: str | None = None, - max_size: int = 50, - warn_on_overflow: bool = False, - ) -> Receiver[OrderDetail]: + ) -> GrpcStreamBroadcaster[ + electricity_trading_pb2.ReceiveGridpoolOrdersStreamResponse, OrderDetail + ]: """ Stream gridpool orders. @@ -240,10 +239,6 @@ def stream_gridpool_orders( delivery_area: Delivery area to filter for. delivery_period: Delivery period to filter for. tag: Tag to filter for. - max_size: The maximum number of messages to buffer. - warn_on_overflow: Whether to log a warning when the receiver's - buffer is full and a message is dropped. - Returns: Async generator of orders. @@ -281,11 +276,9 @@ def stream_gridpool_orders( "Error occurred while streaming gridpool orders: %s", e ) raise - return self._gridpool_orders_streams[stream_key].new_receiver( - warn_on_overflow=warn_on_overflow, maxsize=max_size - ) + return self._gridpool_orders_streams[stream_key] - def stream_gridpool_trades( + def gridpool_trades_stream( # pylint: disable=too-many-arguments, too-many-positional-arguments self, gridpool_id: int, @@ -294,9 +287,9 @@ def stream_gridpool_trades( market_side: MarketSide | None = None, delivery_period: DeliveryPeriod | None = None, delivery_area: DeliveryArea | None = None, - max_size: int = 50, - warn_on_overflow: bool = False, - ) -> Receiver[Trade]: + ) -> GrpcStreamBroadcaster[ + electricity_trading_pb2.ReceiveGridpoolTradesStreamResponse, Trade + ]: """ Stream gridpool trades. @@ -307,9 +300,6 @@ def stream_gridpool_trades( market_side: The market side to filter for. delivery_period: The delivery period to filter for. delivery_area: The delivery area to filter for. - max_size: The maximum number of messages to buffer. - warn_on_overflow: Whether to log a warning when the receiver's - buffer is full and a message is dropped. Returns: The gridpool trades streamer. @@ -347,20 +337,18 @@ def stream_gridpool_trades( "Error occurred while streaming gridpool trades: %s", e ) raise - return self._gridpool_trades_streams[stream_key].new_receiver( - warn_on_overflow=warn_on_overflow, maxsize=max_size - ) + return self._gridpool_trades_streams[stream_key] - def stream_public_trades( + def public_trades_stream( # pylint: disable=too-many-arguments, too-many-positional-arguments self, states: list[TradeState] | None = None, delivery_period: DeliveryPeriod | None = None, buy_delivery_area: DeliveryArea | None = None, sell_delivery_area: DeliveryArea | None = None, - max_size: int = 50, - warn_on_overflow: bool = False, - ) -> Receiver[PublicTrade]: + ) -> GrpcStreamBroadcaster[ + electricity_trading_pb2.ReceivePublicTradesStreamResponse, PublicTrade + ]: """ Stream public trades. @@ -369,9 +357,6 @@ def stream_public_trades( delivery_period: Delivery period to filter for. buy_delivery_area: Buy delivery area to filter for. sell_delivery_area: Sell delivery area to filter for. - max_size: The maximum number of messages to buffer. - warn_on_overflow: Whether to log a warning when the receiver's - buffer is full and a message is dropped. Returns: Async generator of orders. @@ -405,9 +390,7 @@ def stream_public_trades( except grpc.RpcError as e: _logger.exception("Error occurred while streaming public trades: %s", e) raise - return self._public_trades_streams[public_trade_filter].new_receiver( - warn_on_overflow=warn_on_overflow, maxsize=max_size - ) + return self._public_trades_streams[public_trade_filter] def validate_params( # pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-branches diff --git a/src/frequenz/client/electricity_trading/cli/etrading.py b/src/frequenz/client/electricity_trading/cli/etrading.py index d31e5140..af42a431 100644 --- a/src/frequenz/client/electricity_trading/cli/etrading.py +++ b/src/frequenz/client/electricity_trading/cli/etrading.py @@ -73,7 +73,7 @@ async def list_public_trades(url: str, key: str, *, delivery_start: datetime) -> if delivery_start <= datetime.now(timezone.utc): return - stream = client.stream_public_trades(delivery_period=delivery_period) + stream = client.public_trades_stream(delivery_period=delivery_period).new_receiver() async for trade in stream: print_public_trade(trade) @@ -111,7 +111,9 @@ async def list_gridpool_trades( if delivery_start and delivery_start <= datetime.now(timezone.utc): return - stream = client.stream_gridpool_trades(gid, delivery_period=delivery_period) + stream = client.gridpool_trades_stream( + gid, delivery_period=delivery_period + ).new_receiver() async for trade in stream: print_trade(trade) @@ -154,7 +156,9 @@ async def list_gridpool_orders( if delivery_start and delivery_start <= datetime.now(timezone.utc): return - stream = client.stream_gridpool_orders(gid, delivery_period=delivery_period) + stream = client.gridpool_orders_stream( + gid, delivery_period=delivery_period + ).new_receiver() async for order in stream: print_order(order) diff --git a/tests/test_client.py b/tests/test_client.py index 0b75663a..c562bc79 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -133,7 +133,7 @@ def set_up_order_detail_response( async def test_stream_gridpool_orders(set_up: SetupParams) -> None: """Test the method streaming gridpool orders.""" - set_up.client.stream_gridpool_orders(set_up.gridpool_id) + set_up.client.gridpool_orders_stream(set_up.gridpool_id) await asyncio.sleep(0) set_up.mock_stub.ReceiveGridpoolOrdersStream.assert_called_once() @@ -146,7 +146,7 @@ async def test_stream_gridpool_orders_with_optional_inputs(set_up: SetupParams) # Fields to filter for order_states = [OrderState.ACTIVE] - set_up.client.stream_gridpool_orders(set_up.gridpool_id, order_states=order_states) + set_up.client.gridpool_orders_stream(set_up.gridpool_id, order_states=order_states) await asyncio.sleep(0) set_up.mock_stub.ReceiveGridpoolOrdersStream.assert_called_once() @@ -161,7 +161,7 @@ async def test_stream_gridpool_trades( set_up: SetupParams, ) -> None: """Test the method streaming gridpool trades.""" - set_up.client.stream_gridpool_trades( + set_up.client.gridpool_trades_stream( gridpool_id=set_up.gridpool_id, market_side=set_up.side ) await asyncio.sleep(0) @@ -179,7 +179,7 @@ async def test_stream_public_trades( # Fields to filter for trade_states = [TradeState.ACTIVE] - set_up.client.stream_public_trades(states=trade_states) + set_up.client.public_trades_stream(states=trade_states) await asyncio.sleep(0) set_up.mock_stub.ReceivePublicTradesStream.assert_called_once() From e308e7be05e74163981ea71d55dc331a4719c030 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 5 Mar 2025 11:54:57 +0100 Subject: [PATCH 3/4] Restart streams if they are closed Now that we are exposing the `GrpcStreamBroadcaster` instances which can be closed by external parties, we need to check that existing instances are active before reusing them. Signed-off-by: Sahas Subramanian --- .../client/electricity_trading/_client.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/frequenz/client/electricity_trading/_client.py b/src/frequenz/client/electricity_trading/_client.py index 94155a0b..57674e1e 100644 --- a/src/frequenz/client/electricity_trading/_client.py +++ b/src/frequenz/client/electricity_trading/_client.py @@ -258,7 +258,10 @@ def gridpool_orders_stream( stream_key = (gridpool_id, gridpool_order_filter) - if stream_key not in self._gridpool_orders_streams: + if ( + stream_key not in self._gridpool_orders_streams + or not self._gridpool_orders_streams[stream_key].is_running + ): try: self._gridpool_orders_streams[stream_key] = GrpcStreamBroadcaster( f"electricity-trading-{stream_key}", @@ -319,7 +322,10 @@ def gridpool_trades_stream( stream_key = (gridpool_id, gridpool_trade_filter) - if stream_key not in self._gridpool_trades_streams: + if ( + stream_key not in self._gridpool_trades_streams + or not self._gridpool_trades_streams[stream_key].is_running + ): try: self._gridpool_trades_streams[stream_key] = GrpcStreamBroadcaster( f"electricity-trading-{stream_key}", @@ -373,7 +379,10 @@ def public_trades_stream( sell_delivery_area=sell_delivery_area, ) - if public_trade_filter not in self._public_trades_streams: + if ( + public_trade_filter not in self._public_trades_streams + or not self._public_trades_streams[public_trade_filter].is_running + ): try: self._public_trades_streams[public_trade_filter] = ( GrpcStreamBroadcaster( From f64a0c972ff76ae1c87b014c1d470a51c6ace1cf Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Wed, 5 Mar 2025 12:45:05 +0100 Subject: [PATCH 4/4] Update release notes Signed-off-by: Sahas Subramanian --- RELEASE_NOTES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index c0bb8760..81b4ba47 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,6 +10,8 @@ * Rename `receive_trades` to `receive_public_trades` * Rename `receive_orders` to `receive_gridpool_orders` +- The `stream_*` methods in the client have been renamed to `*_stream`. They no longer return `Receiver` instances, but rather `GrpcStreamBroadcaster` instances. They expose a `new_receiver()` method which can be used to get a new receiver. They also expose a `stop()` method which can be used to stop the background streaming task when the stream is no longer needed. + ## New Features * Print tags and filled (instead of open) quantity for gridpool orders in CLI tool.