|
14 | 14 | from ..deployment import ( |
15 | 15 | get_db_vmi_identity, |
16 | 16 | kube_service, |
17 | | - load_simplyblock_credentials, |
18 | 17 | resolve_database_volume_identifiers, |
19 | 18 | resolve_storage_volume_identifiers, |
20 | 19 | ) |
21 | 20 | from ..deployment.kubernetes._util import custom_api_client |
| 21 | +from ..deployment.simplyblock_api import create_simplyblock_api |
22 | 22 | from ..models._util import Identifier |
23 | 23 | from ..models.branch import Branch, BranchServiceStatus, ResourceUsageDefinition |
24 | 24 | from ..models.project import Project |
|
60 | 60 |
|
61 | 61 | router = APIRouter(tags=["resource"]) |
62 | 62 |
|
63 | | -SIMPLYBLOCK_API_TIMEOUT_SECONDS = 10.0 |
64 | | - |
65 | 63 |
|
66 | 64 | # --------------------------- |
67 | 65 | # Helper functions |
|
83 | 81 | logger = logging.getLogger(__name__) |
84 | 82 |
|
85 | 83 |
|
86 | | -def _require_int_stat(stats: dict[str, Any], field: str) -> int: |
87 | | - if field not in stats: |
88 | | - raise ValueError(f"Simplyblock IO stats missing required field {field!r}") |
89 | | - value = stats[field] |
90 | | - try: |
91 | | - return int(value) |
92 | | - except (TypeError, ValueError) as exc: |
93 | | - raise ValueError(f"Simplyblock IO stat {field!r} with value {value!r} is not an integer") from exc |
94 | | - |
95 | | - |
96 | | -async def _fetch_volume_stats( |
97 | | - *, |
98 | | - client: httpx.AsyncClient, |
99 | | - endpoint: str, |
100 | | - cluster_id: str, |
101 | | - cluster_secret: str, |
102 | | - volume_uuid: str, |
103 | | - required_fields: tuple[str, ...], |
104 | | -) -> dict[str, int]: |
105 | | - url = f"{endpoint}/lvol/iostats/{volume_uuid}" |
106 | | - headers = { |
107 | | - "Authorization": f"{cluster_id} {cluster_secret}", |
108 | | - "Accept": "application/json", |
109 | | - } |
110 | | - |
111 | | - response = await client.get(url, headers=headers, timeout=SIMPLYBLOCK_API_TIMEOUT_SECONDS) |
112 | | - response.raise_for_status() |
113 | | - |
114 | | - payload = response.json() |
115 | | - |
116 | | - stats = payload.get("stats") |
117 | | - if not isinstance(stats, list) or not stats: |
118 | | - raise ValueError(f"Simplyblock IO stats payload missing stats list for volume {volume_uuid}") |
119 | | - entry = stats[0] |
120 | | - if not isinstance(entry, dict): |
121 | | - raise ValueError(f"Simplyblock IO stats entry malformed for volume {volume_uuid}") |
122 | | - |
123 | | - return {field: _require_int_stat(entry, field) for field in required_fields} |
124 | | - |
125 | | - |
126 | 84 | # --------------------------- |
127 | 85 | # Provisioning endpoints |
128 | 86 | # --------------------------- |
@@ -414,113 +372,41 @@ async def _resolve_volume_stats( |
414 | 372 | *, |
415 | 373 | volume_identifier_resolver: Callable[[str], Awaitable[tuple[str, str | None]]], |
416 | 374 | namespace: str, |
417 | | - branch: Branch, |
418 | | - resource_label: str, |
419 | | - sb_client: httpx.AsyncClient, |
420 | | - endpoint: str, |
421 | | - cluster_id: str, |
422 | | - cluster_secret: str, |
423 | | - required_fields: tuple[str, ...], |
424 | 375 | ) -> dict[str, int]: |
425 | | - volume_uuid, pv_cluster_id = await volume_identifier_resolver(namespace) |
426 | | - |
427 | | - if pv_cluster_id and pv_cluster_id != cluster_id: |
428 | | - logger.warning( |
429 | | - "Cluster mismatch for branch %s %s volume %s: PV cluster %s != credentials cluster %s", |
430 | | - branch.id, |
431 | | - resource_label, |
432 | | - volume_uuid, |
433 | | - pv_cluster_id, |
434 | | - cluster_id, |
435 | | - ) |
436 | | - raise ValueError( |
437 | | - f"Cluster mismatch for branch {branch.id} {resource_label} volume {volume_uuid}: " |
438 | | - f"PV cluster {pv_cluster_id} != credentials cluster {cluster_id}" |
439 | | - ) |
| 376 | + volume_uuid, _ = await volume_identifier_resolver(namespace) |
440 | 377 |
|
441 | | - return await _fetch_volume_stats( |
442 | | - client=sb_client, |
443 | | - endpoint=endpoint, |
444 | | - cluster_id=cluster_id, |
445 | | - cluster_secret=cluster_secret, |
446 | | - volume_uuid=volume_uuid, |
447 | | - required_fields=required_fields, |
448 | | - ) |
| 378 | + async with httpx.AsyncClient() as sb_client: |
| 379 | + sb_api = await create_simplyblock_api(sb_client) |
| 380 | + return await sb_api.volume_iostats(volume_uuid=volume_uuid) |
449 | 381 |
|
450 | 382 |
|
451 | | -async def _collect_database_volume_usage( |
452 | | - *, |
453 | | - namespace: str, |
454 | | - branch: Branch, |
455 | | - sb_client: httpx.AsyncClient, |
456 | | - endpoint: str, |
457 | | - cluster_id: str, |
458 | | - cluster_secret: str, |
459 | | -) -> tuple[int, int]: |
| 383 | +async def _collect_database_volume_usage(namespace: str) -> tuple[int, int]: |
460 | 384 | stats = await _resolve_volume_stats( |
461 | 385 | volume_identifier_resolver=resolve_database_volume_identifiers, |
462 | 386 | namespace=namespace, |
463 | | - branch=branch, |
464 | | - resource_label="database", |
465 | | - sb_client=sb_client, |
466 | | - endpoint=endpoint, |
467 | | - cluster_id=cluster_id, |
468 | | - cluster_secret=cluster_secret, |
469 | | - required_fields=("size_used", "read_io_ps", "write_io_ps"), |
470 | 387 | ) |
471 | 388 | nvme_bytes = stats["size_used"] |
472 | 389 | read_iops = stats["read_io_ps"] |
473 | 390 | write_iops = stats["write_io_ps"] |
474 | 391 | return nvme_bytes, read_iops + write_iops |
475 | 392 |
|
476 | 393 |
|
477 | | -async def _collect_storage_volume_usage( |
478 | | - *, |
479 | | - namespace: str, |
480 | | - branch: Branch, |
481 | | - sb_client: httpx.AsyncClient, |
482 | | - endpoint: str, |
483 | | - cluster_id: str, |
484 | | - cluster_secret: str, |
485 | | -) -> int: |
| 394 | +async def _collect_storage_volume_usage(namespace: str) -> int: |
486 | 395 | stats = await _resolve_volume_stats( |
487 | 396 | volume_identifier_resolver=resolve_storage_volume_identifiers, |
488 | 397 | namespace=namespace, |
489 | | - branch=branch, |
490 | | - resource_label="storage", |
491 | | - sb_client=sb_client, |
492 | | - endpoint=endpoint, |
493 | | - cluster_id=cluster_id, |
494 | | - cluster_secret=cluster_secret, |
495 | | - required_fields=("size_used",), |
496 | 398 | ) |
497 | 399 | return stats["size_used"] |
498 | 400 |
|
499 | 401 |
|
500 | 402 | async def _collect_branch_volume_usage(branch: Branch, namespace: str) -> tuple[int, int, int | None]: |
501 | | - endpoint, cluster_id, cluster_secret = await load_simplyblock_credentials() |
502 | | - async with httpx.AsyncClient() as sb_client: |
503 | | - db_task = _collect_database_volume_usage( |
504 | | - namespace=namespace, |
505 | | - branch=branch, |
506 | | - sb_client=sb_client, |
507 | | - endpoint=endpoint, |
508 | | - cluster_id=cluster_id, |
509 | | - cluster_secret=cluster_secret, |
510 | | - ) |
511 | | - if branch.enable_file_storage: |
512 | | - storage_task = _collect_storage_volume_usage( |
513 | | - namespace=namespace, |
514 | | - branch=branch, |
515 | | - sb_client=sb_client, |
516 | | - endpoint=endpoint, |
517 | | - cluster_id=cluster_id, |
518 | | - cluster_secret=cluster_secret, |
519 | | - ) |
520 | | - (nvme_bytes, iops), storage_bytes = await asyncio.gather(db_task, storage_task) |
521 | | - else: |
522 | | - nvme_bytes, iops = await db_task |
523 | | - storage_bytes = None |
| 403 | + db_task = _collect_database_volume_usage(namespace) |
| 404 | + if branch.enable_file_storage: |
| 405 | + storage_task = _collect_storage_volume_usage(namespace) |
| 406 | + (nvme_bytes, iops), storage_bytes = await asyncio.gather(db_task, storage_task) |
| 407 | + else: |
| 408 | + nvme_bytes, iops = await db_task |
| 409 | + storage_bytes = None |
524 | 410 |
|
525 | 411 | return nvme_bytes, iops, storage_bytes |
526 | 412 |
|
|
0 commit comments