Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 14 additions & 128 deletions src/api/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
from ..deployment import (
get_db_vmi_identity,
kube_service,
load_simplyblock_credentials,
resolve_database_volume_identifiers,
resolve_storage_volume_identifiers,
)
from ..deployment.kubernetes._util import custom_api_client
from ..deployment.simplyblock_api import create_simplyblock_api
from ..models._util import Identifier
from ..models.branch import Branch, BranchServiceStatus, ResourceUsageDefinition
from ..models.project import Project
Expand Down Expand Up @@ -60,8 +60,6 @@

router = APIRouter(tags=["resource"])

SIMPLYBLOCK_API_TIMEOUT_SECONDS = 10.0


# ---------------------------
# Helper functions
Expand All @@ -83,46 +81,6 @@
logger = logging.getLogger(__name__)


def _require_int_stat(stats: dict[str, Any], field: str) -> int:
if field not in stats:
raise ValueError(f"Simplyblock IO stats missing required field {field!r}")
value = stats[field]
try:
return int(value)
except (TypeError, ValueError) as exc:
raise ValueError(f"Simplyblock IO stat {field!r} with value {value!r} is not an integer") from exc


async def _fetch_volume_stats(
*,
client: httpx.AsyncClient,
endpoint: str,
cluster_id: str,
cluster_secret: str,
volume_uuid: str,
required_fields: tuple[str, ...],
) -> dict[str, int]:
url = f"{endpoint}/lvol/iostats/{volume_uuid}"
headers = {
"Authorization": f"{cluster_id} {cluster_secret}",
"Accept": "application/json",
}

response = await client.get(url, headers=headers, timeout=SIMPLYBLOCK_API_TIMEOUT_SECONDS)
response.raise_for_status()

payload = response.json()

stats = payload.get("stats")
if not isinstance(stats, list) or not stats:
raise ValueError(f"Simplyblock IO stats payload missing stats list for volume {volume_uuid}")
entry = stats[0]
if not isinstance(entry, dict):
raise ValueError(f"Simplyblock IO stats entry malformed for volume {volume_uuid}")

return {field: _require_int_stat(entry, field) for field in required_fields}


# ---------------------------
# Provisioning endpoints
# ---------------------------
Expand Down Expand Up @@ -414,113 +372,41 @@ async def _resolve_volume_stats(
*,
volume_identifier_resolver: Callable[[str], Awaitable[tuple[str, str | None]]],
namespace: str,
branch: Branch,
resource_label: str,
sb_client: httpx.AsyncClient,
endpoint: str,
cluster_id: str,
cluster_secret: str,
required_fields: tuple[str, ...],
) -> dict[str, int]:
volume_uuid, pv_cluster_id = await volume_identifier_resolver(namespace)

if pv_cluster_id and pv_cluster_id != cluster_id:
logger.warning(
"Cluster mismatch for branch %s %s volume %s: PV cluster %s != credentials cluster %s",
branch.id,
resource_label,
volume_uuid,
pv_cluster_id,
cluster_id,
)
raise ValueError(
f"Cluster mismatch for branch {branch.id} {resource_label} volume {volume_uuid}: "
f"PV cluster {pv_cluster_id} != credentials cluster {cluster_id}"
)
volume_uuid, _ = await volume_identifier_resolver(namespace)

return await _fetch_volume_stats(
client=sb_client,
endpoint=endpoint,
cluster_id=cluster_id,
cluster_secret=cluster_secret,
volume_uuid=volume_uuid,
required_fields=required_fields,
)
async with httpx.AsyncClient() as sb_client:
sb_api = await create_simplyblock_api(sb_client)
return await sb_api.volume_iostats(volume_uuid=volume_uuid)


