Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

* Updated frequenz-client-common version range to >=0.1.0, <0.4.0
* Upgraded grpcio to >=1.68.1, <2 and protobuf to >=5.29.2, <6 to resolve compatibility issues
* Unify Public Trades streaming and listing (to align with the proto changes in v0.5.0)
* Removed `list_public_trades`
* Replaced `public_trades_stream` with `receive_public_trades`
* `receive_public_trades` now supports an optional time range (`start_time`, `end_time`)

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->

## Bug Fixes

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies = [
"frequenz-channels >= 1.6.1, < 2",
"frequenz-client-base >= 0.9.0, < 0.10.0",
"frequenz-client-common >= 0.1.0, < 0.4.0",
"frequenz-api-electricity-trading >= 0.2.4, < 0.5",
"frequenz-api-electricity-trading >= 0.5.0, < 0.6.0",
"protobuf >= 5.29.2, < 6",
]
dynamic = ["version"]
Expand Down
150 changes: 48 additions & 102 deletions src/frequenz/client/electricity_trading/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from frequenz.client.base.streaming import GrpcStreamBroadcaster
from frequenz.client.common.pagination import Params
from google.protobuf import field_mask_pb2, struct_pb2
from google.protobuf.timestamp_pb2 import Timestamp

from ._types import (
DeliveryArea,
Expand Down Expand Up @@ -345,62 +346,6 @@ def gridpool_trades_stream(
raise
return self._gridpool_trades_streams[stream_key]

def public_trades_stream(
# pylint: disable=too-many-arguments, too-many-positional-arguments
self,
states: list[TradeState] | None = None,
delivery_period: DeliveryPeriod | None = None,
buy_delivery_area: DeliveryArea | None = None,
sell_delivery_area: DeliveryArea | None = None,
) -> GrpcStreamBroadcaster[
electricity_trading_pb2.ReceivePublicTradesStreamResponse, PublicTrade
]:
"""
Stream public trades.

Args:
states: List of order states to filter for.
delivery_period: Delivery period to filter for.
buy_delivery_area: Buy delivery area to filter for.
sell_delivery_area: Sell delivery area to filter for.

Returns:
Async generator of orders.

Raises:
grpc.RpcError: If an error occurs while streaming public trades.
"""
self.validate_params(delivery_period=delivery_period)

public_trade_filter = PublicTradeFilter(
states=states,
delivery_period=delivery_period,
buy_delivery_area=buy_delivery_area,
sell_delivery_area=sell_delivery_area,
)

if (
public_trade_filter not in self._public_trades_streams
or not self._public_trades_streams[public_trade_filter].is_running
):
try:
self._public_trades_streams[public_trade_filter] = (
GrpcStreamBroadcaster(
f"electricity-trading-{public_trade_filter}",
lambda: self.stub.ReceivePublicTradesStream(
electricity_trading_pb2.ReceivePublicTradesStreamRequest(
filter=public_trade_filter.to_pb(),
),
metadata=self._metadata,
),
lambda response: PublicTrade.from_pb(response.public_trade),
)
)
except grpc.RpcError as e:
_logger.exception("Error occurred while streaming public trades: %s", e)
raise
return self._public_trades_streams[public_trade_filter]

def validate_params(
# pylint: disable=too-many-arguments, too-many-positional-arguments, too-many-branches
self,
Expand Down Expand Up @@ -947,73 +892,74 @@ async def list_gridpool_trades(
_logger.exception("Error occurred while listing gridpool trades: %s", e)
raise

async def list_public_trades(
def receive_public_trades(
# pylint: disable=too-many-arguments, too-many-positional-arguments
self,
states: list[TradeState] | None = None,
delivery_period: DeliveryPeriod | None = None,
buy_delivery_area: DeliveryArea | None = None,
sell_delivery_area: DeliveryArea | None = None,
page_size: int | None = None,
timeout: timedelta | None = None,
) -> AsyncIterator[PublicTrade]:
start_time: datetime | None = None,
end_time: datetime | None = None,
) -> GrpcStreamBroadcaster[
electricity_trading_pb2.ReceivePublicTradesStreamResponse, PublicTrade
]:
"""
List all executed public orders with optional filters and pagination.
Stream public trades with optional filters and time range.

Args:
states: List of order states to filter by.
delivery_period: The delivery period to filter by.
buy_delivery_area: The buy delivery area to filter by.
sell_delivery_area: The sell delivery area to filter by.
page_size: The number of public trades to return per page.
timeout: Timeout duration, defaults to None.
states: List of order states to filter for.
delivery_period: Delivery period to filter for.
buy_delivery_area: Buy delivery area to filter for.
sell_delivery_area: Sell delivery area to filter for.
start_time: The starting timestamp to stream trades from. If None, streams from now.
end_time: The ending timestamp to stop streaming trades. If None, streams indefinitely.

Yields:
The list of public trades for each page.
Returns:
Async generator of orders.

Raises:
grpc.RpcError: If an error occurs while listing public trades.
grpc.RpcError: If an error occurs while streaming public trades.
"""

def dt_to_pb_timestamp(dt: datetime) -> Timestamp:
ts = Timestamp()
ts.FromDatetime(dt)
return ts

public_trade_filter = PublicTradeFilter(
states=states,
delivery_period=delivery_period,
buy_delivery_area=buy_delivery_area,
sell_delivery_area=sell_delivery_area,
)

request = electricity_trading_pb2.ListPublicTradesRequest(
filter=public_trade_filter.to_pb(),
pagination_params=(
Params(page_size=page_size, page_token="").to_proto()
if page_size
else None
),
)

while True:
if (
public_trade_filter not in self._public_trades_streams
or not self._public_trades_streams[public_trade_filter].is_running
):
try:
response = await cast(
Awaitable[electricity_trading_pb2.ListPublicTradesResponse],
grpc_call_with_timeout(
self.stub.ListPublicTrades,
request,
metadata=self._metadata,
timeout=timeout,
),
)

for public_trade in response.public_trades:
yield PublicTrade.from_pb(public_trade)

if response.pagination_info.next_page_token:
request.pagination_params.CopyFrom(
PaginationParams(
page_token=response.pagination_info.next_page_token
)
self._public_trades_streams[public_trade_filter] = (
GrpcStreamBroadcaster(
f"electricity-trading-{public_trade_filter}",
lambda: self.stub.ReceivePublicTradesStream(
electricity_trading_pb2.ReceivePublicTradesStreamRequest(
filter=public_trade_filter.to_pb(),
start_time=(
dt_to_pb_timestamp(start_time)
if start_time
else None
),
end_time=(
dt_to_pb_timestamp(end_time) if end_time else None
),
),
metadata=self._metadata,
),
lambda response: PublicTrade.from_pb(response.public_trade),
)
else:
break

)
except grpc.RpcError as e:
_logger.exception("Error occurred while listing public trades: %s", e)
_logger.exception("Error occurred while streaming public trades: %s", e)
raise
return self._public_trades_streams[public_trade_filter]
14 changes: 11 additions & 3 deletions src/frequenz/client/electricity_trading/cli/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
list_gridpool_trades as run_list_gridpool_trades,
)
from frequenz.client.electricity_trading.cli.etrading import (
list_public_trades as run_list_public_trades,
receive_public_trades as run_receive_public_trades,
)

TZ = ZoneInfo("Europe/Berlin")
Expand All @@ -47,10 +47,18 @@ def cli() -> None:
@cli.command()
@click.option("--url", required=True, type=str)
@click.option("--key", required=True, type=str)
@click.option("--delivery-start", default=None, type=iso)
@click.option("--start", default=None, type=iso)
def receive_public_trades(url: str, key: str, *, start: datetime) -> None:
@click.option("--end", default=None, type=iso)
def receive_public_trades(
url: str, key: str, *, start: datetime, end: datetime, delivery_start: datetime
) -> None:
"""List and/or stream public trades."""
asyncio.run(run_list_public_trades(url=url, key=key, delivery_start=start))
asyncio.run(
run_receive_public_trades(
url=url, key=key, delivery_start=delivery_start, start=start, end=end
)
)


@cli.command()
Expand Down
31 changes: 16 additions & 15 deletions src/frequenz/client/electricity_trading/cli/etrading.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,22 @@ def check_delivery_start(
raise ValueError("Delivery period must be a multiple of `duration`.")


async def list_public_trades(url: str, key: str, *, delivery_start: datetime) -> None:
async def receive_public_trades(
url: str,
key: str,
*,
delivery_start: datetime | None = None,
start: datetime | None = None,
end: datetime | None = None,
) -> None:
"""List trades and stream new public trades.

If delivery_start is provided, list historical trades and stream new trades
for the 15 minute delivery period starting at delivery_start.
If no delivery_start is provided, stream new trades for any delivery period.

Args:
url: URL of the trading API.
key: API key.
delivery_start: Start of the delivery period or None.
start: First execution time to list trades from.
end: Last execution time to list trades until.
"""
client = Client(server_url=url, auth_key=key)

Expand All @@ -65,16 +70,12 @@ async def list_public_trades(url: str, key: str, *, delivery_start: datetime) ->
start=delivery_start,
duration=timedelta(minutes=15),
)
lst = client.list_public_trades(delivery_period=delivery_period)

async for trade in lst:
print_public_trade(trade)

if delivery_start <= datetime.now(timezone.utc):
return

stream = client.public_trades_stream(delivery_period=delivery_period).new_receiver()
async for trade in stream:
stream = client.receive_public_trades(
delivery_period=delivery_period,
start_time=start,
end_time=end,
)
async for trade in stream.new_receiver():
print_public_trade(trade)


Expand Down
6 changes: 3 additions & 3 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,14 @@ async def test_stream_gridpool_trades(
assert args[0].filter.side == set_up.side.to_pb()


async def test_stream_public_trades(
async def test_receive_public_trades(
set_up: SetupParams,
) -> None:
"""Test the method streaming public trades."""
"""Test the method receiving public trades."""
# Fields to filter for
trade_states = [TradeState.ACTIVE]

set_up.client.public_trades_stream(states=trade_states)
set_up.client.receive_public_trades(states=trade_states)
await asyncio.sleep(0)

set_up.mock_stub.ReceivePublicTradesStream.assert_called_once()
Expand Down
Loading