Skip to content

Commit b802d77

Browse files
committed
fix: update marketmetering types and client for v0.4.0 API
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent db9121a commit b802d77

File tree

2 files changed

+571
-13
lines changed

2 files changed

+571
-13
lines changed

src/frequenz/client/marketmetering/_client.py

Lines changed: 180 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,19 @@
2222

2323
from .types import (
2424
EnergyFlowDirection,
25+
MarketLocation,
26+
MarketLocationEntry,
2527
MarketLocationRef,
2628
MarketLocationSeries,
29+
MarketLocationsFilter,
30+
MarketLocationUpdate,
2731
MetricType,
32+
MetricUnit,
33+
PaginationParams,
2834
ResamplingOptions,
35+
RevisionSelection,
36+
UpsertResult,
37+
MarketLocationSample,
2938
)
3039

3140
DEFAULT_PORT = 443
@@ -148,15 +157,184 @@ def stream_timeout(self) -> timedelta:
148157
return timedelta(seconds=self._stream_timeout_seconds)
149158

150159
@property
151-
def stub(self) -> marketmetering_pb2_grpc.MarketMeteringServiceAsyncStub:
160+
def stub(self) -> marketmetering_pb2_grpc.MarketMeteringServiceStub:
152161
"""The stub for the service."""
153-
if self._channel is None or self._stub is None:
162+
# If we have an injected mock stub (for testing), return it directly
163+
if self._stub is not None:
164+
return self._stub # type: ignore
165+
166+
if self._channel is None:
154167
raise ClientNotConnected(server_url=self.server_url, operation="stub")
155168
# This type: ignore is needed because we need to cast the sync stub to
156169
# the async stub, but we can't use cast because the async stub doesn't
157170
# actually exist to the eyes of the interpreter.
158171
return self._stub # type: ignore
159172

173+
async def create_market_location(
174+
self,
175+
*,
176+
market_location_ref: MarketLocationRef,
177+
market_location: MarketLocation,
178+
) -> None:
179+
"""Create a new Market Location.
180+
181+
Args:
182+
market_location_ref: The reference ID for the new location.
183+
market_location: The configuration of the new location.
184+
"""
185+
request = pb.CreateMarketLocationRequest(
186+
market_location_ref=market_location_ref.to_protobuf(),
187+
market_location=market_location.to_protobuf(),
188+
)
189+
await self.stub.CreateMarketLocation(
190+
request,
191+
timeout=self._call_timeout_seconds,
192+
)
193+
194+
async def update_market_location(
195+
self,
196+
*,
197+
market_location_ref: MarketLocationRef,
198+
update: MarketLocationUpdate,
199+
) -> None:
200+
"""Update an existing Market Location.
201+
202+
Args:
203+
market_location_ref: The reference ID of the location to update.
204+
update: The fields to update.
205+
"""
206+
update_pb, update_mask_pb = update.to_protobuf()
207+
request = pb.UpdateMarketLocationRequest(
208+
market_location_ref=market_location_ref.to_protobuf(),
209+
update_fields=update_pb,
210+
update_mask=update_mask_pb,
211+
)
212+
await self.stub.UpdateMarketLocation(
213+
request,
214+
timeout=self._call_timeout_seconds,
215+
)
216+
217+
async def activate_market_location(
218+
self,
219+
*,
220+
market_location_ref: MarketLocationRef,
221+
) -> None:
222+
"""Activate a Market Location.
223+
224+
Args:
225+
market_location_ref: The reference ID of the location to activate.
226+
"""
227+
request = pb.ActivateMarketLocationRequest(
228+
market_location_refs=[market_location_ref.to_protobuf()],
229+
)
230+
await self.stub.ActivateMarketLocation(
231+
request,
232+
timeout=self._call_timeout_seconds,
233+
)
234+
235+
async def deactivate_market_location(
236+
self,
237+
*,
238+
market_location_ref: MarketLocationRef,
239+
) -> None:
240+
"""Deactivate a Market Location.
241+
242+
Args:
243+
market_location_ref: The reference ID of the location to deactivate.
244+
"""
245+
request = pb.DeactivateMarketLocationRequest(
246+
market_location_refs=[market_location_ref.to_protobuf()],
247+
)
248+
await self.stub.DeactivateMarketLocation(
249+
request,
250+
timeout=self._call_timeout_seconds,
251+
)
252+
253+
async def list_market_locations(
254+
self,
255+
*,
256+
enterprise_id: int,
257+
filters: MarketLocationsFilter | None = None,
258+
revision_selection: RevisionSelection | None = None,
259+
pagination_params: PaginationParams | None = None,
260+
) -> tuple[list[MarketLocationEntry], PaginationParams | None]:
261+
"""List Market Locations.
262+
263+
Args:
264+
enterprise_id: Filter by enterprise ID.
265+
filters: Optional filters for the query.
266+
revision_selection: Optional revision selection criteria.
267+
pagination_params: Optional pagination parameters.
268+
269+
Returns:
270+
A tuple containing a list of Market Location entries and optional
271+
pagination parameters for the next page.
272+
"""
273+
request = pb.ListMarketLocationsRequest(
274+
enterprise_id=enterprise_id,
275+
filter=filters.to_protobuf() if filters else None,
276+
revision_selection=(
277+
revision_selection.to_protobuf() if revision_selection else None
278+
),
279+
pagination_params=(
280+
pagination_params.to_protobuf() if pagination_params else None
281+
),
282+
)
283+
response = await self.stub.ListMarketLocations(
284+
request,
285+
timeout=self._call_timeout_seconds,
286+
)
287+
288+
market_locations = [
289+
MarketLocationEntry.from_protobuf(ml) for ml in response.market_locations
290+
]
291+
292+
next_page_params = None
293+
if response.HasField("pagination_info"):
294+
next_page_params = PaginationParams(
295+
page_token=response.pagination_info.next_page_token
296+
)
297+
298+
return market_locations, next_page_params
299+
300+
async def upsert_samples(
301+
self,
302+
samples_stream: AsyncIterator[tuple[MarketLocationRef, MarketLocationSeries]],
303+
) -> AsyncIterator[UpsertResult]:
304+
"""Upsert a stream of metering samples.
305+
306+
Args:
307+
samples_stream: An async iterator yielding (MarketLocationRef, MarketLocationSeries)
308+
tuples. Each series should contain exactly one sample.
309+
310+
Yields:
311+
UpsertResult objects indicating success or failure for each sample.
312+
"""
313+
314+
async def request_generator() -> AsyncIterator[
315+
pb.UpsertMarketLocationSamplesStreamRequest
316+
]:
317+
async for ml_ref, series in samples_stream:
318+
for sample in series.samples:
319+
yield pb.UpsertMarketLocationSamplesStreamRequest(
320+
market_location_ref=ml_ref.to_protobuf(),
321+
direction=series.direction.value,
322+
metric_type=series.metric_type.value,
323+
metric_unit=series.metric_unit.value,
324+
sample=sample.to_protobuf(),
325+
)
326+
327+
response_stream = cast(
328+
AsyncIterator[pb.UpsertMarketLocationSamplesStreamResponse],
329+
self.stub.UpsertMarketLocationSamplesStream(
330+
request_generator(),
331+
timeout=self._stream_timeout_seconds,
332+
),
333+
)
334+
335+
async for response in response_stream:
336+
yield UpsertResult.from_protobuf(response)
337+
160338
# pylint: disable=too-many-arguments
161339
async def stream_samples(
162340
self,

0 commit comments

Comments
 (0)