diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index c0bb8760..81b4ba47 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,6 +10,8 @@ * Rename `receive_trades` to `receive_public_trades` * Rename `receive_orders` to `receive_gridpool_orders` +- The `stream_*` methods in the client have been renamed to `*_stream`. They no longer return `Receiver` instances, but rather `GrpcStreamBroadcaster` instances. They expose a `new_receiver()` method which can be used to get a new receiver. They also expose a `stop()` method which can be used to stop the background streaming task when the stream is no longer needed. + ## New Features * Print tags and filled (instead of open) quantity for gridpool orders in CLI tool. diff --git a/src/frequenz/client/electricity_trading/_client.py b/src/frequenz/client/electricity_trading/_client.py index 6da36483..57674e1e 100644 --- a/src/frequenz/client/electricity_trading/_client.py +++ b/src/frequenz/client/electricity_trading/_client.py @@ -24,7 +24,6 @@ from frequenz.api.electricity_trading.v1.electricity_trading_pb2_grpc import ( ElectricityTradingServiceStub, ) -from frequenz.channels import Receiver from frequenz.client.base.client import BaseApiClient from frequenz.client.base.exception import ClientNotConnected from frequenz.client.base.streaming import GrpcStreamBroadcaster @@ -218,7 +217,7 @@ def stub(self) -> electricity_trading_pb2_grpc.ElectricityTradingServiceAsyncStu # type-checker, so it can only be used for type hints. return self._stub # type: ignore - async def stream_gridpool_orders( + def gridpool_orders_stream( # pylint: disable=too-many-arguments, too-many-positional-arguments self, gridpool_id: int, @@ -227,9 +226,9 @@ async def stream_gridpool_orders( delivery_area: DeliveryArea | None = None, delivery_period: DeliveryPeriod | None = None, tag: str | None = None, - max_size: int = 50, - warn_on_overflow: bool = False, - ) -> Receiver[OrderDetail]: + ) -> GrpcStreamBroadcaster[ + electricity_trading_pb2.ReceiveGridpoolOrdersStreamResponse, OrderDetail + ]: """ Stream gridpool orders. @@ -240,10 +239,6 @@ async def stream_gridpool_orders( delivery_area: Delivery area to filter for. delivery_period: Delivery period to filter for. tag: Tag to filter for. - max_size: The maximum number of messages to buffer. - warn_on_overflow: Whether to log a warning when the receiver's - buffer is full and a message is dropped. - Returns: Async generator of orders. @@ -263,7 +258,10 @@ async def stream_gridpool_orders( stream_key = (gridpool_id, gridpool_order_filter) - if stream_key not in self._gridpool_orders_streams: + if ( + stream_key not in self._gridpool_orders_streams + or not self._gridpool_orders_streams[stream_key].is_running + ): try: self._gridpool_orders_streams[stream_key] = GrpcStreamBroadcaster( f"electricity-trading-{stream_key}", @@ -281,11 +279,9 @@ async def stream_gridpool_orders( "Error occurred while streaming gridpool orders: %s", e ) raise - return self._gridpool_orders_streams[stream_key].new_receiver( - warn_on_overflow=warn_on_overflow, maxsize=max_size - ) + return self._gridpool_orders_streams[stream_key] - async def stream_gridpool_trades( + def gridpool_trades_stream( # pylint: disable=too-many-arguments, too-many-positional-arguments self, gridpool_id: int, @@ -294,9 +290,9 @@ async def stream_gridpool_trades( market_side: MarketSide | None = None, delivery_period: DeliveryPeriod | None = None, delivery_area: DeliveryArea | None = None, - max_size: int = 50, - warn_on_overflow: bool = False, - ) -> Receiver[Trade]: + ) -> GrpcStreamBroadcaster[ + electricity_trading_pb2.ReceiveGridpoolTradesStreamResponse, Trade + ]: """ Stream gridpool trades. @@ -307,9 +303,6 @@ async def stream_gridpool_trades( market_side: The market side to filter for. delivery_period: The delivery period to filter for. delivery_area: The delivery area to filter for. - max_size: The maximum number of messages to buffer. - warn_on_overflow: Whether to log a warning when the receiver's - buffer is full and a message is dropped. Returns: The gridpool trades streamer. @@ -329,7 +322,10 @@ async def stream_gridpool_trades( stream_key = (gridpool_id, gridpool_trade_filter) - if stream_key not in self._gridpool_trades_streams: + if ( + stream_key not in self._gridpool_trades_streams + or not self._gridpool_trades_streams[stream_key].is_running + ): try: self._gridpool_trades_streams[stream_key] = GrpcStreamBroadcaster( f"electricity-trading-{stream_key}", @@ -347,20 +343,18 @@ async def stream_gridpool_trades( "Error occurred while streaming gridpool trades: %s", e ) raise - return self._gridpool_trades_streams[stream_key].new_receiver( - warn_on_overflow=warn_on_overflow, maxsize=max_size - ) + return self._gridpool_trades_streams[stream_key] - async def stream_public_trades( + def public_trades_stream( # pylint: disable=too-many-arguments, too-many-positional-arguments self, states: list[TradeState] | None = None, delivery_period: DeliveryPeriod | None = None, buy_delivery_area: DeliveryArea | None = None, sell_delivery_area: DeliveryArea | None = None, - max_size: int = 50, - warn_on_overflow: bool = False, - ) -> Receiver[PublicTrade]: + ) -> GrpcStreamBroadcaster[ + electricity_trading_pb2.ReceivePublicTradesStreamResponse, PublicTrade + ]: """ Stream public trades. @@ -369,9 +363,6 @@ async def stream_public_trades( delivery_period: Delivery period to filter for. buy_delivery_area: Buy delivery area to filter for. sell_delivery_area: Sell delivery area to filter for. - max_size: The maximum number of messages to buffer. - warn_on_overflow: Whether to log a warning when the receiver's - buffer is full and a message is dropped. Returns: Async generator of orders. @@ -388,7 +379,10 @@ async def stream_public_trades( sell_delivery_area=sell_delivery_area, ) - if public_trade_filter not in self._public_trades_streams: + if ( + public_trade_filter not in self._public_trades_streams + or not self._public_trades_streams[public_trade_filter].is_running + ): try: self._public_trades_streams[public_trade_filter] = ( GrpcStreamBroadcaster( @@ -405,9 +399,7 @@ async def stream_public_trades( except grpc.RpcError as e: _logger.exception("Error occurred while streaming public trades: %s", e) raise - return self._public_trades_streams[public_trade_filter].new_receiver( - warn_on_overflow=warn_on_overflow, maxsize=max_size - ) + return self._public_trades_streams[public_trade_filter] def validate_params( # pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-branches diff --git a/src/frequenz/client/electricity_trading/cli/etrading.py b/src/frequenz/client/electricity_trading/cli/etrading.py index ba85c09b..af42a431 100644 --- a/src/frequenz/client/electricity_trading/cli/etrading.py +++ b/src/frequenz/client/electricity_trading/cli/etrading.py @@ -73,7 +73,7 @@ async def list_public_trades(url: str, key: str, *, delivery_start: datetime) -> if delivery_start <= datetime.now(timezone.utc): return - stream = await client.stream_public_trades(delivery_period=delivery_period) + stream = client.public_trades_stream(delivery_period=delivery_period).new_receiver() async for trade in stream: print_public_trade(trade) @@ -111,7 +111,9 @@ async def list_gridpool_trades( if delivery_start and delivery_start <= datetime.now(timezone.utc): return - stream = await client.stream_gridpool_trades(gid, delivery_period=delivery_period) + stream = client.gridpool_trades_stream( + gid, delivery_period=delivery_period + ).new_receiver() async for trade in stream: print_trade(trade) @@ -154,7 +156,9 @@ async def list_gridpool_orders( if delivery_start and delivery_start <= datetime.now(timezone.utc): return - stream = await client.stream_gridpool_orders(gid, delivery_period=delivery_period) + stream = client.gridpool_orders_stream( + gid, delivery_period=delivery_period + ).new_receiver() async for order in stream: print_order(order) diff --git a/tests/test_client.py b/tests/test_client.py index ba053cf7..c562bc79 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -131,27 +131,23 @@ def set_up_order_detail_response( ).to_pb() -def test_stream_gridpool_orders(set_up: SetupParams) -> None: +async def test_stream_gridpool_orders(set_up: SetupParams) -> None: """Test the method streaming gridpool orders.""" - set_up.loop.run_until_complete( - set_up.client.stream_gridpool_orders(set_up.gridpool_id) - ) + set_up.client.gridpool_orders_stream(set_up.gridpool_id) + await asyncio.sleep(0) set_up.mock_stub.ReceiveGridpoolOrdersStream.assert_called_once() args, _ = set_up.mock_stub.ReceiveGridpoolOrdersStream.call_args assert args[0].gridpool_id == set_up.gridpool_id -def test_stream_gridpool_orders_with_optional_inputs(set_up: SetupParams) -> None: +async def test_stream_gridpool_orders_with_optional_inputs(set_up: SetupParams) -> None: """Test the method streaming gridpool orders with some fields to filter for.""" # Fields to filter for order_states = [OrderState.ACTIVE] - set_up.loop.run_until_complete( - set_up.client.stream_gridpool_orders( - set_up.gridpool_id, order_states=order_states - ) - ) + set_up.client.gridpool_orders_stream(set_up.gridpool_id, order_states=order_states) + await asyncio.sleep(0) set_up.mock_stub.ReceiveGridpoolOrdersStream.assert_called_once() args, _ = set_up.mock_stub.ReceiveGridpoolOrdersStream.call_args @@ -161,15 +157,14 @@ def test_stream_gridpool_orders_with_optional_inputs(set_up: SetupParams) -> Non ] -def test_stream_gridpool_trades( +async def test_stream_gridpool_trades( set_up: SetupParams, ) -> None: """Test the method streaming gridpool trades.""" - set_up.loop.run_until_complete( - set_up.client.stream_gridpool_trades( - gridpool_id=set_up.gridpool_id, market_side=set_up.side - ) + set_up.client.gridpool_trades_stream( + gridpool_id=set_up.gridpool_id, market_side=set_up.side ) + await asyncio.sleep(0) set_up.mock_stub.ReceiveGridpoolTradesStream.assert_called_once() args, _ = set_up.mock_stub.ReceiveGridpoolTradesStream.call_args @@ -177,16 +172,15 @@ def test_stream_gridpool_trades( assert args[0].filter.side == set_up.side.to_pb() -def test_stream_public_trades( +async def test_stream_public_trades( set_up: SetupParams, ) -> None: """Test the method streaming public trades.""" # Fields to filter for trade_states = [TradeState.ACTIVE] - set_up.loop.run_until_complete( - set_up.client.stream_public_trades(states=trade_states) - ) + set_up.client.public_trades_stream(states=trade_states) + await asyncio.sleep(0) set_up.mock_stub.ReceivePublicTradesStream.assert_called_once() args, _ = set_up.mock_stub.ReceivePublicTradesStream.call_args