Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions src/nimbus/cache_proxy/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def enforce_storage_limit(self) -> None:
max_bytes = self.settings.max_storage_bytes
if not max_bytes:
return
quota = self.settings.org_storage_quota_bytes
storage_path = self.settings.storage_path
total = directory_size(storage_path)
if total <= max_bytes:
Expand All @@ -267,12 +268,25 @@ def enforce_storage_limit(self) -> None:
except OSError:
pass
self.metrics.delete(cache_key)
if quota is not None:
try:
org_prefix, _ = cache_key.split("/", 1)
if org_prefix.startswith("org-"):
org_id = int(org_prefix.split("-", 1)[1])
self.update_org_usage(org_id, -size)
except (ValueError, IndexError):
self.logger.debug("failed_to_parse_org_from_cache_key", cache_key=cache_key)
CACHE_EVICTIONS_COUNTER.inc()
total -= size
self.logger.info("cache_evicted", cache_key=cache_key, reclaimed_bytes=size)
TOTAL_ENTRIES_GAUGE.set(float(self.metrics.total_entries()))
self.logger.info("cache_eviction_completed", total_bytes=directory_size(storage_path))

def update_org_usage(self, org_id: Optional[int], delta: int) -> int:
if org_id is None or delta == 0:
return 0
return self.metrics.add_org_bytes(int(org_id), delta)


