Skip to content

Commit 58a4a86

Browse files
fix: fix order book client
The client object acts as a stateful manager for data streams. The public order book stream was cached using a key that only includes the filter parameters (delivery_area, side, etc.) and completely ignores the start_time and end_time. This has been fixed by including start and end time parameters in the cache. Signed-off-by: Matthias Wende <[email protected]>
1 parent 79b5aa2 commit 58a4a86

File tree

1 file changed

+24
-18
lines changed

1 file changed

+24
-18
lines changed

src/frequenz/client/electricity_trading/_client.py

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,9 @@ def __init__(
241241
] = {}
242242

243243
self._public_orders_streams: dict[
244-
PublicOrderBookFilter,
244+
tuple[
245+
PublicOrderBookFilter, tuple[int, int] | None, tuple[int, int] | None
246+
],
245247
GrpcStreamBroadcaster[
246248
electricity_trading_pb2.ReceivePublicOrderBookStreamResponse,
247249
list[PublicOrder],
@@ -1057,28 +1059,32 @@ def receive_public_order_book(
10571059
start_time_utc = dt_to_pb_timestamp_utc(start_time) if start_time else None
10581060
end_time_utc = dt_to_pb_timestamp_utc(end_time) if end_time else None
10591061

1062+
cache_key = (
1063+
public_order_filter,
1064+
(start_time_utc.seconds, start_time_utc.nanos) if start_time_utc else None,
1065+
(end_time_utc.seconds, end_time_utc.nanos) if end_time_utc else None,
1066+
)
1067+
10601068
if (
1061-
public_order_filter not in self._public_orders_streams
1062-
or not self._public_orders_streams[public_order_filter].is_running
1069+
cache_key not in self._public_orders_streams
1070+
or not self._public_orders_streams[cache_key].is_running
10631071
):
10641072
try:
1065-
self._public_orders_streams[public_order_filter] = (
1066-
GrpcStreamBroadcaster(
1067-
f"electricity-trading-{public_order_filter}",
1068-
lambda: self.stub.ReceivePublicOrderBookStream(
1069-
electricity_trading_pb2.ReceivePublicOrderBookStreamRequest(
1070-
filter=public_order_filter.to_pb(),
1071-
start_time=start_time_utc,
1072-
end_time=end_time_utc,
1073-
),
1073+
self._public_orders_streams[cache_key] = GrpcStreamBroadcaster(
1074+
f"electricity-trading-{cache_key}",
1075+
lambda: self.stub.ReceivePublicOrderBookStream(
1076+
electricity_trading_pb2.ReceivePublicOrderBookStreamRequest(
1077+
filter=public_order_filter.to_pb(),
1078+
start_time=start_time_utc,
1079+
end_time=end_time_utc,
10741080
),
1075-
lambda response: [
1076-
PublicOrder.from_pb(public_order)
1077-
for public_order in response.public_order_book_records
1078-
],
1079-
)
1081+
),
1082+
lambda response: [
1083+
PublicOrder.from_pb(public_order)
1084+
for public_order in response.public_order_book_records
1085+
],
10801086
)
10811087
except grpc.RpcError as e:
10821088
_logger.exception("Error occurred while streaming public orders: %s", e)
10831089
raise
1084-
return self._public_orders_streams[public_order_filter]
1090+
return self._public_orders_streams[cache_key]

0 commit comments

Comments
 (0)