Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@
* Removed `list_public_trades`
* Replaced `public_trades_stream` with `receive_public_trades`
* `receive_public_trades` now supports an optional time range (`start_time`, `end_time`)
* Update the `frequenz-api-electricity-trading` from >= 0.5.0, < 0.6.0 to >= 0.6.1, < 0.7.0
* Update repo-config from v0.11.0 to v0.13.0

## New Features

* Add the Public Order Book extension to the client
* Add the `PublicOrder` and `PublicOrderFilter` types
* Add the `receive_public_order()` endpoint

## Bug Fixes

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ dependencies = [
"frequenz-channels >= 1.6.1, < 2",
"frequenz-client-base >= 0.9.0, < 0.10.0",
"frequenz-client-common >= 0.1.0, < 0.4.0",
"frequenz-api-electricity-trading >= 0.5.0, < 0.6.0",
"frequenz-api-electricity-trading >= 0.6.1, < 0.7.0",
"protobuf >= 5.29.2, < 6",
]
dynamic = ["version"]
Expand Down
4 changes: 4 additions & 0 deletions src/frequenz/client/electricity_trading/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ async def stream_trades():
OrderType,
Power,
Price,
PublicOrder,
PublicOrderBookFilter,
PublicTrade,
PublicTradeFilter,
StateDetail,
Expand Down Expand Up @@ -232,6 +234,8 @@ async def stream_trades():
"OrderType",
"Power",
"Price",
"PublicOrder",
"PublicOrderBookFilter",
"PublicTrade",
"PublicTradeFilter",
"UpdateOrder",
Expand Down
81 changes: 81 additions & 0 deletions src/frequenz/client/electricity_trading/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
OrderType,
Power,
Price,
PublicOrder,
PublicOrderBookFilter,
PublicTrade,
PublicTradeFilter,
Trade,
Expand Down Expand Up @@ -197,6 +199,14 @@ def __init__(
],
] = {}

self._public_orders_streams: dict[
PublicOrderBookFilter,
GrpcStreamBroadcaster[
electricity_trading_pb2.ReceivePublicOrderBookStreamResponse,
PublicOrder,
],
] = {}

self._metadata = (("key", auth_key),) if auth_key else ()

@property
Expand Down Expand Up @@ -963,3 +973,74 @@ def dt_to_pb_timestamp(dt: datetime) -> Timestamp:
_logger.exception("Error occurred while streaming public trades: %s", e)
raise
return self._public_trades_streams[public_trade_filter]

def receive_public_order_book(
# pylint: disable=too-many-arguments, too-many-positional-arguments
self,
delivery_period: DeliveryPeriod | None = None,
delivery_area: DeliveryArea | None = None,
side: MarketSide | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,
) -> GrpcStreamBroadcaster[
electricity_trading_pb2.ReceivePublicOrderBookStreamResponse, PublicOrder
]:
"""
Stream public orders with optional filters and time range.

Args:
delivery_period: Delivery period to filter for.
delivery_area: Delivery area to filter for.
side: Side of the market to filter for.
start_time: The starting timestamp to stream orders from. If None, streams from now.
end_time: The ending timestamp to stop streaming orders. If None, streams indefinitely.

Returns:
Async generator of orders.

Raises:
grpc.RpcError: If an error occurs while streaming public orders.
"""

def dt_to_pb_timestamp(dt: datetime) -> Timestamp:
ts = Timestamp()
ts.FromDatetime(dt)
return ts

public_order_filter = PublicOrderBookFilter(
delivery_period=delivery_period,
delivery_area=delivery_area,
side=side,
)

if (
public_order_filter not in self._public_orders_streams
or not self._public_orders_streams[public_order_filter].is_running
):
try:
self._public_orders_streams[public_order_filter] = (
GrpcStreamBroadcaster(
f"electricity-trading-{public_order_filter}",
lambda: self.stub.ReceivePublicOrderBookStream(
electricity_trading_pb2.ReceivePublicOrderBookStreamRequest(
filter=public_order_filter.to_pb(),
start_time=(
dt_to_pb_timestamp(start_time)
if start_time
else None
),
end_time=(
dt_to_pb_timestamp(end_time) if end_time else None
),
),
metadata=self._metadata,
),
lambda response: PublicOrder.from_pb(
response.public_order_book_record
),
)
)
except grpc.RpcError as e:
_logger.exception("Error occurred while streaming public orders: %s", e)
raise
return self._public_orders_streams[public_order_filter]
192 changes: 192 additions & 0 deletions src/frequenz/client/electricity_trading/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1944,3 +1944,195 @@ def to_pb(self) -> electricity_trading_pb2.UpdateGridpoolOrderRequest.UpdateOrde
payload=struct_pb2.Struct(fields=self.payload) if self.payload else None,
tag=self.tag if self.tag else None,
)


@dataclass()
class PublicOrder: # pylint: disable=too-many-instance-attributes
"""Represents a public order in the market."""

public_order_id: int
"""ID of the order from the public order book."""

delivery_area: DeliveryArea
"""Delivery area of the order."""

delivery_period: DeliveryPeriod
"""The delivery period for the contract."""

side: MarketSide
"""Indicates if the order is on the Buy or Sell side of the market."""

price: Price
"""The limit price at which the contract is to be traded."""

quantity: Power
"""The quantity of the contract being traded."""

execution_option: OrderExecutionOption
"""Order execution options such as All or None, Fill or Kill, etc."""

create_time: datetime
"""UTC Timestamp when the order was created."""

update_time: datetime
"""UTC Timestamp of the last update to the order."""

