Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/api/models/branch.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,14 @@ class BranchStatus(BaseModel):
"FAILED",
]

CapaResizeKey = Literal["database_size", "storage_size", "milli_vcpu", "memory_bytes"]
CapaResizeKey = Literal["database_size", "storage_size", "milli_vcpu", "memory_bytes", "iops"]

BranchResizeService = Literal[
"database_disk_resize",
"storage_api_disk_resize",
"database_cpu_resize",
"database_memory_resize",
"database_iops_resize",
]

RESIZE_STATUS_PRIORITY: dict[BranchResizeStatus, int] = {
Expand Down
18 changes: 18 additions & 0 deletions src/api/organization/project/branch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
kube_service,
resize_deployment,
update_branch_database_password,
update_branch_volume_iops,
)
from .....deployment.kubernetes.kubevirt import KubevirtSubresourceAction, call_kubevirt_subresource
from .....deployment.settings import settings as deployment_settings
Expand Down Expand Up @@ -132,6 +133,7 @@ async def _collect_branch_service_health(namespace: str) -> BranchStatus:
"storage_size": "storage_api_disk_resize",
"milli_vcpu": "database_cpu_resize",
"memory_bytes": "database_memory_resize",
"iops": "database_iops_resize",
}


Expand Down Expand Up @@ -162,6 +164,9 @@ async def _apply_resize_operations(branch: Branch, effective_parameters: dict[Ca

resize_deployment(branch.id, resize_params)

if "iops" in effective_parameters:
await update_branch_volume_iops(branch.id, effective_parameters["iops"])

if cpu_limit is not None and cpu_request is not None:
namespace, vm_name = get_db_vmi_identity(branch.id)
await kube_service.resize_vm_compute_cpu(
Expand Down Expand Up @@ -621,9 +626,22 @@ async def resize(
effective=effective_parameters,
timestamp=timestamp,
)
_track_resize_change(
parameter_key="iops",
new_value=parameters.iops,
current_value=branch_in_session.iops,
statuses=updated_statuses,
effective=effective_parameters,
timestamp=timestamp,
)

if effective_parameters:
await _apply_resize_operations(branch_in_session, effective_parameters)
if "iops" in effective_parameters:
updated_statuses["database_iops_resize"] = {
"status": "COMPLETED",
"timestamp": datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z"),
}

branch_in_session.resize_statuses = updated_statuses
branch_in_session.resize_status = aggregate_resize_statuses(updated_statuses)
Expand Down
178 changes: 172 additions & 6 deletions src/deployment/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import base64
import json
import logging
import subprocess
import tempfile
Expand All @@ -8,6 +10,7 @@
from typing import Annotated, Any, Literal

import asyncpg
import httpx
import yaml
from cloudflare import AsyncCloudflare, CloudflareError
from kubernetes_asyncio.client.exceptions import ApiException
Expand Down Expand Up @@ -42,6 +45,14 @@
DEFAULT_DATABASE_VM_NAME = "supabase-supabase-db"
CHECK_ENCRYPTED_HEADER_PLUGIN_NAME = "check-x-connection-encrypted"
CPU_REQUEST_FRACTION = 0.25 # request = 25% of limit
SIMPLYBLOCK_NAMESPACE = "simplyblock"
SIMPLYBLOCK_CSI_CONFIGMAP = "simplyblock-csi-cm"
SIMPLYBLOCK_CSI_SECRET = "simplyblock-csi-secret"
DATABASE_PVC_SUFFIX = "-supabase-db-pvc"


def branch_storage_class_name(branch_id: Identifier) -> str:
return f"sc-{str(branch_id).lower()}"


def deployment_namespace(branch_id: Identifier) -> str:
Expand Down Expand Up @@ -140,6 +151,148 @@ class DeploymentStatus(BaseModel):
status: StatusType


def _build_storage_class_manifest(*, storage_class_name: str, iops: int, base_storage_class: Any) -> dict[str, Any]:
provisioner = getattr(base_storage_class, "provisioner", None)
if not provisioner:
raise VelaKubernetesError("Base storage class missing provisioner")

base_parameters = dict(getattr(base_storage_class, "parameters", {}) or {})
cluster_id = base_parameters.get("cluster_id")
if not cluster_id:
raise VelaKubernetesError("Base storage class missing required parameter 'cluster_id'")

parameters = {key: str(value) for key, value in base_parameters.items()}
parameters.update(
{
"qos_rw_iops": str(iops),
"qos_rw_mbytes": "0",
"qos_r_mbytes": "0",
"qos_w_mbytes": "0",
}
)

allow_volume_expansion = getattr(base_storage_class, "allow_volume_expansion", None)
volume_binding_mode = getattr(base_storage_class, "volume_binding_mode", None)
reclaim_policy = getattr(base_storage_class, "reclaim_policy", None)
mount_options = getattr(base_storage_class, "mount_options", None)

manifest: dict[str, Any] = {
"apiVersion": "storage.k8s.io/v1",
"kind": "StorageClass",
"metadata": {
"name": storage_class_name,
},
"provisioner": provisioner,
"parameters": parameters,
}
if reclaim_policy is not None:
manifest["reclaimPolicy"] = reclaim_policy
if volume_binding_mode is not None:
manifest["volumeBindingMode"] = volume_binding_mode
if allow_volume_expansion is not None:
manifest["allowVolumeExpansion"] = bool(allow_volume_expansion)
if mount_options:
manifest["mountOptions"] = list(mount_options)

return manifest


async def _load_simplyblock_credentials() -> tuple[str, str, str]:
config_map = await kube_service.get_config_map(SIMPLYBLOCK_NAMESPACE, SIMPLYBLOCK_CSI_CONFIGMAP)
config_data = (config_map.data or {}).get("config.json")
if not config_data:
raise VelaDeploymentError("ConfigMap simplyblock-csi-cm missing 'config.json'")
try:
config = json.loads(config_data)
except (TypeError, ValueError) as exc:
raise VelaDeploymentError("Failed to parse Simplyblock CSI config JSON") from exc

cluster_cfg = config.get("simplybk")
if not isinstance(cluster_cfg, dict):
raise VelaDeploymentError("Simplyblock CSI config missing 'simplybk' section")

endpoint = cluster_cfg.get("ip")
cluster_id = cluster_cfg.get("uuid")
if not endpoint or not cluster_id:
raise VelaDeploymentError("Simplyblock CSI config missing required 'ip' or 'uuid'")

secret = await kube_service.get_secret(SIMPLYBLOCK_NAMESPACE, SIMPLYBLOCK_CSI_SECRET)
secret_blob = (secret.data or {}).get("secret.json")
if not secret_blob:
raise VelaDeploymentError("Secret simplyblock-csi-secret missing 'secret.json'")
try:
decoded_secret = base64.b64decode(secret_blob).decode()
except (TypeError, ValueError, UnicodeDecodeError) as exc:
raise VelaDeploymentError("Failed to decode Simplyblock CSI secret") from exc
try:
secret_json = json.loads(decoded_secret)
except (TypeError, ValueError) as exc:
raise VelaDeploymentError("Failed to parse Simplyblock CSI secret JSON") from exc

secret_cfg = secret_json.get("simplybk")
if not isinstance(secret_cfg, dict):
raise VelaDeploymentError("Simplyblock CSI secret missing 'simplybk' section")
cluster_secret = secret_cfg.get("secret")
if not cluster_secret:
raise VelaDeploymentError("Simplyblock CSI secret missing 'secret'")

return endpoint.rstrip("/"), cluster_id, cluster_secret


async def _resolve_database_volume_identifiers(namespace: str) -> tuple[str, str | None]:
pvc_name = f"{_release_name(namespace)}{DATABASE_PVC_SUFFIX}"
pvc = await kube_service.get_persistent_volume_claim(namespace, pvc_name)
pvc_spec = getattr(pvc, "spec", None)
volume_name = getattr(pvc_spec, "volume_name", None) if pvc_spec else None
if not volume_name:
raise VelaDeploymentError(f"PersistentVolumeClaim {namespace}/{pvc_name} is not bound to a PersistentVolume")

pv = await kube_service.get_persistent_volume(volume_name)
pv_spec = getattr(pv, "spec", None)
csi_spec = getattr(pv_spec, "csi", None) if pv_spec else None
volume_attributes = getattr(csi_spec, "volume_attributes", None) if csi_spec else None
if not isinstance(volume_attributes, dict):
raise VelaDeploymentError(
f"PersistentVolume {volume_name} missing CSI volume attributes; cannot resolve Simplyblock volume UUID"
)
volume_uuid = volume_attributes.get("uuid")
volume_cluster_id = volume_attributes.get("cluster_id")
if not volume_uuid:
raise VelaDeploymentError(f"PersistentVolume {volume_name} missing 'uuid' attribute in CSI volume attributes")
return volume_uuid, volume_cluster_id


async def update_branch_volume_iops(branch_id: Identifier, iops: int) -> None:
namespace = deployment_namespace(branch_id)

endpoint, cluster_id, cluster_secret = await _load_simplyblock_credentials()
volume_uuid, pv_cluster_id = await _resolve_database_volume_identifiers(namespace)
if pv_cluster_id and pv_cluster_id != cluster_id:
raise VelaDeploymentError(
f"Cluster ID mismatch for Simplyblock volume {volume_uuid!r}: PV reports {pv_cluster_id}, "
f"but credentials reference {cluster_id}"
)
url = f"{endpoint}/lvol/{volume_uuid}"
headers = {
"Content-Type": "application/json",
"Authorization": f"{cluster_id} {cluster_secret}",
}

try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.put(url, headers=headers, json={"max-rw-iops": iops})
response.raise_for_status()
except httpx.HTTPStatusError as exc:
detail = exc.response.text.strip() or exc.response.reason_phrase or str(exc)
raise VelaDeploymentError(
f"Simplyblock volume API rejected IOPS update for volume {volume_uuid!r}: {detail}"
) from exc
except httpx.HTTPError as exc:
raise VelaDeploymentError(f"Failed to reach Simplyblock volume API at {url!r}") from exc

logger.info("Updated Simplyblock volume %s IOPS to %s using endpoint %s", volume_uuid, iops, endpoint)


async def create_vela_config(
branch_id: Identifier,
parameters: DeploymentParameters,
Expand Down Expand Up @@ -207,16 +360,27 @@ async def create_vela_config(
"cpu": cpu_request,
}

# Set database volume size
db_spec.setdefault("persistence", {})["size"] = f"{bytes_to_gb(parameters.database_size)}G"

# Set storage volume size
storage_spec = values_content.setdefault("storage", {})
storage_persistence = storage_spec.setdefault("persistence", {})
storage_persistence["size"] = f"{bytes_to_gb(parameters.storage_size)}G"

# todo: create an storage class with the given IOPS
values_content["provisioning"] = {"iops": parameters.iops}
# Set database volume size
db_spec.setdefault("persistence", {})["size"] = f"{bytes_to_gb(parameters.database_size)}G"

# Create and apply custom StorageClass for database volume with specified IOPS
storage_class_name = branch_storage_class_name(branch_id)
base_storage_class = await kube_service.get_storage_class("simplyblock-csi-sc")
db_spec.setdefault("persistence", {})["storageClassName"] = storage_class_name

storage_class_manifest = _build_storage_class_manifest(
storage_class_name=storage_class_name,
iops=parameters.iops,
base_storage_class=base_storage_class,
)

await kube_service.apply_storage_class(storage_class_manifest)

with (
tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as temp_values,
tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as modified_compose,
Expand Down Expand Up @@ -311,6 +475,7 @@ class ResizeParameters(BaseModel):
storage_size: Annotated[int, Field(**STORAGE_SIZE_CONSTRAINTS)] | None = None
memory_bytes: Annotated[int, Field(**MEMORY_CONSTRAINTS)] | None = None
milli_vcpu: Annotated[int, Field(**CPU_CONSTRAINTS)] | None = None
iops: Annotated[int, Field(**IOPS_CONSTRAINTS)] | None = None

@model_validator(mode="after")
def ensure_at_least_one(self) -> "ResizeParameters":
Expand All @@ -319,8 +484,9 @@ def ensure_at_least_one(self) -> "ResizeParameters":
and self.storage_size is None
and self.memory_bytes is None
and self.milli_vcpu is None
and self.iops is None
):
raise ValueError("Specify at least one of database_size, storage_size, memory_bytes, or milli_vcpu")
raise ValueError("Specify at least one of database_size, storage_size, memory_bytes, milli_vcpu, or iops")
return self


Expand Down
60 changes: 59 additions & 1 deletion src/deployment/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from kubernetes_asyncio import client

from ...exceptions import VelaKubernetesError
from ._util import core_v1_client, custom_api_client
from ._util import core_v1_client, custom_api_client, storage_v1_client

# Configure logging
logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -100,6 +100,64 @@ async def apply_kong_plugin(self, namespace: str, plugin: dict[str, Any]) -> Non
else:
raise

async def apply_storage_class(self, manifest: dict[str, Any]) -> None:
name = manifest["metadata"]["name"]
async with storage_v1_client() as storage_v1:
try:
await storage_v1.create_storage_class(body=manifest)
logger.info("Created StorageClass %s", name)
except client.exceptions.ApiException as exc:
if exc.status == 409:
logger.info("StorageClass %s already exists; replacing", name)
await storage_v1.replace_storage_class(name=name, body=manifest)
else:
raise

async def get_storage_class(self, name: str) -> Any:
async with storage_v1_client() as storage_v1:
try:
return await storage_v1.read_storage_class(name)
except client.exceptions.ApiException as exc:
if exc.status == 404:
raise VelaKubernetesError(f"StorageClass {name!r} not found") from exc
raise

async def get_config_map(self, namespace: str, name: str) -> Any:
async with core_v1_client() as core_v1:
try:
return await core_v1.read_namespaced_config_map(name=name, namespace=namespace)
except client.exceptions.ApiException as exc:
if exc.status == 404:
raise VelaKubernetesError(f"ConfigMap {namespace!r}/{name!r} not found") from exc
raise

async def get_secret(self, namespace: str, name: str) -> Any:
async with core_v1_client() as core_v1:
try:
return await core_v1.read_namespaced_secret(name=name, namespace=namespace)
except client.exceptions.ApiException as exc:
if exc.status == 404:
raise VelaKubernetesError(f"Secret {namespace!r}/{name!r} not found") from exc
raise

async def get_persistent_volume_claim(self, namespace: str, name: str) -> Any:
async with core_v1_client() as core_v1:
try:
return await core_v1.read_namespaced_persistent_volume_claim(name=name, namespace=namespace)
except client.exceptions.ApiException as exc:
if exc.status == 404:
raise VelaKubernetesError(f"PersistentVolumeClaim {namespace!r}/{name!r} not found") from exc
raise

async def get_persistent_volume(self, name: str) -> Any:
async with core_v1_client() as core_v1:
try:
return await core_v1.read_persistent_volume(name=name)
except client.exceptions.ApiException as exc:
if exc.status == 404:
raise VelaKubernetesError(f"PersistentVolume {name!r} not found") from exc
raise

async def get_vm_pod_name(self, namespace: str, vm_name: str) -> str:
"""
Resolve the virt-launcher pod name backing the supplied KubeVirt VirtualMachine.
Expand Down
8 changes: 7 additions & 1 deletion src/deployment/kubernetes/_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from contextlib import asynccontextmanager

from aiohttp import ClientTimeout
from kubernetes_asyncio.client import ApiClient, CoreV1Api, CustomObjectsApi
from kubernetes_asyncio.client import ApiClient, CoreV1Api, CustomObjectsApi, StorageV1Api
from kubernetes_asyncio.config import load_incluster_config, load_kube_config
from kubernetes_asyncio.config.config_exception import ConfigException

Expand Down Expand Up @@ -49,3 +49,9 @@ async def core_v1_client():
async def custom_api_client():
async with api_client() as client:
yield CustomObjectsApi(api_client=client)


@asynccontextmanager
async def storage_v1_client():
async with api_client() as client:
yield StorageV1Api(api_client=client)