Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ test:
test-coverage:
$(run) pytest tests/ --cov-report term-missing --cov=podme_api $(ARGS)

.PHONY: update-fixtures
update-fixtures:
$(run) pytest --update-fixtures tests/ $(ARGS)

.PHONY: coverage
coverage:
$(run) coverage html
Expand Down
133 changes: 86 additions & 47 deletions podme_api/auth/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@

import asyncio
from dataclasses import dataclass, field
from datetime import UTC, datetime
import hashlib
from http import HTTPStatus
import json
import logging
import os
import secrets
import socket
from typing import TYPE_CHECKING
from urllib.parse import unquote

from aiohttp import ClientError, ClientResponse, ClientResponseError, ClientSession
from aiohttp.hdrs import METH_GET, METH_POST
import pkce
from yarl import URL

from podme_api.auth.common import PodMeAuthClient
from podme_api.auth.models import SchibstedCredentials
from podme_api.auth.utils import get_now_iso, get_uuid, parse_schibsted_auth_html
from podme_api.const import (
PODME_AUTH_BASE_URL,
PODME_AUTH_RETURN_URL,
PODME_AUTH_USER_AGENT,
PODME_BASE_URL,
)
Expand All @@ -35,7 +37,7 @@
_LOGGER = logging.getLogger(__name__)


CLIENT_ID = "66fd26cdae6bde57ef206b35"
CLIENT_ID = "62557b19f552881812b7431c"


@dataclass
Expand All @@ -49,33 +51,25 @@ class PodMeDefaultAuthClient(PodMeAuthClient):
"""User agent string for API requests."""

device_data = {
"platform": "Ubuntu",
"userAgent": "Firefox",
"userAgentVersion": "131.0",
"platform": "Android",
"userAgent": "Chrome",
"userAgentVersion": "128.0.0.0",
"hasLiedOs": "0",
"hasLiedBrowser": "0",
"fonts": [
"Arial",
"Bitstream Vera Sans Mono",
"Bookman Old Style",
"Century Schoolbook",
"Courier",
"Courier New",
"Georgia",
"Helvetica",
"MS Gothic",
"MS PGothic",
"Monaco",
"Palatino",
"Palatino Linotype",
"Tahoma",
"Times",
"Times New Roman",
"Verdana",
],
"plugins": [
"PDF Viewer::Portable Document Format::application/pdf~pdf,text/pdf~pdf",
"Chrome PDF Viewer::Portable Document Format::application/pdf~pdf,text/pdf~pdf",
"Chromium PDF Viewer::Portable Document Format::application/pdf~pdf,text/pdf~pdf",
"Microsoft Edge PDF Viewer::Portable Document Format::application/pdf~pdf,text/pdf~pdf",
"WebKit built-in PDF::Portable Document Format::application/pdf~pdf,text/pdf~pdf",
],
"plugins": [],
}
"""Device information for authentication."""