def __post_init__(self) -> None:
"""Post initialization checks to ensure that all datetimes are UTC."""
if self.delivery_period.start.tzinfo is None:
raise ValueError("Delivery period start must have timezone information")
if self.delivery_period.start.tzinfo != timezone.utc:
_logger.warning(
"Delivery period start is not in UTC timezone. Converting to UTC."
)
self.delivery_period.start = self.delivery_period.start.astimezone(
timezone.utc
)

@classmethod
@from_pb
def from_pb(
cls, public_order: electricity_trading_pb2.PublicOrderBookRecord
) -> Self:
"""Convert a protobuf PublicOrder to PublicOrder object.

Args:
public_order: PublicOrder to convert.

Returns:
PublicOrder object corresponding to the protobuf message.
"""
return cls(
public_order_id=public_order.id,
delivery_area=DeliveryArea.from_pb(public_order.delivery_area),
delivery_period=DeliveryPeriod.from_pb(public_order.delivery_period),
side=MarketSide.from_pb(public_order.side),
price=Price.from_pb(public_order.price),
quantity=Power.from_pb(public_order.quantity),
execution_option=OrderExecutionOption.from_pb(
public_order.execution_option
),
create_time=public_order.create_time.ToDatetime(tzinfo=timezone.utc),
update_time=public_order.update_time.ToDatetime(tzinfo=timezone.utc),
)

def to_pb(self) -> electricity_trading_pb2.PublicOrderBookRecord:
"""Convert a PublicOrder object to protobuf PublicOrder.

Returns:
Protobuf PublicOrder corresponding to the object.
"""
create_time = timestamp_pb2.Timestamp()
create_time.FromDatetime(self.create_time)
update_time = timestamp_pb2.Timestamp()
update_time.FromDatetime(self.update_time)

return electricity_trading_pb2.PublicOrderBookRecord(
id=self.public_order_id,
delivery_area=self.delivery_area.to_pb(),
delivery_period=self.delivery_period.to_pb(),
side=electricity_trading_pb2.MarketSide.ValueType(self.side.value),
price=self.price.to_pb(),
quantity=self.quantity.to_pb(),
execution_option=electricity_trading_pb2.OrderExecutionOption.ValueType(
self.execution_option.value
),
create_time=create_time,
update_time=update_time,
)


@dataclass()
class PublicOrderBookFilter:
"""Parameters for filtering the public orders in the market."""

delivery_period: DeliveryPeriod | None = None
"""Delivery period to filter for."""

delivery_area: DeliveryArea | None = None
"""Delivery area to filter for."""

side: MarketSide | None = None
"""Market side to filter for."""

def __eq__(self, other: object) -> bool:
"""
Check if two PublicOrderBookFilter objects are equal.

Args:
other: PublicOrderBookFilter object to compare with.

Returns:
True if the two PublicOrderBookFilter objects are equal, False otherwise.
"""
if not isinstance(other, PublicOrderBookFilter):
return NotImplemented
return (
self.delivery_period == other.delivery_period
and self.delivery_area == other.delivery_area
and self.side == other.side
)

def __hash__(self) -> int:
"""
Create hash of the PublicOrderBookFilter object.

Returns:
Hash of the PublicOrderBookFilter object.
"""
return hash(
(
self.delivery_period,
self.delivery_area,
self.side,
)
)

@classmethod
@from_pb
def from_pb(
cls, public_order_filter: electricity_trading_pb2.PublicOrderBookFilter
) -> Self:
"""Convert a protobuf PublicOrderBookFilter to PublicOrderBookFilter object.

Args:
public_order_filter: PublicOrderBookFilter to convert.

Returns:
PublicOrderBookFilter object corresponding to the protobuf message.
"""
return cls(
delivery_period=(
DeliveryPeriod.from_pb(public_order_filter.delivery_period)
if public_order_filter.HasField("delivery_period")
else None
),
delivery_area=(
DeliveryArea.from_pb(public_order_filter.delivery_area)
if public_order_filter.HasField("delivery_area")
else None
),
side=(
MarketSide.from_pb(public_order_filter.side)
if public_order_filter.HasField("side")
else None
),
)

def to_pb(self) -> electricity_trading_pb2.PublicOrderBookFilter:
"""Convert a PublicOrderBookFilter object to protobuf PublicOrderBookFilter.

Returns:
Protobuf PublicOrderBookFilter corresponding to the object.
"""
return electricity_trading_pb2.PublicOrderBookFilter(
delivery_period=(
self.delivery_period.to_pb() if self.delivery_period else None
),
delivery_area=self.delivery_area.to_pb() if self.delivery_area else None,
side=(
electricity_trading_pb2.MarketSide.ValueType(self.side.value)
if self.side
else None
),
)
18 changes: 18 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,3 +465,21 @@ async def test_update_gridpool_order_with_invalid_params( # pylint: disable=too
quantity=quantity,
valid_until=valid_until,
)


async def test_receive_public_order_book(
set_up: SetupParams,
) -> None:
"""Test the method receiving public order book."""
set_up.client.receive_public_order_book(
delivery_period=set_up.delivery_period,
delivery_area=set_up.delivery_area,
side=set_up.side,
)
await asyncio.sleep(0)

set_up.mock_stub.ReceivePublicOrderBookStream.assert_called_once()
args, _ = set_up.mock_stub.ReceivePublicOrderBookStream.call_args
assert args[0].filter.delivery_period == set_up.delivery_period.to_pb()
assert args[0].filter.delivery_area == set_up.delivery_area.to_pb()
assert args[0].filter.side == set_up.side.to_pb()
Loading
Loading