async def _collect_database_volume_usage(
*,
namespace: str,
branch: Branch,
sb_client: httpx.AsyncClient,
endpoint: str,
cluster_id: str,
cluster_secret: str,
) -> tuple[int, int]:
async def _collect_database_volume_usage(namespace: str) -> tuple[int, int]:
stats = await _resolve_volume_stats(
volume_identifier_resolver=resolve_database_volume_identifiers,
namespace=namespace,
branch=branch,
resource_label="database",
sb_client=sb_client,
endpoint=endpoint,
cluster_id=cluster_id,
cluster_secret=cluster_secret,
required_fields=("size_used", "read_io_ps", "write_io_ps"),
)
nvme_bytes = stats["size_used"]
read_iops = stats["read_io_ps"]
write_iops = stats["write_io_ps"]
return nvme_bytes, read_iops + write_iops


async def _collect_storage_volume_usage(
*,
namespace: str,
branch: Branch,
sb_client: httpx.AsyncClient,
endpoint: str,
cluster_id: str,
cluster_secret: str,
) -> int:
async def _collect_storage_volume_usage(namespace: str) -> int:
stats = await _resolve_volume_stats(
volume_identifier_resolver=resolve_storage_volume_identifiers,
namespace=namespace,
branch=branch,
resource_label="storage",
sb_client=sb_client,
endpoint=endpoint,
cluster_id=cluster_id,
cluster_secret=cluster_secret,
required_fields=("size_used",),
)
return stats["size_used"]


async def _collect_branch_volume_usage(branch: Branch, namespace: str) -> tuple[int, int, int | None]:
endpoint, cluster_id, cluster_secret = await load_simplyblock_credentials()
async with httpx.AsyncClient() as sb_client:
db_task = _collect_database_volume_usage(
namespace=namespace,
branch=branch,
sb_client=sb_client,
endpoint=endpoint,
cluster_id=cluster_id,
cluster_secret=cluster_secret,
)
if branch.enable_file_storage:
storage_task = _collect_storage_volume_usage(
namespace=namespace,
branch=branch,
sb_client=sb_client,
endpoint=endpoint,
cluster_id=cluster_id,
cluster_secret=cluster_secret,
)
(nvme_bytes, iops), storage_bytes = await asyncio.gather(db_task, storage_task)
else:
nvme_bytes, iops = await db_task
storage_bytes = None
db_task = _collect_database_volume_usage(namespace)
if branch.enable_file_storage:
storage_task = _collect_storage_volume_usage(namespace)
(nvme_bytes, iops), storage_bytes = await asyncio.gather(db_task, storage_task)
else:
nvme_bytes, iops = await db_task
storage_bytes = None

return nvme_bytes, iops, storage_bytes

Expand Down
25 changes: 7 additions & 18 deletions src/deployment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from .kubernetes.kubevirt import get_virtualmachine_status
from .logflare import create_branch_logflare_objects, delete_branch_logflare_objects
from .settings import get_settings
from .simplyblock_api import SIMPLYBLOCK_API_TIMEOUT_SECONDS, create_simplyblock_api

