diff --git a/src/api/resources.py b/src/api/resources.py index 46a278595..b9683eee0 100644 --- a/src/api/resources.py +++ b/src/api/resources.py @@ -4,7 +4,6 @@ from datetime import UTC, datetime from typing import TYPE_CHECKING, Any, cast -import httpx from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlmodel import select @@ -14,11 +13,11 @@ from ..deployment import ( get_db_vmi_identity, kube_service, - load_simplyblock_credentials, resolve_database_volume_identifiers, resolve_storage_volume_identifiers, ) from ..deployment.kubernetes._util import custom_api_client +from ..deployment.simplyblock_api import create_simplyblock_api from ..models._util import Identifier from ..models.branch import Branch, BranchServiceStatus, ResourceUsageDefinition from ..models.project import Project @@ -60,8 +59,6 @@ router = APIRouter(tags=["resource"]) -SIMPLYBLOCK_API_TIMEOUT_SECONDS = 10.0 - # --------------------------- # Helper functions @@ -83,46 +80,6 @@ logger = logging.getLogger(__name__) -def _require_int_stat(stats: dict[str, Any], field: str) -> int: - if field not in stats: - raise ValueError(f"Simplyblock IO stats missing required field {field!r}") - value = stats[field] - try: - return int(value) - except (TypeError, ValueError) as exc: - raise ValueError(f"Simplyblock IO stat {field!r} with value {value!r} is not an integer") from exc - - -async def _fetch_volume_stats( - *, - client: httpx.AsyncClient, - endpoint: str, - cluster_id: str, - cluster_secret: str, - volume_uuid: str, - required_fields: tuple[str, ...], -) -> dict[str, int]: - url = f"{endpoint}/lvol/iostats/{volume_uuid}" - headers = { - "Authorization": f"{cluster_id} {cluster_secret}", - "Accept": "application/json", - } - - response = await client.get(url, headers=headers, timeout=SIMPLYBLOCK_API_TIMEOUT_SECONDS) - response.raise_for_status() - - payload = response.json() - - stats = payload.get("stats") - if not isinstance(stats, list) or not stats: - raise ValueError(f"Simplyblock IO stats payload missing stats list for volume {volume_uuid}") - entry = stats[0] - if not isinstance(entry, dict): - raise ValueError(f"Simplyblock IO stats entry malformed for volume {volume_uuid}") - - return {field: _require_int_stat(entry, field) for field in required_fields} - - # --------------------------- # Provisioning endpoints # --------------------------- @@ -414,59 +371,17 @@ async def _resolve_volume_stats( *, volume_identifier_resolver: Callable[[str], Awaitable[tuple[str, str | None]]], namespace: str, - branch: Branch, - resource_label: str, - sb_client: httpx.AsyncClient, - endpoint: str, - cluster_id: str, - cluster_secret: str, - required_fields: tuple[str, ...], ) -> dict[str, int]: - volume_uuid, pv_cluster_id = await volume_identifier_resolver(namespace) - - if pv_cluster_id and pv_cluster_id != cluster_id: - logger.warning( - "Cluster mismatch for branch %s %s volume %s: PV cluster %s != credentials cluster %s", - branch.id, - resource_label, - volume_uuid, - pv_cluster_id, - cluster_id, - ) - raise ValueError( - f"Cluster mismatch for branch {branch.id} {resource_label} volume {volume_uuid}: " - f"PV cluster {pv_cluster_id} != credentials cluster {cluster_id}" - ) + volume_uuid, _ = await volume_identifier_resolver(namespace) - return await _fetch_volume_stats( - client=sb_client, - endpoint=endpoint, - cluster_id=cluster_id, - cluster_secret=cluster_secret, - volume_uuid=volume_uuid, - required_fields=required_fields, - ) + async with create_simplyblock_api() as sb_api: + return await sb_api.volume_iostats(volume_uuid=volume_uuid) -async def _collect_database_volume_usage( - *, - namespace: str, - branch: Branch, - sb_client: httpx.AsyncClient, - endpoint: str, - cluster_id: str, - cluster_secret: str, -) -> tuple[int, int]: +async def _collect_database_volume_usage(namespace: str) -> tuple[int, int]: stats = await _resolve_volume_stats( volume_identifier_resolver=resolve_database_volume_identifiers, namespace=namespace, - branch=branch, - resource_label="database", - sb_client=sb_client, - endpoint=endpoint, - cluster_id=cluster_id, - cluster_secret=cluster_secret, - required_fields=("size_used", "read_io_ps", "write_io_ps"), ) nvme_bytes = stats["size_used"] read_iops = stats["read_io_ps"] @@ -474,53 +389,22 @@ async def _collect_database_volume_usage( return nvme_bytes, read_iops + write_iops -async def _collect_storage_volume_usage( - *, - namespace: str, - branch: Branch, - sb_client: httpx.AsyncClient, - endpoint: str, - cluster_id: str, - cluster_secret: str, -) -> int: +async def _collect_storage_volume_usage(namespace: str) -> int: stats = await _resolve_volume_stats( volume_identifier_resolver=resolve_storage_volume_identifiers, namespace=namespace, - branch=branch, - resource_label="storage", - sb_client=sb_client, - endpoint=endpoint, - cluster_id=cluster_id, - cluster_secret=cluster_secret, - required_fields=("size_used",), ) return stats["size_used"] async def _collect_branch_volume_usage(branch: Branch, namespace: str) -> tuple[int, int, int | None]: - endpoint, cluster_id, cluster_secret = await load_simplyblock_credentials() - async with httpx.AsyncClient() as sb_client: - db_task = _collect_database_volume_usage( - namespace=namespace, - branch=branch, - sb_client=sb_client, - endpoint=endpoint, - cluster_id=cluster_id, - cluster_secret=cluster_secret, - ) - if branch.enable_file_storage: - storage_task = _collect_storage_volume_usage( - namespace=namespace, - branch=branch, - sb_client=sb_client, - endpoint=endpoint, - cluster_id=cluster_id, - cluster_secret=cluster_secret, - ) - (nvme_bytes, iops), storage_bytes = await asyncio.gather(db_task, storage_task) - else: - nvme_bytes, iops = await db_task - storage_bytes = None + db_task = _collect_database_volume_usage(namespace) + if branch.enable_file_storage: + storage_task = _collect_storage_volume_usage(namespace) + (nvme_bytes, iops), storage_bytes = await asyncio.gather(db_task, storage_task) + else: + nvme_bytes, iops = await db_task + storage_bytes = None return nvme_bytes, iops, storage_bytes diff --git a/src/deployment/__init__.py b/src/deployment/__init__.py index 8aa130415..5017c0a85 100644 --- a/src/deployment/__init__.py +++ b/src/deployment/__init__.py @@ -44,6 +44,7 @@ from .kubernetes.kubevirt import get_virtualmachine_status from .logflare import create_branch_logflare_objects, delete_branch_logflare_objects from .settings import get_settings +from .simplyblock_api import create_simplyblock_api if TYPE_CHECKING: from cloudflare.types.dns.record_list_params import Name as CloudflareRecordName @@ -283,32 +284,19 @@ async def resolve_storage_volume_identifiers(namespace: str) -> tuple[str, str | async def update_branch_volume_iops(branch_id: Identifier, iops: int) -> None: namespace = deployment_namespace(branch_id) - endpoint, cluster_id, cluster_secret = await load_simplyblock_credentials() - volume_uuid, pv_cluster_id = await resolve_database_volume_identifiers(namespace) - if pv_cluster_id and pv_cluster_id != cluster_id: - raise VelaDeploymentError( - f"Cluster ID mismatch for Simplyblock volume {volume_uuid!r}: PV reports {pv_cluster_id}, " - f"but credentials reference {cluster_id}" - ) - url = f"{endpoint}/lvol/{volume_uuid}" - headers = { - "Content-Type": "application/json", - "Authorization": f"{cluster_id} {cluster_secret}", - } - + volume_uuid, _ = await resolve_database_volume_identifiers(namespace) try: - async with httpx.AsyncClient(timeout=10.0) as client: - response = await client.put(url, headers=headers, json={"max-rw-iops": iops}) - response.raise_for_status() + async with create_simplyblock_api() as sb_api: + await sb_api.update_volume(volume_uuid=volume_uuid, payload={"max-rw-iops": iops}) except httpx.HTTPStatusError as exc: detail = exc.response.text.strip() or exc.response.reason_phrase or str(exc) raise VelaDeploymentError( f"Simplyblock volume API rejected IOPS update for volume {volume_uuid!r}: {detail}" ) from exc except httpx.HTTPError as exc: - raise VelaDeploymentError(f"Failed to reach Simplyblock volume API at {url!r}") from exc + raise VelaDeploymentError("Failed to reach Simplyblock volume API") from exc - logger.info("Updated Simplyblock volume %s IOPS to %s using endpoint %s", volume_uuid, iops, endpoint) + logger.info("Updated Simplyblock volume %s IOPS to %s", volume_uuid, iops) async def ensure_branch_storage_class(branch_id: Identifier, *, iops: int) -> str: diff --git a/src/deployment/simplyblock_api.py b/src/deployment/simplyblock_api.py new file mode 100644 index 000000000..ba03f316b --- /dev/null +++ b/src/deployment/simplyblock_api.py @@ -0,0 +1,126 @@ +from __future__ import annotations + +from contextlib import asynccontextmanager +from typing import TYPE_CHECKING, Any +from uuid import UUID + +import httpx + +if TYPE_CHECKING: + from collections.abc import AsyncIterator + + +class SimplyblockApi: + API_TIMEOUT_SECONDS: float = 10.0 + STORAGE_POOL_NAME: str = "testing1" + + def __init__( + self, + endpoint: str, + cluster_id: str, + cluster_secret: str, + *, + client: httpx.AsyncClient | None = None, + timeout: float | httpx.Timeout | None = None, + ) -> None: + self._endpoint = endpoint.rstrip("/") + self._cluster_id = cluster_id + self._cluster_secret = cluster_secret + self._pool_id_cache: dict[str, UUID] = {} + fallback_timeout = client.timeout if client is not None else self.API_TIMEOUT_SECONDS + self._timeout = timeout if timeout is not None else fallback_timeout + self._owns_client = client is None + self._client = client or httpx.AsyncClient( + base_url=self._endpoint, + headers=self._headers(), + timeout=self._timeout, + ) + + async def __aenter__(self) -> SimplyblockApi: + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + await self.aclose() + + async def aclose(self) -> None: + if self._owns_client: + await self._client.aclose() + + @property + def _cluster_base(self) -> str: + return f"{self._endpoint}/api/v2/clusters/{self._cluster_id}" + + def _headers(self) -> dict[str, str]: + return { + "Authorization": f"Bearer {self._cluster_secret}", + "Accept": "application/json", + } + + async def _cluster_pool_base(self) -> str: + pool_id = await self.pool_id() + return f"{self._cluster_base}/storage-pools/{pool_id}" + + async def pool(self, name: str | None = None) -> dict[str, Any]: + pool_name = name or self.STORAGE_POOL_NAME + url = f"{self._cluster_base}/storage-pools/" + response = await self._client.get(url, headers=self._headers(), timeout=self._timeout) + response.raise_for_status() + + pools = response.json() + if isinstance(pools, list): + for pool in pools: + if isinstance(pool, dict) and pool.get("name") == pool_name: + return pool + raise KeyError(f"Storage pool {pool_name!r} not found") + + async def pool_id(self, name: str | None = None) -> UUID: + pool_name = name or self.STORAGE_POOL_NAME + cached = self._pool_id_cache.get(pool_name) + if cached: + return cached + pool = await self.pool(pool_name) + identifier = UUID(str(pool["id"])) + self._pool_id_cache[pool_name] = identifier + return identifier + + async def volume_iostats(self, volume_uuid: str) -> dict[str, Any]: + base_url = await self._cluster_pool_base() + url = f"{base_url}/volumes/{volume_uuid}/iostats" + response = await self._client.get(url, headers=self._headers(), timeout=self._timeout) + response.raise_for_status() + payload = response.json() + return payload[0] # return the most recent one + + async def update_volume( + self, + volume_uuid: str, + payload: dict[str, Any], + ) -> None: + headers = self._headers() + headers["Content-Type"] = "application/json" + base_url = await self._cluster_pool_base() + url = f"{base_url}/volumes/{volume_uuid}" + response = await self._client.put( + url, + headers=headers, + json=payload, + timeout=self._timeout, + ) + response.raise_for_status() + + +@asynccontextmanager +async def create_simplyblock_api( + client: httpx.AsyncClient | None = None, +) -> AsyncIterator[SimplyblockApi]: + from . import load_simplyblock_credentials + + endpoint, cluster_id, cluster_secret = await load_simplyblock_credentials() + api = SimplyblockApi( + endpoint=endpoint, + cluster_id=cluster_id, + cluster_secret=cluster_secret, + client=client, + ) + async with api: + yield api