Skip to content

Commit 26f96b1

Browse files
iMicknlCopilot
andcommitted
Refactor authentication handling in OverkizClient and add new credential classes (#1867)
- Introduced UsernamePasswordCredentials and LocalTokenCredentials for better credential management. - Updated OverkizClient to utilize the new credential classes and refactored login logic. - Added authentication strategies for various servers, including Somfy and Rexel. - Created new modules for auth strategies and credentials to improve code organization. - Enhanced README with updated usage examples for the new authentication methods. ## Breaking - `OverkizServer` class is renamed to `ServerConfig` and has additional `server` and `type` (cloud/local) property - `generate_local_server` is renamed to `create_local_server_config` - `client.api_type` is removed and now available via `ServerConfig` (e.g. `client.server_config.type`) - The `OverkizClient` constructor now requires passing a `ServerConfig` via `server` - The `OverkizClient` constructur now requires passing an `Credentials` class via `credentials`, e.g. `UsernamePasswordCredentials(USERNAME, PASSWORD)` for most server. ## Features - The OverkizClient constructor now supports passing a Server enum directly, such as `OverkizClient(server=Server.SOMFY_EUROPE, ...)`. --------- Co-authored-by: Copilot <[email protected]> Co-authored-by: iMicknl <[email protected]>
1 parent be9b854 commit 26f96b1

File tree

13 files changed

+1447
-420
lines changed

13 files changed

+1447
-420
lines changed

README.md

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pip install pyoverkiz
3737
import asyncio
3838
import time
3939

40-
from pyoverkiz.const import SUPPORTED_SERVERS
40+
from pyoverkiz.auth.credentials import UsernamePasswordCredentials
4141
from pyoverkiz.client import OverkizClient
4242
from pyoverkiz.models import Action
4343
from pyoverkiz.enums import Server, OverkizCommand
@@ -48,7 +48,8 @@ PASSWORD = ""
4848

4949
async def main() -> None:
5050
async with OverkizClient(
51-
USERNAME, PASSWORD, server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE]
51+
server=Server.SOMFY_EUROPE,
52+
credentials=UsernamePasswordCredentials(USERNAME, PASSWORD),
5253
) as client:
5354
try:
5455
await client.login()
@@ -90,38 +91,22 @@ asyncio.run(main())
9091
```python
9192
import asyncio
9293
import time
93-
import aiohttp
9494

95+
from pyoverkiz.auth.credentials import LocalTokenCredentials
9596
from pyoverkiz.client import OverkizClient
96-
from pyoverkiz.const import SUPPORTED_SERVERS, OverkizServer
97-
from pyoverkiz.enums import Server
97+
from pyoverkiz.utils import create_local_server_config
9898

99-
USERNAME = ""
100-
PASSWORD = ""
10199
LOCAL_GATEWAY = "gateway-xxxx-xxxx-xxxx.local" # or use the IP address of your gateway
102100
VERIFY_SSL = True # set verify_ssl to False if you don't use the .local hostname
103101

104102

105103
async def main() -> None:
106104
token = "" # generate your token via the Somfy app and include it here
107105

108-
# Local Connection
109-
session = aiohttp.ClientSession(
110-
connector=aiohttp.TCPConnector(verify_ssl=VERIFY_SSL)
111-
)
112-
113106
async with OverkizClient(
114-
username="",
115-
password="",
116-
token=token,
117-
session=session,
107+
server=create_local_server_config(host=LOCAL_GATEWAY),
108+
credentials=LocalTokenCredentials(token),
118109
verify_ssl=VERIFY_SSL,
119-
server=OverkizServer(
120-
name="Somfy TaHoma (local)",
121-
endpoint=f"https://{LOCAL_GATEWAY}:8443/enduser-mobile-web/1/enduserAPI/",
122-
manufacturer="Somfy",
123-
configuration_url=None,
124-
),
125110
) as client:
126111
await client.login()
127112

pyoverkiz/auth/__init__.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""Authentication module for pyoverkiz."""
2+
3+
from __future__ import annotations
4+
5+
from pyoverkiz.auth.base import AuthContext, AuthStrategy
6+
from pyoverkiz.auth.credentials import (
7+
Credentials,
8+
LocalTokenCredentials,
9+
RexelOAuthCodeCredentials,
10+
TokenCredentials,
11+
UsernamePasswordCredentials,
12+
)
13+
from pyoverkiz.auth.factory import build_auth_strategy
14+
15+
__all__ = [
16+
"AuthContext",
17+
"AuthStrategy",
18+
"Credentials",
19+
"LocalTokenCredentials",
20+
"RexelOAuthCodeCredentials",
21+
"TokenCredentials",
22+
"UsernamePasswordCredentials",
23+
"build_auth_strategy",
24+
]

pyoverkiz/auth/base.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
"""Base classes for authentication strategies."""
2+
3+
from __future__ import annotations
4+
5+
import datetime
6+
from collections.abc import Mapping
7+
from dataclasses import dataclass
8+
from typing import Protocol
9+
10+
11+
@dataclass(slots=True)
12+
class AuthContext:
13+
"""Authentication context holding tokens and expiration."""
14+
15+
access_token: str | None = None
16+
refresh_token: str | None = None
17+
expires_at: datetime.datetime | None = None
18+
19+
def is_expired(self, *, skew_seconds: int = 5) -> bool:
20+
"""Check if the access token is expired, considering a skew time."""
21+
if not self.expires_at:
22+
return False
23+
24+
return datetime.datetime.now(
25+
datetime.UTC
26+
) >= self.expires_at - datetime.timedelta(seconds=skew_seconds)
27+
28+
29+
class AuthStrategy(Protocol):
30+
"""Protocol for authentication strategies."""
31+
32+
async def login(self) -> None:
33+
"""Perform login to obtain tokens."""
34+
35+
async def refresh_if_needed(self) -> bool:
36+
"""Refresh tokens if they are expired. Return True if refreshed."""
37+
38+
def auth_headers(self, path: str | None = None) -> Mapping[str, str]:
39+
"""Generate authentication headers for requests."""
40+
41+
async def close(self) -> None:
42+
"""Clean up any resources held by the strategy."""

pyoverkiz/auth/credentials.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
"""Credentials for authentication strategies."""
2+
3+
from __future__ import annotations
4+
5+
from dataclasses import dataclass
6+
7+
8+
class Credentials:
9+
"""Marker base class for auth credentials."""
10+
11+
12+
@dataclass(slots=True)
13+
class UsernamePasswordCredentials(Credentials):
14+
"""Credentials using username and password."""
15+
16+
username: str
17+
password: str
18+
19+
20+
@dataclass(slots=True)
21+
class TokenCredentials(Credentials):
22+
"""Credentials using an (API) token."""
23+
24+
token: str
25+
26+
27+
@dataclass(slots=True)
28+
class LocalTokenCredentials(TokenCredentials):
29+
"""Credentials using a local API token."""
30+
31+
32+
@dataclass(slots=True)
33+
class RexelOAuthCodeCredentials(Credentials):
34+
"""Credentials using Rexel OAuth2 authorization code."""
35+
36+
code: str
37+
redirect_uri: str

pyoverkiz/auth/factory.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
"""Factory to build authentication strategies based on server and credentials."""
2+
3+
from __future__ import annotations
4+
5+
import ssl
6+
7+
from aiohttp import ClientSession
8+
9+
from pyoverkiz.auth.credentials import (
10+
Credentials,
11+
LocalTokenCredentials,
12+
RexelOAuthCodeCredentials,
13+
TokenCredentials,
14+
UsernamePasswordCredentials,
15+
)
16+
from pyoverkiz.auth.strategies import (
17+
AuthStrategy,
18+
BearerTokenAuthStrategy,
19+
CozytouchAuthStrategy,
20+
LocalTokenAuthStrategy,
21+
NexityAuthStrategy,
22+
RexelAuthStrategy,
23+
SessionLoginStrategy,
24+
SomfyAuthStrategy,
25+
)
26+
from pyoverkiz.enums import APIType, Server
27+
from pyoverkiz.models import ServerConfig
28+
29+
30+
def build_auth_strategy(
31+
*,
32+
server_config: ServerConfig,
33+
credentials: Credentials,
34+
session: ClientSession,
35+
ssl_context: ssl.SSLContext | bool,
36+
) -> AuthStrategy:
37+
"""Build the correct auth strategy for the given server and credentials."""
38+
server: Server | None = server_config.server
39+
40+
if server == Server.SOMFY_EUROPE:
41+
return SomfyAuthStrategy(
42+
_ensure_username_password(credentials),
43+
session,
44+
server_config,
45+
ssl_context,
46+
server_config.type,
47+
)
48+
49+
if server in {
50+
Server.ATLANTIC_COZYTOUCH,
51+
Server.THERMOR_COZYTOUCH,
52+
Server.SAUTER_COZYTOUCH,
53+
}:
54+
return CozytouchAuthStrategy(
55+
_ensure_username_password(credentials),
56+
session,
57+
server_config,
58+
ssl_context,
59+
server_config.type,
60+
)
61+
62+
if server == Server.NEXITY:
63+
return NexityAuthStrategy(
64+
_ensure_username_password(credentials),
65+
session,
66+
server_config,
67+
ssl_context,
68+
server_config.type,
69+
)
70+
71+
if server == Server.REXEL:
72+
return RexelAuthStrategy(
73+
_ensure_rexel(credentials),
74+
session,
75+
server_config,
76+
ssl_context,
77+
server_config.type,
78+
)
79+
80+
if server_config.type == APIType.LOCAL:
81+
if isinstance(credentials, LocalTokenCredentials):
82+
return LocalTokenAuthStrategy(
83+
credentials, session, server_config, ssl_context, server_config.type
84+
)
85+
return BearerTokenAuthStrategy(
86+
_ensure_token(credentials),
87+
session,
88+
server_config,
89+
ssl_context,
90+
server_config.type,
91+
)
92+
93+
if isinstance(credentials, TokenCredentials) and not isinstance(
94+
credentials, LocalTokenCredentials
95+
):
96+
return BearerTokenAuthStrategy(
97+
credentials, session, server_config, ssl_context, server_config.type
98+
)
99+
100+
return SessionLoginStrategy(
101+
_ensure_username_password(credentials),
102+
session,
103+
server_config,
104+
ssl_context,
105+
server_config.type,
106+
)
107+
108+
109+
def _ensure_username_password(credentials: Credentials) -> UsernamePasswordCredentials:
110+
"""Validate that credentials are username/password based."""
111+
if not isinstance(credentials, UsernamePasswordCredentials):
112+
raise TypeError("UsernamePasswordCredentials are required for this server.")
113+
return credentials
114+
115+
116+
def _ensure_token(credentials: Credentials) -> TokenCredentials:
117+
"""Validate that credentials carry a bearer token."""
118+
if not isinstance(credentials, TokenCredentials):
119+
raise TypeError("TokenCredentials are required for this server.")
120+
return credentials
121+
122+
123+
def _ensure_rexel(credentials: Credentials) -> RexelOAuthCodeCredentials:
124+
"""Validate that credentials are of Rexel OAuth code type."""
125+
if not isinstance(credentials, RexelOAuthCodeCredentials):
126+
raise TypeError("RexelOAuthCodeCredentials are required for this server.")
127+
return credentials

0 commit comments

Comments
 (0)