REQUEST_COUNTER = GLOBAL_REGISTRY.register(Counter("nimbus_cache_requests_total", "Total cache proxy requests"))
HIT_COUNTER = GLOBAL_REGISTRY.register(Counter("nimbus_cache_hits_total", "Cache hits"))
Expand All @@ -281,6 +295,8 @@ def enforce_storage_limit(self) -> None:
BYTES_WRITTEN_COUNTER = GLOBAL_REGISTRY.register(Counter("nimbus_cache_bytes_written_total", "Bytes written to cache"))
TOTAL_ENTRIES_GAUGE = GLOBAL_REGISTRY.register(Gauge("nimbus_cache_entries", "Number of cache entries"))
CACHE_EVICTIONS_COUNTER = GLOBAL_REGISTRY.register(Counter("nimbus_cache_evictions_total", "Cache entries evicted to enforce limits"))
ORG_QUOTA_VIOLATIONS_COUNTER = GLOBAL_REGISTRY.register(
Counter("nimbus_cache_org_quota_violations_total", "Cache writes rejected due to org quota"))
CACHE_LATENCY_HISTOGRAM = GLOBAL_REGISTRY.register(
Histogram(
"nimbus_cache_proxy_request_latency_seconds",
Expand Down Expand Up @@ -323,6 +339,17 @@ def _initialise(self) -> None:
"""
)
)
conn.execute(
text(
"""
CREATE TABLE IF NOT EXISTS cache_org_usage (
org_id INTEGER PRIMARY KEY,
total_bytes INTEGER NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""
)
)

def record_hit(self, cache_key: str, bytes_served: int) -> None:
with self._engine.begin() as conn:
Expand Down Expand Up @@ -397,6 +424,39 @@ def delete(self, cache_key: str) -> None:
with self._engine.begin() as conn:
conn.execute(text("DELETE FROM cache_metrics WHERE cache_key = :cache_key"), {"cache_key": cache_key})

def add_org_bytes(self, org_id: int, delta: int) -> int:
with self._engine.begin() as conn:
conn.execute(
text(
"""
INSERT INTO cache_org_usage (org_id, total_bytes)
VALUES (:org_id, CASE WHEN :delta < 0 THEN 0 ELSE :delta END)
ON CONFLICT(org_id) DO UPDATE SET
total_bytes = CASE
WHEN cache_org_usage.total_bytes + :delta < 0 THEN 0
ELSE cache_org_usage.total_bytes + :delta
END,
updated_at = CURRENT_TIMESTAMP
"""
),
{"org_id": org_id, "delta": delta},
)
result = conn.execute(
text("SELECT total_bytes FROM cache_org_usage WHERE org_id=:org_id"),
{"org_id": org_id},
)
value = result.scalar_one()
return int(value)

def get_org_bytes(self, org_id: int) -> int:
with self._engine.connect() as conn:
result = conn.execute(
text("SELECT total_bytes FROM cache_org_usage WHERE org_id=:org_id"),
{"org_id": org_id},
)
row = result.fetchone()
return int(row[0]) if row else 0


def build_backend(settings: CacheProxySettings) -> CacheBackend:
if settings.s3_bucket:
Expand Down Expand Up @@ -502,6 +562,24 @@ async def put_cache(
with TRACER.start_as_current_span("cache_proxy.put", attributes={"nimbus.cache_key": namespaced_key}) as span:
await state.backend.write(namespaced_key, request.stream())
bytes_written = await state.backend.head(namespaced_key)
new_total = state.update_org_usage(org_id, bytes_written)
quota = state.settings.org_storage_quota_bytes
if quota is not None and new_total > quota:
state.update_org_usage(org_id, -bytes_written)
Comment on lines 504 to +568

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Delete cache object when rejecting on org quota

The PUT handler increments the org’s byte counter only after the blob has been written, and when the quota is exceeded it merely subtracts the counter and returns 413 but never removes the already-written object. This allows rejected writes to remain stored and retrievable, so disk usage can exceed the configured quota while metrics claim otherwise. Consider deleting the object (e.g. via the backend) before raising the exception so quota enforcement actually prevents the artifact from being cached.

Useful? React with 👍 / 👎.

await state.backend.delete(namespaced_key)
state.metrics.delete(namespaced_key)
state.logger.warning(
"org_quota_exceeded",
org_id=org_id,
wrote_bytes=bytes_written,
new_total=new_total,
quota=quota,
)
ORG_QUOTA_VIOLATIONS_COUNTER.inc()
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"Org {org_id} exceeded cache quota",
)
state.metrics.record_hit(namespaced_key, bytes_written)
BYTES_WRITTEN_COUNTER.inc(bytes_written)
HIT_COUNTER.inc()
Expand All @@ -510,6 +588,7 @@ async def put_cache(
state.enforce_storage_limit()
span.set_attribute("nimbus.bytes_written", bytes_written)
span.set_attribute("nimbus.org_id", org_id)
span.set_attribute("nimbus.org_bytes_total", new_total)
return Response(status_code=status.HTTP_201_CREATED)

@app.head("/cache/{cache_key:path}")
Expand Down
5 changes: 5 additions & 0 deletions src/nimbus/common/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ class HostAgentSettings(BaseSettings):
seccomp_filter_path: Optional[Path] = env_field(None, "NIMBUS_SECCOMP_FILTER")
kernel_image_path: str = env_field(..., "NIMBUS_KERNEL_IMAGE")
rootfs_image_path: str = env_field(..., "NIMBUS_ROOTFS_IMAGE")
rootfs_manifest_path: Optional[Path] = env_field(None, "NIMBUS_ROOTFS_MANIFEST")
rootfs_version: Optional[str] = env_field(None, "NIMBUS_ROOTFS_VERSION")
require_rootfs_attestation: bool = env_field(False, "NIMBUS_ROOTFS_ATTESTATION_REQUIRED")
snapshot_state_path: Optional[str] = env_field(None, "NIMBUS_SNAPSHOT_STATE_PATH")
snapshot_memory_path: Optional[str] = env_field(None, "NIMBUS_SNAPSHOT_MEMORY_PATH")
snapshot_enable_diff: bool = env_field(False, "NIMBUS_SNAPSHOT_ENABLE_DIFF")
Expand Down Expand Up @@ -273,6 +276,7 @@ class CacheProxySettings(BaseSettings):
"postgresql+psycopg://localhost/nimbus_cache_metrics",
"NIMBUS_CACHE_METRICS_DB",
)
org_storage_quota_bytes: Optional[int] = env_field(None, "NIMBUS_CACHE_ORG_QUOTA_BYTES")
s3_max_retries: int = env_field(3, "NIMBUS_CACHE_S3_MAX_RETRIES")
s3_retry_base_seconds: float = env_field(0.2, "NIMBUS_CACHE_S3_RETRY_BASE")
s3_retry_max_seconds: float = env_field(2.0, "NIMBUS_CACHE_S3_RETRY_MAX")
Expand Down Expand Up @@ -328,6 +332,7 @@ class DockerCacheSettings(BaseSettings):
)
shared_secret: SecretStr = env_field(SecretStr("local-cache-secret"), "NIMBUS_CACHE_SHARED_SECRET")
max_storage_bytes: Optional[int] = env_field(None, "NIMBUS_DOCKER_CACHE_MAX_BYTES")
org_storage_quota_bytes: Optional[int] = env_field(None, "NIMBUS_DOCKER_ORG_QUOTA_BYTES")
log_level: str = env_field("INFO", "NIMBUS_LOG_LEVEL")
otel_exporter_endpoint: Optional[str] = env_field(None, "NIMBUS_OTEL_EXPORTER_ENDPOINT")
otel_exporter_headers: Optional[str] = env_field(None, "NIMBUS_OTEL_EXPORTER_HEADERS")
Expand Down
69 changes: 69 additions & 0 deletions src/nimbus/docker_cache/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,17 @@ def _initialise(self) -> None:
"""
)
)
conn.execute(
text(
"""
CREATE TABLE IF NOT EXISTS org_usage (
org_id INTEGER PRIMARY KEY,
total_bytes INTEGER NOT NULL,
updated_at DOUBLE PRECISION NOT NULL
)
"""
)
)
conn.execute(
text(
"""
Expand Down Expand Up @@ -249,6 +260,40 @@ def get_blob_org_id(self, digest: str) -> Optional[int]:
row = result.fetchone()
return row[0] if row else None

