Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
* Rename `receive_trades` to `receive_public_trades`
* Rename `receive_orders` to `receive_gridpool_orders`

- The `stream_*` methods in the client have been renamed to `*_stream`. They no longer return `Receiver` instances, but rather `GrpcStreamBroadcaster` instances. They expose a `new_receiver()` method which can be used to get a new receiver. They also expose a `stop()` method which can be used to stop the background streaming task when the stream is no longer needed.

## New Features

* Print tags and filled (instead of open) quantity for gridpool orders in CLI tool.
Expand Down
62 changes: 27 additions & 35 deletions src/frequenz/client/electricity_trading/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from frequenz.api.electricity_trading.v1.electricity_trading_pb2_grpc import (
ElectricityTradingServiceStub,
)
from frequenz.channels import Receiver
from frequenz.client.base.client import BaseApiClient
from frequenz.client.base.exception import ClientNotConnected
from frequenz.client.base.streaming import GrpcStreamBroadcaster
Expand Down Expand Up @@ -218,7 +217,7 @@ def stub(self) -> electricity_trading_pb2_grpc.ElectricityTradingServiceAsyncStu
# type-checker, so it can only be used for type hints.
return self._stub # type: ignore

async def stream_gridpool_orders(
def gridpool_orders_stream(
# pylint: disable=too-many-arguments, too-many-positional-arguments
self,
gridpool_id: int,
Expand All @@ -227,9 +226,9 @@ async def stream_gridpool_orders(
delivery_area: DeliveryArea | None = None,
delivery_period: DeliveryPeriod | None = None,
tag: str | None = None,
max_size: int = 50,
warn_on_overflow: bool = False,
) -> Receiver[OrderDetail]:
) -> GrpcStreamBroadcaster[
electricity_trading_pb2.ReceiveGridpoolOrdersStreamResponse, OrderDetail
]:
"""
Stream gridpool orders.

Expand All @@ -240,10 +239,6 @@ async def stream_gridpool_orders(
delivery_area: Delivery area to filter for.
delivery_period: Delivery period to filter for.
tag: Tag to filter for.
max_size: The maximum number of messages to buffer.
warn_on_overflow: Whether to log a warning when the receiver's
buffer is full and a message is dropped.


Returns:
Async generator of orders.
Expand All @@ -263,7 +258,10 @@ async def stream_gridpool_orders(

stream_key = (gridpool_id, gridpool_order_filter)

if stream_key not in self._gridpool_orders_streams:
if (
stream_key not in self._gridpool_orders_streams
or not self._gridpool_orders_streams[stream_key].is_running
):
try:
self._gridpool_orders_streams[stream_key] = GrpcStreamBroadcaster(
f"electricity-trading-{stream_key}",
Expand All @@ -281,11 +279,9 @@ async def stream_gridpool_orders(
"Error occurred while streaming gridpool orders: %s", e
)
raise
return self._gridpool_orders_streams[stream_key].new_receiver(
warn_on_overflow=warn_on_overflow, maxsize=max_size
)
return self._gridpool_orders_streams[stream_key]

async def stream_gridpool_trades(
def gridpool_trades_stream(
# pylint: disable=too-many-arguments, too-many-positional-arguments
self,
gridpool_id: int,
Expand All @@ -294,9 +290,9 @@ async def stream_gridpool_trades(
market_side: MarketSide | None = None,
delivery_period: DeliveryPeriod | None = None,
delivery_area: DeliveryArea | None = None,
max_size: int = 50,
warn_on_overflow: bool = False,
) -> Receiver[Trade]:
) -> GrpcStreamBroadcaster[
electricity_trading_pb2.ReceiveGridpoolTradesStreamResponse, Trade
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that downstream apps need to import these types? Is there a way around this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Downstream apps can say: GrpcStreamBroadcaster[Any, PublicTrade] and still be fully typed, because they only care about the output type.

But we can improve during the refactoring.

]:
"""
Stream gridpool trades.

Expand All @@ -307,9 +303,6 @@ async def stream_gridpool_trades(
market_side: The market side to filter for.
delivery_period: The delivery period to filter for.
delivery_area: The delivery area to filter for.
max_size: The maximum number of messages to buffer.
warn_on_overflow: Whether to log a warning when the receiver's
buffer is full and a message is dropped.

Returns:
The gridpool trades streamer.
Expand All @@ -329,7 +322,10 @@ async def stream_gridpool_trades(

stream_key = (gridpool_id, gridpool_trade_filter)

if stream_key not in self._gridpool_trades_streams:
if (
stream_key not in self._gridpool_trades_streams
or not self._gridpool_trades_streams[stream_key].is_running
):
try:
self._gridpool_trades_streams[stream_key] = GrpcStreamBroadcaster(
f"electricity-trading-{stream_key}",
Expand All @@ -347,20 +343,18 @@ async def stream_gridpool_trades(
"Error occurred while streaming gridpool trades: %s", e
)
raise
return self._gridpool_trades_streams[stream_key].new_receiver(
warn_on_overflow=warn_on_overflow, maxsize=max_size
)
return self._gridpool_trades_streams[stream_key]

async def stream_public_trades(
def public_trades_stream(
# pylint: disable=too-many-arguments, too-many-positional-arguments
self,
states: list[TradeState] | None = None,
delivery_period: DeliveryPeriod | None = None,
buy_delivery_area: DeliveryArea | None = None,
sell_delivery_area: DeliveryArea | None = None,
max_size: int = 50,
warn_on_overflow: bool = False,
) -> Receiver[PublicTrade]:
) -> GrpcStreamBroadcaster[
electricity_trading_pb2.ReceivePublicTradesStreamResponse, PublicTrade
]:
"""
Stream public trades.

Expand All @@ -369,9 +363,6 @@ async def stream_public_trades(
delivery_period: Delivery period to filter for.
buy_delivery_area: Buy delivery area to filter for.
sell_delivery_area: Sell delivery area to filter for.
max_size: The maximum number of messages to buffer.
warn_on_overflow: Whether to log a warning when the receiver's
buffer is full and a message is dropped.

Returns:
Async generator of orders.
Expand All @@ -388,7 +379,10 @@ async def stream_public_trades(
sell_delivery_area=sell_delivery_area,
)

if public_trade_filter not in self._public_trades_streams:
if (
public_trade_filter not in self._public_trades_streams
or not self._public_trades_streams[public_trade_filter].is_running
):
try:
self._public_trades_streams[public_trade_filter] = (
GrpcStreamBroadcaster(
Expand All @@ -405,9 +399,7 @@ async def stream_public_trades(
except grpc.RpcError as e:
_logger.exception("Error occurred while streaming public trades: %s", e)
raise
return self._public_trades_streams[public_trade_filter].new_receiver(
warn_on_overflow=warn_on_overflow, maxsize=max_size
)
return self._public_trades_streams[public_trade_filter]

def validate_params(
# pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-branches
Expand Down
10 changes: 7 additions & 3 deletions src/frequenz/client/electricity_trading/cli/etrading.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def list_public_trades(url: str, key: str, *, delivery_start: datetime) ->
if delivery_start <= datetime.now(timezone.utc):
return

stream = await client.stream_public_trades(delivery_period=delivery_period)
stream = client.public_trades_stream(delivery_period=delivery_period).new_receiver()
async for trade in stream:
print_public_trade(trade)

Expand Down Expand Up @@ -111,7 +111,9 @@ async def list_gridpool_trades(
if delivery_start and delivery_start <= datetime.now(timezone.utc):
return

stream = await client.stream_gridpool_trades(gid, delivery_period=delivery_period)
stream = client.gridpool_trades_stream(
gid, delivery_period=delivery_period
).new_receiver()
async for trade in stream:
print_trade(trade)

Expand Down Expand Up @@ -154,7 +156,9 @@ async def list_gridpool_orders(
if delivery_start and delivery_start <= datetime.now(timezone.utc):
return

stream = await client.stream_gridpool_orders(gid, delivery_period=delivery_period)
stream = client.gridpool_orders_stream(
gid, delivery_period=delivery_period
).new_receiver()
async for order in stream:
print_order(order)

Expand Down
32 changes: 13 additions & 19 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,27 +131,23 @@ def set_up_order_detail_response(
).to_pb()


def test_stream_gridpool_orders(set_up: SetupParams) -> None:
async def test_stream_gridpool_orders(set_up: SetupParams) -> None:
"""Test the method streaming gridpool orders."""
set_up.loop.run_until_complete(
set_up.client.stream_gridpool_orders(set_up.gridpool_id)
)
set_up.client.gridpool_orders_stream(set_up.gridpool_id)
await asyncio.sleep(0)

set_up.mock_stub.ReceiveGridpoolOrdersStream.assert_called_once()
args, _ = set_up.mock_stub.ReceiveGridpoolOrdersStream.call_args
assert args[0].gridpool_id == set_up.gridpool_id


def test_stream_gridpool_orders_with_optional_inputs(set_up: SetupParams) -> None:
async def test_stream_gridpool_orders_with_optional_inputs(set_up: SetupParams) -> None:
"""Test the method streaming gridpool orders with some fields to filter for."""
# Fields to filter for
order_states = [OrderState.ACTIVE]

set_up.loop.run_until_complete(
set_up.client.stream_gridpool_orders(
set_up.gridpool_id, order_states=order_states
)
)
set_up.client.gridpool_orders_stream(set_up.gridpool_id, order_states=order_states)
await asyncio.sleep(0)

set_up.mock_stub.ReceiveGridpoolOrdersStream.assert_called_once()
args, _ = set_up.mock_stub.ReceiveGridpoolOrdersStream.call_args
Expand All @@ -161,32 +157,30 @@ def test_stream_gridpool_orders_with_optional_inputs(set_up: SetupParams) -> Non
]


def test_stream_gridpool_trades(
async def test_stream_gridpool_trades(
set_up: SetupParams,
) -> None:
"""Test the method streaming gridpool trades."""
set_up.loop.run_until_complete(
set_up.client.stream_gridpool_trades(
gridpool_id=set_up.gridpool_id, market_side=set_up.side
)
set_up.client.gridpool_trades_stream(
gridpool_id=set_up.gridpool_id, market_side=set_up.side
)
await asyncio.sleep(0)

set_up.mock_stub.ReceiveGridpoolTradesStream.assert_called_once()
args, _ = set_up.mock_stub.ReceiveGridpoolTradesStream.call_args
assert args[0].gridpool_id == set_up.gridpool_id
assert args[0].filter.side == set_up.side.to_pb()


def test_stream_public_trades(
async def test_stream_public_trades(
set_up: SetupParams,
) -> None:
"""Test the method streaming public trades."""
# Fields to filter for
trade_states = [TradeState.ACTIVE]

set_up.loop.run_until_complete(
set_up.client.stream_public_trades(states=trade_states)
)
set_up.client.public_trades_stream(states=trade_states)
await asyncio.sleep(0)

set_up.mock_stub.ReceivePublicTradesStream.assert_called_once()
args, _ = set_up.mock_stub.ReceivePublicTradesStream.call_args
Expand Down
Loading