Skip to content

Commit 01b207d

Browse files
committed
more wip
1 parent 093dfd8 commit 01b207d

File tree

9 files changed

+130
-104
lines changed

9 files changed

+130
-104
lines changed

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ classifiers = [
3636
]
3737
requires-python = ">= 3.11, < 4"
3838
dependencies = [
39-
"typing-extensions >= 4.6.1, < 5",
39+
"typing-extensions >= 4.13.0, < 5",
4040
"frequenz-api-dispatch == 1.0.0-rc2",
4141
"frequenz-client-base >= 0.8.0, < 0.12.0",
42-
"frequenz-client-common >= 0.1.0, < 0.4.0",
42+
"frequenz-client-common >= 0.3.2, < 0.4.0",
4343
"grpcio >= 1.70.0, < 2",
4444
"python-dateutil >= 2.8.2, < 3.0",
4545
]

src/frequenz/client/dispatch/_client.py

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ def __init__(
8585
)
8686
self._metadata = (("key", key),)
8787
self._streams: dict[
88-
int, GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent]
88+
MicrogridId,
89+
GrpcStreamBroadcaster[StreamMicrogridDispatchesResponse, DispatchEvent],
8990
] = {}
9091
"""A dictionary of streamers, keyed by microgrid_id."""
9192