def add_org_bytes(self, org_id: int, delta: int) -> int:
now = time.time()
with self._engine.begin() as conn:
conn.execute(
text(
"""
INSERT INTO org_usage (org_id, total_bytes, updated_at)
VALUES (:org_id, CASE WHEN :delta < 0 THEN 0 ELSE :delta END, :updated_at)
ON CONFLICT(org_id) DO UPDATE SET
total_bytes = CASE
WHEN org_usage.total_bytes + :delta < 0 THEN 0
ELSE org_usage.total_bytes + :delta
END,
updated_at = :updated_at
"""
),
{"org_id": org_id, "delta": delta, "updated_at": now},
)
result = conn.execute(
text("SELECT total_bytes FROM org_usage WHERE org_id=:org_id"),
{"org_id": org_id},
)
value = result.scalar_one()
return int(value)

def get_org_bytes(self, org_id: int) -> int:
with self._engine.connect() as conn:
result = conn.execute(
text("SELECT total_bytes FROM org_usage WHERE org_id=:org_id"),
{"org_id": org_id},
)
row = result.fetchone()
return int(row[0]) if row else 0


class DockerCacheState:
def __init__(self, settings: DockerCacheSettings, metrics: DockerCacheMetrics):
Expand Down Expand Up @@ -331,12 +376,20 @@ def ensure_storage_limit(self) -> None:
path = self.blob_path(digest)
if path.exists():
path.unlink(missing_ok=True)
blob_org = self.metrics.get_blob_org_id(digest)
self.metrics.delete_blob(digest)
if blob_org is not None:
self.update_org_usage(blob_org, -size)
total -= size
EVICTION_COUNTER.inc()
self.logger.info("blob_evicted", digest=digest, reclaimed_bytes=size)
TOTAL_BLOB_BYTES_GAUGE.set(float(self.metrics.total_blob_bytes()))

