Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,10 @@

## Upgrading

<!-- Here goes notes on how to upgrade from previous versions, including deprecations and what they should be replaced with -->
* Update base client from version `0.6.1` to `0.7.0` and upgrade the `Client` constructor accordingly.

## New Features

* Replace assert statements with proper exception handling
* Implement client instance reuse to avoid redundant TCP connections
* Move documentation and code examples to the documentation website
* Replace the local `PaginationParams` type with the `frequenz-client-common` one
* Remove dependency to `googleapis-common-protos`
* Replace `Energy` with `Power` for the `quantity` representation
* Add str function for `DeliveryPeriod` object
* Add integration tests for the API
* Add an equality function to the Order type
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dependencies = [
"frequenz-api-common >= 0.6.3, < 0.7.0",
"grpcio >= 1.66.2, < 2",
"frequenz-channels >= 1.0.0, < 2",
"frequenz-client-base >= 0.6.1, < 0.7.0",
"frequenz-client-base >= 0.7.0, < 0.8.0",
"frequenz-client-common >= 0.1.0, < 0.3.0",
"frequenz-api-electricity-trading >= 0.2.4, < 1",
"protobuf >= 5.28.0, < 6",
Expand Down Expand Up @@ -145,6 +145,7 @@ disable = [
"unsubscriptable-object",
# Checked by mypy
"no-member",
"no-name-in-module",
# Checked by flake8
"f-string-without-interpolation",
"line-too-long",
Expand Down
60 changes: 54 additions & 6 deletions src/frequenz/client/electricity_trading/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

"""Module to define the client class."""

from __future__ import annotations

import logging
from datetime import datetime, timezone
from decimal import Decimal, InvalidOperation
from typing import Awaitable, cast
from typing import TYPE_CHECKING, Any, Awaitable, cast

import grpc

Expand All @@ -17,9 +19,11 @@
)
from frequenz.channels import Receiver
from frequenz.client.base.client import BaseApiClient
from frequenz.client.base.exception import ClientNotConnected
from frequenz.client.base.streaming import GrpcStreamBroadcaster
from frequenz.client.common.pagination import Params
from google.protobuf import field_mask_pb2, struct_pb2
from typing_extensions import override

from ._types import (
DeliveryArea,
Expand All @@ -41,6 +45,12 @@
UpdateOrder,
)

if TYPE_CHECKING:
from frequenz.api.electricity_trading.v1.electricity_trading_pb2_grpc import (
ElectricityTradingServiceAsyncStub,
)


_logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -81,7 +91,7 @@ def validate_decimal_places(value: Decimal, decimal_places: int, name: str) -> N
) from exc


class Client(BaseApiClient[ElectricityTradingServiceStub]):
class Client(BaseApiClient):
"""Electricity trading client."""

_instances: dict[tuple[str, str | None], "Client"] = {}
Expand Down Expand Up @@ -123,7 +133,10 @@ def __init__(
if not hasattr(
self, "_initialized"
): # Prevent re-initialization of existing instances
super().__init__(server_url, ElectricityTradingServiceStub, connect=connect)
super().__init__(server_url, connect=connect)
self._stub: ElectricityTradingServiceAsyncStub | None = None
if connect:
self._create_stub()
self._initialized = True

self._gridpool_orders_streams: dict[
Expand All @@ -149,6 +162,41 @@ def __init__(

self._metadata = (("key", auth_key),) if auth_key else ()

def _create_stub(self) -> None:
"""Create a new gRPC stub for the Electricity Trading service."""
stub: Any = ElectricityTradingServiceStub(self.channel)
self._stub = stub

@override
def connect(self, server_url: str | None = None) -> None:
"""Connect to the server, possibly using a new URL.

If the client is already connected and the URL is the same as the previous URL,
this method does nothing. If you want to force a reconnection, you can call
[disconnect()][frequenz.client.base.client.BaseApiClient.disconnect] first.

Args:
server_url: The URL of the server to connect to. If not provided, the
previously used URL is used.
"""
super().connect(server_url)
self._create_stub()

@property
def stub(self) -> ElectricityTradingServiceAsyncStub:
"""
Get the gRPC stub for the Electricity Trading service.

Returns:
The gRPC stub.

Raises:
ClientNotConnected: If the client is not connected to the server.
"""
if self._stub is None:
raise ClientNotConnected(server_url=self.server_url, operation="stub")
return self._stub

async def stream_gridpool_orders(
# pylint: disable=too-many-arguments, too-many-positional-arguments
self,
Expand Down Expand Up @@ -192,7 +240,7 @@ async def stream_gridpool_orders(
try:
self._gridpool_orders_streams[stream_key] = GrpcStreamBroadcaster(
f"electricity-trading-{stream_key}",
lambda: self.stub.ReceiveGridpoolOrdersStream( # type: ignore
lambda: self.stub.ReceiveGridpoolOrdersStream(
electricity_trading_pb2.ReceiveGridpoolOrdersStreamRequest(
gridpool_id=gridpool_id,
filter=gridpool_order_filter.to_pb(),
Expand Down Expand Up @@ -251,7 +299,7 @@ async def stream_gridpool_trades(
try:
self._gridpool_trades_streams[stream_key] = GrpcStreamBroadcaster(
f"electricity-trading-{stream_key}",
lambda: self.stub.ReceiveGridpoolTradesStream( # type: ignore
lambda: self.stub.ReceiveGridpoolTradesStream(
electricity_trading_pb2.ReceiveGridpoolTradesStreamRequest(
gridpool_id=gridpool_id,
filter=gridpool_trade_filter.to_pb(),
Expand Down Expand Up @@ -303,7 +351,7 @@ async def stream_public_trades(
self._public_trades_streams[public_trade_filter] = (
GrpcStreamBroadcaster(
f"electricity-trading-{public_trade_filter}",
lambda: self.stub.ReceivePublicTradesStream( # type: ignore
lambda: self.stub.ReceivePublicTradesStream(
electricity_trading_pb2.ReceivePublicTradesStreamRequest(
filter=public_trade_filter.to_pb(),
),
Expand Down
Loading
Loading