Expand Down Expand Up @@ -154,6 +148,7 @@ async def _request(
url,
**kwargs,
)
await self._record_response(url.with_query(kwargs.get("params")), method, response)
response.raise_for_status()
except asyncio.TimeoutError as exception:
raise PodMeApiConnectionTimeoutError(
Expand Down Expand Up @@ -205,30 +200,33 @@ async def authorize(self, user_credentials: PodMeUserCredentials) -> SchibstedCr
PodMeApiConnectionError: For other API communication errors.

"""
# Authorize
self.start_pytest_recording("authorize")
code_verifier, code_challenge = pkce.generate_pkce_pair()
response = await self._request(
"oauth/authorize",
params={
"client_id": CLIENT_ID,
"redirect_uri": "https://podme.com/auth/handleSchibstedLogin",
"redirect_uri": f"pme.podme.{CLIENT_ID}:/login",
"response_type": "code",
"scope": "openid email offline_access",
"state": json.dumps(
{
"returnUrl": PODME_AUTH_RETURN_URL,
"uuid": get_uuid(),
"schibstedFlowInitiatedDate": get_now_iso(),
}
),
"scope": "openid offline_access",
"state": hashlib.sha256(os.urandom(1024)).hexdigest(),
"nonce": secrets.token_urlsafe(),
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"prompt": "select_account",
},
allow_redirects=False,
)
# Login: step 1/3
await self._request("", METH_GET, response.headers.get("Location"))
# Login: step 2/4
response = await self._request(
"authn/api/settings/csrf",
params={"client_id": CLIENT_ID},
)
text = await response.text()
bff_data = parse_schibsted_auth_html(text)
_LOGGER.debug(f"BFF data: {bff_data}")
csrf_token = bff_data.csrf_token
csrf_token = (await response.json())["data"]["attributes"]["csrfToken"]

# Login: step 1/2
# Login: step 3/4
response = await self._request(
"authn/api/identity/email-status",
method=METH_POST,
Expand All @@ -245,7 +243,7 @@ async def authorize(self, user_credentials: PodMeUserCredentials) -> SchibstedCr
email_status = await response.json()
_LOGGER.debug(f"Email status: {email_status}")

# Login: step 2/2
# Login: step 4/4
response = await self._request(
"authn/api/identity/login/",
method=METH_POST,
Expand All @@ -269,19 +267,46 @@ async def authorize(self, user_credentials: PodMeUserCredentials) -> SchibstedCr
"authn/identity/finish/",
method=METH_POST,
params={"client_id": CLIENT_ID},
headers={
"Content-Type": "application/x-www-form-urlencoded",
},
data={
"deviceData": json.dumps(self.device_data),
"remember": "true",
"_csrf": csrf_token,
"redirectToAccountPage": "",
},
allow_redirects=False,
)
final_location = response.history[-1].headers.get("Location")
jwt_cookie = response.history[-1].cookies.get("jwt-cred").value
jwt_cred = unquote(jwt_cookie)

# Follow redirect manually
response = await self._request("", METH_GET, response.headers.get("Location"), allow_redirects=False)
code = URL(response.headers.get("Location")).query.get("code")

# Request tokens with authorization code
response = await self._request(
"oauth/token",
method=METH_POST,
headers={
"X-OIDC": "v1",
"X-Region": "NO", # @TODO: Support multiple regions.
},
data={
"client_id": CLIENT_ID,
"grant_type": "authorization_code",
"code": code,
"redirect_uri": f"pme.podme.{CLIENT_ID}:/login",
"code_verifier": code_verifier,
},
allow_redirects=False,
)
self.stop_pytest_recording()

jwt_cred = await response.json()
jwt_cred["expiration_time"] = int(datetime.now(tz=UTC).timestamp() + jwt_cred["expires_in"])
self.set_credentials(jwt_cred)

_LOGGER.debug(f"Login successful: (final location: {final_location})")
_LOGGER.debug("Login successful")

await self.close()

Expand All @@ -302,23 +327,37 @@ async def refresh_token(self, credentials: SchibstedCredentials | None = None):
if credentials is None:
credentials = self._credentials

self.start_pytest_recording("refresh_token")
response = await self._request(
"auth/refreshSchibstedSession",
base_url=PODME_BASE_URL,
json={
"code": credentials.refresh_token,
"state": get_uuid(),
"oauth/token",
method=METH_POST,
headers={
"Host": "payment.schibsted.no",
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "AccountSDKAndroidWeb/6.4.0 (Linux; Android 15; API 35; Google; sdk_gphone64_arm64)",
"X-OIDC": "v1",
"X-Region": "NO", # @TODO: Support multiple regions.
},
data={
"client_id": CLIENT_ID,
"grant_type": "refresh_token",
"refresh_token": credentials.refresh_token,
},
allow_redirects=False,
)
self.stop_pytest_recording()
refreshed_credentials = await response.json()
refreshed_credentials["expiration_time"] = int(
datetime.now(tz=UTC).timestamp() + refreshed_credentials["expires_in"]
)
self.set_credentials(SchibstedCredentials.from_dict({
**credentials.to_dict(),
**refreshed_credentials,
}))
_LOGGER.debug(f"Refreshed credentials: {self.get_credentials()}")

await self.close()
_LOGGER.debug("Refresh token successful")

await self.close()
return self._credentials

def get_credentials(self) -> dict | None:
Expand Down
56 changes: 53 additions & 3 deletions podme_api/auth/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass
from dataclasses import dataclass, field
import logging
import os
import sys
from typing import TYPE_CHECKING, Self

from podme_api.const import DEFAULT_REQUEST_TIMEOUT

if TYPE_CHECKING:
from aiohttp import ClientSession
from aiohttp import ClientResponse, ClientSession
from yarl import URL

from podme_api.auth.models import PodMeUserCredentials
from podme_api.auth.models import PodMeUserCredentials, PyTestHttpFixture

_LOGGER = logging.getLogger(__name__)

Expand All @@ -37,6 +40,10 @@ class PodMeAuthClient(ABC):
"""Flag to determine if the session should be closed."""
_access_token: str | None = None
"""Cached access token for authentication."""
_pytest_session: str | None = None
"""Name of the current pytest session."""
_pytest_sessions: dict[str, list[PyTestHttpFixture]] = field(default_factory=dict)
"""Store responses for pytest fixtures."""

@abstractmethod
async def async_get_access_token(self) -> str:
Expand Down Expand Up @@ -73,6 +80,49 @@ def invalidate_credentials(self):
"""Invalidate the current credentials."""
raise NotImplementedError # pragma: no cover

@property
def credentials_filename(self):
"""Get the filename for storing credentials."""
return "credentials.json"

def start_pytest_recording(self, name: str):
"""Start recording responses for pytest fixtures."""
if os.getenv("UPDATE_FIXTURES") is None or "pytest" not in sys.modules:
return
self._pytest_session = name
self._pytest_sessions[name] = []

def stop_pytest_recording(self):
"""Stop recording responses for pytest fixtures."""
self._pytest_session = None

def get_pytest_recordings(self):
"""Get the recorded responses for pytest fixtures."""
return self._pytest_sessions

async def _record_response(
self,
url: URL,
method: str,
response: ClientResponse,
):
"""Record a response for a fixture."""
if self._pytest_session is None:
return
fixture_name = "-".join(url.path.lstrip("/").rstrip("/").split("/"))
fixture_no = len(self._pytest_sessions[self._pytest_session]) + 1
self._pytest_sessions[self._pytest_session].append(
{
"no": fixture_no,
"status": response.status,
"headers": dict(response.headers),
"body": await response.text(),
"method": method,
"url": str(url),
"fixture_name": f"{self._pytest_session}_{fixture_no}_{fixture_name}",
}
)

async def close(self) -> None:
"""Close open client session."""
if self.session and self._close_session:
Expand Down
16 changes: 15 additions & 1 deletion podme_api/auth/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import TypedDict

from mashumaro import field_options

Expand All @@ -27,10 +28,11 @@ class SchibstedCredentials(BaseDataClassORJSONMixin):
expires_in: int
id_token: str
expiration_time: datetime = field(
compare=False,
metadata=field_options(
deserialize=datetime.fromtimestamp,
serialize=lambda v: int(datetime.timestamp(v)),
)
),
)
account_created: bool | None = field(default=None, metadata=field_options(alias="accountCreated"))
email: str | None = None
Expand Down Expand Up @@ -78,3 +80,15 @@ class PodMeBffData(BaseDataClassORJSONMixin):
pulse: dict
re_captcha_site_key: str = field(metadata=field_options(alias="reCaptchaSiteKey"))
spid_url: str = field(metadata=field_options(alias="spidUrl"))


class PyTestHttpFixture(TypedDict):
"""Represents a PyTest HTTP fixture."""

no: int
status: int
headers: dict[str, str]
body: str
method: str
url: str
fixture_name: str
Loading
Loading