def update_org_usage(self, org_id: Optional[int], delta: int) -> int:
if org_id is None or delta == 0:
return 0
return self.metrics.add_org_bytes(int(org_id), delta)


def get_state(request: Request) -> DockerCacheState:
state = getattr(request.app.state, "cache_state", None)
Expand Down Expand Up @@ -541,6 +594,22 @@ async def finalize_blob_upload(
final_session = await state.finalize_upload(upload_id)
target_path = state.blob_path(expected_digest)
target_path.parent.mkdir(parents=True, exist_ok=True)
quota = state.settings.org_storage_quota_bytes
new_total = state.update_org_usage(token.organization_id, final_session.size)
if quota is not None and new_total > quota:
state.update_org_usage(token.organization_id, -final_session.size)
final_session.file_path.unlink(missing_ok=True)
Comment on lines +597 to +601

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid counting deduplicated blobs toward org quota

update_org_usage is called before checking whether the uploaded digest already exists. Re‑pushing a blob that is already present (a common case with Docker layers) bumps the org’s stored byte total even though the file is discarded in the target_path.exists() branch. This will cause quotas to trip even when no additional disk space is consumed. The quota update should happen only when a new blob is stored or should be rolled back when deduplication occurs.

Useful? React with 👍 / 👎.

state.logger.warning(
"docker_org_quota_exceeded",
org_id=token.organization_id,
wrote_bytes=final_session.size,
new_total=new_total,
quota=quota,
)
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"Org {token.organization_id} exceeded docker cache quota",
)
if target_path.exists():
target_path.touch() # update mtime for eviction ordering
final_session.file_path.unlink(missing_ok=True)
Expand Down
25 changes: 25 additions & 0 deletions src/nimbus/host_agent/firecracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from ..common.schemas import JobAssignment
from ..common.settings import HostAgentSettings
from ..rootfs.attestation import RootfsAttestationError, RootfsAttestor

LOGGER = structlog.get_logger("nimbus.host_agent.firecracker")

Expand Down Expand Up @@ -98,6 +99,25 @@ def __init__(self, settings: HostAgentSettings, config: Optional[MicroVMConfig]
self._snapshot_memory = Path(settings.snapshot_memory_path) if settings.snapshot_memory_path else None
self._snapshot_enabled = self._snapshot_state is not None and self._snapshot_memory is not None
self._snapshot_enable_diff = settings.snapshot_enable_diff
manifest_path = settings.rootfs_manifest_path
if manifest_path:
try:
self._attestor: Optional[RootfsAttestor] = RootfsAttestor(
manifest_path,
required=settings.require_rootfs_attestation,
version=settings.rootfs_version,
)
except RootfsAttestationError as exc:
if settings.require_rootfs_attestation:
raise
LOGGER.warning(
"rootfs_attestor_initialisation_failed",
error=str(exc),
manifest=str(manifest_path),
)
self._attestor = None
else:
self._attestor = None

async def execute_job(
self,
Expand Down Expand Up @@ -244,6 +264,11 @@ def _prepare_rootfs(self, workdir: Path) -> tuple[Path, str]:
except OSError as exc:
LOGGER.warning("Unable to mark rootfs copy read-only", path=str(destination), error=str(exc))
checksum = self._compute_checksum(destination)
if self._attestor:
try:
self._attestor.verify(source, checksum)
except RootfsAttestationError as exc:
raise FirecrackerError(f"Rootfs attestation failed: {exc}") from exc
return destination, checksum

def _build_vm_config(self, rootfs_path: str, kernel_path: str, tap_name: str) -> dict:
Expand Down
3 changes: 3 additions & 0 deletions src/nimbus/rootfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

from .config import RootfsPipelineConfig, RootfsVersionConfig
from .pipeline import RootfsPipeline
from .attestation import RootfsAttestationError, RootfsAttestor

__all__ = [
"RootfsPipeline",
"RootfsPipelineConfig",
"RootfsVersionConfig",
"RootfsAttestor",
"RootfsAttestationError",
]
Loading