diff --git a/.gitignore b/.gitignore index 144b8b5ea..c1a94f925 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,6 @@ node_modules logs docs/superpowers/ -.env \ No newline at end of file +.env +*.db +ddl/ diff --git a/pyproject.toml b/pyproject.toml index 5d67c2969..2ae7df85d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ admin = [ "alibabacloud_cr20181201==2.0.5", "sqlmodel", "aiosqlite", + "asyncpg", "boto3", "ray[default]==2.43.0", "pip", @@ -171,5 +172,6 @@ markers = [ "need_ray: need ray start", "need_docker: need docker daemon running", "need_admin: need admin start", - "need_admin_and_network: need install from network" + "need_admin_and_network: need install from network", + "need_database: need database Docker containers (PostgreSQL, Redis)" ] diff --git a/requirements_admin.txt b/requirements_admin.txt index 379fcc1fb..332a15229 100644 --- a/requirements_admin.txt +++ b/requirements_admin.txt @@ -108,7 +108,10 @@ arckit==0.1.0 async-timeout==5.0.1 ; python_full_version < '3.11.3' # via # aiohttp + # asyncpg # redis +asyncpg==0.31.0 + # via rl-rock attrs==25.4.0 # via # aiohttp diff --git a/rock/actions/sandbox/response.py b/rock/actions/sandbox/response.py index 3fda9ee2d..3ec324719 100644 --- a/rock/actions/sandbox/response.py +++ b/rock/actions/sandbox/response.py @@ -15,6 +15,7 @@ class SandboxResponse(BaseModel): class State(str, Enum): PENDING = "pending" RUNNING = "running" + STOPPED = "stopped" class IsAliveResponse(BaseModel): diff --git a/rock/admin/core/db_provider.py b/rock/admin/core/db_provider.py index 4c6d5dcba..1519da951 100644 --- a/rock/admin/core/db_provider.py +++ b/rock/admin/core/db_provider.py @@ -1,21 +1,57 @@ +"""Generic async SQLAlchemy engine provider.""" + +from __future__ import annotations + from typing import TYPE_CHECKING from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine -from rock.admin.core.schema import DBModelBase +from rock.admin.core.schema import Base +from rock.logger import init_logger if TYPE_CHECKING: from rock.config import DatabaseConfig +logger = init_logger(__name__) + class DatabaseProvider: - def __init__(self, db_config: "DatabaseConfig"): - self.db_config = db_config - self.engine: AsyncEngine + """Async SQLAlchemy engine provider. + + Supports SQLite (via ``aiosqlite``) and PostgreSQL (via ``asyncpg``). + """ + + def __init__(self, db_config: DatabaseConfig) -> None: + self._url = self._convert_url(db_config.url) + self._engine: AsyncEngine | None = None + + @property + def engine(self) -> AsyncEngine: + if self._engine is None: + raise RuntimeError("DatabaseProvider not initialised. Call init_pool() first.") + return self._engine + + async def init_pool(self) -> None: + """Create the async engine and ensure tables exist.""" + logger.info("Initializing database connection pool ...") + self._engine = create_async_engine(self._url, echo=False) + async with self._engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + logger.info("Database connection pool initialised; tables created.") - async def init(self): - self.engine = create_async_engine(self.db_config.url, echo=True) + async def close_pool(self) -> None: + """Dispose of the engine and release all connections.""" + if self._engine is not None: + logger.info("Closing database connection pool ...") + await self._engine.dispose() + logger.info("Database connection pool closed.") - async def create_tables(self): - async with self.engine.begin() as conn: - await conn.run_sync(DBModelBase.metadata.create_all) + @staticmethod + def _convert_url(url: str) -> str: + """Convert synchronous database URLs to their async equivalents.""" + if url.startswith("sqlite:///"): + return url.replace("sqlite:///", "sqlite+aiosqlite:///", 1) + if url.startswith("postgresql://") or url.startswith("postgres://"): + prefix = "postgresql://" if url.startswith("postgresql://") else "postgres://" + return "postgresql+asyncpg://" + url[len(prefix):] + return url diff --git a/rock/admin/core/sandbox_table.py b/rock/admin/core/sandbox_table.py index 299f7af47..9141b0776 100644 --- a/rock/admin/core/sandbox_table.py +++ b/rock/admin/core/sandbox_table.py @@ -1,34 +1,125 @@ -from collections.abc import Sequence +"""SandboxTable: sandbox-specific CRUD and query operations over DatabaseProvider.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any from sqlalchemy import select -from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession +from sqlalchemy.ext.asyncio import AsyncSession +from rock.admin.core.db_provider import DatabaseProvider from rock.admin.core.schema import SandboxRecord +from rock.logger import init_logger + +if TYPE_CHECKING: + from rock.actions.sandbox.sandbox_info import SandboxInfo + from rock.deployments.config import DockerDeploymentConfig + +logger = init_logger(__name__) class SandboxTable: - def __init__(self, engine: AsyncEngine): - self._engine = engine + """Sandbox-specific database access layer backed by DatabaseProvider. + + All methods use plain ``dict`` for both input and output. + + Write path (create / update): + - Fields from ``SandboxInfo`` / ``DockerDeploymentConfig`` that match a + ``SandboxRecord`` column are written to the corresponding scalar column. + - ``status`` column stores the full ``SandboxInfo`` dict. + - ``spec`` column stores the full ``DockerDeploymentConfig.model_dump()`` dict. + + Read path (get / list_by / list_by_in): + - Returns ``record.to_dict()`` — a plain dict with all non-None column values, + including ``spec`` and ``status``. + """ + + def __init__(self, db_provider: DatabaseProvider) -> None: + self._db = db_provider + + async def create( + self, + sandbox_id: str, + info: SandboxInfo, + config: DockerDeploymentConfig | None = None, + ) -> None: + """Insert a new sandbox record. + + Scalar columns are populated from the union of *config* and *info* + (``info`` takes priority on conflicts). + Raises ``IntegrityError`` if ``sandbox_id`` already exists. + """ + config_dict = config.model_dump() if config is not None else {} + merged = {**config_dict, **info} + filtered = _pick_columns(merged) - async def create(self, sandbox_record: SandboxRecord): - async with AsyncSession(self._engine) as session: - session.add(sandbox_record) + for col, default in SandboxRecord._NOT_NULL_DEFAULTS.items(): + if col not in filtered: + filtered[col] = default + + filtered["status"] = dict(info) + if config_dict: + filtered["spec"] = config_dict + + record = SandboxRecord(sandbox_id=sandbox_id, **filtered) + async with AsyncSession(self._db.engine) as session: + session.add(record) + await session.commit() + + async def get(self, sandbox_id: str) -> dict | None: + """Return a sandbox row as a plain dict, or ``None`` if not found.""" + async with AsyncSession(self._db.engine) as session: + record = await session.get(SandboxRecord, sandbox_id) + if record is None: + return None + return record.to_dict() + + async def update(self, sandbox_id: str, info: SandboxInfo) -> None: + """Partial update of scalar columns; always overwrites ``status`` with *info*.""" + filtered = _pick_columns(info) + filtered["status"] = dict(info) + + async with AsyncSession(self._db.engine) as session: + record = await session.get(SandboxRecord, sandbox_id) + if record is None: + logger.warning("update: sandbox_id=%s not found", sandbox_id) + return + for key, value in filtered.items(): + setattr(record, key, value) await session.commit() - async def list( - self, namespace: str | None = None, user: str | None = None, experiment_id: str | None = None - ) -> Sequence[SandboxRecord]: - async with AsyncSession(self._engine) as session: - stmt = select(SandboxRecord) - if None is not namespace: - stmt = stmt.where(SandboxRecord.namespace == namespace) - if None is not user: - stmt = stmt.where(SandboxRecord.user == user) - if None is not experiment_id: - stmt = stmt.where(SandboxRecord.experiment_id == experiment_id) + async def delete(self, sandbox_id: str) -> None: + """Hard-delete a sandbox record.""" + async with AsyncSession(self._db.engine) as session: + record = await session.get(SandboxRecord, sandbox_id) + if record is not None: + await session.delete(record) + await session.commit() + + async def list_by(self, column: str, value: str | int | float | bool) -> list[dict]: + """Equality query on a single column. Only columns in ``SandboxRecord.LIST_BY_ALLOWLIST`` are permitted.""" + if column not in SandboxRecord.LIST_BY_ALLOWLIST: + raise ValueError(f"Querying by column '{column}' is not allowed") + col_attr = getattr(SandboxRecord, column) + stmt = select(SandboxRecord).where(col_attr == value) + async with AsyncSession(self._db.engine) as session: result = await session.execute(stmt) - return result.scalars().all() + return [r.to_dict() for r in result.scalars().all()] + + async def list_by_in(self, column: str, values: list[str | int | float | bool]) -> list[dict]: + """IN query on a single column. Only columns in ``SandboxRecord.LIST_BY_ALLOWLIST`` are permitted.""" + if column not in SandboxRecord.LIST_BY_ALLOWLIST: + raise ValueError(f"Querying by column '{column}' is not allowed") + if not values: + return [] + col_attr = getattr(SandboxRecord, column) + stmt = select(SandboxRecord).where(col_attr.in_(values)) + async with AsyncSession(self._db.engine) as session: + result = await session.execute(stmt) + return [r.to_dict() for r in result.scalars().all()] + - async def get(self, id: str) -> SandboxRecord: - async with AsyncSession(self._engine) as session: - return await session.get(SandboxRecord, id) +def _pick_columns(data: dict[str, Any]) -> dict[str, Any]: + """Return only keys matching a scalar SandboxRecord column, excluding sandbox_id/spec/status.""" + columns = SandboxRecord.column_names() - {"sandbox_id", "spec", "status"} + return {k: v for k, v in data.items() if k in columns} diff --git a/rock/admin/core/schema.py b/rock/admin/core/schema.py index 21904ccae..cbb36df71 100644 --- a/rock/admin/core/schema.py +++ b/rock/admin/core/schema.py @@ -1,26 +1,105 @@ -from sqlalchemy import Column, DateTime, String -from sqlalchemy.ext.declarative import declarative_base +"""ORM base and models for the ROCK admin database. -DBModelBase = declarative_base() +``Base`` is the single SQLAlchemy ``DeclarativeBase`` for all ROCK tables. +``SandboxRecord`` is the canonical persistence model for sandbox metadata. +""" +from __future__ import annotations -class SandboxRecord(DBModelBase): - __tablename__ = "sandboxes" +from typing import Any, ClassVar - id = Column(String, primary_key=True) +from sqlalchemy import Boolean, Column, Float, Index, String +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import DeclarativeBase +from sqlalchemy.types import JSON - # Some grouping fields for filtering - namespace = Column(String) - user = Column(String) - experiment_id = Column(String) - created_at = Column(DateTime) - # Last call time - last_called_at = Column(DateTime) - # Stop time/auto cleanup time - stopped_at = Column(DateTime) +_JSONB_VARIANT = JSON().with_variant(JSONB(), "postgresql") - # Some metadata information - image = Column(String) - spec_meta = Column(String) - status_meta = Column(String) + +class Base(DeclarativeBase): + pass + + +# All ORM models that inherit from Base must be imported (or defined) in this file +# so that Base.metadata is fully populated before DatabaseProvider.init_pool() calls +# create_all. When adding a new table in a separate module, add its import here: +# from rock.admin.core. import # noqa: F401 +class SandboxRecord(Base): + """ORM model for the ``sandbox_record`` table.""" + + __tablename__ = "sandbox_record" + + sandbox_id = Column(String(128), primary_key=True) + user_id = Column(String(128), nullable=False, default="default") + image = Column(String(128), nullable=False, default="default") + experiment_id = Column(String(128), nullable=False, default="default") + namespace = Column(String(128), nullable=False, default="default") + cluster_name = Column(String(128), nullable=False, default="default") + state = Column(String(32), nullable=False, default="pending") + host_ip = Column(String(128), nullable=False, default="default") + create_time = Column(String(64), nullable=False, default="") + start_time = Column(String(64), nullable=True) + stop_time = Column(String(64), nullable=True) + host_name = Column(String(255), nullable=True) + auth_token = Column(String(512), nullable=True) + rock_authorization_encrypted = Column(String(1024), nullable=True) + cpus = Column(Float, nullable=True) + memory = Column(String(64), nullable=True) + create_user_gray_flag = Column(Boolean, nullable=True) + phases = Column(_JSONB_VARIANT, nullable=True) + port_mapping = Column(_JSONB_VARIANT, nullable=True) + spec = Column(_JSONB_VARIANT, nullable=True) + status = Column(_JSONB_VARIANT, nullable=True) + + __table_args__ = ( + Index("ix_sandbox_record_user_id", "user_id"), + Index("ix_sandbox_record_state", "state"), + Index("ix_sandbox_record_namespace", "namespace"), + Index("ix_sandbox_record_experiment_id", "experiment_id"), + Index("ix_sandbox_record_cluster_name", "cluster_name"), + Index("ix_sandbox_record_image", "image"), + Index("ix_sandbox_record_host_ip", "host_ip"), + Index("ix_sandbox_record_host_name", "host_name"), + Index("ix_sandbox_record_create_user_gray_flag", "create_user_gray_flag"), + ) + + # Columns allowed as the filter key in list_by(). + # Only include columns with an index (or PK); exclude JSONB, sensitive, and internal columns. + LIST_BY_ALLOWLIST: ClassVar[frozenset[str]] = frozenset( + { + "sandbox_id", + "user_id", + "image", + "experiment_id", + "namespace", + "cluster_name", + "state", + "host_ip", + "host_name", + "create_user_gray_flag", + } + ) + + _column_names: ClassVar[set[str] | None] = None + + @classmethod + def column_names(cls) -> set[str]: + if cls._column_names is None: + cls._column_names = {c.key for c in cls.__table__.columns} + return cls._column_names + + _NOT_NULL_DEFAULTS: ClassVar[dict[str, Any]] = { + "user_id": "default", + "image": "default", + "experiment_id": "default", + "namespace": "default", + "cluster_name": "default", + "state": "pending", + "host_ip": "default", + "create_time": "", + } + + def to_dict(self) -> dict[str, Any]: + """Return all non-``None`` column values as a plain dict.""" + return {c.key: getattr(self, c.key) for c in self.__table__.columns if getattr(self, c.key) is not None} diff --git a/rock/admin/entrypoints/sandbox_proxy_api.py b/rock/admin/entrypoints/sandbox_proxy_api.py index 67d8f8760..10a0ae53a 100644 --- a/rock/admin/entrypoints/sandbox_proxy_api.py +++ b/rock/admin/entrypoints/sandbox_proxy_api.py @@ -131,8 +131,10 @@ async def run(action: SandboxBashAction) -> RockResponse[BashObservation]: @sandbox_proxy_router.post("/sandboxes/batch") @handle_exceptions(error_message="batch get sandbox status failed") -async def batch_get_status(request: BatchSandboxStatusRequest) -> RockResponse[BatchSandboxStatusResponse]: - statuses_list = await sandbox_proxy_service.batch_get_sandbox_status_from_redis(request.sandbox_ids) +async def batch_get_status( + request: BatchSandboxStatusRequest, use_legacy_states: bool = True +) -> RockResponse[BatchSandboxStatusResponse]: + statuses_list = await sandbox_proxy_service.batch_get_sandbox_status(request.sandbox_ids, use_legacy_states) response = BatchSandboxStatusResponse(statuses=statuses_list) return RockResponse(result=response) @@ -175,7 +177,10 @@ async def upload( @handle_exceptions(error_message="list sandboxes failed") async def list_sandboxes(request: Request) -> RockResponse[SandboxListResponse]: query_params: SandboxQueryParams = dict(request.query_params) - result = await sandbox_proxy_service.list_sandboxes(query_params) + kwargs = {} + if "use_legacy_states" in query_params: + kwargs["use_legacy_states"] = query_params.pop("use_legacy_states").lower() != "false" + result = await sandbox_proxy_service.list_sandboxes(query_params, **kwargs) return RockResponse(result=result) diff --git a/rock/admin/main.py b/rock/admin/main.py index 2cb242833..93bc69abe 100644 --- a/rock/admin/main.py +++ b/rock/admin/main.py @@ -13,13 +13,15 @@ from starlette.responses import JSONResponse from rock import env_vars +from rock.admin.core.db_provider import DatabaseProvider +from rock.admin.core.sandbox_table import SandboxTable from rock.admin.core.ray_service import RayService from rock.admin.entrypoints.sandbox_api import sandbox_router, set_sandbox_manager from rock.admin.entrypoints.sandbox_proxy_api import sandbox_proxy_router, set_sandbox_proxy_service from rock.admin.entrypoints.warmup_api import set_warmup_service, warmup_router from rock.admin.gem.api import gem_router, set_env_service from rock.admin.scheduler.scheduler import SchedulerThread -from rock.config import RockConfig +from rock.config import RockConfig, DatabaseConfig from rock.logger import init_logger from rock.sandbox.gem_manager import GemManager from rock.sandbox.operator.factory import OperatorContext, OperatorFactory @@ -51,10 +53,12 @@ async def lifespan(app: FastAPI): env_vars.ROCK_ADMIN_ENV = args.env env_vars.ROCK_ADMIN_ROLE = args.role - # init redis provider - if args.env in ["local", "test", "dev"]: + # init redis provider (fallback to fakeredis if no host configured) + if args.env in ["local", "test", "dev"] or not rock_config.redis.host: from fakeredis import aioredis + if not rock_config.redis.host: + logger.info("redis.host is not configured, falling back to FakeRedis") redis_provider = RedisProvider(host=None, port=None, password="") redis_provider.client = aioredis.FakeRedis(decode_responses=True) else: @@ -65,6 +69,18 @@ async def lifespan(app: FastAPI): ) await redis_provider.init_pool() + # init database provider (fallback to sqlite in-memory if no url configured) + db_url = rock_config.database.url or "sqlite+aiosqlite:///:memory:" + if not rock_config.database.url: + logger.info("database.url is not configured, falling back to SQLite in-memory") + db_provider = DatabaseProvider(db_config=DatabaseConfig(url=db_url)) + await db_provider.init_pool() + sandbox_table = SandboxTable(db_provider) + + from rock.sandbox.sandbox_meta_store import SandboxMetaStore + + meta_store = SandboxMetaStore(redis_provider=redis_provider, sandbox_table=sandbox_table) + # init scheduler thread scheduler_thread = None @@ -78,6 +94,7 @@ async def lifespan(app: FastAPI): operator_context = OperatorContext( runtime_config=rock_config.runtime, ray_service=ray_service, + redis_provider=redis_provider, nacos_provider=rock_config.nacos_provider, k8s_config=rock_config.k8s, ) @@ -87,20 +104,20 @@ async def lifespan(app: FastAPI): if rock_config.runtime.enable_auto_clear: sandbox_manager = GemManager( rock_config, - redis_provider=redis_provider, ray_namespace=rock_config.ray.namespace, ray_service=ray_service, enable_runtime_auto_clear=True, operator=operator, + meta_store=meta_store, ) else: sandbox_manager = GemManager( rock_config, - redis_provider=redis_provider, ray_namespace=rock_config.ray.namespace, ray_service=ray_service, enable_runtime_auto_clear=False, operator=operator, + meta_store=meta_store, ) set_sandbox_manager(sandbox_manager) warmup_service = WarmupService(rock_config.warmup) @@ -118,7 +135,7 @@ async def lifespan(app: FastAPI): logger.info("Scheduler thread skipped on non-primary pod") else: - sandbox_manager = SandboxProxyService(rock_config=rock_config, redis_provider=redis_provider) + sandbox_manager = SandboxProxyService(rock_config=rock_config, meta_store=meta_store) set_sandbox_proxy_service(sandbox_manager) logger.info("rock-admin start") @@ -130,6 +147,9 @@ async def lifespan(app: FastAPI): scheduler_thread.stop() logger.info("Scheduler thread stopped") + if db_provider: + await db_provider.close_pool() + if redis_provider: await redis_provider.close_pool() diff --git a/rock/admin/proto/request.py b/rock/admin/proto/request.py index 08e384964..ac69dc7cb 100644 --- a/rock/admin/proto/request.py +++ b/rock/admin/proto/request.py @@ -120,6 +120,7 @@ class SandboxQueryParams(TypedDict, total=False): page: str page_size: str + use_legacy_states: str user_id: str experiment_id: str namespace: str diff --git a/rock/config.py b/rock/config.py index f43952732..b9a7a6efd 100644 --- a/rock/config.py +++ b/rock/config.py @@ -78,6 +78,9 @@ class ProxyServiceConfig: @dataclass class DatabaseConfig: + # Supported URL formats: + # SQLite: sqlite:///relative/path.db or sqlite:////absolute/path.db + # PostgreSQL: postgresql://user:password@host:port/dbname url: str = "" @@ -194,6 +197,7 @@ class RockConfig: runtime: RuntimeConfig = field(default_factory=RuntimeConfig) proxy_service: ProxyServiceConfig = field(default_factory=ProxyServiceConfig) scheduler: SchedulerConfig = field(default_factory=SchedulerConfig) + database: DatabaseConfig = field(default_factory=DatabaseConfig) nacos_provider: NacosConfigProvider | None = None @classmethod @@ -235,6 +239,8 @@ def from_env(cls, config_path: str | None = None): kwargs["proxy_service"] = ProxyServiceConfig(**config["proxy_service"]) if "scheduler" in config: kwargs["scheduler"] = SchedulerConfig(**config["scheduler"]) + if "database" in config: + kwargs["database"] = DatabaseConfig(**config["database"]) return cls(**kwargs) diff --git a/rock/sandbox/base_manager.py b/rock/sandbox/base_manager.py index d084b2cfc..568199a06 100644 --- a/rock/sandbox/base_manager.py +++ b/rock/sandbox/base_manager.py @@ -5,32 +5,30 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger -from rock.admin.core.redis_key import ALIVE_PREFIX from rock.admin.metrics.constants import MetricsConstants from rock.admin.metrics.monitor import MetricsMonitor, aggregate_metrics from rock.config import RockConfig from rock.deployments.manager import DeploymentManager from rock.logger import init_logger from rock.utils import get_executor -from rock.utils.providers.redis_provider import RedisProvider +from rock.sandbox.sandbox_meta_store import SandboxMetaStore logger = init_logger(__name__) class BaseManager: _check_job_bg_task: object = None - _redis_provider: RedisProvider = None rock_config: RockConfig = None def __init__( self, rock_config: RockConfig, - redis_provider: RedisProvider | None = None, + meta_store: SandboxMetaStore | None = None, enable_runtime_auto_clear: bool = False, ): self.rock_config = rock_config self._executor = get_executor() - self._redis_provider = redis_provider + self._meta_store = meta_store self.metrics_monitor = MetricsMonitor.create( export_interval_millis=20_000, metrics_endpoint=rock_config.runtime.metrics_endpoint, @@ -91,16 +89,15 @@ async def _collect_and_report_metrics(self): async def _collect_and_report_metrics_internal(self): """Collect and report metrics for all sandboxes""" overall_start = time.perf_counter() - if not self._redis_provider: + if not self._meta_store: return await self._report_system_resource_metrics() - if not await self._redis_provider.pattern_exists(f"{ALIVE_PREFIX}*"): + sandbox_cnt, sandbox_meta = await self._collect_sandbox_meta() + if sandbox_cnt == 0: logger.debug("No sandboxes to monitor") self.metrics_monitor.record_gauge_by_name(MetricsConstants.SANDBOX_TOTAL_COUNT, 0) return - - sandbox_cnt, sandbox_meta = await self._collect_sandbox_meta() aggregated_metrics = aggregate_metrics(sandbox_meta, "image") for image, count in aggregated_metrics.items(): self.metrics_monitor.record_gauge_by_name(MetricsConstants.SANDBOX_COUNT_IMAGE, count, {"image": image}) @@ -133,16 +130,12 @@ async def _collect_system_resource_metrics(self): async def _collect_sandbox_meta(self) -> tuple[int, dict[str, dict[str, str]]]: meta: dict = {} cnt = 0 - # type: ignore - async for key in self._redis_provider.client.scan_iter(match=f"{ALIVE_PREFIX}*", count=100): - sandbox_id = key.removeprefix(ALIVE_PREFIX) + if not self._meta_store: + return cnt, meta + async for sandbox_id in self._meta_store.iter_alive_sandbox_ids(): cnt += 1 - if self._sandbox_meta.get(sandbox_id) is not None: - try: - image = self._sandbox_meta[sandbox_id]["image"] - except Exception: - image = "default" - meta[sandbox_id] = {"image": image} + image = self._sandbox_meta.get(sandbox_id, {}).get("image", "default") + meta[sandbox_id] = {"image": image} return cnt, meta def stop_monitoring(self): diff --git a/rock/sandbox/gem_manager.py b/rock/sandbox/gem_manager.py index 2cb2aab1c..d0ec3d470 100644 --- a/rock/sandbox/gem_manager.py +++ b/rock/sandbox/gem_manager.py @@ -18,20 +18,26 @@ from rock.deployments.config import DockerDeploymentConfig from rock.sandbox.sandbox_actor import SandboxActor from rock.sandbox.sandbox_manager import SandboxManager -from rock.utils.providers import RedisProvider - +from rock.sandbox.sandbox_meta_store import SandboxMetaStore class GemManager(SandboxManager): def __init__( self, rock_config: RockConfig, - redis_provider: RedisProvider | None = None, + meta_store: SandboxMetaStore | None = None, ray_namespace: str = env_vars.ROCK_RAY_NAMESPACE, ray_service: RayService | None = None, enable_runtime_auto_clear: bool = False, operator=None, ): - super().__init__(rock_config, redis_provider, ray_namespace, ray_service, enable_runtime_auto_clear, operator) + super().__init__( + rock_config, + meta_store=meta_store, + ray_namespace=ray_namespace, + ray_service=ray_service, + enable_runtime_auto_clear=enable_runtime_auto_clear, + operator=operator, + ) async def env_make(self, env_id: str) -> EnvMakeResponse: config = DockerDeploymentConfig(image=env_vars.ROCK_ENVHUB_DEFAULT_DOCKER_IMAGE) diff --git a/rock/sandbox/operator/factory.py b/rock/sandbox/operator/factory.py index 3b0fc0d72..d7289c6ee 100644 --- a/rock/sandbox/operator/factory.py +++ b/rock/sandbox/operator/factory.py @@ -10,6 +10,7 @@ from rock.sandbox.operator.k8s.operator import K8sOperator from rock.sandbox.operator.ray import RayOperator from rock.utils.providers.nacos_provider import NacosConfigProvider +from rock.utils.providers.redis_provider import RedisProvider logger = init_logger(__name__) @@ -25,6 +26,7 @@ class OperatorContext: runtime_config: RuntimeConfig ray_service: RayService | None = None + redis_provider: RedisProvider | None = None # K8s operator dependencies k8s_config: K8sConfig | None = None nacos_provider: NacosConfigProvider | None = None @@ -59,6 +61,8 @@ def create_operator(context: OperatorContext) -> AbstractOperator: raise ValueError("RayService is required for RayOperator") logger.info("Creating RayOperator") ray_operator = RayOperator(ray_service=context.ray_service, runtime_config=context.runtime_config) + if context.redis_provider is not None: + ray_operator.set_redis_provider(context.redis_provider) if context.nacos_provider is not None: ray_operator.set_nacos_provider(context.nacos_provider) return ray_operator diff --git a/rock/sandbox/operator/ray.py b/rock/sandbox/operator/ray.py index b23b1fd82..6412d1269 100644 --- a/rock/sandbox/operator/ray.py +++ b/rock/sandbox/operator/ray.py @@ -90,7 +90,7 @@ async def get_status(self, sandbox_id: str) -> SandboxInfo: actor: SandboxActor = await self._ray_service.async_ray_get_actor(self._get_actor_name(sandbox_id)) sandbox_info: SandboxInfo = await self._ray_service.async_ray_get(actor.sandbox_info.remote()) remote_status: ServiceStatus = await self._ray_service.async_ray_get(actor.get_status.remote()) - sandbox_info["phases"] = remote_status.phases + sandbox_info["phases"] = {name: phase.to_dict() for name, phase in remote_status.phases.items()} sandbox_info["port_mapping"] = remote_status.get_port_mapping() alive = await self._ray_service.async_ray_get(actor.is_alive.remote()) # TODO: sink update state according to is_alive logic into SandboxInfo @@ -101,7 +101,6 @@ async def get_status(self, sandbox_id: str) -> SandboxInfo: redis_info = await self.get_sandbox_info_from_redis(sandbox_id) if redis_info: redis_info.update(sandbox_info) - redis_info["phases"] = {name: phase.to_dict() for name, phase in remote_status.phases.items()} return redis_info else: return sandbox_info diff --git a/rock/sandbox/sandbox_manager.py b/rock/sandbox/sandbox_manager.py index 3dbe2652d..15b84e888 100644 --- a/rock/sandbox/sandbox_manager.py +++ b/rock/sandbox/sandbox_manager.py @@ -1,5 +1,4 @@ import asyncio -import time from fastapi import UploadFile @@ -16,7 +15,6 @@ from rock.actions.sandbox.response import State from rock.actions.sandbox.sandbox_info import SandboxInfo from rock.admin.core.ray_service import RayService -from rock.admin.core.redis_key import ALIVE_PREFIX, alive_sandbox_key, timeout_sandbox_key from rock.admin.metrics.billing import log_billing_info from rock.admin.metrics.decorator import monitor_sandbox_operation from rock.admin.proto.request import ClusterInfo, UserInfo @@ -35,9 +33,11 @@ from rock.sandbox.base_manager import BaseManager from rock.sandbox.operator.abstract import AbstractOperator from rock.sandbox.sandbox_actor import SandboxActor +from rock.sandbox.sandbox_meta_store import SandboxMetaStore from rock.sandbox.service.sandbox_proxy_service import SandboxProxyService from rock.sdk.common.exceptions import BadRequestRockError, InternalServerRockError from rock.utils.crypto_utils import AESEncryption +from rock.sandbox.utils.timeout import SandboxTimeoutHelper from rock.utils.format import convert_to_gb, parse_size_to_bytes from rock.utils.providers.redis_provider import RedisProvider from rock.utils.service import build_sandbox_from_redis @@ -52,22 +52,22 @@ class SandboxManager(BaseManager): def __init__( self, rock_config: RockConfig, - redis_provider: RedisProvider | None = None, + meta_store: SandboxMetaStore | None = None, ray_namespace: str = env_vars.ROCK_RAY_NAMESPACE, ray_service: RayService | None = None, enable_runtime_auto_clear: bool = False, operator: AbstractOperator | None = None, ): super().__init__( - rock_config, redis_provider=redis_provider, enable_runtime_auto_clear=enable_runtime_auto_clear + rock_config, + meta_store=meta_store, + enable_runtime_auto_clear=enable_runtime_auto_clear, ) self._ray_service = ray_service self._ray_namespace = ray_namespace self._operator = operator self._aes_encrypter = AESEncryption() - self._proxy_service = SandboxProxyService(rock_config=rock_config, redis_provider=redis_provider) - if redis_provider: - self._operator.set_redis_provider(redis_provider) + self._proxy_service = SandboxProxyService(rock_config=rock_config, meta_store=meta_store) logger.info("sandbox service init success") async def refresh_aes_key(self): @@ -82,7 +82,7 @@ async def refresh_aes_key(self): async def _check_sandbox_exists_in_redis(self, config: DeploymentConfig): if isinstance(config, DockerDeploymentConfig) and config.container_name: sandbox_id = config.container_name - if self._redis_provider and await self._redis_provider.json_get(alive_sandbox_key(sandbox_id), "$"): + if self._meta_store and await self._meta_store.exists(sandbox_id): raise BadRequestRockError(f"Sandbox {sandbox_id} already exists") def _setup_sandbox_actor_metadata(self, sandbox_actor: SandboxActor, user_info: UserInfo) -> None: @@ -126,15 +126,15 @@ async def start_async( docker_deployment_config.cpus = self.rock_config.runtime.standard_spec.cpus docker_deployment_config.memory = self.rock_config.runtime.standard_spec.memory sandbox_info: SandboxInfo = await self._operator.submit(docker_deployment_config, user_info) - stop_time = str(int(time.time()) + docker_deployment_config.auto_clear_time * 60) - auto_clear_time_dict = { - env_vars.ROCK_SANDBOX_AUTO_CLEAR_TIME_KEY: str(docker_deployment_config.auto_clear_time), - env_vars.ROCK_SANDBOX_EXPIRE_TIME_KEY: stop_time, - } await self._build_sandbox_info_metadata(sandbox_info, user_info, cluster_info) - if self._redis_provider: - await self._redis_provider.json_set(alive_sandbox_key(sandbox_id), "$", sandbox_info) - await self._redis_provider.json_set(timeout_sandbox_key(sandbox_id), "$", auto_clear_time_dict) + if self._meta_store: + timeout_info = SandboxTimeoutHelper.make_timeout_info(docker_deployment_config.auto_clear_time) + await self._meta_store.create( + sandbox_id, + sandbox_info, + timeout_info=timeout_info, + deployment_config=docker_deployment_config, + ) return SandboxStartResponse( sandbox_id=sandbox_id, host_name=sandbox_info.get("host_name"), @@ -171,28 +171,35 @@ async def start(self, config: DeploymentConfig) -> SandboxStartResponse: @monitor_sandbox_operation() async def stop(self, sandbox_id): logger.info(f"stop sandbox {sandbox_id}") - sandbox_info: SandboxInfo = await build_sandbox_from_redis(self._redis_provider, sandbox_id) - if sandbox_info and sandbox_info.get("start_time"): + sandbox_info: SandboxInfo | None = await self._meta_store.get(sandbox_id) if self._meta_store else None + if sandbox_info is None: + sandbox_info = {} + sandbox_info["state"] = State.STOPPED + if sandbox_info.get("start_time"): sandbox_info["stop_time"] = get_iso8601_timestamp() log_billing_info(sandbox_info=sandbox_info) try: await self._operator.stop(sandbox_id) except ValueError as e: logger.error(f"ray get actor, actor {sandbox_id} not exist", exc_info=e) - await self._clear_redis_keys(sandbox_id) + if self._meta_store: + await self._meta_store.archive(sandbox_id, sandbox_info) + return try: self._sandbox_meta.pop(sandbox_id) except KeyError: logger.debug(f"{sandbox_id} key not found") logger.info(f"sandbox {sandbox_id} stopped") - await self._clear_redis_keys(sandbox_id) + if self._meta_store: + await self._meta_store.archive(sandbox_id, sandbox_info) async def get_mount(self, sandbox_id): async with self._ray_service.get_ray_rwlock().read_lock(): actor_name = self.deployment_manager.get_actor_name(sandbox_id) sandbox_actor = await self._ray_service.async_ray_get_actor(actor_name, self._ray_namespace) if sandbox_actor is None: - await self._clear_redis_keys(sandbox_id) + if self._meta_store: + await self._meta_store.archive(sandbox_id, {}) raise Exception(f"sandbox {sandbox_id} not found to get mount") result = await self._ray_service.async_ray_get(sandbox_actor.get_mount.remote()) logger.info(f"get_mount: {result}") @@ -205,27 +212,24 @@ async def commit(self, sandbox_id, image_tag: str, username: str, password: str) actor_name = self.deployment_manager.get_actor_name(sandbox_id) sandbox_actor = await self._ray_service.async_ray_get_actor(actor_name, self._ray_namespace) if sandbox_actor is None: - await self._clear_redis_keys(sandbox_id) + if self._meta_store: + await self._meta_store.archive(sandbox_id, {}) raise Exception(f"sandbox {sandbox_id} not found to commit") logger.info(f"begin to commit {sandbox_id} to {image_tag}") result = await self._ray_service.async_ray_get(sandbox_actor.commit.remote(image_tag, username, password)) logger.info(f"commit {sandbox_id} to {image_tag} finished, result {result}") return result - async def _clear_redis_keys(self, sandbox_id): - if self._redis_provider: - await self._redis_provider.json_delete(alive_sandbox_key(sandbox_id)) - await self._redis_provider.json_delete(timeout_sandbox_key(sandbox_id)) - logger.info(f"sandbox {sandbox_id} deleted from redis") - @monitor_sandbox_operation() async def get_status(self, sandbox_id) -> SandboxStatusResponse: sandbox_info: SandboxInfo = await self._operator.get_status(sandbox_id=sandbox_id) is_alive = sandbox_info.get("state") == State.RUNNING self._update_sandbox_alive_info(sandbox_info, is_alive) - if self._redis_provider: - await self._redis_provider.json_set(alive_sandbox_key(sandbox_id), "$", sandbox_info) - await self._update_expire_time(sandbox_id) + if self._meta_store: + current = await self._meta_store.get(sandbox_id) + if current is None or current.get("state") != sandbox_info.get("state"): + await self._meta_store.update(sandbox_id, sandbox_info) + await self._refresh_timeout(sandbox_id) return SandboxStatusResponse( sandbox_id=sandbox_id, status=sandbox_info.get("phases"), @@ -245,9 +249,9 @@ async def get_status(self, sandbox_id) -> SandboxStatusResponse: ) async def build_sandbox_info_from_redis(self, sandbox_id: str, deployment_info: SandboxInfo) -> SandboxInfo | None: - sandbox_status = await self._redis_provider.json_get(alive_sandbox_key(sandbox_id), "$") - if sandbox_status and len(sandbox_status) > 0: - sandbox_info = sandbox_status[0] + sandbox_info_from_store = await self._meta_store.get(sandbox_id) if self._meta_store else None + if sandbox_info_from_store: + sandbox_info = sandbox_info_from_store remote_info = { k: v for k, v in deployment_info.items() if k in ["phases", "port_mapping", "alive", "state"] } @@ -296,17 +300,27 @@ async def write_file(self, request: WriteFileRequest) -> WriteFileResponse: async def upload(self, file: UploadFile, target_path: str, sandbox_id: str) -> UploadResponse: return await self._proxy_service.upload(file, target_path, sandbox_id) - async def _is_expired(self, sandbox_id): - timeout_dict = await self._redis_provider.json_get(timeout_sandbox_key(sandbox_id), "$") - if timeout_dict is None or len(timeout_dict) == 0: - raise Exception(f"sandbox {sandbox_id} timeout key not found") + async def _refresh_timeout(self, sandbox_id: str) -> None: + if not self._meta_store: + return + timeout_info = await self._meta_store.get_timeout(sandbox_id) + if timeout_info is None: + logger.warning("refresh_timeout: timeout key not found for sandbox_id=%s", sandbox_id) + return + new_timeout = SandboxTimeoutHelper.refresh_timeout(timeout_info) + if new_timeout is None: + logger.warning("refresh_timeout: auto_clear_time missing for sandbox_id=%s", sandbox_id) + return + await self._meta_store.update_timeout(sandbox_id, new_timeout) - if timeout_dict is not None and len(timeout_dict) > 0: - expire_time: int = int(timeout_dict[0].get(env_vars.ROCK_SANDBOX_EXPIRE_TIME_KEY)) - return int(time.time()) > expire_time - else: - logger.info(f"sandbox_id:[{sandbox_id}] is already cleared") - return True + async def _is_expired(self, sandbox_id: str) -> bool: + if not self._meta_store: + return False + timeout_info = await self._meta_store.get_timeout(sandbox_id) + if timeout_info is None: + logger.warning("is_expired: timeout key not found for sandbox_id=%s", sandbox_id) + return False + return SandboxTimeoutHelper.is_expired(timeout_info) async def _is_actor_alive(self, sandbox_id): try: @@ -318,11 +332,10 @@ async def _is_actor_alive(self, sandbox_id): return False async def _check_job_background(self): - if not self._redis_provider: + if not self._meta_store: return logger.debug("check job background") - async for key in self._redis_provider.client.scan_iter(match=f"{ALIVE_PREFIX}*", count=100): - sandbox_id = key.removeprefix(ALIVE_PREFIX) + async for sandbox_id in self._meta_store.iter_alive_sandbox_ids(): try: is_expired = await self._is_expired(sandbox_id) if is_expired: @@ -341,26 +354,6 @@ async def get_sandbox_statistics(self, sandbox_id): resource_metrics = await self._ray_service.async_ray_get(sandbox_actor.get_sandbox_statistics.remote()) return resource_metrics - async def _update_expire_time(self, sandbox_id): - if self._redis_provider is None: - return - sandbox_status_dict = await self._redis_provider.json_get(alive_sandbox_key(sandbox_id), "$") - if not sandbox_status_dict or len(sandbox_status_dict) == 0: - logger.info(f"sandbox-{sandbox_id} is not alive, skip update expire time") - return - origin_info = await self._redis_provider.json_get(timeout_sandbox_key(sandbox_id), "$") - if origin_info is None or len(origin_info) == 0: - logger.info(f"sandbox-{sandbox_id} is not initialized, skip update expire time") - return - auto_clear_time: str = origin_info[0].get(env_vars.ROCK_SANDBOX_AUTO_CLEAR_TIME_KEY) - expire_time: int = int(time.time()) + int(auto_clear_time) * 60 - logger.info(f"sandbox-{sandbox_id} update expire time: {expire_time}") - new_dict = { - env_vars.ROCK_SANDBOX_AUTO_CLEAR_TIME_KEY: auto_clear_time, - env_vars.ROCK_SANDBOX_EXPIRE_TIME_KEY: str(expire_time), - } - await self._redis_provider.json_set(timeout_sandbox_key(sandbox_id), "$", new_dict) - def validate_sandbox_spec(self, runtime_config: RuntimeConfig, deployment_config: DeploymentConfig) -> None: try: memory = parse_size_to_bytes(deployment_config.memory) diff --git a/rock/sandbox/sandbox_meta_store.py b/rock/sandbox/sandbox_meta_store.py new file mode 100644 index 000000000..82091f5ca --- /dev/null +++ b/rock/sandbox/sandbox_meta_store.py @@ -0,0 +1,143 @@ +"""SandboxMetaStore - coordinator for Redis (hot path) + DB (query path) dual-write. + +Redis remains the source of truth for live sandbox state. +The database is an async replica used for indexed queries (list_by, etc.). +All DB operations are awaited for consistency. +""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from typing import Any + +from typing import TYPE_CHECKING + +from rock.actions.sandbox.response import State +from rock.actions.sandbox.sandbox_info import SandboxInfo +from rock.admin.core.redis_key import alive_sandbox_key, timeout_sandbox_key +from rock.admin.core.sandbox_table import SandboxTable + +if TYPE_CHECKING: + from rock.deployments.config import DockerDeploymentConfig +from rock.logger import init_logger +from rock.utils.providers.redis_provider import RedisProvider + +logger = init_logger(__name__) + +# States that indicate an active sandbox (not yet stopped/archived). +_ACTIVE_STATES: list[str] = [State.RUNNING, State.PENDING] + + +class SandboxMetaStore: + """Coordinates sandbox metadata across Redis (hot path) and DB (query path). + + Both providers are required. Use FakeRedis / SQLite-memory for local/test environments. + """ + + def __init__( + self, + redis_provider: RedisProvider, + sandbox_table: SandboxTable, + ) -> None: + self._redis: RedisProvider = redis_provider + self._db: SandboxTable = sandbox_table + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + async def create( + self, + sandbox_id: str, + sandbox_info: SandboxInfo, + timeout_info: dict[str, str] | None = None, + deployment_config: DockerDeploymentConfig | None = None, + ) -> None: + """Write sandbox info to the Redis alive key and await DB insert. + + Parameters + ---------- + timeout_info: + If provided, also write the timeout key (``auto_clear_time`` / ``expire_time``). + deployment_config: + ``DockerDeploymentConfig`` snapshot written once to the ``spec`` DB column. + Redis does not store this. + """ + await self._redis.json_set(alive_sandbox_key(sandbox_id), "$", sandbox_info) + if timeout_info is not None: + await self._redis.json_set(timeout_sandbox_key(sandbox_id), "$", timeout_info) + + await self._db.create(sandbox_id, sandbox_info, deployment_config) + + async def update(self, sandbox_id: str, sandbox_info: SandboxInfo) -> None: + """Merge *sandbox_info* into the existing Redis alive key and await DB update.""" + current = await self._redis.json_get(alive_sandbox_key(sandbox_id), "$") + merged: dict[str, Any] = {**(current[0] if current else {}), **sandbox_info} + await self._redis.json_set(alive_sandbox_key(sandbox_id), "$", merged) + + await self._db.update(sandbox_id, sandbox_info) + + async def delete(self, sandbox_id: str) -> None: + """Delete Redis alive + timeout keys and await DB delete.""" + await self._redis.json_delete(alive_sandbox_key(sandbox_id)) + await self._redis.json_delete(timeout_sandbox_key(sandbox_id)) + + await self._db.delete(sandbox_id) + + async def archive(self, sandbox_id: str, final_info: SandboxInfo) -> None: + """Persist final state to DB, then remove sandbox from Redis. + + Unlike ``delete``, the DB record is preserved and updated with + ``final_info`` (e.g. ``stop_time``, ``state``). Use this when a + sandbox has finished its lifecycle and the final state should be + queryable from the DB. + + The DB write is awaited before the Redis keys are deleted so that + the final state is always durably stored before the alive key + disappears. If the DB write fails the exception propagates and + Redis cleanup is skipped. + """ + await self._db.update(sandbox_id, final_info) + + await self._redis.json_delete(alive_sandbox_key(sandbox_id)) + await self._redis.json_delete(timeout_sandbox_key(sandbox_id)) + + async def get(self, sandbox_id: str) -> SandboxInfo | None: + """Read sandbox info from the Redis alive key.""" + result = await self._redis.json_get(alive_sandbox_key(sandbox_id), "$") + if result and len(result) > 0: + return result[0] + return None + + async def exists(self, sandbox_id: str) -> bool: + """Return ``True`` when the Redis alive key exists for ``sandbox_id``.""" + return await self.get(sandbox_id) is not None + + async def get_timeout(self, sandbox_id: str) -> dict[str, str] | None: + """Read timeout info from the Redis timeout key.""" + timeout_info = await self._redis.json_get(timeout_sandbox_key(sandbox_id), "$") + if timeout_info and len(timeout_info) > 0: + return timeout_info[0] + return None + + async def update_timeout(self, sandbox_id: str, timeout_info: dict[str, str]) -> None: + """Overwrite the Redis timeout key with *timeout_info*.""" + await self._redis.json_set(timeout_sandbox_key(sandbox_id), "$", timeout_info) + + async def iter_alive_sandbox_ids(self) -> AsyncIterator[str]: + """Yield active sandbox IDs from the DB.""" + for sandbox_info in await self._db.list_by_in("state", _ACTIVE_STATES): + sandbox_id = sandbox_info.get("sandbox_id") + if sandbox_id: + yield sandbox_id + + async def batch_get(self, sandbox_ids: list[str]) -> list[SandboxInfo]: + """Fetch sandbox info for multiple IDs from the DB. Missing IDs are omitted.""" + if not sandbox_ids: + return [] + + return await self._db.list_by_in("sandbox_id", sandbox_ids) + + async def list_by(self, field: str, value: str | int | float | bool) -> list[SandboxInfo]: + """Query sandboxes by *field* == *value* from the DB.""" + return await self._db.list_by(field, value) diff --git a/rock/sandbox/service/sandbox_proxy_service.py b/rock/sandbox/service/sandbox_proxy_service.py index 99430d034..53037d3dd 100644 --- a/rock/sandbox/service/sandbox_proxy_service.py +++ b/rock/sandbox/service/sandbox_proxy_service.py @@ -1,6 +1,5 @@ import asyncio # noqa: I001 import json -import time from fastapi.responses import JSONResponse, StreamingResponse from starlette.datastructures import Headers @@ -23,8 +22,8 @@ UploadResponse, WriteFileResponse, ) +from rock.actions.sandbox.response import State from rock.actions.sandbox.sandbox_info import SandboxInfo -from rock.admin.core.redis_key import ALIVE_PREFIX, alive_sandbox_key, timeout_sandbox_key from rock.admin.metrics.decorator import monitor_sandbox_operation from rock.admin.metrics.monitor import MetricsMonitor from rock.admin.proto.request import SandboxBashAction as BashAction @@ -40,20 +39,21 @@ from rock.deployments.status import ServiceStatus from rock.common.port_validation import validate_port_forward_port from rock.logger import init_logger +from rock.sandbox.sandbox_meta_store import SandboxMetaStore +from rock.sandbox.utils.timeout import SandboxTimeoutHelper from rock.sdk.common.exceptions import BadRequestRockError from rock.utils import EAGLE_EYE_TRACE_ID, trace_id_ctx_var -from rock.utils.providers import RedisProvider logger = init_logger(__name__) class SandboxProxyService: - _redis_provider: RedisProvider = None + _meta_store: SandboxMetaStore = None _httpx_client = None - def __init__(self, rock_config: RockConfig, redis_provider: RedisProvider | None = None): + def __init__(self, rock_config: RockConfig, meta_store: SandboxMetaStore | None = None): self._rock_config = rock_config - self._redis_provider = redis_provider + self._meta_store = meta_store self.metrics_monitor = MetricsMonitor.create( export_interval_millis=20_000, metrics_endpoint=rock_config.runtime.metrics_endpoint, @@ -162,9 +162,11 @@ async def execute(self, command: Command) -> CommandResponse: return CommandResponse(**response) @monitor_sandbox_operation() - async def batch_get_sandbox_status_from_redis(self, sandbox_ids: list[str]) -> list[SandboxStatusResponse]: - if self._redis_provider is None: - logger.info("batch_get_sandbox_status_from_redis, redis provider is None, return empty") + async def batch_get_sandbox_status( + self, sandbox_ids: list[str], use_legacy_states: bool = True + ) -> list[SandboxStatusResponse]: + if self._meta_store is None: + logger.info("batch_get_sandbox_status, meta_store is None, return empty") return [] if sandbox_ids is None: raise BadRequestRockError(message="sandbox_ids is None") @@ -172,20 +174,23 @@ async def batch_get_sandbox_status_from_redis(self, sandbox_ids: list[str]) -> l raise BadRequestRockError( message=f"sandbox_ids count too large, max count is {self._batch_get_status_max_count}" ) - logger.info(f"batch_get_sandbox_status_from_redis, sandbox_ids count is {len(sandbox_ids)}") + logger.info(f"batch_get_sandbox_status, sandbox_ids count is {len(sandbox_ids)}") results = [] - alive_keys = [alive_sandbox_key(sandbox_id) for sandbox_id in sandbox_ids] - sandbox_infos: list[SandboxInfo] = await self._redis_provider.json_mget(alive_keys, "$") + sandbox_infos: list[SandboxInfo] = await self._meta_store.batch_get(sandbox_ids) for sandbox_info in sandbox_infos: - if sandbox_info: - results.append(SandboxStatusResponse.from_sandbox_info(sandbox_info)) - logger.info(f"batch_get_sandbox_status_from_redis succ, result count is {len(results)}") + state = sandbox_info.get("state") + if use_legacy_states and state not in (State.RUNNING, State.PENDING): + continue + results.append(SandboxStatusResponse.from_sandbox_info(sandbox_info)) + logger.info(f"batch_get_sandbox_status succ, result count is {len(results)}") return results @monitor_sandbox_operation() - async def list_sandboxes(self, query_params: SandboxQueryParams) -> SandboxListResponse: - if self._redis_provider is None: - logger.warning("Redis provider is not available, list_sandboxes returning empty result") + async def list_sandboxes( + self, query_params: SandboxQueryParams, use_legacy_states: bool = True + ) -> SandboxListResponse: + if self._meta_store is None: + logger.warning("meta_store is not available, list_sandboxes returning empty result") return SandboxListResponse() page = int(query_params.pop("page", "1")) page_size = int(query_params.pop("page_size", "500")) @@ -195,7 +200,7 @@ async def list_sandboxes(self, query_params: SandboxQueryParams) -> SandboxListR raise BadRequestRockError(f"page_size exceeds maximum {self._batch_get_status_max_count}") logger.info(f"list sandboxes with filters: {query_params}, page: {page}, page_size: {page_size}") try: - all_sandbox_data = await self.list_all_sandboxes_by_query_params(query_params) + all_sandbox_data = await self.list_all_sandboxes_by_query_params(query_params, use_legacy_states) total = len(all_sandbox_data) start_index = (page - 1) * page_size end_index = start_index + page_size @@ -585,10 +590,10 @@ async def _forward_tcp_to_websocket(self, reader, websocket, direction: str, idl logger.info(f"Connection closed in {direction}: {e}") async def get_service_status(self, sandbox_id: str): - sandbox_status_dicts = await self._redis_provider.json_get(alive_sandbox_key(sandbox_id), "$") - if not sandbox_status_dicts or sandbox_status_dicts[0].get("host_ip") is None: + sandbox_info = await self._meta_store.get(sandbox_id) if self._meta_store else None + if not sandbox_info or sandbox_info.get("host_ip") is None: raise Exception(f"sandbox {sandbox_id} not started") - return sandbox_status_dicts + return [sandbox_info] async def _send_request( self, @@ -715,40 +720,32 @@ async def _forward_messages(self, source_ws, target_ws, direction: str): logger.error(f"Error forwarding message {direction}: {e}") async def _update_expire_time(self, sandbox_id): - if self._redis_provider is None: + if not self._meta_store: return - sandbox_status_dict = await self._redis_provider.json_get(alive_sandbox_key(sandbox_id), "$") - if not sandbox_status_dict or len(sandbox_status_dict) == 0: - logger.info(f"sandbox-{sandbox_id} is not alive, skip update expire time") + timeout_info = await self._meta_store.get_timeout(sandbox_id) + if timeout_info is None: return - origin_info = await self._redis_provider.json_get(timeout_sandbox_key(sandbox_id), "$") - if origin_info is None or len(origin_info) == 0: - logger.info(f"sandbox-{sandbox_id} is not initialized, skip update expire time") - return - auto_clear_time: str = origin_info[0].get(env_vars.ROCK_SANDBOX_AUTO_CLEAR_TIME_KEY) - expire_time: int = int(time.time()) + int(auto_clear_time) * 60 - logger.info(f"sandbox-{sandbox_id} update expire time: {expire_time}") - new_dict = { - env_vars.ROCK_SANDBOX_AUTO_CLEAR_TIME_KEY: auto_clear_time, - env_vars.ROCK_SANDBOX_EXPIRE_TIME_KEY: str(expire_time), - } - await self._redis_provider.json_set(timeout_sandbox_key(sandbox_id), "$", new_dict) - - async def list_all_sandboxes_by_query_params(self, query_params: SandboxQueryParams): - all_keys = [] - async for key in self._redis_provider.client.scan_iter(match=f"{ALIVE_PREFIX}*", count=1000): # type: ignore - all_keys.append(key) - if not all_keys: + new_timeout = SandboxTimeoutHelper.refresh_timeout(timeout_info) + if new_timeout is not None: + await self._meta_store.update_timeout(sandbox_id, new_timeout) + + async def list_all_sandboxes_by_query_params( + self, query_params: SandboxQueryParams, use_legacy_states: bool = True + ): + all_ids = [] + async for sandbox_id in self._meta_store.iter_alive_sandbox_ids(): + all_ids.append(sandbox_id) + if not all_ids: return [] all_sandbox_data = [] batch_size = self._batch_get_status_max_count - for i in range(0, len(all_keys), batch_size): - batch_keys = all_keys[i : i + batch_size] - sandbox_infos_list = await self._redis_provider.json_mget(batch_keys, "$") - + for i in range(0, len(all_ids), batch_size): + batch_ids = all_ids[i : i + batch_size] + sandbox_infos_list = await self._meta_store.batch_get(batch_ids) for sandbox_info in sandbox_infos_list: - if not sandbox_info: + state = sandbox_info.get("state") + if use_legacy_states and state not in (State.RUNNING, State.PENDING): continue if self._matches_query_params(sandbox_info, query_params): all_sandbox_data.append(SandboxListStatusResponse.from_sandbox_info(sandbox_info)) diff --git a/rock/sandbox/utils/__init__.py b/rock/sandbox/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/rock/sandbox/utils/timeout.py b/rock/sandbox/utils/timeout.py new file mode 100644 index 000000000..c63fc65c2 --- /dev/null +++ b/rock/sandbox/utils/timeout.py @@ -0,0 +1,51 @@ +"""Sandbox timeout helpers. + +Pure calculation utilities — no I/O. Callers are responsible for reading +timeout_info from SandboxMetaStore and writing the result back. +""" + +from __future__ import annotations + +import time + +from rock import env_vars + + +class SandboxTimeoutHelper: + """Stateless timeout calculation helpers.""" + + @staticmethod + def make_timeout_info(auto_clear_time: int) -> dict[str, str]: + """Build the initial timeout dict for a newly created sandbox. + + Parameters + ---------- + auto_clear_time: + Sandbox lifetime in **minutes**. + """ + expire_time = int(time.time()) + auto_clear_time * 60 + return { + env_vars.ROCK_SANDBOX_AUTO_CLEAR_TIME_KEY: str(auto_clear_time), + env_vars.ROCK_SANDBOX_EXPIRE_TIME_KEY: str(expire_time), + } + + @staticmethod + def refresh_timeout(timeout_info: dict[str, str]) -> dict[str, str] | None: + """Return a new timeout dict with ``expire_time`` recalculated from ``auto_clear_time``. + + Returns *None* if ``auto_clear_time`` is missing from *timeout_info*. + """ + auto_clear_time = timeout_info.get(env_vars.ROCK_SANDBOX_AUTO_CLEAR_TIME_KEY) + if auto_clear_time is None: + return None + expire_time = int(time.time()) + int(auto_clear_time) * 60 + return { + env_vars.ROCK_SANDBOX_AUTO_CLEAR_TIME_KEY: str(auto_clear_time), + env_vars.ROCK_SANDBOX_EXPIRE_TIME_KEY: str(expire_time), + } + + @staticmethod + def is_expired(timeout_info: dict[str, str]) -> bool: + """Return *True* when ``expire_time`` in *timeout_info* is in the past.""" + expire_time = int(timeout_info.get(env_vars.ROCK_SANDBOX_EXPIRE_TIME_KEY, 0)) + return int(time.time()) > expire_time diff --git a/scripts/gen_ddl.py b/scripts/gen_ddl.py new file mode 100644 index 000000000..02a701319 --- /dev/null +++ b/scripts/gen_ddl.py @@ -0,0 +1,57 @@ +"""Generate DDL SQL from SQLAlchemy ORM schema definitions. + +Usage: + uv run python scripts/gen_ddl.py # default: postgresql + uv run python scripts/gen_ddl.py --dialect sqlite + uv run python scripts/gen_ddl.py --dialect postgresql --out ddl.sql +""" + +import argparse +import sys + +from sqlalchemy.schema import CreateIndex, CreateTable + + +def get_dialect(name: str): + if name == "postgresql": + from sqlalchemy.dialects import postgresql + return postgresql.dialect() + if name == "sqlite": + from sqlalchemy.dialects import sqlite + return sqlite.dialect() + print(f"Unsupported dialect: {name}", file=sys.stderr) + sys.exit(1) + + +def gen_ddl(dialect) -> str: + # Import here so all models are registered onto Base.metadata + from rock.admin.core.schema import Base # noqa: F401 (side-effect: registers SandboxRecord) + + lines: list[str] = [] + for table in Base.metadata.sorted_tables: + lines.append(str(CreateTable(table).compile(dialect=dialect)).strip() + ";") + for index in table.indexes: + lines.append(str(CreateIndex(index).compile(dialect=dialect)).strip() + ";") + + return "\n\n".join(lines) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Generate DDL from ORM schema") + parser.add_argument("--dialect", default="postgresql", choices=["postgresql", "sqlite"]) + parser.add_argument("--out", default=None, help="Output file path (default: stdout)") + args = parser.parse_args() + + dialect = get_dialect(args.dialect) + ddl = gen_ddl(dialect) + + if args.out: + with open(args.out, "w") as f: + f.write(ddl + "\n") + print(f"Written to {args.out}") + else: + print(ddl) + + +if __name__ == "__main__": + main() diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 40e69b6b3..a54d4aa6f 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -9,6 +9,7 @@ import urllib.request from dataclasses import dataclass from pathlib import Path +from urllib.parse import urlparse import pytest import uvicorn @@ -99,6 +100,20 @@ def admin_client_fixture(): @pytest.fixture(scope="session") def admin_remote_server(): + external_base_url = os.getenv("ROCK_TEST_ADMIN_BASE_URL") + if external_base_url: + # Accept either "localhost:8080" or "http://localhost:8080". + normalized = external_base_url if "://" in external_base_url else f"http://{external_base_url}" + parsed = urlparse(normalized) + if not parsed.hostname or not parsed.port: + raise ValueError( + "Invalid ROCK_TEST_ADMIN_BASE_URL. Expected host:port or http://host:port, " + f"got: {external_base_url!r}" + ) + logger.info("Using external admin server from ROCK_TEST_ADMIN_BASE_URL=%s", external_base_url) + yield RemoteServer(port=parsed.port, endpoint=f"{parsed.scheme}://{parsed.hostname}") + return + port = run_until_complete(find_free_port()) proxy_port = run_until_complete(find_free_port()) diff --git a/tests/unit/admin/core/test_sandbox_table.py b/tests/unit/admin/core/test_sandbox_table.py index a14c4fd42..cb310f3c2 100644 --- a/tests/unit/admin/core/test_sandbox_table.py +++ b/tests/unit/admin/core/test_sandbox_table.py @@ -1,47 +1,196 @@ +"""Tests for SandboxTable — SQLite in-memory (fast) and PostgreSQL (Docker).""" + import pytest +from sqlalchemy.exc import IntegrityError from rock.admin.core.db_provider import DatabaseProvider from rock.admin.core.sandbox_table import SandboxTable -from rock.admin.core.schema import DBModelBase as Base -from rock.admin.core.schema import SandboxRecord from rock.config import DatabaseConfig -class DatabaseProviderUtil: - @staticmethod - async def reset(provider: DatabaseProvider): - async with provider.engine.begin() as conn: - await conn.run_sync(Base.metadata.drop_all) - await conn.run_sync(Base.metadata.create_all) - - -@pytest.mark.asyncio -async def _create_sandbox_table(): - db_provider = DatabaseProvider(DatabaseConfig(url="sqlite+aiosqlite:///:memory:")) - await db_provider.init() - await DatabaseProviderUtil.reset(db_provider) - return SandboxTable(db_provider.engine) - - -@pytest.mark.asyncio -async def test_sandbox_table(): - table = await _create_sandbox_table() - await table.create(SandboxRecord(id="1", experiment_id="1", image="1")) - sandbox_record = await table.get("1") - assert "1" == sandbox_record.id - assert "1" == sandbox_record.image - - -@pytest.mark.asyncio -async def test_list(): - table = await _create_sandbox_table() - await table.create(SandboxRecord(id="1", experiment_id="1", image="1")) - await table.create(SandboxRecord(id="2", experiment_id="1", image="2")) - await table.create(SandboxRecord(id="3", experiment_id="2", image="3")) - - sandbox_records = await table.list(experiment_id="1") - assert 2 == len(sandbox_records) - sandbox_records = await table.list(experiment_id="2") - assert 1 == len(sandbox_records) - sandbox_records = await table.list() - assert 3 == len(sandbox_records) +class TestSandboxTableWithSQLite: + """Unit tests for SandboxTable using an in-memory SQLite database. + + These tests cover all CRUD paths and run without any external dependencies. + """ + + @pytest.fixture + async def db(self): + provider = DatabaseProvider(db_config=DatabaseConfig(url="sqlite:///:memory:")) + await provider.init_pool() + table = SandboxTable(provider) + yield table + await provider.close_pool() + + async def test_insert_and_get(self, db): + sandbox_id = "sqlite-sbx-001" + data = { + "user_id": "user-1", + "image": "python:3.11", + "experiment_id": "exp-1", + "namespace": "default", + "cluster_name": "local", + "state": "running", + "host_ip": "127.0.0.1", + "create_time": "2025-01-01T00:00:00Z", + } + await db.create(sandbox_id, data) + record = await db.get(sandbox_id) + assert record is not None + assert record["sandbox_id"] == sandbox_id + assert record["user_id"] == "user-1" + assert record["state"] == "running" + + async def test_insert_duplicate_raises(self, db): + sandbox_id = "sqlite-sbx-002" + data = {"state": "pending", "create_time": "2025-01-01T00:00:00Z"} + await db.create(sandbox_id, data) + with pytest.raises(IntegrityError): + await db.create(sandbox_id, {**data, "state": "running"}) + + async def test_update(self, db): + sandbox_id = "sqlite-sbx-003" + await db.create(sandbox_id, {"state": "pending", "create_time": "2025-01-01T00:00:00Z"}) + await db.update(sandbox_id, {"state": "running", "host_ip": "10.0.0.2"}) + record = await db.get(sandbox_id) + assert record["state"] == "running" + assert record["host_ip"] == "10.0.0.2" + + async def test_delete(self, db): + sandbox_id = "sqlite-sbx-004" + await db.create(sandbox_id, {"create_time": "2025-01-01T00:00:00Z"}) + await db.delete(sandbox_id) + assert await db.get(sandbox_id) is None + + async def test_list_by(self, db): + await db.create("lb-s1", {"user_id": "alice", "create_time": "2025-01-01T00:00:00Z"}) + await db.create("lb-s2", {"user_id": "alice", "create_time": "2025-01-01T00:00:00Z"}) + await db.create("lb-s3", {"user_id": "bob", "create_time": "2025-01-01T00:00:00Z"}) + results = await db.list_by("user_id", "alice") + assert len(results) == 2 + + async def test_list_by_in(self, db): + await db.create("lbi-1", {"user_id": "alice", "create_time": "2025-01-01T00:00:00Z"}) + await db.create("lbi-2", {"user_id": "bob", "create_time": "2025-01-01T00:00:00Z"}) + await db.create("lbi-3", {"user_id": "carol", "create_time": "2025-01-01T00:00:00Z"}) + results = await db.list_by_in("sandbox_id", ["lbi-1", "lbi-3"]) + assert {r["sandbox_id"] for r in results} == {"lbi-1", "lbi-3"} + + async def test_list_by_rejects_blacklisted_column(self, db): + with pytest.raises(ValueError, match="not allowed"): + await db.list_by("phases", "{}") + + async def test_get_nonexistent_returns_none(self, db): + assert await db.get("does-not-exist") is None + + async def test_update_nonexistent_is_noop(self, db): + """update() on a non-existent ID should log a warning and not raise.""" + await db.update("does-not-exist", {"state": "running"}) # should not raise + + async def test_not_null_defaults_applied_on_insert(self, db): + """Insert with minimal data should fill NOT NULL columns from _NOT_NULL_DEFAULTS.""" + sandbox_id = "sqlite-sbx-defaults" + await db.create(sandbox_id, {}) + record = await db.get(sandbox_id) + assert record is not None + assert record["user_id"] == "default" + assert record["state"] == "pending" + + +@pytest.mark.need_docker +@pytest.mark.need_database +class TestSandboxTableWithPostgres: + """Integration tests for SandboxTable using a real PostgreSQL container.""" + + @pytest.fixture + async def db(self, pg_container): + """Create a SandboxTable connected to the test PostgreSQL container.""" + provider = DatabaseProvider(db_config=DatabaseConfig(url=pg_container["url"])) + await provider.init_pool() + table = SandboxTable(provider) + yield table + await provider.close_pool() + + async def test_fixture_connection(self, db): + """Verify that the fixture can connect and create tables.""" + assert db._db._engine is not None + + async def test_insert_and_get(self, db): + sandbox_id = "test-sandbox-001" + data = { + "user_id": "user-1", + "image": "python:3.11", + "experiment_id": "exp-1", + "namespace": "default", + "cluster_name": "local", + "state": "RUNNING", + "host_ip": "10.0.0.1", + "create_time": "2025-01-01T00:00:00Z", + } + + await db.create(sandbox_id, data) + record = await db.get(sandbox_id) + + assert record is not None + assert record["sandbox_id"] == sandbox_id + assert record["user_id"] == "user-1" + assert record["state"] == "RUNNING" + + async def test_insert_duplicate_raises(self, db): + sandbox_id = "test-sandbox-002" + data = { + "user_id": "user-1", + "state": "PENDING", + "create_time": "2025-01-01T00:00:00Z", + } + await db.create(sandbox_id, data) + + with pytest.raises(IntegrityError): + await db.create(sandbox_id, {**data, "state": "RUNNING"}) + + async def test_update(self, db): + sandbox_id = "test-sandbox-003" + await db.create(sandbox_id, { + "state": "PENDING", + "create_time": "2025-01-01T00:00:00Z", + }) + + await db.update(sandbox_id, {"state": "RUNNING", "host_ip": "10.0.0.2"}) + record = await db.get(sandbox_id) + assert record["state"] == "RUNNING" + assert record["host_ip"] == "10.0.0.2" + + async def test_delete(self, db): + sandbox_id = "test-sandbox-004" + await db.create(sandbox_id, {"create_time": "2025-01-01T00:00:00Z"}) + await db.delete(sandbox_id) + assert await db.get(sandbox_id) is None + + async def test_list_by(self, db): + await db.create("lb-1", {"user_id": "alice", "create_time": "2025-01-01T00:00:00Z"}) + await db.create("lb-2", {"user_id": "alice", "create_time": "2025-01-01T00:00:00Z"}) + await db.create("lb-3", {"user_id": "bob", "create_time": "2025-01-01T00:00:00Z"}) + + results = await db.list_by("user_id", "alice") + assert len(results) == 2 + + async def test_list_by_rejects_blacklisted_column(self, db): + with pytest.raises(ValueError, match="not allowed"): + await db.list_by("phases", "{}") + + async def test_json_fields_postgresql(self, db): + """Verify JSONB variant works correctly on PostgreSQL.""" + sandbox_id = "json-test-001" + data = { + "create_time": "2025-01-01T00:00:00Z", + "phases": {"build": "done", "deploy": "pending"}, + "port_mapping": {"8080": 30080, "22": 30022}, + } + await db.create(sandbox_id, data) + record = await db.get(sandbox_id) + + assert record["phases"] == {"build": "done", "deploy": "pending"} + assert record["port_mapping"] == {"8080": 30080, "22": 30022} + + async def test_get_nonexistent_returns_none(self, db): + assert await db.get("does-not-exist") is None diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 750c7e54a..b1058fd68 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -9,8 +9,10 @@ from kubernetes import client from ray.util.state import list_actors +from rock.admin.core.db_provider import DatabaseProvider from rock.admin.core.ray_service import RayService -from rock.config import K8sConfig, RockConfig +from rock.admin.core.sandbox_table import SandboxTable +from rock.config import DatabaseConfig, K8sConfig, RockConfig from rock.deployments.abstract import AbstractDeployment from rock.deployments.config import DeploymentConfig, DockerDeploymentConfig from rock.logger import init_logger @@ -19,6 +21,7 @@ from rock.sandbox.operator.k8s.template_loader import K8sTemplateLoader from rock.sandbox.operator.ray import RayOperator from rock.sandbox.sandbox_manager import SandboxManager +from rock.sandbox.sandbox_meta_store import SandboxMetaStore from rock.sandbox.service.sandbox_proxy_service import SandboxProxyService from rock.utils.providers.redis_provider import RedisProvider @@ -78,13 +81,24 @@ def ray_operator(ray_service, runtime_config): return ray_operator +@pytest.fixture +async def _memory_sandbox_table(): + provider = DatabaseProvider(db_config=DatabaseConfig(url="sqlite+aiosqlite:///:memory:")) + await provider.init_pool() + table = SandboxTable(provider) + yield table + await provider.close_pool() + + @pytest.fixture async def sandbox_manager( - rock_config: RockConfig, redis_provider: RedisProvider, ray_init_shutdown, ray_service, ray_operator + rock_config: RockConfig, redis_provider: RedisProvider, ray_init_shutdown, ray_service, ray_operator, + _memory_sandbox_table: SandboxTable, ): + meta_store = SandboxMetaStore(redis_provider=redis_provider, sandbox_table=_memory_sandbox_table) sandbox_manager = SandboxManager( rock_config, - redis_provider=redis_provider, + meta_store=meta_store, ray_namespace=rock_config.ray.namespace, ray_service=ray_service, enable_runtime_auto_clear=rock_config.runtime.enable_auto_clear, @@ -94,8 +108,11 @@ async def sandbox_manager( @pytest.fixture -async def sandbox_proxy_service(rock_config: RockConfig, redis_provider: RedisProvider): - sandbox_proxy_service = SandboxProxyService(rock_config, redis_provider=redis_provider) +async def sandbox_proxy_service( + rock_config: RockConfig, redis_provider: RedisProvider, _memory_sandbox_table: SandboxTable +): + meta_store = SandboxMetaStore(redis_provider=redis_provider, sandbox_table=_memory_sandbox_table) + sandbox_proxy_service = SandboxProxyService(rock_config, meta_store=meta_store) return sandbox_proxy_service @@ -248,3 +265,140 @@ def deployment_config(): container_name="test-sandbox", template_name="default", ) + + +# --------------------------------------------------------------------------- +# Docker container fixtures - shared across all unit test subdirectories +# (lazy-import docker so non-Docker tests don't require the package) +# --------------------------------------------------------------------------- + +_PG_IMAGE = "postgres:16-alpine" +_PG_USER = "rock_test" +_PG_PASSWORD = "rock_test_pass" +_PG_DB = "rock_test_db" +_PG_PORT = 5432 +_REDIS_IMAGE = "redis/redis-stack-server:latest" +_REDIS_PORT = 6379 + + +def _docker_keep_containers() -> bool: + import os + return os.getenv("ROCK_TEST_KEEP_DOCKER_CONTAINERS", "").lower() in {"1", "true", "yes", "on"} + + +def _docker_detect_network(client) -> str | None: + import socket + import docker + hostname = socket.gethostname() + try: + current = client.containers.get(hostname) + networks = current.attrs["NetworkSettings"]["Networks"] + if "bridge" in networks: + return "bridge" + return next(iter(networks), None) + except (docker.errors.NotFound, docker.errors.APIError): + return None + + +def _docker_resolve_host_port(container, network_name: str | None, internal_port: int) -> tuple[str, int]: + container.reload() + if network_name: + host = container.attrs["NetworkSettings"]["Networks"][network_name]["IPAddress"] + return host, internal_port + host = "127.0.0.1" + port = int(container.ports[f"{internal_port}/tcp"][0]["HostPort"]) + return host, port + + +def _docker_start_container(client, image, name, network_name, internal_port, **extra_kwargs): + keep = _docker_keep_containers() + run_kwargs = {"image": image, "name": name, "detach": True, "remove": not keep, **extra_kwargs} + if network_name: + run_kwargs["network"] = network_name + else: + run_kwargs["ports"] = {f"{internal_port}/tcp": None} + return client.containers.run(**run_kwargs) + + +@pytest.fixture(scope="session") +def pg_container(): + """Start a PostgreSQL 16 Docker container for the test session.""" + import uuid + import docker + + client = docker.from_env() + container_name = f"rock-test-pg-{uuid.uuid4().hex[:8]}" + network_name = _docker_detect_network(client) + container = _docker_start_container( + client, + image=_PG_IMAGE, + name=container_name, + network_name=network_name, + internal_port=_PG_PORT, + environment={ + "POSTGRES_USER": _PG_USER, + "POSTGRES_PASSWORD": _PG_PASSWORD, + "POSTGRES_DB": _PG_DB, + }, + ) + try: + # wait for readiness + import time as _t + deadline = _t.time() + 30 + while _t.time() < deadline: + code, _ = container.exec_run(f"pg_isready -U {_PG_USER}") + if code == 0: + break + _t.sleep(0.5) + else: + raise TimeoutError("PostgreSQL container did not become ready within 30s") + + host, port = _docker_resolve_host_port(container, network_name, _PG_PORT) + yield { + "host": host, "port": port, + "user": _PG_USER, "password": _PG_PASSWORD, "database": _PG_DB, + "url": f"postgresql://{_PG_USER}:{_PG_PASSWORD}@{host}:{port}/{_PG_DB}", + } + finally: + if not _docker_keep_containers(): + try: + container.stop(timeout=5) + except Exception: + pass + + +@pytest.fixture(scope="session") +def redis_container(): + """Start a Redis Stack Docker container (with RedisJSON) for the test session.""" + import uuid + import docker + + client = docker.from_env() + container_name = f"rock-test-redis-{uuid.uuid4().hex[:8]}" + network_name = _docker_detect_network(client) + container = _docker_start_container( + client, + image=_REDIS_IMAGE, + name=container_name, + network_name=network_name, + internal_port=_REDIS_PORT, + ) + try: + import time as _t + deadline = _t.time() + 30 + while _t.time() < deadline: + code, output = container.exec_run("redis-cli ping") + if code == 0 and b"PONG" in output: + break + _t.sleep(0.5) + else: + raise TimeoutError("Redis container did not become ready within 30s") + + host, port = _docker_resolve_host_port(container, network_name, _REDIS_PORT) + yield {"host": host, "port": port, "password": "", "url": f"redis://{host}:{port}"} + finally: + if not _docker_keep_containers(): + try: + container.stop(timeout=5) + except Exception: + pass diff --git a/tests/unit/sandbox/test_proxy_enhancements.py b/tests/unit/sandbox/test_proxy_enhancements.py index 9f3c3ece4..a6b64e37d 100644 --- a/tests/unit/sandbox/test_proxy_enhancements.py +++ b/tests/unit/sandbox/test_proxy_enhancements.py @@ -1,16 +1,27 @@ """Tests for proxy enhancements: 1. WebSocket proxy supports user-specified port 2. HTTP proxy supports all HTTP methods +3. batch_get_sandbox_status legacy-states filtering """ from unittest.mock import AsyncMock, MagicMock, patch import pytest +from fakeredis import aioredis from fastapi import FastAPI from httpx import ASGITransport, AsyncClient from starlette.datastructures import Headers, MutableHeaders from starlette.responses import JSONResponse +from rock.actions.sandbox.response import State +from rock.admin.core.db_provider import DatabaseProvider +from rock.admin.core.sandbox_table import SandboxTable +from rock.config import DatabaseConfig, RockConfig +from rock.sandbox.sandbox_meta_store import SandboxMetaStore +from rock.sandbox.service.sandbox_proxy_service import SandboxProxyService +from rock.utils.providers.redis_provider import RedisProvider + +from rock.admin.proto.response import SandboxListResponse from rock.admin.entrypoints.sandbox_proxy_api import ( sandbox_proxy_router, set_sandbox_proxy_service, @@ -1023,3 +1034,176 @@ async def test_vnc_ws_route_ignores_query_param_port(self, app): call = svc.websocket_proxy.call_args port = call.kwargs.get("port") or (call.args[3] if len(call.args) > 3 else None) assert port == 8006 + + +# ───────────────────────────────────────────────────────────────────────────── +# batch_get_sandbox_status — legacy-states filtering +# ───────────────────────────────────────────────────────────────────────────── + +_BASE_INFO = { + "sandbox_id": None, + "user_id": "u1", + "image": "python:3.11", + "experiment_id": "exp-1", + "namespace": "default", + "cluster_name": "c1", + "host_ip": "10.0.0.1", + "create_time": "2025-01-01T00:00:00Z", +} + + +@pytest.fixture +async def _redis(): + provider = RedisProvider(host=None, port=None, password="") + provider.client = aioredis.FakeRedis(decode_responses=True) + yield provider + await provider.close_pool() + + +@pytest.fixture +async def _db(): + provider = DatabaseProvider(db_config=DatabaseConfig(url="sqlite+aiosqlite:///:memory:")) + await provider.init_pool() + table = SandboxTable(provider) + yield table + await provider.close_pool() + + +@pytest.fixture +def _svc(_redis, _db, rock_config): + meta_store = SandboxMetaStore(redis_provider=_redis, sandbox_table=_db) + return SandboxProxyService(rock_config, meta_store=meta_store), meta_store + + +@pytest.fixture +def rock_config(): + return RockConfig() + + +async def _seed(meta_store: SandboxMetaStore, sandbox_id: str, state: str) -> None: + info = {**_BASE_INFO, "sandbox_id": sandbox_id, "state": state} + await meta_store.create(sandbox_id, info) + + +class TestBatchGetLegacyStates: + """batch_get_sandbox_status with use_legacy_states=True (default).""" + + async def test_running_sandbox_included(self, _svc): + svc, meta_store = _svc + await _seed(meta_store, "sb-running", State.RUNNING) + result = await svc.batch_get_sandbox_status(["sb-running"]) + assert len(result) == 1 + assert result[0].sandbox_id == "sb-running" + + async def test_pending_sandbox_included(self, _svc): + svc, meta_store = _svc + await _seed(meta_store, "sb-pending", State.PENDING) + result = await svc.batch_get_sandbox_status(["sb-pending"]) + assert len(result) == 1 + assert result[0].sandbox_id == "sb-pending" + + async def test_stopped_sandbox_excluded(self, _svc): + """stopped sandbox must be filtered out when use_legacy_states=True.""" + svc, meta_store = _svc + await _seed(meta_store, "sb-stopped", State.STOPPED) + result = await svc.batch_get_sandbox_status(["sb-stopped"]) + assert result == [] + + async def test_mixed_states_only_active_returned(self, _svc): + """Only running/pending survive; stopped is silently dropped.""" + svc, meta_store = _svc + await _seed(meta_store, "sb-r", State.RUNNING) + await _seed(meta_store, "sb-p", State.PENDING) + await _seed(meta_store, "sb-s", State.STOPPED) + result = await svc.batch_get_sandbox_status(["sb-r", "sb-p", "sb-s"]) + ids = {r.sandbox_id for r in result} + assert ids == {"sb-r", "sb-p"} + + async def test_unknown_id_omitted(self, _svc): + """sandbox_id not in DB → entry absent from result (not None/empty status).""" + svc, meta_store = _svc + await _seed(meta_store, "sb-exists", State.RUNNING) + result = await svc.batch_get_sandbox_status(["sb-exists", "sb-ghost"]) + assert len(result) == 1 + assert result[0].sandbox_id == "sb-exists" + + async def test_stopped_included_when_legacy_disabled(self, _svc): + """use_legacy_states=False → stopped sandbox is returned.""" + svc, meta_store = _svc + await _seed(meta_store, "sb-stopped2", State.STOPPED) + result = await svc.batch_get_sandbox_status(["sb-stopped2"], use_legacy_states=False) + assert len(result) == 1 + assert result[0].sandbox_id == "sb-stopped2" + + +# ───────────────────────────────────────────────────────────────────────────── +# GET /sandboxes — use_legacy_states query param +# ───────────────────────────────────────────────────────────────────────────── + + +class TestListSandboxesLegacyStates: + """GET /sandboxes should forward use_legacy_states to the service.""" + + async def test_default_uses_legacy_states(self, app): + a, svc = app + svc.list_sandboxes = AsyncMock(return_value=SandboxListResponse()) + async with AsyncClient(transport=ASGITransport(app=a), base_url="http://test") as client: + await client.get("/sandboxes") + _, kwargs = svc.list_sandboxes.call_args + assert "use_legacy_states" not in kwargs + + async def test_false_string_disables_legacy_states(self, app): + a, svc = app + svc.list_sandboxes = AsyncMock(return_value=SandboxListResponse()) + async with AsyncClient(transport=ASGITransport(app=a), base_url="http://test") as client: + await client.get("/sandboxes?use_legacy_states=false") + _, kwargs = svc.list_sandboxes.call_args + assert kwargs["use_legacy_states"] is False + + async def test_true_string_keeps_legacy_states(self, app): + a, svc = app + svc.list_sandboxes = AsyncMock(return_value=SandboxListResponse()) + async with AsyncClient(transport=ASGITransport(app=a), base_url="http://test") as client: + await client.get("/sandboxes?use_legacy_states=true") + _, kwargs = svc.list_sandboxes.call_args + assert kwargs["use_legacy_states"] is True + + async def test_use_legacy_states_not_passed_as_filter(self, app): + """use_legacy_states must be popped from query_params, not forwarded as a filter.""" + a, svc = app + svc.list_sandboxes = AsyncMock(return_value=SandboxListResponse()) + async with AsyncClient(transport=ASGITransport(app=a), base_url="http://test") as client: + await client.get("/sandboxes?use_legacy_states=false&user_id=u1") + query_params, _ = svc.list_sandboxes.call_args + assert "use_legacy_states" not in query_params[0] + assert query_params[0].get("user_id") == "u1" + + +# ───────────────────────────────────────────────────────────────────────────── +# POST /sandboxes/batch — use_legacy_states query param +# ───────────────────────────────────────────────────────────────────────────── + + +class TestBatchGetStatusLegacyStates: + """POST /sandboxes/batch should forward use_legacy_states to the service.""" + + async def test_default_uses_legacy_states(self, app): + a, svc = app + svc.batch_get_sandbox_status = AsyncMock(return_value=[]) + async with AsyncClient(transport=ASGITransport(app=a), base_url="http://test") as client: + await client.post("/sandboxes/batch", json={"sandbox_ids": ["sbx-1"]}) + svc.batch_get_sandbox_status.assert_called_once_with(["sbx-1"], True) + + async def test_false_disables_legacy_states(self, app): + a, svc = app + svc.batch_get_sandbox_status = AsyncMock(return_value=[]) + async with AsyncClient(transport=ASGITransport(app=a), base_url="http://test") as client: + await client.post("/sandboxes/batch?use_legacy_states=false", json={"sandbox_ids": ["sbx-1"]}) + svc.batch_get_sandbox_status.assert_called_once_with(["sbx-1"], False) + + async def test_true_keeps_legacy_states(self, app): + a, svc = app + svc.batch_get_sandbox_status = AsyncMock(return_value=[]) + async with AsyncClient(transport=ASGITransport(app=a), base_url="http://test") as client: + await client.post("/sandboxes/batch?use_legacy_states=true", json={"sandbox_ids": ["sbx-1"]}) + svc.batch_get_sandbox_status.assert_called_once_with(["sbx-1"], True) diff --git a/tests/unit/sandbox/test_sandbox_meta_store.py b/tests/unit/sandbox/test_sandbox_meta_store.py new file mode 100644 index 000000000..24ffe5c28 --- /dev/null +++ b/tests/unit/sandbox/test_sandbox_meta_store.py @@ -0,0 +1,487 @@ +"""Tests for SandboxMetaStore - Redis + DB dual-write coordinator.""" + +import asyncio +import time +import uuid + +import pytest +from fakeredis import aioredis + +from rock.actions.sandbox.response import State +from rock.admin.core.db_provider import DatabaseProvider +from rock.admin.core.redis_key import ALIVE_PREFIX, alive_sandbox_key, timeout_sandbox_key +from rock.admin.core.sandbox_table import SandboxTable +from rock.config import DatabaseConfig +from rock.sandbox.sandbox_meta_store import SandboxMetaStore +from rock.utils.providers.redis_provider import RedisProvider + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +async def redis(): + provider = RedisProvider(host=None, port=None, password="") + provider.client = aioredis.FakeRedis(decode_responses=True) + yield provider + await provider.close_pool() + + +@pytest.fixture +async def db(tmp_path): + provider = DatabaseProvider(db_config=DatabaseConfig(url=f"sqlite:///{tmp_path / 'test.db'}")) + await provider.init_pool() + table = SandboxTable(provider) + yield table + await provider.close_pool() + + +@pytest.fixture +async def db_memory(): + provider = DatabaseProvider(db_config=DatabaseConfig(url="sqlite:///:memory:")) + await provider.init_pool() + table = SandboxTable(provider) + yield table + await provider.close_pool() + + +@pytest.fixture +def repo(redis, db): + return SandboxMetaStore(redis_provider=redis, sandbox_table=db) + + + +@pytest.fixture +def repo_with_memory_db(redis, db_memory): + return SandboxMetaStore(redis_provider=redis, sandbox_table=db_memory) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +SANDBOX_ID = "sbx-test-001" + +SANDBOX_INFO = { + "sandbox_id": SANDBOX_ID, + "user_id": "user-1", + "image": "python:3.11", + "experiment_id": "exp-1", + "namespace": "default", + "cluster_name": "cluster-1", + "state": State.RUNNING, + "host_ip": "10.0.0.1", + "create_time": "2025-01-01T00:00:00Z", +} + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestSave: + async def test_save_writes_redis_and_db(self, repo, redis, db): + """save() should persist to Redis alive key AND fire a DB upsert.""" + await repo.create(SANDBOX_ID, SANDBOX_INFO) + + # Give fire-and-forget task time to complete + await asyncio.sleep(0.1) + + # Verify Redis + result = await redis.json_get(alive_sandbox_key(SANDBOX_ID), "$") + assert result is not None + assert result[0]["sandbox_id"] == SANDBOX_ID + assert result[0]["user_id"] == "user-1" + + # Verify DB + db_record = await db.get(SANDBOX_ID) + assert db_record is not None + assert db_record["user_id"] == "user-1" + + async def test_save_with_timeout_info(self, repo, redis): + """save() with timeout_info should also write the timeout key.""" + timeout = {"auto_clear_time": "30", "expire_time": "9999999999"} + await repo.create(SANDBOX_ID, SANDBOX_INFO, timeout_info=timeout) + + result = await redis.json_get(timeout_sandbox_key(SANDBOX_ID), "$") + assert result is not None + assert result[0]["auto_clear_time"] == "30" + + +class TestUpdate: + async def test_update_writes_redis_and_db(self, repo, redis, db): + """update() should merge new fields into Redis and fire DB update.""" + await repo.create(SANDBOX_ID, SANDBOX_INFO) + await asyncio.sleep(0.1) + + update_data = {"state": "stopped", "stop_time": "2025-01-01T01:00:00Z"} + await repo.update(SANDBOX_ID, update_data) + await asyncio.sleep(0.1) + + # Verify Redis - should have merged (old fields + new fields) + result = await redis.json_get(alive_sandbox_key(SANDBOX_ID), "$") + assert result is not None + info = result[0] + assert info["state"] == "stopped" + assert info["stop_time"] == "2025-01-01T01:00:00Z" + # Original fields should still be present + assert info["user_id"] == "user-1" + assert info["image"] == "python:3.11" + + # Verify DB + db_record = await db.get(SANDBOX_ID) + assert db_record is not None + assert db_record["state"] == "stopped" + + +class TestRemove: + async def test_remove_deletes_redis_and_db(self, repo, redis, db): + """remove() should delete from both Redis alive+timeout keys and DB.""" + # Setup: save sandbox and a timeout key + await repo.create(SANDBOX_ID, SANDBOX_INFO) + timeout_data = {"auto_clear_time": "30", "expire_time": str(int(time.time()) + 1800)} + await redis.json_set(timeout_sandbox_key(SANDBOX_ID), "$", timeout_data) + await asyncio.sleep(0.1) + + # Act + await repo.delete(SANDBOX_ID) + await asyncio.sleep(0.1) + + # Verify Redis - both keys gone + alive_result = await redis.json_get(alive_sandbox_key(SANDBOX_ID), "$") + assert alive_result is None + timeout_result = await redis.json_get(timeout_sandbox_key(SANDBOX_ID), "$") + assert timeout_result is None + + # Verify DB + db_record = await db.get(SANDBOX_ID) + assert db_record is None + + +class TestArchive: + async def test_archive_removes_redis_and_updates_db(self, repo, redis, db): + """archive() should update DB first, then remove Redis keys.""" + await repo.create(SANDBOX_ID, SANDBOX_INFO) + await redis.json_set(timeout_sandbox_key(SANDBOX_ID), "$", {"auto_clear_time": "30", "expire_time": "9999"}) + await asyncio.sleep(0.1) # let the create fire-and-forget DB insert settle + + final_info: dict = {"state": "stopped", "stop_time": "2025-06-01T00:00:00Z"} + await repo.archive(SANDBOX_ID, final_info) + # No extra sleep needed: archive() awaits the DB write before returning. + + # Redis: both keys gone + assert await redis.json_get(alive_sandbox_key(SANDBOX_ID), "$") is None + assert await redis.json_get(timeout_sandbox_key(SANDBOX_ID), "$") is None + + # DB: record still present with updated fields + db_record = await db.get(SANDBOX_ID) + assert db_record is not None + assert db_record["state"] == "stopped" + assert db_record["stop_time"] == "2025-06-01T00:00:00Z" + assert db_record["user_id"] == "user-1" # original fields preserved + + async def test_archive_db_written_before_redis_deleted(self, repo, redis, db): + """DB must be durably updated before the Redis alive key is removed.""" + await repo.create(SANDBOX_ID, SANDBOX_INFO) + await asyncio.sleep(0.1) + + # Intercept: check DB state immediately after archive returns (no extra sleep). + await repo.archive(SANDBOX_ID, {"state": "stopped"}) + + # At this point archive() has already awaited the DB write and deleted Redis. + assert await redis.json_get(alive_sandbox_key(SANDBOX_ID), "$") is None + db_record = await db.get(SANDBOX_ID) + assert db_record is not None + assert db_record["state"] == "stopped" + + +class TestGet: + async def test_get_reads_from_redis(self, repo, redis): + """get() should read from Redis alive key.""" + await repo.create(SANDBOX_ID, SANDBOX_INFO) + + result = await repo.get(SANDBOX_ID) + assert result is not None + assert result["sandbox_id"] == SANDBOX_ID + assert result["user_id"] == "user-1" + + async def test_get_nonexistent_returns_none(self, repo): + """get() on a non-existent sandbox should return None.""" + result = await repo.get("does-not-exist") + assert result is None + + +class TestExists: + async def test_exists_returns_true_when_present(self, repo, redis): + """exists() should return True when the sandbox alive key exists.""" + await repo.create(SANDBOX_ID, SANDBOX_INFO) + + assert await repo.exists(SANDBOX_ID) is True + + async def test_exists_returns_false_when_absent(self, repo): + """exists() should return False for a non-existent sandbox.""" + assert await repo.exists("does-not-exist") is False + + +class TestGetTimeout: + async def test_get_timeout_returns_timeout_info(self, repo, redis): + """get_timeout() should return the timeout dict from Redis.""" + timeout_data = {"auto_clear_time": "30", "expire_time": "9999999999"} + await redis.json_set(timeout_sandbox_key(SANDBOX_ID), "$", timeout_data) + + result = await repo.get_timeout(SANDBOX_ID) + assert result is not None + assert result["auto_clear_time"] == "30" + assert result["expire_time"] == "9999999999" + + async def test_get_timeout_returns_none_when_absent(self, repo): + """get_timeout() should return None when the timeout key does not exist.""" + result = await repo.get_timeout("does-not-exist") + assert result is None + + +class TestIterAliveSandboxIds: + async def test_iter_alive_sandbox_ids_yields_running_and_pending(self, repo): + """iter_alive_sandbox_ids() should yield IDs for both RUNNING and PENDING sandboxes.""" + await repo.create("sbx-running", {**SANDBOX_INFO, "sandbox_id": "sbx-running", "state": State.RUNNING}) + await repo.create("sbx-pending", {**SANDBOX_INFO, "sandbox_id": "sbx-pending", "state": State.PENDING}) + await asyncio.sleep(0.1) # let fire-and-forget DB writes settle + + ids = {sid async for sid in repo.iter_alive_sandbox_ids()} + assert "sbx-running" in ids + assert "sbx-pending" in ids + + async def test_iter_alive_sandbox_ids_excludes_stopped(self, repo): + """iter_alive_sandbox_ids() should not yield sandboxes with terminal state.""" + await repo.create("sbx-running", {**SANDBOX_INFO, "sandbox_id": "sbx-running"}) + await repo.create("sbx-stopped", {**SANDBOX_INFO, "sandbox_id": "sbx-stopped", "state": "stopped"}) + await asyncio.sleep(0.1) + + ids = [sid async for sid in repo.iter_alive_sandbox_ids()] + assert "sbx-running" in ids + assert "sbx-stopped" not in ids + + async def test_iter_alive_sandbox_ids_works_with_sqlite_memory(self, repo_with_memory_db): + """iter_alive_sandbox_ids() should work with sqlite in-memory DB + Redis fallback.""" + await repo_with_memory_db.create("sbx-running", {**SANDBOX_INFO, "sandbox_id": "sbx-running", "state": State.RUNNING}) + await repo_with_memory_db.create("sbx-pending", {**SANDBOX_INFO, "sandbox_id": "sbx-pending", "state": State.PENDING}) + await repo_with_memory_db.create("sbx-stopped", {**SANDBOX_INFO, "sandbox_id": "sbx-stopped", "state": "stopped"}) + await asyncio.sleep(0.1) + + ids = {sid async for sid in repo_with_memory_db.iter_alive_sandbox_ids()} + assert "sbx-running" in ids + assert "sbx-pending" in ids + assert "sbx-stopped" not in ids + + async def test_iter_alive_sandbox_ids_consistent_with_redis_scan(self, repo, redis): + """DB list_by_in(state IN active_states) should be consistent with Redis alive-key scan. + + Both approaches must agree: every active sandbox (PENDING or RUNNING) found in DB + must also have a Redis alive key. The inverse may not hold for sandboxes whose state + was updated to a terminal value without calling archive()/remove(). + """ + await repo.create("sbx-a", {**SANDBOX_INFO, "sandbox_id": "sbx-a", "state": State.RUNNING}) + await repo.create("sbx-b", {**SANDBOX_INFO, "sandbox_id": "sbx-b", "state": State.PENDING}) + await asyncio.sleep(0.1) + + # new approach: DB-backed iter_alive_sandbox_ids (PENDING + RUNNING) + db_ids = {sid async for sid in repo.iter_alive_sandbox_ids()} + + # old approach: Redis scan_iter on alive: prefix + redis_ids = set() + async for key in redis.client.scan_iter(match=f"{ALIVE_PREFIX}*", count=100): + if isinstance(key, str) and key.startswith(ALIVE_PREFIX): + redis_ids.add(key.removeprefix(ALIVE_PREFIX)) + + assert db_ids == redis_ids + + +class TestBatchGet: + async def test_batch_get_returns_db_results(self, repo): + """batch_get() returns sandbox info from the DB.""" + await repo.create(SANDBOX_ID, SANDBOX_INFO) + + results = await repo.batch_get([SANDBOX_ID]) + assert len(results) == 1 + assert results[0]["sandbox_id"] == SANDBOX_ID + + async def test_batch_get_omits_unknown_id(self, repo): + """batch_get() omits IDs not found in DB.""" + results = await repo.batch_get(["does-not-exist"]) + assert results == [] + + async def test_batch_get_multiple_ids(self, repo): + """batch_get() returns only found sandboxes; missing IDs are omitted.""" + await repo.create("sbx-a", {**SANDBOX_INFO, "sandbox_id": "sbx-a"}) + await repo.create("sbx-b", {**SANDBOX_INFO, "sandbox_id": "sbx-b"}) + + results = await repo.batch_get(["sbx-a", "sbx-b", "sbx-missing"]) + assert len(results) == 2 + sandbox_ids = {r["sandbox_id"] for r in results} + assert sandbox_ids == {"sbx-a", "sbx-b"} + + async def test_batch_get_empty_list(self, repo): + """batch_get([]) should return [].""" + assert await repo.batch_get([]) == [] + + +class TestListBy: + async def test_list_by_queries_db(self, repo, db): + """list_by() should query the DB by a given field.""" + info_a = {**SANDBOX_INFO, "sandbox_id": "sbx-a", "user_id": "user-1"} + info_b = {**SANDBOX_INFO, "sandbox_id": "sbx-b", "user_id": "user-1"} + info_c = {**SANDBOX_INFO, "sandbox_id": "sbx-c", "user_id": "user-2"} + + await repo.create("sbx-a", info_a) + await repo.create("sbx-b", info_b) + await repo.create("sbx-c", info_c) + await asyncio.sleep(0.2) + + results = await repo.list_by("user_id", "user-1") + assert len(results) == 2 + sandbox_ids = {r["sandbox_id"] for r in results} + assert sandbox_ids == {"sbx-a", "sbx-b"} + + async def test_list_by_raises_for_non_allowlisted_field(self, repo): + """list_by() should raise ValueError for fields not in the DB allowlist.""" + with pytest.raises(ValueError): + await repo.list_by("create_time", "t-1") + + +class TestUpdateTimeout: + async def test_update_timeout_writes_redis(self, repo, redis): + """update_timeout() should overwrite the timeout key in Redis.""" + new_info = {"auto_clear_time": "60", "expire_time": "9999999999"} + await repo.update_timeout(SANDBOX_ID, new_info) + + result = await redis.json_get(timeout_sandbox_key(SANDBOX_ID), "$") + assert result is not None + assert result[0]["auto_clear_time"] == "60" + assert result[0]["expire_time"] == "9999999999" + + +# --------------------------------------------------------------------------- +# Docker-backed fixtures (real Redis Stack + real PostgreSQL) +# --------------------------------------------------------------------------- + + +@pytest.fixture +async def real_redis(redis_container): + provider = RedisProvider( + host=redis_container["host"], + port=redis_container["port"], + password=redis_container["password"], + ) + await provider.init_pool() + yield provider + await provider.close_pool() + + +@pytest.fixture +async def real_db(pg_container): + provider = DatabaseProvider(db_config=DatabaseConfig(url=pg_container["url"])) + await provider.init_pool() + table = SandboxTable(provider) + yield table + await provider.close_pool() + + +@pytest.fixture +def docker_repo(real_redis, real_db): + return SandboxMetaStore(redis_provider=real_redis, sandbox_table=real_db) + + +# --------------------------------------------------------------------------- +# Docker-backed tests +# --------------------------------------------------------------------------- + + +@pytest.mark.need_docker +@pytest.mark.need_database +class TestSandboxMetaStoreWithDocker: + """SandboxMetaStore verified against real Redis Stack + PostgreSQL. + + Uses unique sandbox IDs per test to avoid cross-test pollution across + the shared session-scoped containers. + """ + + async def test_save_writes_redis_and_db(self, docker_repo, real_redis, real_db): + """save() persists to real Redis and fires a real DB insert.""" + sid = f"docker-{uuid.uuid4().hex[:8]}" + info = {**SANDBOX_INFO, "sandbox_id": sid, "user_id": "docker-user"} + + await docker_repo.create(sid, info) + await asyncio.sleep(0.15) + + result = await real_redis.json_get(alive_sandbox_key(sid), "$") + assert result is not None + assert result[0]["sandbox_id"] == sid + + db_record = await real_db.get(sid) + assert db_record is not None + assert db_record["user_id"] == "docker-user" + + async def test_update_writes_redis_and_db(self, docker_repo, real_redis, real_db): + """update() merges into Redis and fires a real DB update.""" + sid = f"docker-{uuid.uuid4().hex[:8]}" + await docker_repo.create(sid, {**SANDBOX_INFO, "sandbox_id": sid}) + await asyncio.sleep(0.15) + + await docker_repo.update(sid, {"state": "stopped"}) + await asyncio.sleep(0.15) + + redis_result = await real_redis.json_get(alive_sandbox_key(sid), "$") + assert redis_result[0]["state"] == "stopped" + assert redis_result[0]["user_id"] == "user-1" # old fields still present + + db_record = await real_db.get(sid) + assert db_record["state"] == "stopped" + + async def test_remove_deletes_redis_and_db(self, docker_repo, real_redis, real_db): + """remove() deletes Redis alive+timeout keys and the DB row.""" + sid = f"docker-{uuid.uuid4().hex[:8]}" + timeout = {"auto_clear_time": "30", "expire_time": "9999999999"} + await docker_repo.create(sid, {**SANDBOX_INFO, "sandbox_id": sid}, timeout_info=timeout) + await asyncio.sleep(0.15) + + await docker_repo.delete(sid) + await asyncio.sleep(0.15) + + assert await real_redis.json_get(alive_sandbox_key(sid), "$") is None + assert await real_db.get(sid) is None + + async def test_list_by_queries_db(self, docker_repo, real_db): + """list_by() returns DB rows matching the given field value.""" + uid = f"docker-user-{uuid.uuid4().hex[:8]}" + for _ in range(3): + sid = f"docker-{uuid.uuid4().hex[:8]}" + await docker_repo.create(sid, {**SANDBOX_INFO, "sandbox_id": sid, "user_id": uid}) + await asyncio.sleep(0.2) + + results = await docker_repo.list_by("user_id", uid) + assert len(results) == 3 + assert all(r["user_id"] == uid for r in results) + + async def test_iter_alive_sandbox_ids(self, docker_repo, real_redis): + """iter_alive_sandbox_ids() yields RUNNING sandbox IDs from DB, consistent with Redis alive keys.""" + sids = [f"docker-{uuid.uuid4().hex[:8]}" for _ in range(3)] + for sid in sids: + await docker_repo.create(sid, {**SANDBOX_INFO, "sandbox_id": sid}) + await asyncio.sleep(0.2) # let fire-and-forget DB writes settle + + # new approach: DB-backed + db_found = [s async for s in docker_repo.iter_alive_sandbox_ids()] + assert set(sids).issubset(set(db_found)) + + # consistency: every RUNNING sandbox in DB must have a Redis alive key + redis_ids = set() + async for key in real_redis.client.scan_iter(match=f"{ALIVE_PREFIX}*", count=100): + if isinstance(key, str) and key.startswith(ALIVE_PREFIX): + redis_ids.add(key.removeprefix(ALIVE_PREFIX)) + assert set(sids).issubset(redis_ids) + # Only assert consistency for the sandboxes created in this test; + # db_found may include leftover sandboxes from other tests whose + # alive keys have already been cleaned up. + assert set(sids).issubset(redis_ids) diff --git a/tests/unit/sandbox/test_sandbox_proxy.py b/tests/unit/sandbox/test_sandbox_proxy.py index 55603c577..f98802c6b 100644 --- a/tests/unit/sandbox/test_sandbox_proxy.py +++ b/tests/unit/sandbox/test_sandbox_proxy.py @@ -20,7 +20,7 @@ async def test_batch_get_sandbox_status(sandbox_manager: SandboxManager, sandbox sandbox_ids.append(response.sandbox_id) await check_sandbox_status_until_alive(sandbox_manager, response.sandbox_id) # batch get status - batch_response = await sandbox_proxy_service.batch_get_sandbox_status_from_redis(sandbox_ids) + batch_response = await sandbox_proxy_service.batch_get_sandbox_status(sandbox_ids) assert len(batch_response) == sandbox_count response_sandbox_ids = [status.sandbox_id for status in batch_response] @@ -33,7 +33,7 @@ async def test_batch_get_sandbox_status(sandbox_manager: SandboxManager, sandbox assert status.state == State.RUNNING invalid_ids = sandbox_ids + ["invalid_sandbox_id_1", "invalid_sandbox_id_2"] - batch_response_with_invalid = await sandbox_proxy_service.batch_get_sandbox_status_from_redis(invalid_ids) + batch_response_with_invalid = await sandbox_proxy_service.batch_get_sandbox_status(invalid_ids) assert len(batch_response_with_invalid) == len(sandbox_ids) for sandbox_id in sandbox_ids: await sandbox_manager.stop(sandbox_id) diff --git a/tests/unit/sandbox/test_sandbox_timeout.py b/tests/unit/sandbox/test_sandbox_timeout.py new file mode 100644 index 000000000..1118c25ad --- /dev/null +++ b/tests/unit/sandbox/test_sandbox_timeout.py @@ -0,0 +1,55 @@ +"""Tests for SandboxTimeoutHelper — pure calculation, no I/O.""" + +import time + +from rock.sandbox.utils.timeout import SandboxTimeoutHelper + + +class TestMakeTimeoutInfo: + def test_contains_correct_keys(self): + info = SandboxTimeoutHelper.make_timeout_info(30) + assert "auto_clear_time" in info + assert "expire_time" in info + + def test_expire_time_is_now_plus_duration(self): + before = int(time.time()) + info = SandboxTimeoutHelper.make_timeout_info(30) + after = int(time.time()) + expire = int(info["expire_time"]) + assert before + 30 * 60 <= expire <= after + 30 * 60 + + def test_auto_clear_time_stored_as_string(self): + info = SandboxTimeoutHelper.make_timeout_info(45) + assert info["auto_clear_time"] == "45" + + +class TestRefreshTimeout: + def test_recalculates_expire_time(self): + old_expire = int(time.time()) - 100 + timeout_info = {"auto_clear_time": "30", "expire_time": str(old_expire)} + result = SandboxTimeoutHelper.refresh_timeout(timeout_info) + assert result is not None + assert int(result["expire_time"]) >= int(time.time()) + 30 * 60 - 5 + + def test_preserves_auto_clear_time(self): + timeout_info = {"auto_clear_time": "60", "expire_time": "0"} + result = SandboxTimeoutHelper.refresh_timeout(timeout_info) + assert result["auto_clear_time"] == "60" + + def test_returns_none_when_auto_clear_time_missing(self): + result = SandboxTimeoutHelper.refresh_timeout({"expire_time": "0"}) + assert result is None + + +class TestIsExpired: + def test_returns_true_when_past(self): + timeout_info = {"auto_clear_time": "30", "expire_time": str(int(time.time()) - 100)} + assert SandboxTimeoutHelper.is_expired(timeout_info) is True + + def test_returns_false_when_future(self): + timeout_info = {"auto_clear_time": "30", "expire_time": str(int(time.time()) + 3600)} + assert SandboxTimeoutHelper.is_expired(timeout_info) is False + + def test_returns_true_when_expire_time_missing(self): + # Missing key → defaults to 0, always in the past + assert SandboxTimeoutHelper.is_expired({}) is True diff --git a/tests/unit/utils/test_redis_provider_docker.py b/tests/unit/utils/test_redis_provider_docker.py new file mode 100644 index 000000000..2e63d3d3d --- /dev/null +++ b/tests/unit/utils/test_redis_provider_docker.py @@ -0,0 +1,90 @@ +"""Tests for RedisProvider against a real Redis Stack container (with RedisJSON).""" + +import pytest + +from rock.utils.providers.redis_provider import RedisProvider + + +@pytest.mark.need_docker +@pytest.mark.need_database +class TestRedisProviderWithDocker: + """Integration tests for RedisProvider using a real Redis Stack container.""" + + @pytest.fixture + async def redis(self, redis_container): + """Create a RedisProvider connected to the test Redis container.""" + provider = RedisProvider( + host=redis_container["host"], + port=redis_container["port"], + password=redis_container["password"], + ) + await provider.init_pool() + yield provider + await provider.close_pool() + + async def test_fixture_connection(self, redis): + """Verify the fixture can connect and ping Redis.""" + assert redis.client is not None + pong = await redis.client.ping() + assert pong is True + + async def test_json_set_and_get(self, redis): + key = "test:sandbox:001" + data = {"sandbox_id": "sb-001", "state": "RUNNING", "user_id": "alice"} + + await redis.json_set(key, "$", data) + result = await redis.json_get(key, "$") + + assert result is not None + assert result[0]["sandbox_id"] == "sb-001" + assert result[0]["state"] == "RUNNING" + + async def test_json_get_subpath(self, redis): + key = "test:sandbox:002" + data = {"sandbox_id": "sb-002", "spec": {"cpus": 4, "memory": "8Gi"}} + + await redis.json_set(key, "$", data) + result = await redis.json_get(key, "$.spec.cpus") + + assert result == 4 + + async def test_json_set_with_ttl(self, redis): + key = "test:sandbox:ttl" + data = {"sandbox_id": "sb-ttl"} + + await redis.json_set_with_ttl(key, "$", data, ttl_seconds=300) + + ttl = await redis.get_ttl(key) + assert ttl is not None + assert ttl > 0 + assert ttl <= 300 + + async def test_json_delete(self, redis): + key = "test:sandbox:del" + await redis.json_set(key, "$", {"sandbox_id": "sb-del"}) + + deleted = await redis.json_delete(key) + assert deleted >= 1 + + result = await redis.json_get(key, "$") + assert result is None + + async def test_json_mget(self, redis): + keys = ["test:mget:1", "test:mget:2", "test:mget:3"] + for i, key in enumerate(keys): + await redis.json_set(key, "$", {"name": f"user-{i}"}) + + results = await redis.json_mget(keys, "$") + assert len(results) == 3 + assert results[0][0]["name"] == "user-0" + assert results[2][0]["name"] == "user-2" + + async def test_pattern_exists(self, redis): + await redis.json_set("test:pattern:hit", "$", {"v": 1}) + + assert await redis.pattern_exists("test:pattern:*") is True + assert await redis.pattern_exists("nonexistent:prefix:*") is False + + async def test_get_nonexistent_key(self, redis): + result = await redis.json_get("does-not-exist", "$") + assert result is None diff --git a/uv.lock b/uv.lock index 7ae9b8b4c..cf569139e 100644 --- a/uv.lock +++ b/uv.lock @@ -467,6 +467,65 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/fe/ba/e2081de779ca30d473f21f5b30e0e737c438205440784c7dfc81efc2b029/async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c" }, ] +[[package]] +name = "asyncpg" +version = "0.31.0" +source = { registry = "https://mirrors.aliyun.com/pypi/simple/" } +dependencies = [ + { name = "async-timeout", marker = "python_full_version < '3.11'" }, +] +sdist = { url = "https://mirrors.aliyun.com/pypi/packages/fe/cc/d18065ce2380d80b1bcce927c24a2642efd38918e33fd724bc4bca904877/asyncpg-0.31.0.tar.gz", hash = "sha256:c989386c83940bfbd787180f2b1519415e2d3d6277a70d9d0f0145ac73500735" } +wheels = [ + { url = "https://mirrors.aliyun.com/pypi/packages/c3/d9/507c80bdac2e95e5a525644af94b03fa7f9a44596a84bd48a6e80f854f92/asyncpg-0.31.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:831712dd3cf117eec68575a9b50da711893fd63ebe277fc155ecae1c6c9f0f61" }, + { url = "https://mirrors.aliyun.com/pypi/packages/ea/03/f93b5e543f65c5f504e91405e8d21bb9e600548be95032951a754781a41d/asyncpg-0.31.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:0b17c89312c2f4ccea222a3a6571f7df65d4ba2c0e803339bfc7bed46a96d3be" }, + { url = "https://mirrors.aliyun.com/pypi/packages/e5/1e/de2177e57e03a06e697f6c1ddf2a9a7fcfdc236ce69966f54ffc830fd481/asyncpg-0.31.0-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3faa62f997db0c9add34504a68ac2c342cfee4d57a0c3062fcf0d86c7f9cb1e8" }, + { url = "https://mirrors.aliyun.com/pypi/packages/d0/98/1a853f6870ac7ad48383a948c8ff3c85dc278066a4d69fc9af7d3d4b1106/asyncpg-0.31.0-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8ea599d45c361dfbf398cb67da7fd052affa556a401482d3ff1ee99bd68808a1" }, + { url = "https://mirrors.aliyun.com/pypi/packages/11/29/7e76f2a51f2360a7c90d2cf6d0d9b210c8bb0ae342edebd16173611a55c2/asyncpg-0.31.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:795416369c3d284e1837461909f58418ad22b305f955e625a4b3a2521d80a5f3" }, + { url = "https://mirrors.aliyun.com/pypi/packages/5d/3f/716e10cb57c4f388248db46555e9226901688fbfabd0afb85b5e1d65d5a7/asyncpg-0.31.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:a8d758dac9d2e723e173d286ef5e574f0b350ec00e9186fce84d0fc5f6a8e6b8" }, + { url = "https://mirrors.aliyun.com/pypi/packages/7e/ec/3ebae9dfb23a1bd3f68acfd4f795983b65b413291c0e2b0d982d6ae6c920/asyncpg-0.31.0-cp310-cp310-win32.whl", hash = "sha256:2d076d42eb583601179efa246c5d7ae44614b4144bc1c7a683ad1222814ed095" }, + { url = "https://mirrors.aliyun.com/pypi/packages/20/b4/9fbb4b0af4e36d96a61d026dd37acab3cf521a70290a09640b215da5ab7c/asyncpg-0.31.0-cp310-cp310-win_amd64.whl", hash = "sha256:9ea33213ac044171f4cac23740bed9a3805abae10e7025314cfbd725ec670540" }, + { url = "https://mirrors.aliyun.com/pypi/packages/08/17/cc02bc49bc350623d050fa139e34ea512cd6e020562f2a7312a7bcae4bc9/asyncpg-0.31.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:eee690960e8ab85063ba93af2ce128c0f52fd655fdff9fdb1a28df01329f031d" }, + { url = "https://mirrors.aliyun.com/pypi/packages/a4/62/4ded7d400a7b651adf06f49ea8f73100cca07c6df012119594d1e3447aa6/asyncpg-0.31.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2657204552b75f8288de08ca60faf4a99a65deef3a71d1467454123205a88fab" }, + { url = "https://mirrors.aliyun.com/pypi/packages/d6/5b/4179538a9a72166a0bf60ad783b1ef16efb7960e4d7b9afe9f77a5551680/asyncpg-0.31.0-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a429e842a3a4b4ea240ea52d7fe3f82d5149853249306f7ff166cb9948faa46c" }, + { url = "https://mirrors.aliyun.com/pypi/packages/e6/35/c27719ae0536c5b6e61e4701391ffe435ef59539e9360959240d6e47c8c8/asyncpg-0.31.0-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c0807be46c32c963ae40d329b3a686356e417f674c976c07fa49f1b30303f109" }, + { url = "https://mirrors.aliyun.com/pypi/packages/43/f4/01ebb9207f29e645a64699b9ce0eefeff8e7a33494e1d29bb53736f7766b/asyncpg-0.31.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:e5d5098f63beeae93512ee513d4c0c53dc12e9aa2b7a1af5a81cddf93fe4e4da" }, + { url = "https://mirrors.aliyun.com/pypi/packages/3e/f4/03ff1426acc87be0f4e8d40fa2bff5c3952bef0080062af9efc2212e3be8/asyncpg-0.31.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:37fc6c00a814e18eef51833545d1891cac9aa69140598bb076b4cd29b3e010b9" }, + { url = "https://mirrors.aliyun.com/pypi/packages/c7/39/cc788dfca3d4060f9d93e67be396ceec458dfc429e26139059e58c2c244d/asyncpg-0.31.0-cp311-cp311-win32.whl", hash = "sha256:5a4af56edf82a701aece93190cc4e094d2df7d33f6e915c222fb09efbb5afc24" }, + { url = "https://mirrors.aliyun.com/pypi/packages/28/fc/735af5384c029eb7f1ca60ccb8fa95521dbdaeef788edf4cecfc604c3cab/asyncpg-0.31.0-cp311-cp311-win_amd64.whl", hash = "sha256:480c4befbdf079c14c9ca43c8c5e1fe8b6296c96f1f927158d4f1e750aacc047" }, + { url = "https://mirrors.aliyun.com/pypi/packages/2a/a6/59d0a146e61d20e18db7396583242e32e0f120693b67a8de43f1557033e2/asyncpg-0.31.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:b44c31e1efc1c15188ef183f287c728e2046abb1d26af4d20858215d50d91fad" }, + { url = "https://mirrors.aliyun.com/pypi/packages/36/01/ffaa189dcb63a2471720615e60185c3f6327716fdc0fc04334436fbb7c65/asyncpg-0.31.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:0c89ccf741c067614c9b5fc7f1fc6f3b61ab05ae4aaa966e6fd6b93097c7d20d" }, + { url = "https://mirrors.aliyun.com/pypi/packages/9f/62/3f699ba45d8bd24c5d65392190d19656d74ff0185f42e19d0bbd973bb371/asyncpg-0.31.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:12b3b2e39dc5470abd5e98c8d3373e4b1d1234d9fbdedf538798b2c13c64460a" }, + { url = "https://mirrors.aliyun.com/pypi/packages/8c/d1/a867c2150f9c6e7af6462637f613ba67f78a314b00db220cd26ff559d532/asyncpg-0.31.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:aad7a33913fb8bcb5454313377cc330fbb19a0cd5faa7272407d8a0c4257b671" }, + { url = "https://mirrors.aliyun.com/pypi/packages/7a/1a/cce4c3f246805ecd285a3591222a2611141f1669d002163abef999b60f98/asyncpg-0.31.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:3df118d94f46d85b2e434fd62c84cb66d5834d5a890725fe625f498e72e4d5ec" }, + { url = "https://mirrors.aliyun.com/pypi/packages/40/ae/0fc961179e78cc579e138fad6eb580448ecae64908f95b8cb8ee2f241f67/asyncpg-0.31.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:bd5b6efff3c17c3202d4b37189969acf8927438a238c6257f66be3c426beba20" }, + { url = "https://mirrors.aliyun.com/pypi/packages/52/b2/b20e09670be031afa4cbfabd645caece7f85ec62d69c312239de568e058e/asyncpg-0.31.0-cp312-cp312-win32.whl", hash = "sha256:027eaa61361ec735926566f995d959ade4796f6a49d3bde17e5134b9964f9ba8" }, + { url = "https://mirrors.aliyun.com/pypi/packages/b5/f0/f2ed1de154e15b107dc692262395b3c17fc34eafe2a78fc2115931561730/asyncpg-0.31.0-cp312-cp312-win_amd64.whl", hash = "sha256:72d6bdcbc93d608a1158f17932de2321f68b1a967a13e014998db87a72ed3186" }, + { url = "https://mirrors.aliyun.com/pypi/packages/95/11/97b5c2af72a5d0b9bc3fa30cd4b9ce22284a9a943a150fdc768763caf035/asyncpg-0.31.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:c204fab1b91e08b0f47e90a75d1b3c62174dab21f670ad6c5d0f243a228f015b" }, + { url = "https://mirrors.aliyun.com/pypi/packages/1b/71/157d611c791a5e2d0423f09f027bd499935f0906e0c2a416ce712ba51ef3/asyncpg-0.31.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:54a64f91839ba59008eccf7aad2e93d6e3de688d796f35803235ea1c4898ae1e" }, + { url = "https://mirrors.aliyun.com/pypi/packages/2e/fc/9e3486fb2bbe69d4a867c0b76d68542650a7ff1574ca40e84c3111bb0c6e/asyncpg-0.31.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:c0e0822b1038dc7253b337b0f3f676cadc4ac31b126c5d42691c39691962e403" }, + { url = "https://mirrors.aliyun.com/pypi/packages/12/c6/8c9d076f73f07f995013c791e018a1cd5f31823c2a3187fc8581706aa00f/asyncpg-0.31.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:bef056aa502ee34204c161c72ca1f3c274917596877f825968368b2c33f585f4" }, + { url = "https://mirrors.aliyun.com/pypi/packages/ae/3b/60683a0baf50fbc546499cfb53132cb6835b92b529a05f6a81471ab60d0c/asyncpg-0.31.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:0bfbcc5b7ffcd9b75ab1558f00db2ae07db9c80637ad1b2469c43df79d7a5ae2" }, + { url = "https://mirrors.aliyun.com/pypi/packages/50/dc/8487df0f69bd398a61e1792b3cba0e47477f214eff085ba0efa7eac9ce87/asyncpg-0.31.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:22bc525ebbdc24d1261ecbf6f504998244d4e3be1721784b5f64664d61fbe602" }, + { url = "https://mirrors.aliyun.com/pypi/packages/13/a1/c5bbeeb8531c05c89135cb8b28575ac2fac618bcb60119ee9696c3faf71c/asyncpg-0.31.0-cp313-cp313-win32.whl", hash = "sha256:f890de5e1e4f7e14023619399a471ce4b71f5418cd67a51853b9910fdfa73696" }, + { url = "https://mirrors.aliyun.com/pypi/packages/91/66/b25ccb84a246b470eb943b0107c07edcae51804912b824054b3413995a10/asyncpg-0.31.0-cp313-cp313-win_amd64.whl", hash = "sha256:dc5f2fa9916f292e5c5c8b2ac2813763bcd7f58e130055b4ad8a0531314201ab" }, + { url = "https://mirrors.aliyun.com/pypi/packages/3c/36/e9450d62e84a13aea6580c83a47a437f26c7ca6fa0f0fd40b6670793ea30/asyncpg-0.31.0-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:f6b56b91bb0ffc328c4e3ed113136cddd9deefdf5f79ab448598b9772831df44" }, + { url = "https://mirrors.aliyun.com/pypi/packages/82/4b/1d0a2b33b3102d210439338e1beea616a6122267c0df459ff0265cd5807a/asyncpg-0.31.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:334dec28cf20d7f5bb9e45b39546ddf247f8042a690bff9b9573d00086e69cb5" }, + { url = "https://mirrors.aliyun.com/pypi/packages/41/aa/e7f7ac9a7974f08eff9183e392b2d62516f90412686532d27e196c0f0eeb/asyncpg-0.31.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:98cc158c53f46de7bb677fd20c417e264fc02b36d901cc2a43bd6cb0dc6dbfd2" }, + { url = "https://mirrors.aliyun.com/pypi/packages/6f/de/bf1b60de3dede5c2731e6788617a512bc0ebd9693eac297ee74086f101d7/asyncpg-0.31.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9322b563e2661a52e3cdbc93eed3be7748b289f792e0011cb2720d278b366ce2" }, + { url = "https://mirrors.aliyun.com/pypi/packages/46/78/fc3ade003e22d8bd53aaf8f75f4be48f0b460fa73738f0391b9c856a9147/asyncpg-0.31.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:19857a358fc811d82227449b7ca40afb46e75b33eb8897240c3839dd8b744218" }, + { url = "https://mirrors.aliyun.com/pypi/packages/bf/e9/73eb8a6789e927816f4705291be21f2225687bfa97321e40cd23055e903a/asyncpg-0.31.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:ba5f8886e850882ff2c2ace5732300e99193823e8107e2c53ef01c1ebfa1e85d" }, + { url = "https://mirrors.aliyun.com/pypi/packages/08/4b/f10b880534413c65c5b5862f79b8e81553a8f364e5238832ad4c0af71b7f/asyncpg-0.31.0-cp314-cp314-win32.whl", hash = "sha256:cea3a0b2a14f95834cee29432e4ddc399b95700eb1d51bbc5bfee8f31fa07b2b" }, + { url = "https://mirrors.aliyun.com/pypi/packages/d3/2d/7aa40750b7a19efa5d66e67fc06008ca0f27ba1bd082e457ad82f59aba49/asyncpg-0.31.0-cp314-cp314-win_amd64.whl", hash = "sha256:04d19392716af6b029411a0264d92093b6e5e8285ae97a39957b9a9c14ea72be" }, + { url = "https://mirrors.aliyun.com/pypi/packages/ce/fe/b9dfe349b83b9dee28cc42360d2c86b2cdce4cb551a2c2d27e156bcac84d/asyncpg-0.31.0-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:bdb957706da132e982cc6856bb2f7b740603472b54c3ebc77fe60ea3e57e1bd2" }, + { url = "https://mirrors.aliyun.com/pypi/packages/6a/81/e6be6e37e560bd91e6c23ea8a6138a04fd057b08cf63d3c5055c98e81c1d/asyncpg-0.31.0-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:6d11b198111a72f47154fa03b85799f9be63701e068b43f84ac25da0bda9cb31" }, + { url = "https://mirrors.aliyun.com/pypi/packages/a6/45/6009040da85a1648dd5bc75b3b0a062081c483e75a1a29041ae63a0bf0dc/asyncpg-0.31.0-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:18c83b03bc0d1b23e6230f5bf8d4f217dc9bc08644ce0502a9d91dc9e634a9c7" }, + { url = "https://mirrors.aliyun.com/pypi/packages/7e/06/2e3d4d7608b0b2b3adbee0d0bd6a2d29ca0fc4d8a78f8277df04e2d1fd7b/asyncpg-0.31.0-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e009abc333464ff18b8f6fd146addffd9aaf63e79aa3bb40ab7a4c332d0c5e9e" }, + { url = "https://mirrors.aliyun.com/pypi/packages/7d/aa/7d75ede780033141c51d83577ea23236ba7d3a23593929b32b49db8ed36e/asyncpg-0.31.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:3b1fbcb0e396a5ca435a8826a87e5c2c2cc0c8c68eb6fadf82168056b0e53a8c" }, + { url = "https://mirrors.aliyun.com/pypi/packages/ba/7a/15e37d45e7f7c94facc1e9148c0e455e8f33c08f0b8a0b1deb2c5171771b/asyncpg-0.31.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:8df714dba348efcc162d2adf02d213e5fab1bd9f557e1305633e851a61814a7a" }, + { url = "https://mirrors.aliyun.com/pypi/packages/13/d5/71437c5f6ae5f307828710efbe62163974e71237d5d46ebd2869ea052d10/asyncpg-0.31.0-cp314-cp314t-win32.whl", hash = "sha256:1b41f1afb1033f2b44f3234993b15096ddc9cd71b21a42dbd87fc6a57b43d65d" }, + { url = "https://mirrors.aliyun.com/pypi/packages/3c/d7/8fb3044eaef08a310acfe23dae9a8e2e07d305edc29a53497e52bc76eca7/asyncpg-0.31.0-cp314-cp314t-win_amd64.whl", hash = "sha256:bd4107bb7cdd0e9e65fae66a62afd3a249663b844fa34d479f6d5b3bef9c04c3" }, +] + [[package]] name = "attrs" version = "25.4.0" @@ -4003,6 +4062,7 @@ admin = [ { name = "aiosqlite" }, { name = "alibabacloud-cr20181201" }, { name = "apscheduler" }, + { name = "asyncpg" }, { name = "bashlex" }, { name = "boto3" }, { name = "cryptography" }, @@ -4028,6 +4088,7 @@ all = [ { name = "aiosqlite" }, { name = "alibabacloud-cr20181201" }, { name = "apscheduler" }, + { name = "asyncpg" }, { name = "bashlex" }, { name = "boto3" }, { name = "cryptography" }, @@ -4106,6 +4167,7 @@ requires-dist = [ { name = "anyio" }, { name = "apscheduler", marker = "extra == 'admin'" }, { name = "apscheduler", marker = "extra == 'sandbox-actor'", specifier = ">=3.11.0" }, + { name = "asyncpg", marker = "extra == 'admin'" }, { name = "bashlex", marker = "extra == 'rocklet'" }, { name = "boto3", marker = "extra == 'admin'" }, { name = "build" },