if TYPE_CHECKING:
from cloudflare.types.dns.record_list_params import Name as CloudflareRecordName
Expand Down Expand Up @@ -283,32 +284,20 @@ async def resolve_storage_volume_identifiers(namespace: str) -> tuple[str, str |
async def update_branch_volume_iops(branch_id: Identifier, iops: int) -> None:
namespace = deployment_namespace(branch_id)

endpoint, cluster_id, cluster_secret = await load_simplyblock_credentials()
volume_uuid, pv_cluster_id = await resolve_database_volume_identifiers(namespace)
if pv_cluster_id and pv_cluster_id != cluster_id:
raise VelaDeploymentError(
f"Cluster ID mismatch for Simplyblock volume {volume_uuid!r}: PV reports {pv_cluster_id}, "
f"but credentials reference {cluster_id}"
)
url = f"{endpoint}/lvol/{volume_uuid}"
headers = {
"Content-Type": "application/json",
"Authorization": f"{cluster_id} {cluster_secret}",
}

volume_uuid, _ = await resolve_database_volume_identifiers(namespace)
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.put(url, headers=headers, json={"max-rw-iops": iops})
response.raise_for_status()
async with httpx.AsyncClient(timeout=SIMPLYBLOCK_API_TIMEOUT_SECONDS) as client:
sb_api = await create_simplyblock_api(client)
await sb_api.update_volume(volume_uuid=volume_uuid, payload={"max-rw-iops": iops})
except httpx.HTTPStatusError as exc:
detail = exc.response.text.strip() or exc.response.reason_phrase or str(exc)
raise VelaDeploymentError(
f"Simplyblock volume API rejected IOPS update for volume {volume_uuid!r}: {detail}"
) from exc
except httpx.HTTPError as exc:
raise VelaDeploymentError(f"Failed to reach Simplyblock volume API at {url!r}") from exc
raise VelaDeploymentError("Failed to reach Simplyblock volume API") from exc

logger.info("Updated Simplyblock volume %s IOPS to %s using endpoint %s", volume_uuid, iops, endpoint)
logger.info("Updated Simplyblock volume %s IOPS to %s", volume_uuid, iops)


async def ensure_branch_storage_class(branch_id: Identifier, *, iops: int) -> str:
Expand Down
97 changes: 97 additions & 0 deletions src/deployment/simplyblock_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any
from uuid import UUID

if TYPE_CHECKING:
import httpx

SIMPLYBLOCK_API_TIMEOUT_SECONDS = 10.0
SIMPLYBLOCK_STORAGE_POOL_NAME = "testing1"


class SimplyblockApi:
def __init__(
self,
client: httpx.AsyncClient,
endpoint: str,
cluster_id: str,
cluster_secret: str,
) -> None:
self._client = client
self._endpoint = endpoint.rstrip("/")
self._cluster_id = cluster_id
self._cluster_secret = cluster_secret
self._pool_id_cache: dict[str, UUID] = {}

@property
def _cluster_base(self) -> str:
return f"{self._endpoint}/api/v2/clusters/{self._cluster_id}"

def _headers(self) -> dict[str, str]:
return {
"Authorization": f"Bearer {self._cluster_secret}",
"Accept": "application/json",
}

async def _cluster_pool_base(self) -> str:
pool_id = await self.pool_id()
return f"{self._endpoint}/api/v2/clusters/{self._cluster_id}/storage-pools/{pool_id}"

async def pool(self, name: str = SIMPLYBLOCK_STORAGE_POOL_NAME) -> dict[str, Any]:
url = f"{self._cluster_base}/storage-pools/"
response = await self._client.get(url, headers=self._headers(), timeout=SIMPLYBLOCK_API_TIMEOUT_SECONDS)
response.raise_for_status()

pools = response.json()
if isinstance(pools, list):
for pool in pools:
if isinstance(pool, dict) and pool.get("name") == name:
return pool
raise KeyError(f"Storage pool {name!r} not found")

async def pool_id(self, name: str = SIMPLYBLOCK_STORAGE_POOL_NAME) -> UUID:
cached = self._pool_id_cache.get(name)
if cached:
return cached
pool = await self.pool(name)
identifier = UUID(str(pool["id"]))
self._pool_id_cache[name] = identifier
return identifier

async def volume_iostats(self, volume_uuid: str) -> dict[str, Any]:
base_url = await self._cluster_pool_base()
url = f"{base_url}/volumes/{volume_uuid}/iostats"
response = await self._client.get(url, headers=self._headers(), timeout=SIMPLYBLOCK_API_TIMEOUT_SECONDS)
response.raise_for_status()
payload = response.json()
return payload[0] # return the most recent one

async def update_volume(
self,
volume_uuid: str,
payload: dict[str, Any],
) -> None:
headers = self._headers()
headers["Content-Type"] = "application/json"
base_url = await self._cluster_pool_base()
url = f"{base_url}/volumes/{volume_uuid}"
response = await self._client.put(
url,
headers=headers,
json=payload,
timeout=SIMPLYBLOCK_API_TIMEOUT_SECONDS,
)
response.raise_for_status()


async def create_simplyblock_api(client: httpx.AsyncClient) -> SimplyblockApi:
from . import load_simplyblock_credentials

endpoint, cluster_id, cluster_secret = await load_simplyblock_credentials()
return SimplyblockApi(
client=client,
endpoint=endpoint,
cluster_id=cluster_id,
cluster_secret=cluster_secret,
)