@@ -140,7 +141,7 @@ async def list(
140141
key="key",
141142
server_url="grpc://dispatch.url.goes.here.example.com"
142143
)
143-
async for page in client.list(microgrid_id=1):
144+
async for page in client.list(microgrid_id=MicrogridId(1)):
144145
for dispatch in page:
145146
print(dispatch)
146147
```
@@ -220,17 +221,6 @@ def stream(self, microgrid_id: MicrogridId) -> channels.Receiver[DispatchEvent]:
220221
dispatch events.
221222
An event is one of [CREATE, UPDATE, DELETE].
222223
223-
Example usage:
224-
225-
```
226-
client = DispatchApiClient(
227-
key="key",
228-
server_url="grpc://dispatch.url.goes.here.example.com"
229-
)
230-
async for message in client.stream(microgrid_id=1):
231-
print(message.event, message.dispatch)
232-
```
233-
234224
Args:
235225
microgrid_id: The microgrid_id to receive dispatches for.
236226
@@ -361,7 +351,7 @@ async def update(
361351
ValueError: If updating `type` or `dry_run`.
362352
"""
363353
msg = UpdateMicrogridDispatchRequest(
364-
dispatch_id=dispatch_id, microgrid_id=microgrid_id
354+
dispatch_id=int(dispatch_id), microgrid_id=int(microgrid_id)
365355
)
366356

367357
for key, val in new_fields.items():

src/frequenz/client/dispatch/_internal_types.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212
from frequenz.api.dispatch.v1.dispatch_pb2 import (
1313
CreateMicrogridDispatchRequest as PBDispatchCreateRequest,
1414
)
15-
from frequenz.api.dispatch.v1.dispatch_pb2 import DispatchData
15+
from frequenz.api.dispatch.v1.dispatch_pb2 import (
16+
DispatchData,
17+
)
1618
from google.protobuf.json_format import MessageToDict
1719
from google.protobuf.struct_pb2 import Struct
1820
from google.protobuf.timestamp_pb2 import Timestamp

src/frequenz/client/dispatch/test/_service.py

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
from frequenz.client.common.microgrid import MicrogridId
3939

4040
from .._internal_types import DispatchCreateRequest
41-
from ..types import Dispatch, DispatchEvent, Event
41+
from ..types import Dispatch, DispatchEvent, DispatchId, Event
4242

4343
ALL_KEY = "all"
4444
"""Key that has access to all resources in the FakeService."""
@@ -70,9 +70,20 @@ def __init__(self) -> None:
7070
self.dispatches: dict[MicrogridId, list[Dispatch]] = {}
7171
"""List of dispatches per microgrid."""
7272

73-
self._last_id: int = 0
73+
self._last_id: DispatchId = DispatchId(0)
7474
"""Last used dispatch id."""
7575

76+
def refresh_last_id_for(self, microgrid_id: MicrogridId) -> None:
77+
"""Update last id to be the next highest number."""
78+
dispatches = self.dispatches.get(microgrid_id, [])
79+
80+
if len(dispatches) == 0:
81+
return
82+
83+
self._last_id = DispatchId(
84+
max(int(self._last_id), max(int(dispatch.id) for dispatch in dispatches))
85+
)
86+
7687
def _check_access(self, metadata: grpc.aio.Metadata) -> None:
7788
"""Check if the access key is valid.
7889
@@ -170,7 +181,7 @@ async def StreamMicrogridDispatches(
170181

171182
async for message in receiver:
172183
logging.debug("Received message: %s", message)
173-
if message.microgrid_id == request.microgrid_id:
184+
if message.microgrid_id == MicrogridId(request.microgrid_id):
174185
response = StreamMicrogridDispatchesResponse(
175186
event=message.event.event.value,
176187
dispatch=message.event.dispatch.to_protobuf(),
@@ -225,7 +236,8 @@ async def CreateMicrogridDispatch(
225236
) -> CreateMicrogridDispatchResponse:
226237
"""Create a new dispatch."""
227238
self._check_access(metadata)
228-
self._last_id += 1
239+
microgrid_id = MicrogridId(request.microgrid_id)
240+
self._last_id = DispatchId(int(self._last_id) + 1)
229241

230242
new_dispatch = _dispatch_from_request(
231243
DispatchCreateRequest.from_protobuf(request),
@@ -235,11 +247,11 @@ async def CreateMicrogridDispatch(
235247
)
236248

237249
# implicitly create the list if it doesn't exist
238-
self.dispatches.setdefault(request.microgrid_id, []).append(new_dispatch)
250+
self.dispatches.setdefault(microgrid_id, []).append(new_dispatch)
239251

240252
await self._stream_sender.send(
241253
self.StreamEvent(
242-
request.microgrid_id,
254+
microgrid_id,
243255
DispatchEvent(dispatch=new_dispatch, event=Event.CREATED),
244256
)
245257
)
@@ -254,9 +266,15 @@ async def UpdateMicrogridDispatch(
254266
) -> UpdateMicrogridDispatchResponse:
255267
"""Update a dispatch."""
256268
self._check_access(metadata)
257-
grid_dispatches = self.dispatches[request.microgrid_id]
269+
270+
microgrid_id = MicrogridId(request.microgrid_id)
271+
grid_dispatches = self.dispatches.get(microgrid_id, [])
258272
index = next(
259-
(i for i, d in enumerate(grid_dispatches) if d.id == request.dispatch_id),
273+
(
274+
i
275+
for i, d in enumerate(grid_dispatches)
276+
if d.id == DispatchId(request.dispatch_id)
277+
),
260278
None,
261279
)
262280

@@ -313,9 +331,9 @@ async def UpdateMicrogridDispatch(
313331
| "bymonthdays"
314332
| "bymonths"
315333
):
316-
getattr(pb_dispatch.data.recurrence, split_path[1])[
317-
:
318-
] = getattr(request.update.recurrence, split_path[1])[:]
334+
getattr(pb_dispatch.data.recurrence, split_path[1])[:] = (
335+
getattr(request.update.recurrence, split_path[1])[:]
336+
)
319337

320338
dispatch = Dispatch.from_protobuf(pb_dispatch)
321339
dispatch = replace(
@@ -327,7 +345,7 @@ async def UpdateMicrogridDispatch(
327345

328346
await self._stream_sender.send(
329347
self.StreamEvent(
330-
request.microgrid_id,
348+
microgrid_id,
331349
DispatchEvent(dispatch=dispatch, event=Event.UPDATED),
332350
)
333351
)
@@ -342,9 +360,11 @@ async def GetMicrogridDispatch(
342360
) -> GetMicrogridDispatchResponse:
343361
"""Get a single dispatch."""
344362
self._check_access(metadata)
345-
grid_dispatches = self.dispatches.get(request.microgrid_id, [])
363+
microgrid_id = MicrogridId(request.microgrid_id)
364+
grid_dispatches = self.dispatches.get(microgrid_id, [])
346365
dispatch = next(
347-
(d for d in grid_dispatches if d.id == request.dispatch_id), None
366+
(d for d in grid_dispatches if d.id == DispatchId(request.dispatch_id)),
367+
None,
348368
)
349369

350370
if dispatch is None:
@@ -365,10 +385,12 @@ async def DeleteMicrogridDispatch(
365385
) -> Empty:
366386
"""Delete a given dispatch."""
367387
self._check_access(metadata)
368-
grid_dispatches = self.dispatches.get(request.microgrid_id, [])
388+
microgrid_id = MicrogridId(request.microgrid_id)
389+
grid_dispatches = self.dispatches.get(microgrid_id, [])
369390

370391
dispatch_to_delete = next(
371-
(d for d in grid_dispatches if d.id == request.dispatch_id), None
392+
(d for d in grid_dispatches if d.id == DispatchId(request.dispatch_id)),
393+
None,
372394
)
373395

374396
if dispatch_to_delete is None:
@@ -383,7 +405,7 @@ async def DeleteMicrogridDispatch(
383405

384406
await self._stream_sender.send(
385407
self.StreamEvent(
386-
request.microgrid_id,
408+
microgrid_id,
387409
DispatchEvent(
388410
dispatch=dispatch_to_delete,
389411
event=Event.DELETED,
@@ -398,7 +420,7 @@ async def DeleteMicrogridDispatch(
398420

399421
def _dispatch_from_request(
400422
_request: DispatchCreateRequest,
401-
_id: int,
423+
_id: DispatchId,
402424
create_time: datetime,
403425
update_time: datetime,
404426
) -> Dispatch:

src/frequenz/client/dispatch/test/client.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,7 @@ def set_dispatches(self, microgrid_id: MicrogridId, value: list[Dispatch]) -> No
5555
value: The list of dispatches to set.
5656
"""
5757
self._service.dispatches[microgrid_id] = value
58-
59-
if len(value) == 0:
60-
return
61-
62-
# Max between last id and the max id in the list
63-
# pylint: disable=protected-access
64-
self._service._last_id = max(
65-
self._service._last_id, max(dispatch.id for dispatch in value)
66-
)
67-
# pylint: enable=protected-access
58+
self._service.refresh_last_id_for(microgrid_id)
6859

6960
@property
7061
def _service(self) -> FakeService:

0 commit comments

Comments
 (0)