Skip to content

Commit 7b13fad

Browse files
test: add regression tests
Signed-off-by: Matthias Wende <[email protected]>
1 parent d74de71 commit 7b13fad

File tree

1 file changed

+268
-0
lines changed

1 file changed

+268
-0
lines changed

tests/test_order_book.py

Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
# License: MIT
2+
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Tests for the receive_public_order_book rpc."""
5+
6+
import asyncio
7+
import datetime as dt
8+
from collections.abc import AsyncIterator
9+
from datetime import datetime
10+
from typing import Any
11+
12+
import grpc
13+
import pytest
14+
from frequenz.api.common.v1.grid import delivery_area_pb2, delivery_duration_pb2
15+
from frequenz.api.common.v1.market import power_pb2, price_pb2
16+
from frequenz.api.common.v1.types import decimal_pb2
17+
from frequenz.api.electricity_trading.v1 import (
18+
electricity_trading_pb2,
19+
electricity_trading_pb2_grpc,
20+
)
21+
from google.protobuf import timestamp_pb2
22+
from grpc.aio import ServicerContext
23+
24+
from frequenz.client.electricity_trading import (
25+
Client,
26+
DeliveryArea,
27+
EnergyMarketCodeType,
28+
PublicOrder,
29+
)
30+
31+
START_TIME = datetime.fromisoformat("2023-01-01T12:00:00+00:00")
32+
START_TIME_PB = timestamp_pb2.Timestamp(seconds=1672574400)
33+
CREATE_TIME = datetime.fromisoformat("2023-01-01T12:00:00+00:00")
34+
CREATE_TIME_PB = timestamp_pb2.Timestamp(seconds=1672574400)
35+
MODIFICATION_TIME = datetime.fromisoformat("2023-01-01T12:00:00+00:00")
36+
MODIFICATION_TIME_PB = timestamp_pb2.Timestamp(seconds=1672574400)
37+
PUBLIC_ORDER_PB = electricity_trading_pb2.PublicOrderBookRecord(
38+
id=42,
39+
delivery_area=delivery_area_pb2.DeliveryArea(
40+
code="XYZ",
41+
code_type=delivery_area_pb2.EnergyMarketCodeType.ENERGY_MARKET_CODE_TYPE_EUROPE_EIC,
42+
),
43+
delivery_period=delivery_duration_pb2.DeliveryPeriod(
44+
start=START_TIME_PB,
45+
duration=delivery_duration_pb2.DeliveryDuration.DELIVERY_DURATION_15,
46+
),
47+
type=electricity_trading_pb2.OrderType.ORDER_TYPE_LIMIT,
48+
side=electricity_trading_pb2.MarketSide.MARKET_SIDE_BUY,
49+
price=price_pb2.Price(
50+
amount=decimal_pb2.Decimal(value="100.00"),
51+
currency=price_pb2.Price.Currency.CURRENCY_EUR,
52+
),
53+
quantity=power_pb2.Power(mw=decimal_pb2.Decimal(value="5.00")),
54+
execution_option=electricity_trading_pb2.OrderExecutionOption.ORDER_EXECUTION_OPTION_AON,
55+
create_time=CREATE_TIME_PB,
56+
update_time=MODIFICATION_TIME_PB,
57+
)
58+
59+
60+
class MockElectricityTradingService(
61+
electricity_trading_pb2_grpc.ElectricityTradingServiceServicer
62+
):
63+
"""A mock gRPC service to simulate historic vs real-time streams."""
64+
65+
@staticmethod
66+
def _construct_public_order_book_record(
67+
id: int,
68+
) -> electricity_trading_pb2.PublicOrderBookRecord:
69+
return electricity_trading_pb2.PublicOrderBookRecord(
70+
id=id,
71+
delivery_area=delivery_area_pb2.DeliveryArea(
72+
code="XYZ",
73+
code_type=delivery_area_pb2.EnergyMarketCodeType.ENERGY_MARKET_CODE_TYPE_EUROPE_EIC,
74+
),
75+
delivery_period=delivery_duration_pb2.DeliveryPeriod(
76+
start=START_TIME_PB,
77+
duration=delivery_duration_pb2.DeliveryDuration.DELIVERY_DURATION_15,
78+
),
79+
type=electricity_trading_pb2.OrderType.ORDER_TYPE_LIMIT,
80+
side=electricity_trading_pb2.MarketSide.MARKET_SIDE_BUY,
81+
price=price_pb2.Price(
82+
amount=decimal_pb2.Decimal(value="100.00"),
83+
currency=price_pb2.Price.Currency.CURRENCY_EUR,
84+
),
85+
quantity=power_pb2.Power(mw=decimal_pb2.Decimal(value="5.00")),
86+
execution_option=(
87+
electricity_trading_pb2.OrderExecutionOption.ORDER_EXECUTION_OPTION_AON
88+
),
89+
create_time=CREATE_TIME_PB,
90+
update_time=MODIFICATION_TIME_PB,
91+
)
92+
93+
def _mark_as_unimplemented(self, context: Any) -> None:
94+
"""Set the context to UNIMPLEMENTED."""
95+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
96+
context.set_details("Method not implemented in the mock servicer.")
97+
98+
async def ReceivePublicOrderBookStream( # pylint: disable=invalid-overridden-method
99+
self,
100+
request: electricity_trading_pb2.ReceivePublicOrderBookStreamRequest,
101+
context: ServicerContext[
102+
electricity_trading_pb2.ReceivePublicOrderBookStreamRequest,
103+
electricity_trading_pb2.ReceivePublicOrderBookStreamResponse,
104+
],
105+
) -> AsyncIterator[electricity_trading_pb2.ReceivePublicOrderBookStreamResponse]:
106+
"""Send different data based on whether start_time is set."""
107+
is_historic = request.HasField("start_time") and request.start_time.seconds > 0
108+
109+
if is_historic:
110+
print("SERVER: Sending historic data...")
111+
yield electricity_trading_pb2.ReceivePublicOrderBookStreamResponse(
112+
public_order_book_records=[
113+
self._construct_public_order_book_record(1),
114+
self._construct_public_order_book_record(2),
115+
]
116+
)
117+
print("SERVER: Historic stream finished.")
118+
return
119+
120+
print("SERVER: Sending real-time data...")
121+
yield electricity_trading_pb2.ReceivePublicOrderBookStreamResponse(
122+
public_order_book_records=[
123+
self._construct_public_order_book_record(9),
124+
]
125+
)
126+
print("SERVER: Real-time stream is now idle.")
127+
await asyncio.sleep(5)
128+
129+
# --- Placeholder implementations for ALL other abstract methods ---
130+
131+
async def CancelAllGridpoolOrders( # pylint: disable=invalid-overridden-method
132+
self, request: Any, context: Any
133+
) -> electricity_trading_pb2.CancelAllGridpoolOrdersResponse:
134+
"""Handle CancelAllGridpoolOrders request."""
135+
self._mark_as_unimplemented(context)
136+
return electricity_trading_pb2.CancelAllGridpoolOrdersResponse()
137+
138+
async def CancelGridpoolOrder( # pylint: disable=invalid-overridden-method
139+
self, request: Any, context: Any
140+
) -> electricity_trading_pb2.CancelGridpoolOrderResponse:
141+
"""Handle CancelGridpoolOrder request."""
142+
self._mark_as_unimplemented(context)
143+
return electricity_trading_pb2.CancelGridpoolOrderResponse()
144+
145+
async def CreateGridpoolOrder( # pylint: disable=invalid-overridden-method
146+
self, request: Any, context: Any
147+
) -> electricity_trading_pb2.CreateGridpoolOrderResponse:
148+
"""Handle CreateGridpoolOrder request."""
149+
self._mark_as_unimplemented(context)
150+
return electricity_trading_pb2.CreateGridpoolOrderResponse()
151+
152+
async def GetGridpoolOrder( # pylint: disable=invalid-overridden-method
153+
self, request: Any, context: Any
154+
) -> electricity_trading_pb2.GetGridpoolOrderResponse:
155+
"""Handle GetGridpoolOrder request."""
156+
self._mark_as_unimplemented(context)
157+
return electricity_trading_pb2.GetGridpoolOrderResponse()
158+
159+
async def ListGridpoolOrders( # pylint: disable=invalid-overridden-method
160+
self, request: Any, context: Any
161+
) -> electricity_trading_pb2.ListGridpoolOrdersResponse:
162+
"""Handle ListGridpoolOrders request."""
163+
self._mark_as_unimplemented(context)
164+
return electricity_trading_pb2.ListGridpoolOrdersResponse()
165+
166+
async def ListGridpoolTrades( # pylint: disable=invalid-overridden-method
167+
self, request: Any, context: Any
168+
) -> electricity_trading_pb2.ListGridpoolTradesResponse:
169+
"""Handle ListGridpoolTrades request."""
170+
self._mark_as_unimplemented(context)
171+
return electricity_trading_pb2.ListGridpoolTradesResponse()
172+
173+
async def ReceiveGridpoolOrdersStream( # pylint: disable=invalid-overridden-method
174+
self, request: Any, context: Any
175+
) -> AsyncIterator[electricity_trading_pb2.ReceiveGridpoolOrdersStreamResponse]:
176+
"""Handle ReceiveGridpoolOrdersStream request."""
177+
self._mark_as_unimplemented(context)
178+
if False: # pylint: disable=using-constant-test
179+
yield
180+
181+
async def ReceiveGridpoolTradesStream( # pylint: disable=invalid-overridden-method
182+
self, request: Any, context: Any
183+
) -> AsyncIterator[electricity_trading_pb2.ReceiveGridpoolTradesStreamResponse]:
184+
"""Handle ReceiveGridpoolTradesStream request."""
185+
self._mark_as_unimplemented(context)
186+
if False: # pylint: disable=using-constant-test
187+
yield
188+
189+
async def ReceivePublicTradesStream( # pylint: disable=invalid-overridden-method
190+
self, request: Any, context: Any
191+
) -> AsyncIterator[electricity_trading_pb2.ReceivePublicTradesStreamResponse]:
192+
"""Handle ReceivePublicTradesStream request."""
193+
self._mark_as_unimplemented(context)
194+
if False: # pylint: disable=using-constant-test
195+
yield
196+
197+
async def UpdateGridpoolOrder( # pylint: disable=invalid-overridden-method
198+
self, request: Any, context: Any
199+
) -> electricity_trading_pb2.UpdateGridpoolOrderResponse:
200+
"""Handle UpdateGridpoolOrder request."""
201+
self._mark_as_unimplemented(context)
202+
return electricity_trading_pb2.UpdateGridpoolOrderResponse()
203+
204+
205+
@pytest.fixture
206+
async def mock_server() -> AsyncIterator[str]:
207+
"""Set up and tear down a mock gRPC server for the test session."""
208+
servicer = MockElectricityTradingService()
209+
server = grpc.aio.server()
210+
electricity_trading_pb2_grpc.add_ElectricityTradingServiceServicer_to_server(
211+
servicer, server
212+
)
213+
port = server.add_insecure_port("[::]:0")
214+
address = f"[::1]:{port}"
215+
await server.start()
216+
try:
217+
yield address
218+
finally:
219+
await server.stop(0)
220+
221+
222+
@pytest.mark.asyncio
223+
async def test_concurrent_historic_and_realtime_streams(mock_server: str) -> None:
224+
"""Verify that historic and real-time streams from one client instance are distinct."""
225+
client = Client(server_url=f"grpc://{mock_server}?ssl=false")
226+
227+
delivery_area = DeliveryArea(
228+
code="DE-TENNET", code_type=EnergyMarketCodeType.EUROPE_EIC
229+
)
230+
231+
end_time = dt.datetime.now(dt.timezone.utc)
232+
start_time = end_time - dt.timedelta(hours=1)
233+
234+
historic_stream = client.receive_public_order_book(
235+
delivery_area=delivery_area, start_time=start_time, end_time=end_time
236+
)
237+
realtime_stream = client.receive_public_order_book(delivery_area=delivery_area)
238+
239+
historic_orders_received: list[PublicOrder] = []
240+
realtime_orders_received: list[PublicOrder] = []
241+
242+
async def consume_historic() -> None:
243+
"""Consume all items from the historic stream."""
244+
async for batch in historic_stream.new_receiver():
245+
historic_orders_received.extend(batch)
246+
247+
async def consume_realtime() -> None:
248+
"""Consume the first item from the real-time stream."""
249+
async for batch in realtime_stream.new_receiver():
250+
realtime_orders_received.extend(batch)
251+
break
252+
253+
try:
254+
await asyncio.wait_for(
255+
asyncio.gather(consume_historic(), consume_realtime()), timeout=2.0
256+
)
257+
except asyncio.TimeoutError:
258+
pytest.fail(
259+
"Test timed out. The streams did not produce the expected data in time."
260+
)
261+
262+
assert (
263+
len(historic_orders_received) == 2
264+
), "Historic stream should receive a batch of 2"
265+
assert [order.public_order_id for order in historic_orders_received] == [1, 2]
266+
267+
assert len(realtime_orders_received) == 1, "Real-time stream should receive 1 item"
268+
assert realtime_orders_received[0].public_order_id == 9

0 commit comments

Comments
 (0)