diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 000000000..0169f884b --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,255 @@ +# CLAUDE.md + +## Project Overview + +ROCK (Reinforcement Open Construction Kit) is a sandbox environment management framework for agentic AI / reinforcement learning. It provides multi-protocol sandbox lifecycle management with Docker, Ray, and Kubernetes deployment backends. + +Package name: `rl-rock`, version 1.3.0, Python 3.10–3.12. + +## Quick Reference + +```bash +# Setup +make init # Create venv, install deps, hooks, preflight checks +uv sync --all-extras --all-groups # Install all dependencies + +# Test +uv run pytest -m "not need_ray and not need_admin and not need_admin_and_network" --reruns 1 # Fast tests +uv run pytest -m "need_ray" --reruns 1 # Ray tests +uv run pytest -m "need_admin" --reruns 1 # Admin tests + +# Lint / Format +uv run ruff check --fix . # Lint with autofix +uv run ruff format . # Format +``` + +## Architecture + +### Services (Entry Points) + +| Command | Module | Role | +|-----------|---------------------------|----------------------------------------------------| +| `rock` | `rock.cli.main` | CLI tool (admin start, sandbox build/push/run) | +| `admin` | `rock.admin.main` | FastAPI orchestrator — sandbox lifecycle via API | +| `rocklet` | `rock.rocklet.server` | FastAPI proxy — runs inside containers, executes commands | +| `envhub` | `rock.envhub.server` | Environment repository CRUD server | + +### Core Module Map + +``` +rock/ +├── admin/ # Admin service: API routers, Ray service, scheduler, metrics +├── sandbox/ # SandboxManager, Operators (Ray/K8s), SandboxActor +├── deployments/ # AbstractDeployment → Docker/Ray/Local/Remote, configs, validator +├── rocklet/ # Lightweight sandbox runtime server +├── sdk/ # Client SDK: Sandbox client, agent integrations, EnvHub client +├── envhub/ # Environment hub service with SQLModel database +├── actions/ # Request/response models for sandbox and env actions +├── config.py # Dataclass-based config (RayConfig, K8sConfig, RuntimeConfig, ...) +├── env_vars.py # Environment variables with lazy defaults via __getattr__ +├── cli/ # CLI commands (argparse) +└── utils/ # Docker wrapper, Redis/Nacos providers, HTTP, retry, crypto, etc. +``` + +### Key Patterns + +- **Operator pattern**: `AbstractOperator` → `RayOperator` / `K8sOperator` — decouples scheduling from execution +- **Deployment hierarchy**: `AbstractDeployment` → `DockerDeployment` → `RayDeployment`, plus `LocalDeployment`, `RemoteDeployment` +- **Actor pattern (Ray)**: `SandboxActor` (remote, detached) wraps a `DockerDeployment` instance +- **Config flow**: `SandboxManager` → `DeploymentManager.init_config()` (normalize config) → `Operator.submit()` (orchestrate) +- **Validation**: `SandboxValidator` / `DockerSandboxValidator` → `DockerUtil` (shell out to docker CLI) + +## Code Conventions + +### Style + +- Line length: 120 (`ruff`) +- Lint rules: `E, F, I, W, UP` (pycodestyle, pyflakes, isort, warnings, pyupgrade) +- Ignored: `E501` (line length), `F811` (redefinition), `E741` (ambiguous names) +- Import order: stdlib → third-party → local, managed by `ruff` isort + +### Naming + +- Classes: `PascalCase` (`SandboxManager`, `RayOperator`) +- Functions/methods: `snake_case` +- Constants: `UPPER_SNAKE_CASE` +- Private: `_leading_underscore` + +### Logging + +- Always use `from rock.logger import init_logger; logger = init_logger(__name__)` +- Context vars for distributed tracing: `sandbox_id_ctx_var`, `trace_id_ctx_var` + +### Error Handling + +- Custom exceptions: `BadRequestRockError` (4xxx), `InternalServerRockError` (5xxx), `CommandRockError` (6xxx) +- Status codes defined in `rock._codes` + +### Data Models + +- Pydantic v2 for API request/response models and deployment configs +- `dataclass` for internal configs (`RockConfig`, `RayConfig`, etc.) +- SQLModel for database ORM (`envhub`) + +### Async + +- `asyncio_mode = "auto"` in pytest — all async tests run automatically +- FastAPI async handlers throughout +- Ray async operations via `async_ray_get()`, `async_ray_get_actor()` + +## Testing + +### Structure + +``` +tests/ +├── unit/ # Fast isolated tests +│ ├── conftest.py # Fixtures: rock_config, redis_provider (FakeRedis), sandbox_manager, ray_* +│ ├── sandbox/ # SandboxManager tests +│ ├── rocklet/ # Rocklet tests +│ └── admin/ # Admin tests +├── integration/ # Tests needing external services (Docker, network) +│ └── conftest.py # SKIP_IF_NO_DOCKER, rocklet/admin remote servers +└── conftest.py # Global config +``` + +### Markers + +| Marker | Purpose | +|----------------------------|--------------------------------------| +| `@pytest.mark.need_ray` | Requires running Ray cluster | +| `@pytest.mark.need_admin` | Requires admin service | +| `@pytest.mark.need_admin_and_network` | Requires admin + network | +| `@pytest.mark.slow` | Long-running tests | +| `@pytest.mark.integration` | Integration tests | + +Strict markers enabled (`--strict-markers`). All markers must be registered in `pyproject.toml`. + +### CI Pipeline (`.github/workflows/python-ci.yml`) + +Runs in 4 phases on `self-hosted` runner: +1. Fast tests (no external deps) +2. Ray-dependent tests +3. Admin-dependent tests +4. Network-dependent tests + +## Configuration + +### Environment Variables + +All defined in `rock/env_vars.py` with lazy evaluation via module `__getattr__`. Key variables: + +| Variable | Default | Purpose | +|-------------------------|------------------------|-----------------------------| +| `ROCK_ADMIN_ENV` | `dev` | Environment: local/dev/test | +| `ROCK_WORKER_ENV_TYPE` | `local` | Worker type: local/docker/uv/pip | +| `ROCK_CONFIG` | (none) | Path to YAML config file | +| `ROCK_BASE_URL` | `http://localhost:8080` | Admin service URL | +| `ROCK_LOGGING_LEVEL` | `INFO` | Log level | +| `ROCK_TIME_ZONE` | `Asia/Shanghai` | Timezone | +| `ROCK_RAY_NAMESPACE` | `xrl-sandbox` | Ray namespace | + +### YAML Config (`rock-conf/`) + +Loaded by `RockConfig.from_env()`. Files: `rock-local.yml`, `rock-dev.yml`, `rock-test.yml`. + +Key sections: `ray`, `k8s`, `runtime` (operator_type, standard_spec, max_allowed_spec), `redis`, `proxy_service`, `scheduler`. + +## Git Workflow + +### Branch & PR Rules + +1. **先建 Issue** — 任何代码变更必须先创建 GitHub Issue 描述问题或需求 +2. **创建分支** — 从 `master` 拉分支开发 +3. **PR 必须关联 Issue** — CI 会通过 `pr-issue-link-check.yml` 检查,未关联的 PR 会被阻断 + +PR 关联 Issue 的方式(二选一): +- PR body 中使用关键字:`fixes #123`、`closes #123`、`resolves #123`、`refs #123` +- PR title 中包含 issue 编号:`[FEATURE] Add new feature (#123)` + +```bash +# 典型流程 +# 1. 在 GitHub 上创建 Issue +# 2. 本地创建分支 +git checkout -b feat/my-feature master + +# 3. 开发、提交 +git add +git commit -m "feat: add my feature" + +# 4. 推送并创建 PR(body 中关联 issue) +git push -u origin feat/my-feature +gh pr create --title "feat: add my feature" --body "fixes #123" +``` + +### Pre-commit Hooks + +- `ruff --fix` (lint) + `ruff format` (format) +- Custom hook: prevents mixing internal (`xrl/`, `intetest/`) and external files in one commit + +## Development Workflow with Claude Code + +### Recommended Skills + +在使用 Claude Code 进行开发时,建议按以下流程使用 skills: + +#### 1. Planning — 先规划再动手 + +对于多步骤任务,使用 plan 相关 skills 确保方案清晰后再执行: + +- **`/brainstorming`** — 任何新功能或行为变更前,先用此 skill 探索需求、明确设计 +- **`/writing-plans`** — 有明确需求/spec 后,用此 skill 生成分步实施计划,输出 plan 文件 +- **`/executing-plans`** — 拿到 plan 文件后,用此 skill 按步骤执行,带 review checkpoint + +``` +需求 → /brainstorming → /writing-plans → plan.md → /executing-plans → 实现完成 +``` + +#### 2. TDD — 测试驱动开发 + +所有功能开发和 bugfix 必须遵循 TDD 流程: + +- **`/test-driven-development`** — 在写实现代码之前调用,先写失败的测试,再写实现使其通过 + +``` +明确需求 → 写测试(RED) → 写实现(GREEN) → 重构(REFACTOR) +``` + +TDD 要点: +- 先写测试用例,确认测试失败(RED) +- 写最少的实现代码使测试通过(GREEN) +- 测试通过后再重构优化(REFACTOR) +- 每个 cycle 保持小步迭代 + +#### 3. Debugging + +- **`/systematic-debugging`** — 遇到 bug、测试失败或异常行为时使用,先诊断再修复,避免盲猜 + +#### 4. Parallel Execution + +- **`/dispatching-parallel-agents`** — 当有 2+ 个独立任务时,并行分发给多个 agent 执行 +- **`/subagent-driven-development`** — 在当前 session 中并行执行 plan 中的独立步骤 + +#### 5. Review & Completion + +- **`/verification-before-completion`** — 在声明任务完成前,必须运行验证命令确认结果 +- **`/requesting-code-review`** — 完成实现后,请求代码审查 +- **`/finishing-a-development-branch`** — 实现完成、测试通过后,决定如何集成(merge / PR / cleanup) + +### Typical Feature Development Flow + +``` +1. /brainstorming — 探索需求,明确设计方向 +2. /writing-plans — 生成分步实施计划 +3. /test-driven-development — 对每个步骤: 先写测试 → 再写实现 +4. /executing-plans — 按计划逐步执行,带 checkpoint +5. /verification-before-completion — 运行全部测试,确认通过 +6. /requesting-code-review — 请求审查 +7. /finishing-a-development-branch — 提交 PR +``` + +## Dependencies + +Core: FastAPI, Pydantic v2, Ray 2.43.0, Redis, Docker CLI, Kubernetes client, APScheduler, OpenTelemetry, httpx. + +Package manager: `uv` (Rust-based). Build: setuptools + wheel. diff --git a/README.md b/README.md index e8743e747..37960c709 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ ROCK adopts a client-server architecture, supports different levels of isolation ## 📢 News | 📣 Update | |:--| +| **[02/28/2026]** 🎉 ROCK v1.2.5 Released! Custom metrics endpoint, user-defined metric tags, and Aliyun MSE Nacos support. | | **[01/01/2026]** 🎉 Our [Let It Flow: Agentic Crafting on Rock and Roll](https://arxiv.org/abs/2512.24873) report released! Introducing ALE ecosystem and ROME, an open-source agentic model with novel IPA algorithm. | --- @@ -161,7 +162,7 @@ if __name__ == "__main__": | 📣 Update Content | |:-----------| -| **[Latest]** 🎉 ROCK v0.2.0 Released | +| **[Latest]** 🎉 ROCK v1.2.5 Released | --- diff --git a/docs/dev/deployment/README.md b/docs/dev/deployment/README.md new file mode 100644 index 000000000..ff24c6d8c --- /dev/null +++ b/docs/dev/deployment/README.md @@ -0,0 +1,482 @@ +# Deployment Architecture + +## 1. Overall Architecture + +### 1.1 Class Hierarchy & Operator Design + +#### Deployment Classes + +```mermaid +classDiagram + class AbstractDeployment { + <> + +add_hook(hook) + +is_alive() IsAliveResponse + +start() + +stop() + +creator_actor(actor_name) + +runtime AbstractSandbox + } + + class DockerDeployment { + -_config : DockerDeploymentConfig + -_runtime : RemoteSandboxRuntime + -_container_process + -_service_status : PersistedServiceStatus + +sandbox_validator : DockerSandboxValidator + +start() + +stop() + +is_alive() + +get_status() ServiceStatus + -_pull_image() + -_build_image() + -_docker_run(cmd) + -_wait_until_alive(timeout) + } + + class RayDeployment { + +creator_actor(actor_name) + -_create_sandbox_actor(actor_name) + -_generate_actor_options(actor_name) + } + + class LocalDeployment { + -_runtime : LocalSandboxRuntime + +start() + +stop() + +is_alive() + } + + class RemoteDeployment { + -_config : RemoteDeploymentConfig + -_runtime : RemoteSandboxRuntime + +start() + +stop() + +is_alive() + } + + AbstractDeployment <|-- DockerDeployment + AbstractDeployment <|-- LocalDeployment + AbstractDeployment <|-- RemoteDeployment + DockerDeployment <|-- RayDeployment +``` + +#### Operator & DeploymentManager + +`DeploymentManager` 和 `Operator` 分工协作,由 `SandboxManager` 协调调用: + +- **DeploymentManager** — 负责 **Config 准备阶段**:接收用户原始 `DeploymentConfig`,统一转换为 `RayDeploymentConfig`,生成 sandbox_id,填充 role/env/actor_resource 等默认值。 +- **Operator** — 负责 **部署编排阶段**:接收准备好的 Config,创建 Actor / Pod,管理生命周期(start、get_status、stop)。`RayOperator` 内部通过 `config.get_deployment()` 创建 `DockerDeployment` 实例。 + +> **注意**:`DeploymentManager.get_actor_name()` 和 `RayOperator._get_actor_name()` 存在重复逻辑(均为 `f"sandbox-{sandbox_id}"`),`SandboxManager` 在非 Operator 路径(如 `get_sandbox_statistics`、`get_mount`)中直接调用 `DeploymentManager.get_actor_name()`。 + +```mermaid +classDiagram + class DeploymentManager { + +rock_config : RockConfig + -_enable_runtime_auto_clear : bool + +init_config(config) RayDeploymentConfig + +get_actor_name(sandbox_id) str + +get_deployment(config) AbstractDeployment + } + note for DeploymentManager "Config preparation phase:\n1. generate sandbox_id\n2. convert to RayDeploymentConfig\n3. fill role, env, actor_resource" + + class AbstractOperator { + <> + -_redis_provider : RedisProvider + -_nacos_provider : NacosConfigProvider + +submit(config, user_info) SandboxInfo + +get_status(sandbox_id) SandboxInfo + +stop(sandbox_id) bool + +set_redis_provider(provider) + +set_nacos_provider(provider) + } + + class RayOperator { + -_ray_service : RayService + +submit(config, user_info) SandboxInfo + +get_status(sandbox_id) SandboxInfo + +stop(sandbox_id) bool + +create_actor(config) SandboxActor + -_get_actor_name(sandbox_id) str + -_generate_actor_options(config) dict + -_check_alive_status() bool + +get_remote_status() ServiceStatus + } + note for RayOperator "Orchestration phase:\n1. create_actor calls config.get_deployment()\n2. SandboxActor.remote(config, deployment)\n3. actor.start.remote()" + + class K8sOperator { + -_provider : BatchSandboxProvider + -_template_loader : K8sTemplateLoader + +submit(config, user_info) SandboxInfo + +get_status(sandbox_id) SandboxInfo + +stop(sandbox_id) bool + } + + AbstractOperator <|-- RayOperator + AbstractOperator <|-- K8sOperator + + class SandboxManager { + -_operator : AbstractOperator + -_proxy_service : SandboxProxyService + +start_async(config, user_info) SandboxStartResponse + +stop(sandbox_id) + +get_status(sandbox_id) SandboxStatusResponse + } + + class BaseManager { + +rock_config : RockConfig + +deployment_manager : DeploymentManager + -_redis_provider : RedisProvider + } + + BaseManager <|-- SandboxManager + SandboxManager --> DeploymentManager : 1. init_config + SandboxManager --> AbstractOperator : 2. submit / stop / get_status + RayOperator --> SandboxActor : creates + SandboxActor --> DockerDeployment : wraps +``` + +调用流程: + +```mermaid +sequenceDiagram + participant SM as SandboxManager + participant DM as DeploymentManager + participant OP as RayOperator + participant CFG as RayDeploymentConfig + + SM->>DM: init_config(user_config) + Note over DM: generate sandbox_id + Note over DM: convert to RayDeploymentConfig + Note over DM: fill role, env, actor_resource + DM-->>SM: RayDeploymentConfig + + SM->>OP: submit(config, user_info) + OP->>CFG: config.get_deployment() + CFG-->>OP: DockerDeployment instance + Note over OP: create SandboxActor with deployment + Note over OP: actor.start.remote() + OP-->>SM: SandboxInfo +``` + +### 1.2 Configuration Hierarchy + +```mermaid +classDiagram + class DeploymentConfig { + <> + +role : str + +env : str + +get_deployment() AbstractDeployment + } + + class DockerDeploymentConfig { + +image : str + +image_os : str + +port : int + +docker_args : list + +startup_timeout : float + +pull : str + +memory : str + +cpus : float + +container_name : str + +use_kata_runtime : bool + +runtime_config : RuntimeConfig + +extended_params : dict + +get_deployment() DockerDeployment + } + + class RayDeploymentConfig { + +actor_resource : str + +actor_resource_num : int + +get_deployment() RayDeployment + } + + class LocalDeploymentConfig { + +get_deployment() LocalDeployment + } + + class RemoteDeploymentConfig { + +host : str + +port : int + +timeout : float + +get_deployment() RemoteDeployment + } + + DeploymentConfig <|-- DockerDeploymentConfig + DeploymentConfig <|-- LocalDeploymentConfig + DeploymentConfig <|-- RemoteDeploymentConfig + DockerDeploymentConfig <|-- RayDeploymentConfig +``` + +### 1.3 Sandbox Manager & Operator Architecture + +```mermaid +flowchart TB + subgraph Manager Layer + SM[SandboxManager] + BM[BaseManager] + DM[DeploymentManager] + SM -.extends.-> BM + SM -->|uses| DM + end + + subgraph Operator Layer + AO[AbstractOperator] + RO[RayOperator] + KO[K8sOperator] + RO -.implements.-> AO + KO -.implements.-> AO + end + + subgraph Actor Layer + SA[SandboxActor] + GA[GemActor] + BA[BaseActor] + SA -.extends.-> GA -.extends.-> BA + end + + subgraph Deployment Layer + AD[AbstractDeployment] + DD[DockerDeployment] + RD[RayDeployment] + LD[LocalDeployment] + ReD[RemoteDeployment] + DD -.implements.-> AD + RD -.extends.-> DD + LD -.implements.-> AD + ReD -.implements.-> AD + end + + subgraph Validation Layer + SV[SandboxValidator] + DSV[DockerSandboxValidator] + DU[DockerUtil] + DSV -.implements.-> SV + DSV -->|uses| DU + end + + subgraph Runtime Layer + RSR[RemoteSandboxRuntime] + LSR[LocalSandboxRuntime] + end + + SM -->|uses| AO + RO -->|creates| SA + SA -->|wraps| DD + DD -->|validates via| DSV + DD -->|communicates via| RSR + LD -->|uses| LSR +``` + +### 1.4 Sandbox Startup Flow (via RayOperator) + +```mermaid +sequenceDiagram + participant Client + participant SM as SandboxManager + participant DM as DeploymentManager + participant RO as RayOperator + participant SA as SandboxActor + participant DD as DockerDeployment + participant DV as DockerSandboxValidator + participant Docker as Docker Daemon + + Client->>SM: start_async(config) + SM->>SM: validate_sandbox_spec() + SM->>DM: init_config(config) + DM-->>SM: DockerDeploymentConfig + + SM->>RO: submit(config, user_info) + RO->>RO: create_actor(config) + RO->>SA: SandboxActor.remote(config, deployment) + RO->>SA: start.remote() [async] + RO-->>SM: SandboxInfo + + Note over SA,Docker: Actor starts asynchronously + + SA->>DD: start() + DD->>DV: check_availability() + DV->>Docker: docker --version, docker info + Docker-->>DV: OK or Error + DV-->>DD: bool + + alt Docker not available + DD-->>SA: raise Exception + end + + DD->>DD: _pull_image() + DD->>Docker: docker pull image + DD->>DV: check_resource(image_id) + DD->>Docker: docker run image + DD->>DD: _wait_until_alive(timeout) + DD-->>SA: started + + SM->>SM: store in Redis + SM-->>Client: SandboxStartResponse +``` + +## 2. Docker Usage Analysis + +### 2.1 Docker Dependency Points + +Docker is used in the following scenarios: + +| Component | Method | Docker Operation | +|-----------|--------|------------------| +| `DockerDeployment.start()` | `sandbox_validator.check_availability()` | `docker --version` + `docker info` | +| `DockerDeployment._pull_image()` | `DockerUtil.pull_image()` | `docker pull {image}` | +| `DockerDeployment._build_image()` | subprocess | `docker build` | +| `DockerDeployment.start()` | `sandbox_validator.check_resource()` | `docker inspect {image}` | +| `DockerDeployment.start()` | `_docker_run()` | `docker run ...` | +| `DockerDeployment._stop()` | subprocess | `docker kill {container}` | +| `DockerDeployment._stop()` | `DockerUtil.remove_image()` | `docker rmi {image}` | + +### 2.2 Current Docker Validation + +Docker availability is currently validated **only inside `DockerDeployment.start()`** (line 290 of `docker.py`): + +```python +async def start(self): + if not self.sandbox_validator.check_availability(): + raise Exception("Docker is not available") +``` + +This means Docker unavailability is only detected **after** the sandbox actor has been created and submitted to Ray, which wastes resources and makes the error harder to trace. + +### 2.3 Current Test Markers + +- `need_ray` - registered in `pyproject.toml`, used to mark tests requiring Ray cluster +- `SKIP_IF_NO_DOCKER` - defined in `tests/integration/conftest.py`, used via `skipif` for integration tests +- **No `need_docker` marker** exists for unit tests + +## 3. Proposal: `need_docker` Test Marker + +### 3.1 Problem + +`test_get_status` and similar tests in `tests/unit/sandbox/test_sandbox_manager.py` use `DockerDeploymentConfig`, which ultimately calls `DockerDeployment.start()`. When Docker is not installed or daemon is not running, these tests fail with an unclear error instead of being skipped. + +Tests using Docker: +- `test_async_sandbox_start` - uses `DockerDeploymentConfig()` +- `test_get_status` - uses `DockerDeploymentConfig(image="python:3.11")` +- `test_ray_actor_is_alive` - uses `DockerDeploymentConfig()` +- `test_resource_limit_exception` - uses `docker_deployment_config` fixture +- `test_resource_limit_exception_memory` - uses `docker_deployment_config` fixture +- `test_get_status_state` - uses `DockerDeploymentConfig(cpus=0.5, memory="1g")` +- `test_sandbox_start_with_sandbox_id` - uses `DockerDeploymentConfig(...)` + +### 3.2 Solution + +#### Step 1: Register `need_docker` marker in `pyproject.toml` + +```toml +markers = [ + "slow: marks tests as slow", + "integration: marks tests as integration tests", + "need_ray: need ray start", + "need_docker: need docker daemon running", + "need_admin: need admin start", + "need_admin_and_network: need install from network" +] +``` + +#### Step 2: Add auto-skip logic in `tests/unit/conftest.py` + +```python +from rock.utils.docker import DockerUtil + +def pytest_collection_modifyitems(config, items): + if not DockerUtil.is_docker_available(): + skip_docker = pytest.mark.skip(reason="Docker is not available") + for item in items: + if "need_docker" in item.keywords: + item.add_marker(skip_docker) +``` + +#### Step 3: Mark Docker-dependent tests + +Add `@pytest.mark.need_docker` to all test cases that use `DockerDeploymentConfig`: + +```python +@pytest.mark.need_ray +@pytest.mark.need_docker +@pytest.mark.asyncio +async def test_get_status(sandbox_manager): + ... +``` + +## 4. Proposal: Early Docker Validation in SandboxManager + +### 4.1 Problem + +Currently Docker availability is only checked inside `DockerDeployment.start()`, which is called asynchronously inside the Ray actor. This means: + +1. A Ray actor is allocated (consuming CPU/memory resources) +2. The actor starts, calls `deployment.start()` +3. Docker check fails, the actor throws an exception +4. Resources are wasted and error is hard to trace + +### 4.2 Solution: Pre-validate in `SandboxManager.start_async()` + +Add Docker availability check in `SandboxManager.start_async()` **before** submitting to the operator, specifically after `validate_sandbox_spec()` and before `self._operator.submit()`. + +```mermaid +flowchart TD + A[start_async called] --> B[check sandbox exists in Redis] + B --> C[validate_sandbox_spec] + C --> D[init_config] + D --> E{Config is DockerDeploymentConfig?} + E -- Yes --> F{Docker available?} + F -- No --> G[raise BadRequestRockError] + F -- Yes --> H[operator.submit] + E -- No --> H + H --> I[store in Redis] + I --> J[return SandboxStartResponse] +``` + +#### Implementation Location + +In `rock/sandbox/sandbox_manager.py`, method `start_async()`, add validation between `init_config()` and `_operator.submit()`: + +```python +@monitor_sandbox_operation() +async def start_async( + self, config: DeploymentConfig, user_info: UserInfo = {}, cluster_info: ClusterInfo = {} +) -> SandboxStartResponse: + await self._check_sandbox_exists_in_redis(config) + self.validate_sandbox_spec(self.rock_config.runtime, config) + docker_deployment_config: DockerDeploymentConfig = await self.deployment_manager.init_config(config) + + # Early Docker validation - fail fast before allocating Ray resources + self._validate_deployment_prerequisites(docker_deployment_config) + + sandbox_id = docker_deployment_config.container_name + sandbox_info: SandboxInfo = await self._operator.submit(docker_deployment_config, user_info) + ... +``` + +New method in `SandboxManager`: + +```python +def _validate_deployment_prerequisites(self, config: DeploymentConfig) -> None: + """Validate that deployment prerequisites are met before submitting to the operator. + + For Docker-based deployments, checks that Docker daemon is available. + This prevents resource waste from allocating Ray actors that will fail immediately. + """ + if isinstance(config, DockerDeploymentConfig): + from rock.deployments.sandbox_validator import DockerSandboxValidator + validator = DockerSandboxValidator() + if not validator.check_availability(): + raise BadRequestRockError( + "Docker is not available. Please ensure Docker daemon is running." + ) +``` + +### 4.3 Keep Existing Validation + +The existing `DockerDeployment.start()` check should be kept as a defense-in-depth measure. Docker could become unavailable between the manager-level check and the actor executing `start()`. The two checks serve different purposes: + +- **Manager-level check**: Fail-fast, avoid resource waste +- **Deployment-level check**: Runtime safety, handles edge cases diff --git a/docs/versioned_docs/version-1.0.x/Release Notes/v1.0.4.md b/docs/versioned_docs/version-1.0.x/Release Notes/v1.0.4.md index 0b45800f2..edd2d08ac 100644 --- a/docs/versioned_docs/version-1.0.x/Release Notes/v1.0.4.md +++ b/docs/versioned_docs/version-1.0.x/Release Notes/v1.0.4.md @@ -1,4 +1,4 @@ -# v1.0.2 +# v1.0.4 ## Release Date February 4, 2026 diff --git a/docs/versioned_docs/version-1.2.x/Release Notes/index.md b/docs/versioned_docs/version-1.2.x/Release Notes/index.md index bf2c85834..b7058a640 100644 --- a/docs/versioned_docs/version-1.2.x/Release Notes/index.md +++ b/docs/versioned_docs/version-1.2.x/Release Notes/index.md @@ -2,5 +2,6 @@ sidebar_position: 1 --- # Release Notes -* [release v1.2.0](v1.2.0.md) +* [release v1.2.5](v1.2.5.md) * [release v1.2.1](v1.2.1.md) +* [release v1.2.0](v1.2.0.md) diff --git a/docs/versioned_docs/version-1.2.x/Release Notes/v1.2.5.md b/docs/versioned_docs/version-1.2.x/Release Notes/v1.2.5.md new file mode 100644 index 000000000..f8693ce1c --- /dev/null +++ b/docs/versioned_docs/version-1.2.x/Release Notes/v1.2.5.md @@ -0,0 +1,53 @@ +# v1.2.5 + +## Release Date +February 28, 2026 + +## Overview +This release focuses on monitoring and metrics enhancements, adding support for custom OTLP endpoints, user-defined metric tags, and Aliyun MSE Nacos connectivity. + +--- + +## Admin + +### New Features + +#### Custom Metrics Endpoint +- **NEW**: Support specifying a custom OTLP metrics endpoint via `metrics_endpoint` in `RuntimeConfig`. Previously the endpoint was hardcoded to `http://{host}:{port}/v1/metrics`; now it can be configured to any OTLP-compatible collector. + +**Configuration Example:** +```yaml +runtime: + metrics_endpoint: "http://my-otel-collector:4317/v1/metrics" +``` + +#### User-Defined Metric Tags +- **NEW**: Support adding custom tags to all metrics via `user_defined_tags` in `RuntimeConfig`. These tags are merged into the base attributes of every metric emitted by Admin, SandboxProxyService, and SandboxActor. + +**Configuration Example:** +```yaml +runtime: + user_defined_tags: + cluster: "prod-cluster" + region: "cn-hz" + service: "rock-sandbox" +``` + +#### Metrics IP Tag +- **NEW**: Added `ip` field to metrics base attributes for better instance identification in monitoring dashboards. + +#### Actor Metrics Endpoint +- **NEW**: Exposed `metrics_endpoint` configuration to `SandboxActor`, allowing Ray actors to report metrics to custom OTLP endpoints. The endpoint is propagated from `RuntimeConfig` to actors during sandbox creation. + +### Enhancements + +#### Nacos Configuration +- **NEW**: Added `server_addresses` field to `NacosConfig`, enabling direct connection to Nacos servers (e.g., Aliyun MSE) without relying solely on the `endpoint` parameter. + +**Configuration Example:** +```yaml +nacos: + server_addresses: "mse-xxx.nacos.mse.aliyuncs.com:8848" + group: "DEFAULT_GROUP" + data_id: "rock-config" +``` diff --git a/examples/agents/swe_agent/rock_agent_config.yaml b/examples/agents/swe_agent/rock_agent_config.yaml index 4ee112e8e..005cf8e02 100644 --- a/examples/agents/swe_agent/rock_agent_config.yaml +++ b/examples/agents/swe_agent/rock_agent_config.yaml @@ -12,7 +12,7 @@ run_cmd: > --agent.model.api_base --agent.model.api_key # replace ; keep others as-is -# if repo not at workspace root: --env.repo.type preexisting --env.repo.repo_name testbed +# if repo at workspace root, change env.repo's config like: --env.repo.type preexisting --env.repo.repo_name testbed # Python Agent often meets python conflicts, so we need to avoid add RuntimeEnv to PATH in agent_run skip_wrap_run_cmd: true diff --git a/pyproject.toml b/pyproject.toml index 74f874a63..739f26f68 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,8 @@ admin = [ "pip", "cryptography==39.0.1", "fakeredis[json]", + "kubernetes>=35.0.0", # For K8S operator support + "aiolimiter>=1.2.1", # For rate limiting ] rocklet = [ diff --git a/requirements_admin.txt b/requirements_admin.txt index 0021aaeed..379fcc1fb 100644 --- a/requirements_admin.txt +++ b/requirements_admin.txt @@ -17,6 +17,8 @@ aiohttp==3.13.2 # rl-rock aiohttp-cors==0.8.1 # via ray +aiolimiter==1.2.1 + # via rl-rock aiosignal==1.4.0 # via # aiohttp @@ -135,6 +137,7 @@ certifi==2025.10.5 # via # httpcore # httpx + # kubernetes # requests cffi==2.0.0 # via cryptography @@ -176,8 +179,12 @@ distlib==0.4.0 # via virtualenv drawsvg==2.4.0 # via arckit +durationpy==0.10 + # via kubernetes exceptiongroup==1.3.0 ; python_full_version < '3.11' # via anyio +fakeredis==2.33.0 + # via rl-rock fastapi==0.121.0 # via rl-rock filelock==3.20.0 @@ -239,12 +246,16 @@ jmespath==0.10.0 # botocore joblib==1.5.2 # via nltk +jsonpath-ng==1.7.0 + # via fakeredis jsonschema==4.25.1 # via ray jsonschema-specifications==2025.9.1 # via jsonschema kiwisolver==1.4.9 # via matplotlib +kubernetes==35.0.0 + # via rl-rock latex2sympy2-extended==1.10.2 # via math-verify magiccube==0.3.0 @@ -285,6 +296,8 @@ numpy==2.3.4 ; python_full_version >= '3.11' # contourpy # magiccube # matplotlib +oauthlib==3.3.1 + # via requests-oauthlib opencensus==0.11.4 # via ray opencensus-context==0.1.3 @@ -337,6 +350,8 @@ pip==25.3 # via rl-rock platformdirs==4.5.0 # via virtualenv +ply==3.11 + # via jsonpath-ng prometheus-client==0.23.1 # via # opentelemetry-exporter-prometheus @@ -397,6 +412,7 @@ pyproject-hooks==1.2.0 python-dateutil==2.9.0.post0 # via # botocore + # kubernetes # matplotlib python-multipart==0.0.20 # via rl-rock @@ -404,6 +420,7 @@ pytz==2025.2 # via reasoning-gym pyyaml==6.0.3 # via + # kubernetes # ray # reasoning-gym # rl-rock @@ -412,7 +429,9 @@ ray==2.43.0 reasoning-gym==0.1.23 # via gem-llm redis==7.0.1 - # via rl-rock + # via + # fakeredis + # rl-rock referencing==0.37.0 # via # jsonschema @@ -424,10 +443,14 @@ requests==2.32.5 # alibabacloud-tea # darabonba-core # google-api-core + # kubernetes # opentelemetry-exporter-otlp-proto-http # oss2 # ray + # requests-oauthlib # rl-rock +requests-oauthlib==2.0.0 + # via kubernetes rich==14.2.0 # via # arckit @@ -444,6 +467,7 @@ setuptools==80.9.0 # via incremental six==1.17.0 # via + # kubernetes # opencensus # oss2 # python-dateutil @@ -452,6 +476,8 @@ smart-open==7.4.4 # via ray sniffio==1.3.1 # via anyio +sortedcontainers==2.4.0 + # via fakeredis sqlalchemy==2.0.44 # via sqlmodel sqlmodel==0.0.27 @@ -478,6 +504,7 @@ typing-extensions==4.15.0 # aiosqlite # anyio # exceptiongroup + # fakeredis # fastapi # grpcio # multidict @@ -506,6 +533,7 @@ tzlocal==5.3.1 urllib3==2.5.0 # via # botocore + # kubernetes # requests uuid==1.30 # via rl-rock @@ -513,6 +541,8 @@ uvicorn==0.38.0 # via rl-rock virtualenv==20.35.4 # via ray +websocket-client==1.9.0 + # via kubernetes websockets==15.0.1 # via rl-rock wrapt==2.0.0 diff --git a/rock-conf/rock-dev.yml b/rock-conf/rock-dev.yml new file mode 100644 index 000000000..2014d5673 --- /dev/null +++ b/rock-conf/rock-dev.yml @@ -0,0 +1,50 @@ +# K8S deployment configuration +k8s: + # Kubeconfig path (None for in-cluster config) + kubeconfig_path: "" + + # Global namespace for all sandboxes + namespace: "rock" + + # API client configuration + # Rate limiting + api_qps: 20.0 + + # Watch configuration + watch_timeout_seconds: 60 + watch_reconnect_delay_seconds: 5 + + # Template definitions - corresponds to spec.template in BatchSandbox CRD + # Top-level fields (apiVersion, kind, metadata, spec.replicas) are hardcoded in code + templates: + # Default template - basic pod template + default: + # Enable resource speedup label when building BatchSandbox CR + enable_resource_speedup: true + # Port configuration for the sandbox service + ports: + proxy: 8000 # HTTP API port (rocklet/swerex-remote) + server: 8080 # WebSocket server port (same service) + ssh: 22 # SSH service port + # Pod template (corresponds to spec.template in BatchSandbox) + template: + metadata: + labels: + example.app: rock-sandbox + spec: + tolerations: + - operator: "Exists" + containers: + - name: main + image: python:3.11 + command: + - "/bin/sh" + - "-c" + - | + pip install rl-rock[rocklet] + rocklet + +# Runtime configuration +runtime: + # Operator type: 'ray' or 'k8s' + operator_type: "k8s" \ No newline at end of file diff --git a/rock/admin/main.py b/rock/admin/main.py index 0ad079fd7..2cb242833 100644 --- a/rock/admin/main.py +++ b/rock/admin/main.py @@ -18,7 +18,7 @@ from rock.admin.entrypoints.sandbox_proxy_api import sandbox_proxy_router, set_sandbox_proxy_service from rock.admin.entrypoints.warmup_api import set_warmup_service, warmup_router from rock.admin.gem.api import gem_router, set_env_service -from rock.admin.scheduler.scheduler import SchedulerProcess +from rock.admin.scheduler.scheduler import SchedulerThread from rock.config import RockConfig from rock.logger import init_logger from rock.sandbox.gem_manager import GemManager @@ -52,7 +52,7 @@ async def lifespan(app: FastAPI): env_vars.ROCK_ADMIN_ROLE = args.role # init redis provider - if args.env in ["local", "test"]: + if args.env in ["local", "test", "dev"]: from fakeredis import aioredis redis_provider = RedisProvider(host=None, port=None, password="") @@ -65,8 +65,8 @@ async def lifespan(app: FastAPI): ) await redis_provider.init_pool() - # init scheduler process - scheduler_process = None + # init scheduler thread + scheduler_thread = None # init sandbox service if args.role == "admin": @@ -79,6 +79,7 @@ async def lifespan(app: FastAPI): runtime_config=rock_config.runtime, ray_service=ray_service, nacos_provider=rock_config.nacos_provider, + k8s_config=rock_config.k8s, ) operator = OperatorFactory.create_operator(operator_context) @@ -108,15 +109,13 @@ async def lifespan(app: FastAPI): set_env_service(sandbox_manager) if rock_config.scheduler.enabled and is_primary_pod(): - scheduler_process = SchedulerProcess( + scheduler_thread = SchedulerThread( scheduler_config=rock_config.scheduler, - ray_address=rock_config.ray.address, - ray_namespace=rock_config.ray.namespace, ) - scheduler_process.start() - logger.info("Scheduler process started on primary pod") + scheduler_thread.start() + logger.info("Scheduler thread started on primary pod") elif rock_config.scheduler.enabled: - logger.info("Scheduler process skipped on non-primary pod") + logger.info("Scheduler thread skipped on non-primary pod") else: sandbox_manager = SandboxProxyService(rock_config=rock_config, redis_provider=redis_provider) @@ -126,10 +125,10 @@ async def lifespan(app: FastAPI): yield - # stop scheduler process - if scheduler_process: - scheduler_process.stop() - logger.info("Scheduler process stopped") + # stop scheduler thread + if scheduler_thread: + scheduler_thread.stop() + logger.info("Scheduler thread stopped") if redis_provider: await redis_provider.close_pool() diff --git a/rock/admin/proto/request.py b/rock/admin/proto/request.py index acbb75d70..eb54b96e6 100644 --- a/rock/admin/proto/request.py +++ b/rock/admin/proto/request.py @@ -27,6 +27,10 @@ class SandboxStartRequest(BaseModel): """The amount of CPUs to allocate for the container.""" sandbox_id: str | None = Field(default=None) """The id of the sandbox.""" + registry_username: str | None = None + """Username for Docker registry authentication. When both username and password are provided, docker login will be performed before pulling the image.""" + registry_password: str | None = None + """Password for Docker registry authentication. When both username and password are provided, docker login will be performed before pulling the image.""" class SandboxCommand(Command): diff --git a/rock/admin/scheduler/scheduler.py b/rock/admin/scheduler/scheduler.py index fbf328950..d890b800c 100644 --- a/rock/admin/scheduler/scheduler.py +++ b/rock/admin/scheduler/scheduler.py @@ -1,10 +1,8 @@ # rock/admin/scheduler/scheduler.py import asyncio -import multiprocessing as mp -import signal +import threading import time from datetime import datetime, timedelta -from multiprocessing import Process import pytz import ray @@ -23,9 +21,7 @@ class WorkerIPCache: """Manages Ray worker IP caching with TTL-based expiration.""" - def __init__(self, ray_address: str, ray_namespace: str, cache_ttl: int = 60): - self.ray_address = ray_address - self.ray_namespace = ray_namespace + def __init__(self, cache_ttl: int = 60): self.cache_ttl = cache_ttl self._cached_ips: list[str] = [] self._cache_time: float = 0.0 @@ -35,30 +31,16 @@ def _is_cache_expired(self) -> bool: return (time.time() - self._cache_time) > self.cache_ttl def _fetch_worker_ips_from_ray(self) -> list[str]: - """Connect to Ray and fetch alive worker IPs.""" + """Fetch alive worker IPs from the already-initialized Ray cluster.""" logger.info("Refreshing worker IP cache from Ray cluster") - - should_shutdown = False - if not ray.is_initialized(): - logger.info(f"Ray start init with address[{self.ray_address}] and namespace[{self.ray_namespace}]") - ray.init(address=self.ray_address, namespace=self.ray_namespace) - should_shutdown = True - else: - logger.info("Ray has already initialized") - - try: - nodes = ray.nodes() - alive_ips = [] - for node in nodes: - if node.get("Alive", False) and node.get("Resources", {}).get("CPU", 0) > 0: - ip = node.get("NodeManagerAddress", "").split(":")[0] - if ip: - alive_ips.append(ip) - return alive_ips - finally: - if should_shutdown: - ray.shutdown() - logger.debug("Ray connection closed after fetching worker IPs") + nodes = ray.nodes() + alive_ips = [] + for node in nodes: + if node.get("Alive", False) and node.get("Resources", {}).get("CPU", 0) > 0: + ip = node.get("NodeManagerAddress", "").split(":")[0] + if ip: + alive_ips.append(ip) + return alive_ips def refresh(self) -> list[str]: """Force refresh the worker IP cache.""" @@ -69,11 +51,6 @@ def refresh(self) -> list[str]: return self._cached_ips except Exception as e: logger.error(f"Failed to refresh worker cache: {e}") - if ray.is_initialized(): - try: - ray.shutdown() - except Exception: - pass return self._cached_ips def get_alive_workers(self, force_refresh: bool = False) -> list[str]: @@ -90,25 +67,17 @@ def get_alive_workers(self, force_refresh: bool = False) -> list[str]: class TaskScheduler: """Manages task scheduling using APScheduler.""" - def __init__( - self, - scheduler_config: SchedulerConfig, - ray_address: str, - ray_namespace: str, - ): + def __init__(self, scheduler_config: SchedulerConfig): self.scheduler_config = scheduler_config - self.ray_address = ray_address - self.ray_namespace = ray_namespace self.local_tz = pytz.timezone(env_vars.ROCK_TIME_ZONE) self._scheduler: AsyncIOScheduler | None = None self._stop_event: asyncio.Event | None = None self._worker_cache: WorkerIPCache | None = None + self._loop: asyncio.AbstractEventLoop | None = None def _init_worker_cache(self) -> None: """Initialize the worker IP cache.""" self._worker_cache = WorkerIPCache( - ray_address=self.ray_address, - ray_namespace=self.ray_namespace, cache_ttl=self.scheduler_config.worker_cache_ttl, ) @@ -145,17 +114,6 @@ def _add_jobs(self) -> None: ) logger.info(f"Added job '{task.type}' with interval {task.interval_seconds}s") - def _setup_signal_handlers(self) -> None: - """Setup signal handlers for graceful shutdown.""" - - def signal_handler(signum, frame): - logger.info("Received signal, shutting down scheduler") - if self._stop_event: - self._stop_event.set() - - signal.signal(signal.SIGTERM, signal_handler) - signal.signal(signal.SIGINT, signal_handler) - async def run(self) -> None: """Run the scheduler until stopped.""" self._init_worker_cache() @@ -171,7 +129,7 @@ async def run(self) -> None: logger.info("Scheduler started") self._stop_event = asyncio.Event() - self._setup_signal_handlers() + self._loop = asyncio.get_event_loop() try: await self._stop_event.wait() @@ -181,53 +139,50 @@ async def run(self) -> None: self._scheduler.shutdown(wait=False) logger.info("Scheduler stopped") -class SchedulerProcess: - """Scheduler process manager - runs APScheduler in a separate process.""" + def stop(self) -> None: + """Thread-safe stop: signal the scheduler to shut down.""" + if self._stop_event and self._loop: + self._loop.call_soon_threadsafe(self._stop_event.set) + - def __init__(self, scheduler_config: SchedulerConfig, ray_address: str, ray_namespace: str): +class SchedulerThread: + """Scheduler thread manager - runs APScheduler in a daemon thread with its own event loop.""" + + def __init__(self, scheduler_config: SchedulerConfig): self.scheduler_config = scheduler_config - self.ray_address = ray_address - self.ray_namespace = ray_namespace - self._process: Process | None = None - self._ctx = mp.get_context("spawn") - - @staticmethod - def _run_scheduler_in_process( - scheduler_config: SchedulerConfig, - ray_address: str, - ray_namespace: str, - ) -> None: - """Entry point for running scheduler in a separate process.""" + self._thread: threading.Thread | None = None + self._task_scheduler: TaskScheduler | None = None + + def _run_scheduler_in_thread(self) -> None: + """Entry point for running scheduler in a thread with a dedicated event loop.""" try: - task_scheduler = TaskScheduler(scheduler_config, ray_address, ray_namespace) - asyncio.run(task_scheduler.run()) - except (KeyboardInterrupt, SystemExit): - logger.info("Scheduler process interrupted") + self._task_scheduler = TaskScheduler(self.scheduler_config) + asyncio.run(self._task_scheduler.run()) + except Exception: + logger.exception("Scheduler thread encountered an error") def start(self) -> None: - """Start the scheduler process.""" - if self._process and self._process.is_alive(): - logger.warning("Scheduler process is already running") + """Start the scheduler thread.""" + if self._thread and self._thread.is_alive(): + logger.warning("Scheduler thread is already running") return - self._process = self._ctx.Process( - target=self._run_scheduler_in_process, - args=(self.scheduler_config, self.ray_address, self.ray_namespace), + self._thread = threading.Thread( + target=self._run_scheduler_in_thread, + name="scheduler-thread", daemon=True, ) - self._process.start() - logger.info(f"Scheduler process started with PID: {self._process.pid}") + self._thread.start() + logger.info("Scheduler thread started") def stop(self) -> None: - """Stop the scheduler process.""" - if self._process and self._process.is_alive(): - self._process.terminate() - self._process.join(timeout=5) - if self._process.is_alive(): - self._process.kill() - self._process.join(timeout=2) - logger.info("Scheduler process stopped") + """Stop the scheduler thread gracefully.""" + if self._task_scheduler: + self._task_scheduler.stop() + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=5) + logger.info("Scheduler thread stopped") def is_alive(self) -> bool: - """Check if the scheduler process is alive.""" - return self._process is not None and self._process.is_alive() + """Check if the scheduler thread is alive.""" + return self._thread is not None and self._thread.is_alive() diff --git a/rock/config.py b/rock/config.py index ad67170ac..50db6062f 100644 --- a/rock/config.py +++ b/rock/config.py @@ -105,6 +105,21 @@ def __post_init__(self) -> None: self.tasks = [TaskConfig(**task) for task in self.tasks] +@dataclass +class K8sConfig: + """Kubernetes configuration for K8s operator.""" + kubeconfig_path: str | None = None + namespace: str = 'rock' + templates: dict[str, dict] = field(default_factory=dict) + + # API client rate limiting + api_qps: float = 20.0 # Queries per second + + # Watch configuration + watch_timeout_seconds: int = 60 # Watch timeout before reconnect + watch_reconnect_delay_seconds: int = 5 # Delay after watch failure + + @dataclass class RuntimeConfig: enable_auto_clear: bool = False @@ -141,6 +156,7 @@ def __post_init__(self) -> None: @dataclass class RockConfig: ray: RayConfig = field(default_factory=RayConfig) + k8s: K8sConfig = field(default_factory=K8sConfig) warmup: WarmupConfig = field(default_factory=WarmupConfig) nacos: NacosConfig = field(default_factory=NacosConfig) redis: RedisConfig = field(default_factory=RedisConfig) @@ -172,6 +188,8 @@ def from_env(cls, config_path: str | None = None): kwargs = {} if "ray" in config: kwargs["ray"] = RayConfig(**config["ray"]) + if "k8s" in config: + kwargs["k8s"] = K8sConfig(**config["k8s"]) if "warmup" in config: kwargs["warmup"] = WarmupConfig(**config["warmup"]) if "nacos" in config: diff --git a/rock/deployments/config.py b/rock/deployments/config.py index e7ee384c1..fcf79cf0c 100644 --- a/rock/deployments/config.py +++ b/rock/deployments/config.py @@ -55,6 +55,8 @@ class DockerDeploymentConfig(DeploymentConfig): image: str = "python:3.11" """Docker image name to use for the container.""" + image_os: str = "linux" + port: int | None = None """Port number for container communication. If None, an available port will be automatically assigned.""" @@ -107,9 +109,18 @@ class DockerDeploymentConfig(DeploymentConfig): actor_resource_num: float = 1 """Number of actor resources to allocate (to be refined).""" + registry_username: str | None = None + """Username for Docker registry authentication. When both username and password are provided, docker login will be performed before pulling the image.""" + + registry_password: str | None = None + """Password for Docker registry authentication. When both username and password are provided, docker login will be performed before pulling the image.""" + runtime_config: RuntimeConfig = Field(default_factory=RuntimeConfig) """Runtime configuration settings.""" + extended_params: dict[str, str] = Field(default_factory=dict) + """Generic extension field for storing custom string key-value pairs.""" + @model_validator(mode="before") def validate_platform_args(cls, data: dict) -> dict: """Validate and extract platform arguments from docker_args. diff --git a/rock/deployments/docker.py b/rock/deployments/docker.py index 0cb8c98b6..bdfd7ce28 100644 --- a/rock/deployments/docker.py +++ b/rock/deployments/docker.py @@ -18,6 +18,7 @@ from rock.deployments.config import DockerDeploymentConfig from rock.deployments.constants import Port, Status from rock.deployments.hooks.abstract import CombinedDeploymentHook, DeploymentHook +from rock.deployments.hooks.docker_login import DockerLoginHook from rock.deployments.runtime_env import DockerRuntimeEnv, LocalRuntimeEnv, PipRuntimeEnv, UvRuntimeEnv from rock.deployments.sandbox_validator import DockerSandboxValidator from rock.deployments.status import PersistedServiceStatus, ServiceStatus @@ -76,6 +77,11 @@ def __init__( else: raise Exception(f"Invalid ROCK_WORKER_ENV_TYPE: {env_vars.ROCK_WORKER_ENV_TYPE}") + if self._config.registry_username is not None and self._config.registry_password is not None: + self.add_hook( + DockerLoginHook(self._config.image, self._config.registry_username, self._config.registry_password) + ) + self.sandbox_validator: DockerSandboxValidator | None = DockerSandboxValidator() def add_hook(self, hook: DeploymentHook): diff --git a/rock/deployments/hooks/docker_login.py b/rock/deployments/hooks/docker_login.py new file mode 100644 index 000000000..d14b07ea6 --- /dev/null +++ b/rock/deployments/hooks/docker_login.py @@ -0,0 +1,37 @@ +import asyncio + +from rock.deployments.hooks.abstract import DeploymentHook +from rock.logger import init_logger +from rock.utils import DockerUtil, ImageUtil + +logger = init_logger(__name__) + + +class DockerLoginHook(DeploymentHook): + """Hook that performs Docker registry authentication before pulling images. + + When triggered by the "Pulling docker image" step, this hook parses the + registry from the image name and logs in using the provided credentials. + """ + + _PULL_STEP_MESSAGE = "Pulling docker image" + + def __init__(self, image: str, username: str, password: str): + self._image = image + self._username = username + self._password = password + + def on_custom_step(self, message: str): + if message != self._PULL_STEP_MESSAGE: + return + + loop = asyncio.new_event_loop() + try: + registry, _ = loop.run_until_complete(ImageUtil.parse_registry_and_others(self._image)) + finally: + loop.close() + if registry: + logger.info(f"Authenticating to registry {registry!r} before pulling image") + DockerUtil.login(registry, self._username, self._password) + else: + logger.warning(f"No registry found in image name {self._image!r}, skipping docker login") diff --git a/rock/env_vars.py b/rock/env_vars.py index 9ac4da3f8..be1f98899 100644 --- a/rock/env_vars.py +++ b/rock/env_vars.py @@ -66,7 +66,7 @@ "ROCK_LOGGING_PATH": lambda: os.getenv("ROCK_LOGGING_PATH"), "ROCK_LOGGING_FILE_NAME": lambda: os.getenv("ROCK_LOGGING_FILE_NAME", "rocklet.log"), "ROCK_LOGGING_LEVEL": lambda: os.getenv("ROCK_LOGGING_LEVEL", "INFO"), - "ROCK_SERVICE_STATUS_DIR": lambda: os.getenv("ROCK_SERVICE_STATUS_DIR", "/data/service_status"), + "ROCK_SERVICE_STATUS_DIR": lambda: os.getenv("ROCK_SERVICE_STATUS_DIR", "/tmp"), "ROCK_SCHEDULER_STATUS_DIR": lambda: os.getenv("ROCK_SCHEDULER_STATUS_DIR", "/data/scheduler_status"), "ROCK_CONFIG": lambda: os.getenv("ROCK_CONFIG"), "ROCK_CONFIG_DIR_NAME": lambda: os.getenv("ROCK_CONFIG_DIR_NAME", "rock-conf"), @@ -107,7 +107,7 @@ ), "ROCK_RTENV_PYTHON_V31212_INSTALL_CMD": lambda: os.getenv( "ROCK_RTENV_PYTHON_V31212_INSTALL_CMD", - "[ -f cpython-3.12.12.tar.gz ] && rm cpython-3.12.12.tar.gz; [ -d python ] && rm -rf python; wget -q -O cpython-3.12.12.tar.gz https://github.com/astral-sh/python-build-standalone/releases/download/20251217/cpython-3.12.12+20251217-x86_64-unknown-linux-gnu-install_only.tar.gz && tar -xzf cpython-3.12.12.tar.gz && mv python runtime-env", + "[ -f cpython-31212.tar.gz ] && rm cpython-31212.tar.gz; [ -d python ] && rm -rf python; wget -q -O cpython-31212.tar.gz https://github.com/astral-sh/python-build-standalone/releases/download/20251217/cpython-3.12.12+20251217-x86_64-unknown-linux-gnu-install_only.tar.gz && tar -xzf cpython-31212.tar.gz && mv python runtime-env", ), "ROCK_RTENV_NODE_V22180_INSTALL_CMD": lambda: os.getenv( "ROCK_RTENV_NODE_V22180_INSTALL_CMD", diff --git a/rock/sandbox/operator/factory.py b/rock/sandbox/operator/factory.py index 92e274c24..38ea810ba 100644 --- a/rock/sandbox/operator/factory.py +++ b/rock/sandbox/operator/factory.py @@ -4,9 +4,10 @@ from typing import Any from rock.admin.core.ray_service import RayService -from rock.config import RuntimeConfig +from rock.config import RuntimeConfig, K8sConfig from rock.logger import init_logger from rock.sandbox.operator.abstract import AbstractOperator +from rock.sandbox.operator.k8s.operator import K8sOperator from rock.sandbox.operator.ray import RayOperator from rock.utils.providers.nacos_provider import NacosConfigProvider @@ -24,10 +25,10 @@ class OperatorContext: runtime_config: RuntimeConfig ray_service: RayService | None = None + # K8s operator dependencies + k8s_config: K8sConfig | None = None nacos_provider: NacosConfigProvider | None = None # Future operator dependencies can be added here without breaking existing code - # kubernetes_client: Any | None = None - # docker_client: Any | None = None extra_params: dict[str, Any] = field(default_factory=dict) @@ -61,5 +62,10 @@ def create_operator(context: OperatorContext) -> AbstractOperator: if context.nacos_provider is not None: ray_operator.set_nacos_provider(context.nacos_provider) return ray_operator + elif operator_type == "k8s": + if context.k8s_config is None: + raise ValueError("K8sConfig is required for K8sOperator") + logger.info("Creating K8sOperator") + return K8sOperator(k8s_config=context.k8s_config) else: raise ValueError(f"Unsupported operator type: {operator_type}. " f"Supported types: ray, kubernetes") diff --git a/rock/sandbox/operator/k8s/__init__.py b/rock/sandbox/operator/k8s/__init__.py new file mode 100644 index 000000000..fb3e68474 --- /dev/null +++ b/rock/sandbox/operator/k8s/__init__.py @@ -0,0 +1,14 @@ +"""K8S Operator implementation and related components.""" + +from rock.sandbox.operator.k8s.constants import K8sConstants +from rock.sandbox.operator.k8s.operator import K8sOperator +from rock.sandbox.operator.k8s.provider import BatchSandboxProvider, K8sProvider +from rock.sandbox.operator.k8s.template_loader import K8sTemplateLoader + +__all__ = [ + "K8sConstants", + "K8sOperator", + "K8sProvider", + "BatchSandboxProvider", + "K8sTemplateLoader", +] diff --git a/rock/sandbox/operator/k8s/api_client.py b/rock/sandbox/operator/k8s/api_client.py new file mode 100644 index 000000000..200e9efa3 --- /dev/null +++ b/rock/sandbox/operator/k8s/api_client.py @@ -0,0 +1,253 @@ +"""K8s API Client with rate limiting and local cache. + +This module provides a wrapper around Kubernetes CustomObjectsApi with: +- Rate limiting using aiolimiter (configurable QPS) +- Local cache with watch-based sync (Informer pattern) +- Consistent error handling +- Simple CRUD interface for K8s CR operations + +The rate limiter uses token bucket algorithm to prevent API Server overload. +The Informer pattern reduces API Server load by maintaining a local cache. +""" + +import asyncio +from typing import Any + +from aiolimiter import AsyncLimiter +from kubernetes import client, watch + +from rock.logger import init_logger + + +logger = init_logger(__name__) + + +class K8sApiClient: + """K8s API client wrapper with rate limiting and Informer cache. + + Centralizes K8s API Server access with: + - Rate limiting via aiolimiter to prevent API Server overload + - Local cache with watch-based sync (Informer pattern) to reduce API calls + - Consistent error handling + - Simple CRUD interface for K8s custom resources + """ + + def __init__( + self, + api_client: client.ApiClient, + group: str, + version: str, + plural: str, + namespace: str, + qps: float = 5.0, + watch_timeout_seconds: int = 60, + watch_reconnect_delay_seconds: int = 5, + ): + """Initialize K8s API client. + + Args: + api_client: Kubernetes ApiClient instance + group: CRD API group + version: CRD API version + plural: CRD resource plural name + namespace: Namespace for operations + qps: Queries per second limit (default: 5 for small clusters) + watch_timeout_seconds: Watch timeout before reconnect (default: 60) + watch_reconnect_delay_seconds: Delay after watch failure (default: 5) + """ + self._api_client = api_client + self._group = group + self._version = version + self._plural = plural + self._namespace = namespace + self._custom_api = client.CustomObjectsApi(api_client) + + # Rate limiting + self._rate_limiter = AsyncLimiter(max_rate=qps, time_period=1.0) + + # Watch configuration + self._watch_timeout_seconds = watch_timeout_seconds + self._watch_reconnect_delay_seconds = watch_reconnect_delay_seconds + + # Local cache for resources (Informer pattern) + self._cache: dict[str, dict] = {} + self._cache_lock = asyncio.Lock() + self._watch_task = None + self._initialized = False + + async def start(self): + """Start the API client and initialize cache watch.""" + if self._initialized: + return + + self._watch_task = asyncio.create_task(self._watch_resources()) + self._initialized = True + logger.info(f"Started K8sApiClient watch for {self._plural} in namespace {self._namespace}") + + async def _list_and_sync_cache(self) -> str: + """List all resources and sync to cache. + + Returns: + resourceVersion for next watch + """ + async with self._rate_limiter: + resources = await asyncio.to_thread( + self._custom_api.list_namespaced_custom_object, + group=self._group, + version=self._version, + namespace=self._namespace, + plural=self._plural, + ) + + resource_version = resources.get('metadata', {}).get('resourceVersion') + async with self._cache_lock: + self._cache.clear() + for item in resources.get('items', []): + name = item.get('metadata', {}).get('name') + if name: + self._cache[name] = item + return resource_version + + async def _watch_resources(self): + """Background task to watch resources and maintain cache. + + Implements Kubernetes Informer pattern: + 1. Initial list-and-sync to populate cache + 2. Continuous watch for ADDED/MODIFIED/DELETED events + 3. Auto-reconnect on watch timeout or network failures + 4. Re-sync on reconnect to avoid event loss + """ + resource_version = None + try: + resource_version = await self._list_and_sync_cache() + logger.info(f"Initial cache populated with {len(self._cache)} resources, resourceVersion={resource_version}") + except Exception as e: + logger.error(f"Failed to populate initial cache: {e}") + + while True: + try: + def _watch_in_thread(): + w = watch.Watch() + stream = w.stream( + self._custom_api.list_namespaced_custom_object, + group=self._group, + version=self._version, + namespace=self._namespace, + plural=self._plural, + resource_version=resource_version, + timeout_seconds=self._watch_timeout_seconds, + ) + events = [] + for event in stream: + events.append(event) + return events + + events = await asyncio.to_thread(_watch_in_thread) + + async with self._cache_lock: + for event in events: + event_type = event['type'] + obj = event['object'] + name = obj.get('metadata', {}).get('name') + new_rv = obj.get('metadata', {}).get('resourceVersion') + + if new_rv: + resource_version = new_rv + + if not name: + continue + + if event_type in ['ADDED', 'MODIFIED']: + self._cache[name] = obj + elif event_type == 'DELETED': + self._cache.pop(name, None) + + except asyncio.CancelledError: + logger.info("Watch task cancelled") + raise + except Exception as e: + logger.warning(f"Watch stream disconnected: {e}, reconnecting immediately...") + try: + resource_version = await self._list_and_sync_cache() + logger.info(f"Re-synced cache with {len(self._cache)} resources, resourceVersion={resource_version}") + except Exception as list_err: + logger.error(f"Failed to re-list resources: {list_err}, retrying in {self._watch_reconnect_delay_seconds}s...") + await asyncio.sleep(self._watch_reconnect_delay_seconds) + + async def create_custom_object( + self, + body: dict[str, Any], + ) -> dict[str, Any]: + """Create a custom resource. + + Args: + body: Resource manifest + + Returns: + Created resource + """ + async with self._rate_limiter: + return await asyncio.to_thread( + self._custom_api.create_namespaced_custom_object, + group=self._group, + version=self._version, + namespace=self._namespace, + plural=self._plural, + body=body, + ) + + async def get_custom_object( + self, + name: str, + ) -> dict[str, Any]: + """Get a custom resource (from cache with fallback to API Server). + + Args: + name: Resource name + + Returns: + Resource object + """ + async with self._cache_lock: + resource = self._cache.get(name) + + if resource: + return resource + + logger.debug(f"Cache miss for {name}, querying API Server") + async with self._rate_limiter: + resource = await asyncio.to_thread( + self._custom_api.get_namespaced_custom_object, + group=self._group, + version=self._version, + namespace=self._namespace, + plural=self._plural, + name=name, + ) + + async with self._cache_lock: + self._cache[name] = resource + + return resource + + async def delete_custom_object( + self, + name: str, + ) -> dict[str, Any]: + """Delete a custom resource. + + Args: + name: Resource name + + Returns: + Delete status + """ + async with self._rate_limiter: + return await asyncio.to_thread( + self._custom_api.delete_namespaced_custom_object, + group=self._group, + version=self._version, + namespace=self._namespace, + plural=self._plural, + name=name, + ) diff --git a/rock/sandbox/operator/k8s/constants.py b/rock/sandbox/operator/k8s/constants.py new file mode 100644 index 000000000..f30c715ad --- /dev/null +++ b/rock/sandbox/operator/k8s/constants.py @@ -0,0 +1,25 @@ +"""K8S operator constants for labels and annotations.""" + + +class K8sConstants: + """Constants for K8S BatchSandbox labels and annotations.""" + + # CRD configuration + CRD_GROUP = "sandbox.opensandbox.io" + CRD_VERSION = "v1alpha1" + CRD_PLURAL = "batchsandboxes" + CRD_KIND = "BatchSandbox" + CRD_API_VERSION = f"{CRD_GROUP}/{CRD_VERSION}" # sandbox.opensandbox.io/v1alpha1 + + # Annotation keys + ANNOTATION_ENDPOINTS = "sandbox.opensandbox.io/endpoints" + ANNOTATION_PORTS = "rock.sandbox/ports" + + # Label keys + LABEL_SANDBOX_ID = "rock.sandbox/sandbox-id" + LABEL_RESOURCE_SPEEDUP = "batchsandbox.alibabacloud.com/resource-speedup" + LABEL_TEMPLATE = "rock.sandbox/template" + + # Extension keys for DockerDeploymentConfig.extended_params + EXT_POOL_NAME = "pool_name" + EXT_TEMPLATE_NAME = "template_name" diff --git a/rock/sandbox/operator/k8s/operator.py b/rock/sandbox/operator/k8s/operator.py new file mode 100644 index 000000000..fbf02335d --- /dev/null +++ b/rock/sandbox/operator/k8s/operator.py @@ -0,0 +1,90 @@ +"""K8s Operator implementation for managing sandboxes via Kubernetes.""" + +from rock.config import K8sConfig +from rock.actions.sandbox.sandbox_info import SandboxInfo +from rock.deployments.config import DockerDeploymentConfig +from rock.logger import init_logger +from rock.sandbox.operator.abstract import AbstractOperator +from rock.sandbox.operator.k8s.provider import BatchSandboxProvider + +logger = init_logger(__name__) + + +class K8sOperator(AbstractOperator): + """Operator for managing sandboxes via Kubernetes BatchSandbox CRD.""" + + def __init__(self, k8s_config: K8sConfig, redis_provider=None): + """Initialize K8s operator. + + Args: + k8s_config: K8sConfig object containing kubeconfig and templates + redis_provider: Optional Redis provider for caching sandbox info + """ + self._provider = BatchSandboxProvider(k8s_config=k8s_config) + self._redis_provider = redis_provider + logger.info("Initialized K8sOperator") + + async def submit(self, config: DockerDeploymentConfig, user_info: dict = {}) -> SandboxInfo: + """Submit a new sandbox deployment to Kubernetes. + + Args: + config: Docker deployment configuration + user_info: User metadata (user_id, experiment_id, namespace, rock_authorization) + + Returns: + SandboxInfo with sandbox metadata + """ + return await self._provider.submit(config, user_info) + + async def get_status(self, sandbox_id: str) -> SandboxInfo: + """Get sandbox status with user info from Redis. + + This method first gets status from provider (IP, port_mapping, is_alive), + then merges it with user info from Redis if available. + + Args: + sandbox_id: Sandbox identifier + + Returns: + SandboxInfo with current status and user info + """ + # Get sandbox info from provider (includes is_alive check) + sandbox_info = await self._provider.get_status(sandbox_id) + + # Get user info from redis if available + if self._redis_provider: + redis_info = await self._get_sandbox_info_from_redis(sandbox_id) + if redis_info: + redis_info.update(sandbox_info) + return redis_info + + return sandbox_info + + async def _get_sandbox_info_from_redis(self, sandbox_id: str) -> dict | None: + """Get sandbox user info from Redis. + + Args: + sandbox_id: Sandbox identifier + + Returns: + Sandbox info dict from Redis or None if not found + """ + from rock.admin.core.redis_key import alive_sandbox_key + try: + sandbox_status = await self._redis_provider.json_get(alive_sandbox_key(sandbox_id), "$") + if sandbox_status and len(sandbox_status) > 0: + return sandbox_status[0] + except Exception as e: + logger.debug(f"Failed to get sandbox info from redis for {sandbox_id}: {e}") + return None + + async def stop(self, sandbox_id: str) -> bool: + """Stop and delete a sandbox. + + Args: + sandbox_id: Sandbox identifier + + Returns: + True if successful, False otherwise + """ + return await self._provider.stop(sandbox_id) diff --git a/rock/sandbox/operator/k8s/provider.py b/rock/sandbox/operator/k8s/provider.py new file mode 100644 index 000000000..04e5f8863 --- /dev/null +++ b/rock/sandbox/operator/k8s/provider.py @@ -0,0 +1,506 @@ +"""K8s provider implementations for managing sandbox resources.""" + +from abc import abstractmethod +from typing import Any, Optional, Protocol + +from kubernetes import client, config as k8s_config +import json +import asyncio + +from rock.config import K8sConfig +from rock.deployments.config import DeploymentConfig, DockerDeploymentConfig +from rock.deployments.constants import Port +from rock.sandbox.operator.k8s.constants import K8sConstants +from rock.sandbox.operator.k8s.api_client import K8sApiClient +from rock.sandbox.operator.k8s.template_loader import K8sTemplateLoader +from rock.sandbox.remote_sandbox import RemoteSandboxRuntime +from rock.actions.sandbox.config import RemoteSandboxRuntimeConfig +from rock.actions.sandbox.sandbox_info import SandboxInfo +from rock.actions.sandbox.response import State +from rock.logger import init_logger + + +logger = init_logger(__name__) + + +class K8sProvider(Protocol): + """Base K8s provider interface. + + This protocol defines the standard interface for K8s sandbox providers. + All provider implementations must support these three core operations: + - submit: Create and initialize a new sandbox + - get_status: Retrieve current sandbox status and check if alive + - stop: Stop and delete a sandbox + """ + + def __init__(self, k8s_config: K8sConfig): + """Initialize provider with K8s configuration. + + Args: + k8s_config: K8sConfig object containing kubeconfig and templates + """ + ... + + async def _ensure_initialized(self): + """Ensure K8s client is initialized.""" + ... + + @abstractmethod + async def submit(self, config: DeploymentConfig, user_info: dict = {}) -> SandboxInfo: + """Submit a sandbox deployment. + + Args: + config: Deployment configuration + user_info: User metadata + + Returns: + SandboxInfo with sandbox metadata + """ + pass + + @abstractmethod + async def get_status(self, sandbox_id: str) -> SandboxInfo: + """Get sandbox status. + + Args: + sandbox_id: ID of the sandbox + + Returns: + SandboxInfo with current status + """ + pass + + @abstractmethod + async def stop(self, sandbox_id: str) -> bool: + """Stop and delete a sandbox. + + Args: + sandbox_id: ID of the sandbox to delete + + Returns: + True if successful, False otherwise + """ + pass + + +class BatchSandboxProvider(K8sProvider): + """Provider for BatchSandbox CRD with Informer-based local cache. + + This provider uses Kubernetes watch API to maintain a local cache of BatchSandbox + resources. All get_status queries read from this cache instead of querying API Server, + which significantly improves performance and reduces API Server load. + + The watch task runs in the background and automatically reconnects on network failures. + """ + + def __init__(self, k8s_config: K8sConfig): + """Initialize BatchSandbox provider. + + Args: + k8s_config: K8sConfig object containing kubeconfig and templates + """ + self.kubeconfig_path = k8s_config.kubeconfig_path + self.namespace = k8s_config.namespace + self._k8s_config = k8s_config + self._api_client = None + self._k8s_api: K8sApiClient | None = None + self._initialized = False + + # Initialize template loader with config templates + self._template_loader = K8sTemplateLoader( + templates=k8s_config.templates, + default_namespace=k8s_config.namespace, + ) + logger.info(f"Available K8S templates: {', '.join(self._template_loader.available_templates)}") + + async def submit(self, config: DockerDeploymentConfig, user_info: dict = {}) -> SandboxInfo: + """Create a sandbox and return sandbox info immediately without waiting for IP. + + Args: + config: Docker deployment configuration + user_info: User metadata (user_id, experiment_id, namespace, rock_authorization) + + Returns: + SandboxInfo with sandbox metadata (state=PENDING, no IP/ports yet) + """ + from rock.actions.sandbox.response import State + + sandbox_id = config.container_name + logger.info(f"Creating sandbox {sandbox_id}") + + try: + # Create the sandbox without waiting for IP + created_sandbox_id = await self._create(config) + + # Extract and set user info + user_id = user_info.get("user_id", "default") + experiment_id = user_info.get("experiment_id", "default") + namespace = user_info.get("namespace", "default") + rock_authorization = user_info.get("rock_authorization", "default") + + # Build sandbox info with empty IP and port_mapping + sandbox_info = SandboxInfo( + sandbox_id=created_sandbox_id, + host_name=created_sandbox_id, + host_ip="", + user_id=user_id, + experiment_id=experiment_id, + namespace=namespace, + rock_authorization=rock_authorization, + image=config.image, + cpus=config.cpus, + memory=config.memory, + port_mapping={}, + state=State.PENDING, + phases={}, + ) + + logger.info(f"sandbox {sandbox_id} is submitted") + return sandbox_info + + except Exception as e: + logger.error(f"Failed to create sandbox {sandbox_id}: {e}", exc_info=True) + # Clean up on failure + try: + if 'created_sandbox_id' in locals(): + await self.stop(created_sandbox_id) + logger.info(f"Cleaned up failed sandbox {created_sandbox_id}") + except: + pass + raise + + async def get_status(self, sandbox_id: str) -> SandboxInfo: + """Get sandbox status and check if alive. + + This method fetches the sandbox resource from K8s and checks if the sandbox + is alive by calling its is_alive endpoint. The state is determined by: + - RUNNING: IP allocated AND is_alive returns true + - PENDING: IP not allocated OR is_alive returns false + + Args: + sandbox_id: Sandbox identifier + + Returns: + SandboxInfo with current status (without user_info fields) + """ + from rock.actions.sandbox.response import State + + # Get host_ip and port_mapping + host_ip, port_mapping = await self._get_sandbox_runtime_info(sandbox_id) + + # Check is_alive through runtime + is_alive = False + if host_ip: + runtime = self._build_runtime(host_ip, port_mapping) + try: + is_alive_response = await runtime.is_alive() + is_alive = is_alive_response.is_alive + except Exception as e: + logger.debug(f"Failed to check is_alive for {sandbox_id}: {e}") + + # Build sandbox info with current state + sandbox_info = SandboxInfo( + sandbox_id=sandbox_id, + host_name=sandbox_id, + host_ip=host_ip, + port_mapping=port_mapping, + state=State.RUNNING if is_alive else State.PENDING, + phases={}, + ) + + return sandbox_info + + async def stop(self, sandbox_id: str) -> bool: + """Delete a BatchSandbox resource.""" + await self._ensure_initialized() + + logger.info(f"Deleting BatchSandbox: {sandbox_id}") + + try: + await self._k8s_api.delete_custom_object(name=sandbox_id) + logger.info(f"Deleted BatchSandbox: {sandbox_id} from namespace: {self.namespace}") + return True + + except client.exceptions.ApiException as e: + if e.status == 404: + logger.warning(f"BatchSandbox {sandbox_id} not found, already deleted") + return True + logger.error(f"Failed to delete sandbox {sandbox_id}: {e}", exc_info=True) + return False + except Exception as e: + logger.error(f"Unexpected error deleting sandbox {sandbox_id}: {e}", exc_info=True) + return False + + async def _ensure_initialized(self): + """Ensure K8s client is initialized and start watch task. + + Lazy initialization of K8S client and API abstraction layer: + 1. Load kubeconfig (from file, in-cluster, or default) + 2. Create K8sApiClient with rate limiting and caching + 3. Start background watch task for cache synchronization + + Thread-safe: Uses _initialized flag to prevent duplicate initialization. + """ + if self._initialized: + return + + try: + if self.kubeconfig_path: + k8s_config.load_kube_config(config_file=self.kubeconfig_path) + else: + # Try in-cluster config first, fallback to default kubeconfig + try: + k8s_config.load_incluster_config() + except k8s_config.ConfigException: + k8s_config.load_kube_config() + + self._api_client = client.ApiClient() + self._k8s_api = K8sApiClient( + api_client=self._api_client, + group=K8sConstants.CRD_GROUP, + version=K8sConstants.CRD_VERSION, + plural=K8sConstants.CRD_PLURAL, + namespace=self.namespace, + qps=self._k8s_config.api_qps, + watch_timeout_seconds=self._k8s_config.watch_timeout_seconds, + watch_reconnect_delay_seconds=self._k8s_config.watch_reconnect_delay_seconds, + ) + await self._k8s_api.start() + self._initialized = True + + logger.info("Initialized K8s provider with informer") + except Exception as e: + logger.error(f"Failed to initialize K8s client: {e}", exc_info=True) + raise + + def _normalize_memory(self, memory: str) -> str: + """Normalize memory format to Kubernetes standard. + + Convert formats like '2g', '2G', '2048m' to K8s format like '2Gi', '2048Mi'. + """ + import re + + # Already in K8s format + if re.match(r'^\d+(\.\d+)?(Ei|Pi|Ti|Gi|Mi|Ki)$', memory): + return memory + + # Parse value and unit + match = re.match(r'^(\d+(\.\d+)?)([a-zA-Z]*)$', memory) + if not match: + # Fallback: assume it's bytes and convert to Mi + try: + bytes_val = int(memory) + return f"{bytes_val // (1024 * 1024)}Mi" + except: + return memory # Return as-is if can't parse + + value = float(match.group(1)) + unit = match.group(3).lower() + + # Convert to K8s format - use int() for whole numbers, preserve decimals otherwise + if unit in ('', 'b'): + mi_value = value / (1024 * 1024) + return f"{int(mi_value) if mi_value == int(mi_value) else mi_value:.2f}Mi" + elif unit in ('k', 'kb'): + return f"{int(value) if value == int(value) else value:.2f}Ki" + elif unit in ('m', 'mb'): + return f"{int(value) if value == int(value) else value:.2f}Mi" + elif unit in ('g', 'gb'): + return f"{int(value) if value == int(value) else value:.2f}Gi" + elif unit in ('t', 'tb'): + return f"{int(value) if value == int(value) else value:.2f}Ti" + else: + return memory + + def _build_pool_manifest(self, sandbox_id: str, pool_name: str, ports_config: dict[str, int]) -> dict[str, Any]: + """Build BatchSandbox manifest for pool mode. + + Args: + sandbox_id: Sandbox identifier + pool_name: Pool name to reference + ports_config: Port configuration dictionary + + Returns: + Manifest dictionary + """ + + manifest = { + 'apiVersion': K8sConstants.CRD_API_VERSION, + 'kind': K8sConstants.CRD_KIND, + 'metadata': { + 'name': sandbox_id, + 'namespace': self.namespace, + 'labels': { + K8sConstants.LABEL_SANDBOX_ID: sandbox_id, + }, + 'annotations': { + K8sConstants.ANNOTATION_PORTS: json.dumps(ports_config), + }, + }, + 'spec': { + 'poolRef': pool_name, + 'replicas': 1, + } + } + + return manifest + + def _build_batchsandbox_manifest(self, config: DockerDeploymentConfig) -> dict[str, Any]: + """Build BatchSandbox manifest from template and deployment config. + + This method uses the template loader to get a base template and only sets + the user-configurable parameters from SandboxStartRequest: + - image + - cpus + - memory + + All other fields (command, volumes, tolerations, etc.) come from the template. + + Returns: + Manifest dictionary + """ + sandbox_id = config.container_name + + # Check if using pool mode + pool_name = config.extended_params.get(K8sConstants.EXT_POOL_NAME) + if pool_name: + # Hardcode ports for pool mode + ports_config = {'proxy': 8000, 'server': 8080, 'ssh': 22} + manifest = self._build_pool_manifest(sandbox_id, pool_name, ports_config) + + logger.debug(f"Built BatchSandbox manifest for {sandbox_id} using pool '{pool_name}' in namespace '{self.namespace}'") + return manifest + + # Template mode: build from template + # Get template name from extended_params or use 'default' + template_name = config.extended_params.get(K8sConstants.EXT_TEMPLATE_NAME, 'default') + + # Build manifest using template loader + manifest = self._template_loader.build_manifest( + template_name=template_name, + sandbox_id=sandbox_id, + image=config.image, + cpus=config.cpus, + memory=self._normalize_memory(config.memory), + ) + + logger.debug(f"Built BatchSandbox manifest for {sandbox_id} in namespace '{self.namespace}' using template '{template_name}'") + return manifest + + async def _create(self, config: DockerDeploymentConfig) -> str: + """Create a BatchSandbox resource without waiting for IP allocation. + + Args: + config: Docker deployment configuration + + Returns: + sandbox_id (same as config.container_name) + + Raises: + Exception: If creation fails or sandbox already exists + """ + await self._ensure_initialized() + + sandbox_id = config.container_name + + try: + manifest = self._build_batchsandbox_manifest(config) + + # Create BatchSandbox resource + await self._k8s_api.create_custom_object(body=manifest) + + logger.info(f"Created BatchSandbox: {sandbox_id} in namespace: {self.namespace}") + return sandbox_id + + except client.exceptions.ApiException as e: + if e.status == 409: + logger.warning(f"BatchSandbox {sandbox_id} already exists") + raise Exception(f"Sandbox {sandbox_id} already exists") + logger.error(f"Failed to create BatchSandbox: {e}", exc_info=True) + raise Exception(f"Failed to create sandbox: {e.reason}") + except Exception as e: + logger.error(f"Unexpected error creating sandbox: {e}", exc_info=True) + raise + + async def _get_sandbox_runtime_info(self, sandbox_id: str) -> tuple[str, dict[int, int]]: + """Get sandbox runtime info (host_ip and port_mapping). + + Args: + sandbox_id: ID of the sandbox + + Returns: + tuple: (host_ip, port_mapping) + - host_ip: Pod IP from endpoints annotation (empty string if not allocated) + - port_mapping: Port configuration from annotations + + Raises: + Exception: If sandbox not found + ValueError: If ports annotation is missing or invalid + """ + await self._ensure_initialized() + + try: + # Get from cache or API Server (handled by api_client) + resource = await self._k8s_api.get_custom_object(name=sandbox_id) + + # Extract metadata + metadata = resource.get("metadata", {}) + annotations = metadata.get("annotations", {}) + + # Parse endpoints from annotations + endpoints_str = annotations.get(K8sConstants.ANNOTATION_ENDPOINTS) + pod_ips = [] + if endpoints_str: + try: + pod_ips = json.loads(endpoints_str) + except (json.JSONDecodeError, TypeError): + logger.warning(f"Failed to parse endpoints for {sandbox_id}: {endpoints_str}") + + # Get pod IP for host_ip if available + host_ip = pod_ips[0] if pod_ips else "" + + # Get port configuration from annotations + ports_str = annotations.get(K8sConstants.ANNOTATION_PORTS) + if not ports_str: + raise ValueError( + f"Sandbox '{sandbox_id}' is missing required '{K8sConstants.ANNOTATION_PORTS}' annotation. " + f"This sandbox may have been created with an older version." + ) + + try: + ports_config = json.loads(ports_str) + except (json.JSONDecodeError, TypeError) as e: + raise ValueError( + f"Failed to parse ports annotation for sandbox '{sandbox_id}': {ports_str}. Error: {e}" + ) + + # Build port_mapping + port_mapping = { + Port.PROXY: ports_config['proxy'], + Port.SERVER: ports_config['server'], + Port.SSH: ports_config['ssh'], + } + + return host_ip, port_mapping + + except Exception as e: + logger.error(f"Failed to fetch resource from cache for {sandbox_id}: {e}", exc_info=True) + raise + + def _build_runtime(self, host_ip: str, port_mapping: dict[int, int]) -> RemoteSandboxRuntime: + """Build runtime for communicating with the sandbox. + + Args: + host_ip: Pod IP address + port_mapping: Port mapping configuration + + Returns: + RemoteSandboxRuntime instance + """ + proxy_port = port_mapping.get(Port.PROXY, 8000) + runtime_config = RemoteSandboxRuntimeConfig( + host=f"http://{host_ip}", + port=proxy_port, + ) + return RemoteSandboxRuntime.from_config(runtime_config) + diff --git a/rock/sandbox/operator/k8s/template_loader.py b/rock/sandbox/operator/k8s/template_loader.py new file mode 100644 index 000000000..26790e2c2 --- /dev/null +++ b/rock/sandbox/operator/k8s/template_loader.py @@ -0,0 +1,181 @@ +"""K8S template loader for BatchSandbox manifests.""" + +import copy +import json +from pathlib import Path +from typing import Any, Dict + +import yaml + +from rock.logger import init_logger +from rock.sandbox.operator.k8s.constants import K8sConstants + +logger = init_logger(__name__) + + +class K8sTemplateLoader: + """Loader for K8S BatchSandbox templates.""" + + def __init__(self, templates: Dict[str, Dict[str, Any]], default_namespace: str = 'rock'): + """Initialize template loader. + + Args: + templates: Dictionary of template configurations from K8sConfig + default_namespace: Default namespace if template doesn't specify one + """ + self._templates: Dict[str, Dict[str, Any]] = templates + self._default_namespace = default_namespace + + if not self._templates: + raise ValueError( + "No templates provided. At least one template must be defined in K8sConfig.templates." + ) + + logger.info(f"Loaded {len(self._templates)} K8S templates from config") + logger.debug(f"Available templates: {', '.join(self._templates.keys())}") + + def get_template(self, template_name: str = 'default') -> Dict[str, Any]: + """Get a template by name. + + Args: + template_name: Name of the template + + Returns: + Deep copy of the template dictionary + + Raises: + ValueError: If template not found + """ + if template_name not in self._templates: + available = ', '.join(self._templates.keys()) + raise ValueError(f"Template '{template_name}' not found. Available: {available}") + + return copy.deepcopy(self._templates[template_name]) + + def build_manifest( + self, + template_name: str = 'default', + sandbox_id: str = None, + image: str = None, + cpus: float = None, + memory: str = None, + ) -> Dict[str, Any]: + """Build a complete BatchSandbox manifest from template. + + Template structure: + - namespace: K8S namespace for the sandbox (REQUIRED) + - ports: custom port configuration (not part of K8S manifest) + - template: corresponds to spec.template in BatchSandbox CRD + - template.metadata -> spec.template.metadata + - template.spec -> spec.template.spec (Pod spec) + + Top-level fields are hardcoded: + - apiVersion: sandbox.opensandbox.io/v1alpha1 + - kind: BatchSandbox + - metadata: constructed from parameters + - spec.replicas: always 1 + + Args: + template_name: Name of the template to use + sandbox_id: Sandbox identifier + image: Container image + cpus: CPU resource limit + memory: Memory resource limit (normalized format like '2Gi') + + Returns: + Complete BatchSandbox manifest + """ + import uuid + + # Get template configuration + config = self.get_template(template_name) + + # Use default namespace (configured at startup) + namespace = self._default_namespace + + # Get enable_resource_speedup from template (default to True) + enable_resource_speedup = config.get('enable_resource_speedup', True) + + # Get port configuration from template (required) + ports_config = config.get('ports') + if not ports_config: + raise ValueError( + f"Template '{template_name}' is missing required 'ports' configuration. " + f"Each template must define ports (proxy, server, ssh)." + ) + + # Extract template (corresponds to spec.template in BatchSandbox) + pod_template = config.get('template', {}) + template_metadata = copy.deepcopy(pod_template.get('metadata', {})) + pod_spec = copy.deepcopy(pod_template.get('spec', {})) + + # Generate sandbox_id if not provided + if not sandbox_id: + sandbox_id = f"sandbox-{uuid.uuid4().hex[:8]}" + + # Build top-level BatchSandbox manifest (hardcoded structure) + manifest = { + 'apiVersion': K8sConstants.CRD_API_VERSION, + 'kind': K8sConstants.CRD_KIND, + 'metadata': { + 'name': sandbox_id, + 'namespace': namespace, + 'labels': { + K8sConstants.LABEL_SANDBOX_ID: sandbox_id, + K8sConstants.LABEL_TEMPLATE: template_name, + }, + 'annotations': { + K8sConstants.ANNOTATION_PORTS: json.dumps(ports_config), + } + }, + 'spec': { + 'replicas': 1, # Always 1 for sandbox + 'template': { + 'metadata': template_metadata, + 'spec': pod_spec + } + } + } + + # Add resource speedup label if enabled + if enable_resource_speedup: + manifest['metadata']['labels'][K8sConstants.LABEL_RESOURCE_SPEEDUP] = 'true' + + # Add sandbox-id label to template metadata + if 'labels' not in manifest['spec']['template']['metadata']: + manifest['spec']['template']['metadata']['labels'] = {} + manifest['spec']['template']['metadata']['labels'][K8sConstants.LABEL_SANDBOX_ID] = sandbox_id + + # Set container image + if image: + containers = pod_spec.get('containers', []) + if containers and len(containers) > 0: + containers[0]['image'] = image + + # Set resources if provided + if cpus is not None or memory is not None: + containers = pod_spec.get('containers', []) + if containers and len(containers) > 0: + if 'resources' not in containers[0]: + containers[0]['resources'] = {} + + if cpus is not None or memory is not None: + containers[0]['resources']['requests'] = {} + containers[0]['resources']['limits'] = {} + + if cpus is not None: + containers[0]['resources']['requests']['cpu'] = str(cpus) + containers[0]['resources']['limits']['cpu'] = str(cpus) + + if memory is not None: + containers[0]['resources']['requests']['memory'] = memory + containers[0]['resources']['limits']['memory'] = memory + + return manifest + + @property + def available_templates(self) -> list[str]: + """Get list of available template names.""" + return list(self._templates.keys()) + + diff --git a/rock/sdk/sandbox/client.py b/rock/sdk/sandbox/client.py index 88ea07ea2..2415cd498 100644 --- a/rock/sdk/sandbox/client.py +++ b/rock/sdk/sandbox/client.py @@ -165,11 +165,14 @@ async def start(self): headers = self._build_headers() data = { "image": self.config.image, + "image_os": self.config.image_os, "auto_clear_time": self.config.auto_clear_seconds / 60, "auto_clear_time_minutes": self.config.auto_clear_seconds / 60, "startup_timeout": self.config.startup_timeout, "memory": self.config.memory, "cpus": self.config.cpus, + "registry_username": self.config.registry_username, + "registry_password": self.config.registry_password, } try: response = await HttpUtils.post(url, headers, data) diff --git a/rock/sdk/sandbox/config.py b/rock/sdk/sandbox/config.py index 7109d8c57..27489b017 100644 --- a/rock/sdk/sandbox/config.py +++ b/rock/sdk/sandbox/config.py @@ -29,6 +29,7 @@ def validate_xrl_authorization(cls, v): class SandboxConfig(BaseConfig): image: str = "python:3.11" + image_os: str = "linux" auto_clear_seconds: int = 60 * 5 route_key: str | None = None startup_timeout: float = env_vars.ROCK_SANDBOX_STARTUP_TIMEOUT_SECONDS @@ -38,6 +39,8 @@ class SandboxConfig(BaseConfig): experiment_id: str | None = None cluster: str = "zb" namespace: str | None = None + registry_username: str | None = None + registry_password: str | None = None class SandboxGroupConfig(SandboxConfig): diff --git a/rock/sdk/sandbox/model_service/base.py b/rock/sdk/sandbox/model_service/base.py index 7dbc3906a..ee79cd6a7 100644 --- a/rock/sdk/sandbox/model_service/base.py +++ b/rock/sdk/sandbox/model_service/base.py @@ -52,11 +52,13 @@ class ModelServiceConfig(BaseModel): """Command to watch agent with pid placeholder.""" anti_call_llm_cmd: str = Field( - default="rock model-service anti-call-llm --index ${index} --response ${response_payload}" + default="PYTHONWARNINGS=ignore rock model-service anti-call-llm --index ${index} --response ${response_payload}" ) """Command to anti-call LLM with index and response_payload placeholders.""" - anti_call_llm_cmd_no_response: str = Field(default="rock model-service anti-call-llm --index ${index}") + anti_call_llm_cmd_no_response: str = Field( + default="PYTHONWARNINGS=ignore rock model-service anti-call-llm --index ${index}" + ) """Command to anti-call LLM with only index placeholder.""" logging_path: str = Field(default="/data/logs") diff --git a/rock/sdk/sandbox/runtime_env/base.py b/rock/sdk/sandbox/runtime_env/base.py index 6b42515a7..87becee00 100644 --- a/rock/sdk/sandbox/runtime_env/base.py +++ b/rock/sdk/sandbox/runtime_env/base.py @@ -231,7 +231,7 @@ async def _post_init(self) -> None: async def _do_custom_install(self) -> None: """Execute custom install command after _post_init.""" await self.run( - self._custom_install_cmd, + f"cd {shlex.quote(self._workdir)} && {self._custom_install_cmd}", wait_timeout=self._install_timeout, error_msg="custom_install_cmd failed", ) diff --git a/rock/utils/docker.py b/rock/utils/docker.py index c22cca56b..8106c73aa 100644 --- a/rock/utils/docker.py +++ b/rock/utils/docker.py @@ -36,6 +36,39 @@ def pull_image(cls, image: str) -> bytes: # e.stderr contains the error message as bytes raise subprocess.CalledProcessError(e.returncode, e.cmd, e.output, e.stderr) from None + @classmethod + def login(cls, registry: str, username: str, password: str, timeout: int = 30) -> str: + """Login to a Docker registry + + Args: + registry: Docker registry URL (e.g. registry.example.com) + username: Registry username + password: Registry password + timeout: Command timeout in seconds + + Returns: + Command output as string on success + + Raises: + subprocess.CalledProcessError: If login fails + """ + try: + result = subprocess.run( + ["docker", "login", registry, "-u", username, "--password-stdin"], + input=password, + capture_output=True, + text=True, + timeout=timeout, + ) + if result.returncode != 0: + logger.error(f"Docker login to {registry} failed: {result.stderr.strip()}") + raise subprocess.CalledProcessError(result.returncode, result.args, result.stdout, result.stderr) + logger.info(f"Successfully logged in to {registry}") + return result.stdout.strip() + except subprocess.TimeoutExpired: + logger.error(f"Docker login to {registry} timed out after {timeout}s") + raise + @classmethod def remove_image(image: str) -> bytes: """Remove a Docker image""" diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index ea7496123..49b13b202 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -7,10 +7,18 @@ from fakeredis import aioredis from ray.util.state import list_actors +from unittest.mock import MagicMock + +from kubernetes import client + from rock.admin.core.ray_service import RayService -from rock.config import RockConfig -from rock.deployments.config import DockerDeploymentConfig +from rock.config import K8sConfig, RockConfig +from rock.deployments.abstract import AbstractDeployment +from rock.deployments.config import DeploymentConfig, DockerDeploymentConfig from rock.logger import init_logger +from rock.sandbox.operator.k8s.api_client import K8sApiClient +from rock.sandbox.operator.k8s.operator import K8sOperator +from rock.sandbox.operator.k8s.template_loader import K8sTemplateLoader from rock.sandbox.operator.ray import RayOperator from rock.sandbox.sandbox_manager import SandboxManager from rock.sandbox.service.sandbox_proxy_service import SandboxProxyService @@ -19,6 +27,21 @@ logger = init_logger(__name__) +class MockDeploymentConfig(DeploymentConfig): + """Mock deployment config for testing.""" + + image: str = "python:3.11" + cpus: float = 2 + memory: str = "4Gi" + container_name: str | None = None + template_name: str = "default" + auto_clear_time_minutes: int = 30 + + def get_deployment(self) -> AbstractDeployment: + """Mock implementation.""" + return MagicMock() + + @pytest.fixture(scope="session", autouse=True) def rock_config(): config_path = Path(__file__).parent.parent.parent / "rock-conf" / "rock-test.yml" @@ -119,3 +142,104 @@ async def check_sandbox_status_until_alive(sandbox_manager: SandboxManager, sand cnt += 1 if cnt > timeout: raise Exception("sandbox not alive") + + +# ========== K8S Fixtures ========== + + +@pytest.fixture +def k8s_config(): + """Create K8sConfig with required templates.""" + return K8sConfig( + kubeconfig_path=None, + templates={ + "default": { + "namespace": "rock-test", + "ports": { + "proxy": 8000, + "server": 8080, + "ssh": 22, + }, + "template": { + "metadata": {"labels": {"app": "test"}}, + "spec": {"containers": [{"name": "main", "image": "python:3.11"}]}, + }, + } + }, + ) + + +@pytest.fixture +def basic_templates(): + """Create basic template configuration.""" + return { + "default": { + "ports": { + "proxy": 8000, + "server": 8080, + "ssh": 22, + }, + "template": { + "metadata": {"labels": {"app": "rock-sandbox"}}, + "spec": {"containers": [{"name": "main", "image": "python:3.11"}]}, + }, + } + } + + +@pytest.fixture +def template_loader(basic_templates): + """Create template loader instance.""" + return K8sTemplateLoader(templates=basic_templates, default_namespace="rock-test") + + +@pytest.fixture +def mock_api_client(): + """Create mock K8S ApiClient.""" + return MagicMock(spec=client.ApiClient) + + +@pytest.fixture +def k8s_api_client(mock_api_client): + """Create K8sApiClient instance.""" + return K8sApiClient( + api_client=mock_api_client, + group="sandbox.opensandbox.io", + version="v1alpha1", + plural="batchsandboxes", + namespace="rock-test", + qps=5.0, + watch_timeout_seconds=60, + watch_reconnect_delay_seconds=5, + ) + + +@pytest.fixture +def mock_provider(): + """Create mock BatchSandboxProvider.""" + from unittest.mock import AsyncMock + + return AsyncMock() + + +@pytest.fixture +def k8s_operator(k8s_config, mock_provider): + """Create K8sOperator instance with mock provider.""" + from unittest.mock import patch + + with patch("rock.sandbox.operator.k8s.operator.BatchSandboxProvider", return_value=mock_provider): + operator = K8sOperator(k8s_config=k8s_config) + operator._provider = mock_provider + return operator + + +@pytest.fixture +def deployment_config(): + """Create deployment configuration.""" + return MockDeploymentConfig( + image="python:3.11", + cpus=2, + memory="4Gi", + container_name="test-sandbox", + template_name="default", + ) diff --git a/tests/unit/sandbox/operator/test_k8s_api_client.py b/tests/unit/sandbox/operator/test_k8s_api_client.py new file mode 100644 index 000000000..e206e4422 --- /dev/null +++ b/tests/unit/sandbox/operator/test_k8s_api_client.py @@ -0,0 +1,165 @@ +"""Unit tests for K8sApiClient. + +Tests cover: +- AsyncLimiter rate limiting integration +- Informer pattern cache synchronization +- CRUD operations on K8s custom resources +""" + +from unittest.mock import AsyncMock, patch + +import pytest + +from rock.sandbox.operator.k8s.api_client import K8sApiClient + + +class TestK8sApiClient: + """Test cases for K8sApiClient.""" + + def test_initialization(self, mock_api_client): + """Test K8sApiClient initialization. + + Verifies AsyncLimiter is configured with QPS limit. + """ + api_client = K8sApiClient( + api_client=mock_api_client, + group="sandbox.opensandbox.io", + version="v1alpha1", + plural="batchsandboxes", + namespace="rock-test", + qps=200.0, + watch_timeout_seconds=60, + watch_reconnect_delay_seconds=5, + ) + + assert api_client._group == "sandbox.opensandbox.io" + assert api_client._version == "v1alpha1" + assert api_client._plural == "batchsandboxes" + assert api_client._namespace == "rock-test" + assert api_client._rate_limiter.max_rate == 200.0 + assert api_client._watch_timeout_seconds == 60 + assert api_client._watch_reconnect_delay_seconds == 5 + + @pytest.mark.asyncio + async def test_rate_limiting_with_context_manager(self, k8s_api_client): + """Test AsyncLimiter integration in CRUD operations. + + Verifies that AsyncLimiter is properly used as context manager + to enforce rate limiting on API Server requests. + """ + with patch('asyncio.to_thread', new_callable=AsyncMock) as mock_thread: + mock_thread.return_value = {"created": True} + + result = await k8s_api_client.create_custom_object(body={"test": "data"}) + + assert result == {"created": True} + mock_thread.assert_awaited_once() + + @pytest.mark.asyncio + async def test_create_custom_object(self, k8s_api_client): + """Test creating K8s custom resource with rate limiting.""" + mock_body = { + "apiVersion": "sandbox.opensandbox.io/v1alpha1", + "kind": "BatchSandbox", + "metadata": {"name": "test-sandbox"}, + } + + with patch('asyncio.to_thread', new_callable=AsyncMock) as mock_thread: + mock_thread.return_value = {"created": True} + + result = await k8s_api_client.create_custom_object(body=mock_body) + + assert result == {"created": True} + mock_thread.assert_awaited_once() + + @pytest.mark.asyncio + async def test_get_custom_object_from_cache(self, k8s_api_client): + """Test cache hit scenario (Informer pattern). + + When resource exists in local cache, no API Server request is made. + """ + k8s_api_client._cache = { + "test-sandbox": {"metadata": {"name": "test-sandbox"}} + } + + result = await k8s_api_client.get_custom_object(name="test-sandbox") + + assert result == {"metadata": {"name": "test-sandbox"}} + + @pytest.mark.asyncio + async def test_get_custom_object_cache_miss(self, k8s_api_client): + """Test cache miss scenario with API Server fallback. + + When resource not in cache, queries API Server and updates cache. + """ + k8s_api_client._cache = {} + mock_response = {"metadata": {"name": "test-sandbox"}} + + with patch('asyncio.to_thread', new_callable=AsyncMock) as mock_thread: + mock_thread.return_value = mock_response + + result = await k8s_api_client.get_custom_object(name="test-sandbox") + + assert result == mock_response + assert k8s_api_client._cache["test-sandbox"] == mock_response + + @pytest.mark.asyncio + async def test_delete_custom_object(self, k8s_api_client): + """Test deleting K8s custom resource with rate limiting.""" + with patch('asyncio.to_thread', new_callable=AsyncMock) as mock_thread: + mock_thread.return_value = {"status": "deleted"} + + result = await k8s_api_client.delete_custom_object(name="test-sandbox") + + assert result == {"status": "deleted"} + mock_thread.assert_awaited_once() + + @pytest.mark.asyncio + async def test_start_initializes_watch(self, k8s_api_client): + """Test watch task initialization for Informer pattern. + + start() creates background task to watch K8s resource changes + and sync them to local cache. + """ + with patch('asyncio.create_task') as mock_create_task: + await k8s_api_client.start() + + assert k8s_api_client._initialized is True + mock_create_task.assert_called_once() + + @pytest.mark.asyncio + async def test_start_idempotent(self, k8s_api_client): + """Test start() idempotency. + + Multiple start() calls should only initialize watch once. + """ + with patch('asyncio.create_task') as mock_create_task: + await k8s_api_client.start() + await k8s_api_client.start() + + assert mock_create_task.call_count == 1 + + @pytest.mark.asyncio + async def test_list_and_sync_cache(self, k8s_api_client): + """Test initial cache sync from K8s API Server. + + Populates local cache with all resources and returns resourceVersion + for subsequent watch operations. + """ + mock_resources = { + "metadata": {"resourceVersion": "12345"}, + "items": [ + {"metadata": {"name": "sandbox-1"}}, + {"metadata": {"name": "sandbox-2"}}, + ] + } + + with patch('asyncio.to_thread', new_callable=AsyncMock) as mock_thread: + mock_thread.return_value = mock_resources + + resource_version = await k8s_api_client._list_and_sync_cache() + + assert resource_version == "12345" + assert len(k8s_api_client._cache) == 2 + assert "sandbox-1" in k8s_api_client._cache + assert "sandbox-2" in k8s_api_client._cache diff --git a/tests/unit/sandbox/operator/test_k8s_operator.py b/tests/unit/sandbox/operator/test_k8s_operator.py new file mode 100644 index 000000000..17729b266 --- /dev/null +++ b/tests/unit/sandbox/operator/test_k8s_operator.py @@ -0,0 +1,154 @@ +"""Unit tests for K8sOperator.""" + +from unittest.mock import AsyncMock, patch + +import pytest + +from rock.actions.sandbox.response import State +from rock.actions.sandbox.sandbox_info import SandboxInfo +from rock.config import K8sConfig +from rock.sandbox.operator.k8s.operator import K8sOperator + + +class TestK8sOperator: + """Test cases for K8sOperator.""" + + def test_initialization(self, k8s_config): + """Test K8sOperator initialization.""" + with patch("rock.sandbox.operator.k8s.operator.BatchSandboxProvider"): + operator = K8sOperator(k8s_config=k8s_config) + assert operator._provider is not None + + def test_initialization_without_templates(self): + """Test K8sOperator initialization fails without templates.""" + config = K8sConfig(kubeconfig_path=None, templates={}) + # Validation happens in provider now, so operator init succeeds + # but provider creation should fail + with pytest.raises(ValueError, match="No templates provided"): + from rock.sandbox.operator.k8s.provider import BatchSandboxProvider + BatchSandboxProvider(k8s_config=config) + + @pytest.mark.asyncio + async def test_submit_success(self, k8s_operator, mock_provider, deployment_config): + """Test successful sandbox submission.""" + # Mock provider's submit method + mock_sandbox_info = { + "sandbox_id": "test-sandbox", + "host_ip": "10.0.0.1", + "state": State.RUNNING, + "user_id": "test-user", + "image": "python:3.11", + "cpus": 2, + "memory": "4Gi", + "port_mapping": {22555: 8000, 8080: 8080, 22: 22}, + } + mock_provider.submit = AsyncMock(return_value=SandboxInfo(**mock_sandbox_info)) + + result = await k8s_operator.submit(deployment_config, user_info={"user_id": "test-user"}) + + assert result["sandbox_id"] == "test-sandbox" + assert result["host_ip"] == "10.0.0.1" + assert result["state"] == State.RUNNING + assert result["user_id"] == "test-user" + + mock_provider.submit.assert_awaited_once() + + @pytest.mark.asyncio + async def test_submit_no_host_ip(self, k8s_operator, mock_provider, deployment_config): + """Test submission fails when no host IP is allocated.""" + # Mock provider to raise exception + mock_provider.submit = AsyncMock( + side_effect=Exception("Failed to get host IP for sandbox test-sandbox") + ) + + with pytest.raises(Exception, match="Failed to get host IP"): + await k8s_operator.submit(deployment_config) + + @pytest.mark.asyncio + async def test_submit_with_cleanup_on_failure(self, k8s_operator, mock_provider, deployment_config): + """Test that failed submission is handled by provider.""" + mock_provider.submit = AsyncMock(side_effect=Exception("K8S API error")) + + with pytest.raises(Exception, match="K8S API error"): + await k8s_operator.submit(deployment_config) + + @pytest.mark.asyncio + async def test_get_status_success(self, k8s_operator, mock_provider): + """Test successful status retrieval from local cache.""" + mock_sandbox_info = { + "sandbox_id": "test-sandbox", + "host_name": "test-sandbox", + "host_ip": "10.0.0.1", + "state": State.RUNNING, + "image": "python:3.11", + "alive": True, + "port_mapping": {}, + } + mock_provider.get_status = AsyncMock(return_value=SandboxInfo(**mock_sandbox_info)) + + result = await k8s_operator.get_status("test-sandbox") + + assert result["sandbox_id"] == "test-sandbox" + assert result["state"] == State.RUNNING + assert result["alive"] is True + + @pytest.mark.asyncio + async def test_get_status_not_alive(self, k8s_operator, mock_provider): + """Test status when sandbox is not alive.""" + mock_sandbox_info = { + "sandbox_id": "test-sandbox", + "host_name": "test-sandbox", + "host_ip": "10.0.0.1", + "state": State.PENDING, + "alive": False, + "port_mapping": {}, + } + mock_provider.get_status = AsyncMock(return_value=SandboxInfo(**mock_sandbox_info)) + + result = await k8s_operator.get_status("test-sandbox") + + assert result["sandbox_id"] == "test-sandbox" + assert result["state"] == State.PENDING + assert result["alive"] is False + + @pytest.mark.asyncio + async def test_get_status_not_found(self, k8s_operator, mock_provider): + """Test status retrieval when sandbox not found in cache.""" + mock_provider.get_status = AsyncMock( + side_effect=Exception("Sandbox test-sandbox not found") + ) + + with pytest.raises(Exception, match="not found"): + await k8s_operator.get_status("test-sandbox") + + @pytest.mark.asyncio + async def test_get_status_missing_ports_annotation(self, k8s_operator, mock_provider): + """Test that missing ports annotation raises error.""" + # Mock get_status to raise ValueError for missing ports + mock_provider.get_status = AsyncMock( + side_effect=ValueError("Sandbox 'test-sandbox' is missing required 'rock.sandbox/ports' annotation") + ) + + with pytest.raises(Exception, match="missing required.*annotation"): + await k8s_operator.get_status("test-sandbox") + + @pytest.mark.asyncio + async def test_stop_success(self, k8s_operator, mock_provider): + """Test successful sandbox stop.""" + mock_provider.stop = AsyncMock(return_value=True) + + result = await k8s_operator.stop("test-sandbox") + + assert result is True + mock_provider.stop.assert_awaited_once_with("test-sandbox") + + @pytest.mark.asyncio + async def test_stop_failure(self, k8s_operator, mock_provider): + """Test sandbox stop failure.""" + mock_provider.stop = AsyncMock(return_value=False) + + result = await k8s_operator.stop("test-sandbox") + + assert result is False + + diff --git a/tests/unit/sandbox/operator/test_k8s_template_loader.py b/tests/unit/sandbox/operator/test_k8s_template_loader.py new file mode 100644 index 000000000..67cd1145a --- /dev/null +++ b/tests/unit/sandbox/operator/test_k8s_template_loader.py @@ -0,0 +1,175 @@ +"""Unit tests for K8sTemplateLoader.""" + +import pytest + +from rock.sandbox.operator.k8s.constants import K8sConstants +from rock.sandbox.operator.k8s.template_loader import K8sTemplateLoader + + +class TestK8sTemplateLoader: + """Test cases for K8sTemplateLoader.""" + + def test_initialization_success(self, basic_templates): + """Test successful template loader initialization.""" + loader = K8sTemplateLoader( + templates=basic_templates, + default_namespace="rock-test" + ) + + assert loader._default_namespace == "rock-test" + assert len(loader._templates) == 1 + assert "default" in loader.available_templates + + def test_initialization_without_templates(self): + """Test initialization fails without templates.""" + with pytest.raises(ValueError, match="No templates provided"): + K8sTemplateLoader(templates={}, default_namespace="rock-test") + + def test_get_template_success(self, template_loader): + """Test getting template by name.""" + template = template_loader.get_template("default") + + assert template is not None + assert "ports" in template + assert "template" in template + assert template["ports"]["proxy"] == 8000 + + def test_get_template_not_found(self, template_loader): + """Test getting non-existent template.""" + with pytest.raises(ValueError, match="Template 'nonexistent' not found"): + template_loader.get_template("nonexistent") + + def test_get_template_returns_copy(self, template_loader): + """Test that get_template returns a deep copy.""" + template1 = template_loader.get_template("default") + template2 = template_loader.get_template("default") + + # Modify first template + template1["ports"]["proxy"] = 9999 + + # Second template should not be affected + assert template2["ports"]["proxy"] == 8000 + + def test_build_manifest_basic(self, template_loader): + """Test building basic manifest.""" + manifest = template_loader.build_manifest( + template_name="default", + sandbox_id="test-sandbox", + image="python:3.11", + cpus=2.0, + memory="4Gi" + ) + + # Verify top-level structure + assert manifest["apiVersion"] == K8sConstants.CRD_API_VERSION + assert manifest["kind"] == K8sConstants.CRD_KIND + assert manifest["metadata"]["name"] == "test-sandbox" + assert manifest["metadata"]["namespace"] == "rock-test" + + # Verify labels + assert manifest["metadata"]["labels"][K8sConstants.LABEL_SANDBOX_ID] == "test-sandbox" + assert manifest["metadata"]["labels"][K8sConstants.LABEL_TEMPLATE] == "default" + + # Verify annotations (ports stored as JSON) + assert K8sConstants.ANNOTATION_PORTS in manifest["metadata"]["annotations"] + + # Verify spec + assert manifest["spec"]["replicas"] == 1 + assert "template" in manifest["spec"] + + def test_build_manifest_with_resources(self, template_loader): + """Test building manifest with CPU and memory resources.""" + manifest = template_loader.build_manifest( + template_name="default", + sandbox_id="test-sandbox", + cpus=4.0, + memory="8Gi" + ) + + container = manifest["spec"]["template"]["spec"]["containers"][0] + + # Verify resource requests and limits + assert container["resources"]["requests"]["cpu"] == "4.0" + assert container["resources"]["limits"]["cpu"] == "4.0" + assert container["resources"]["requests"]["memory"] == "8Gi" + assert container["resources"]["limits"]["memory"] == "8Gi" + + def test_build_manifest_without_resources(self, template_loader): + """Test building manifest without specifying resources.""" + manifest = template_loader.build_manifest( + template_name="default", + sandbox_id="test-sandbox", + ) + + container = manifest["spec"]["template"]["spec"]["containers"][0] + + # Should not have resources section if not specified + assert "resources" not in container or not container.get("resources") + + def test_build_manifest_with_custom_image(self, template_loader): + """Test building manifest with custom image.""" + manifest = template_loader.build_manifest( + template_name="default", + sandbox_id="test-sandbox", + image="ubuntu:22.04" + ) + + container = manifest["spec"]["template"]["spec"]["containers"][0] + assert container["image"] == "ubuntu:22.04" + + def test_build_manifest_missing_ports_in_template(self): + """Test building manifest fails when template lacks ports config.""" + templates = { + "no-ports": { + "template": { + "spec": {"containers": [{"name": "main"}]} + } + } + } + + loader = K8sTemplateLoader(templates=templates, default_namespace="rock-test") + + with pytest.raises(ValueError, match="missing required 'ports' configuration"): + loader.build_manifest(template_name="no-ports", sandbox_id="test") + + def test_build_manifest_with_resource_speedup(self): + """Test building manifest with resource speedup label.""" + templates = { + "speedup": { + "enable_resource_speedup": True, + "ports": {"proxy": 8000, "server": 8080, "ssh": 22}, + "template": { + "spec": {"containers": [{"name": "main"}]} + } + } + } + + loader = K8sTemplateLoader(templates=templates, default_namespace="rock-test") + manifest = loader.build_manifest(template_name="speedup", sandbox_id="test") + + assert manifest["metadata"]["labels"][K8sConstants.LABEL_RESOURCE_SPEEDUP] == "true" + + def test_build_manifest_auto_generate_sandbox_id(self, template_loader): + """Test building manifest auto-generates sandbox_id if not provided.""" + manifest = template_loader.build_manifest(template_name="default") + + sandbox_id = manifest["metadata"]["name"] + assert sandbox_id.startswith("sandbox-") + assert len(sandbox_id) > 8 # Should have UUID suffix + + def test_available_templates_property(self, template_loader): + """Test available_templates property.""" + templates = template_loader.available_templates + + assert isinstance(templates, list) + assert "default" in templates + + def test_build_manifest_adds_sandbox_id_to_pod_labels(self, template_loader): + """Test that sandbox-id label is added to pod template.""" + manifest = template_loader.build_manifest( + template_name="default", + sandbox_id="test-sandbox" + ) + + pod_labels = manifest["spec"]["template"]["metadata"]["labels"] + assert pod_labels[K8sConstants.LABEL_SANDBOX_ID] == "test-sandbox" diff --git a/tests/unit/test_envs.py b/tests/unit/test_envs.py index 842ae4837..5cb692625 100644 --- a/tests/unit/test_envs.py +++ b/tests/unit/test_envs.py @@ -1,3 +1,5 @@ +import os + from rock import env_vars @@ -10,3 +12,15 @@ def test_default_envs(): def test_envs_project_root(): project_root = env_vars.ROCK_PROJECT_ROOT assert project_root is not None + + +def test_service_status_dir_default(): + """ROCK_SERVICE_STATUS_DIR 默认值应为 /tmp""" + # 清除可能已设置的环境变量 + original = os.environ.pop("ROCK_SERVICE_STATUS_DIR", None) + try: + status_dir = env_vars.ROCK_SERVICE_STATUS_DIR + assert status_dir == "/tmp", f"Expected /tmp, got {status_dir}" + finally: + if original is not None: + os.environ["ROCK_SERVICE_STATUS_DIR"] = original diff --git a/uv.lock b/uv.lock index 86b59acd9..69e565a1c 100644 --- a/uv.lock +++ b/uv.lock @@ -158,6 +158,15 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/98/3b/40a68de458904bcc143622015fff2352b6461cd92fd66d3527bf1c6f5716/aiohttp_cors-0.8.1-py3-none-any.whl", hash = "sha256:3180cf304c5c712d626b9162b195b1db7ddf976a2a25172b35bb2448b890a80d" }, ] +[[package]] +name = "aiolimiter" +version = "1.2.1" +source = { registry = "https://mirrors.aliyun.com/pypi/simple/" } +sdist = { url = "https://mirrors.aliyun.com/pypi/packages/f1/23/b52debf471f7a1e42e362d959a3982bdcb4fe13a5d46e63d28868807a79c/aiolimiter-1.2.1.tar.gz", hash = "sha256:e02a37ea1a855d9e832252a105420ad4d15011505512a1a1d814647451b5cca9" } +wheels = [ + { url = "https://mirrors.aliyun.com/pypi/packages/f3/ba/df6e8e1045aebc4778d19b8a3a9bc1808adb1619ba94ca354d9ba17d86c3/aiolimiter-1.2.1-py3-none-any.whl", hash = "sha256:d3f249e9059a20badcb56b61601a83556133655c11d1eb3dd3e04ff069e5f3c7" }, +] + [[package]] name = "aiosignal" version = "1.4.0" @@ -1153,6 +1162,15 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/7f/ca/a8e5ae0e13d7c874e138c5d0e8313856b91f97359781fd78b0c9e34ae791/drawsvg-2.4.0-py3-none-any.whl", hash = "sha256:85b54044956390f05053bc2651e2414d54a4939b57a2be0a043e5cec2e04f1bb" }, ] +[[package]] +name = "durationpy" +version = "0.10" +source = { registry = "https://mirrors.aliyun.com/pypi/simple/" } +sdist = { url = "https://mirrors.aliyun.com/pypi/packages/9d/a4/e44218c2b394e31a6dd0d6b095c4e1f32d0be54c2a4b250032d717647bab/durationpy-0.10.tar.gz", hash = "sha256:1fa6893409a6e739c9c72334fc65cca1f355dbdd93405d30f726deb5bde42fba" } +wheels = [ + { url = "https://mirrors.aliyun.com/pypi/packages/b0/0d/9feae160378a3553fa9a339b0e9c1a048e147a4127210e286ef18b730f03/durationpy-0.10-py3-none-any.whl", hash = "sha256:3b41e1b601234296b4fb368338fdcd3e13e0b4fb5b67345948f4f2bf9868b286" }, +] + [[package]] name = "exceptiongroup" version = "1.3.0" @@ -2010,6 +2028,26 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/da/e9/0d4add7873a73e462aeb45c036a2dead2562b825aa46ba326727b3f31016/kiwisolver-1.4.9-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:fb940820c63a9590d31d88b815e7a3aa5915cad3ce735ab45f0c730b39547de1" }, ] +[[package]] +name = "kubernetes" +version = "35.0.0" +source = { registry = "https://mirrors.aliyun.com/pypi/simple/" } +dependencies = [ + { name = "certifi" }, + { name = "durationpy" }, + { name = "python-dateutil" }, + { name = "pyyaml" }, + { name = "requests" }, + { name = "requests-oauthlib" }, + { name = "six" }, + { name = "urllib3" }, + { name = "websocket-client" }, +] +sdist = { url = "https://mirrors.aliyun.com/pypi/packages/2c/8f/85bf51ad4150f64e8c665daf0d9dfe9787ae92005efb9a4d1cba592bd79d/kubernetes-35.0.0.tar.gz", hash = "sha256:3d00d344944239821458b9efd484d6df9f011da367ecb155dadf9513f05f09ee" } +wheels = [ + { url = "https://mirrors.aliyun.com/pypi/packages/0c/70/05b685ea2dffcb2adbf3cdcea5d8865b7bc66f67249084cf845012a0ff13/kubernetes-35.0.0-py2.py3-none-any.whl", hash = "sha256:39e2b33b46e5834ef6c3985ebfe2047ab39135d41de51ce7641a7ca5b372a13d" }, +] + [[package]] name = "latex2sympy2-extended" version = "1.10.2" @@ -2641,6 +2679,15 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/95/8e/2844c3959ce9a63acc7c8e50881133d86666f0420bcde695e115ced0920f/numpy-2.3.4-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:81b3a59793523e552c4a96109dde028aa4448ae06ccac5a76ff6532a85558a7f" }, ] +[[package]] +name = "oauthlib" +version = "3.3.1" +source = { registry = "https://mirrors.aliyun.com/pypi/simple/" } +sdist = { url = "https://mirrors.aliyun.com/pypi/packages/0b/5f/19930f824ffeb0ad4372da4812c50edbd1434f678c90c2733e1188edfc63/oauthlib-3.3.1.tar.gz", hash = "sha256:0f0f8aa759826a193cf66c12ea1af1637f87b9b4622d46e866952bb022e538c9" } +wheels = [ + { url = "https://mirrors.aliyun.com/pypi/packages/be/9c/92789c596b8df838baa98fa71844d84283302f7604ed565dafe5a6b5041a/oauthlib-3.3.1-py3-none-any.whl", hash = "sha256:88119c938d2b8fb88561af5f6ee0eec8cc8d552b7bb1f712743136eb7523b7a1" }, +] + [[package]] name = "opencensus" version = "0.11.4" @@ -3996,6 +4043,19 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6" }, ] +[[package]] +name = "requests-oauthlib" +version = "2.0.0" +source = { registry = "https://mirrors.aliyun.com/pypi/simple/" } +dependencies = [ + { name = "oauthlib" }, + { name = "requests" }, +] +sdist = { url = "https://mirrors.aliyun.com/pypi/packages/42/f2/05f29bc3913aea15eb670be136045bf5c5bbf4b99ecb839da9b422bb2c85/requests-oauthlib-2.0.0.tar.gz", hash = "sha256:b3dffaebd884d8cd778494369603a9e7b58d29111bf6b41bdc2dcd87203af4e9" } +wheels = [ + { url = "https://mirrors.aliyun.com/pypi/packages/3b/5d/63d4ae3b9daea098d5d6f5da83984853c1bbacd5dc826764b249fe119d24/requests_oauthlib-2.0.0-py2.py3-none-any.whl", hash = "sha256:7dd8a5c40426b779b0868c404bdef9768deccf22749cde15852df527e6269b36" }, +] + [[package]] name = "rich" version = "14.2.0" @@ -4011,7 +4071,7 @@ wheels = [ [[package]] name = "rl-rock" -version = "1.2.1" +version = "1.3.0" source = { editable = "." } dependencies = [ { name = "anyio" }, @@ -4034,14 +4094,17 @@ dependencies = [ [package.optional-dependencies] admin = [ { name = "aiohttp" }, + { name = "aiolimiter" }, { name = "aiosqlite" }, { name = "alibabacloud-cr20181201" }, { name = "apscheduler" }, { name = "bashlex" }, { name = "boto3" }, { name = "cryptography" }, + { name = "fakeredis", extra = ["json"] }, { name = "fastapi" }, { name = "gem-llm" }, + { name = "kubernetes" }, { name = "nacos-sdk-python" }, { name = "pexpect" }, { name = "pip" }, @@ -4055,6 +4118,7 @@ admin = [ ] all = [ { name = "aiohttp" }, + { name = "aiolimiter" }, { name = "aiosqlite" }, { name = "alibabacloud-cr20181201" }, { name = "apscheduler" }, @@ -4062,8 +4126,10 @@ all = [ { name = "boto3" }, { name = "cryptography" }, { name = "docker" }, + { name = "fakeredis", extra = ["json"] }, { name = "fastapi" }, { name = "gem-llm" }, + { name = "kubernetes" }, { name = "nacos-sdk-python" }, { name = "pexpect" }, { name = "pip" }, @@ -4107,7 +4173,6 @@ sandbox-actor = [ [package.dev-dependencies] test = [ - { name = "fakeredis", extra = ["json"] }, { name = "iflow-cli-sdk" }, { name = "pre-commit" }, { name = "pytest" }, @@ -4126,6 +4191,7 @@ test = [ [package.metadata] requires-dist = [ { name = "aiohttp", marker = "extra == 'admin'", specifier = ">=3.12.15" }, + { name = "aiolimiter", marker = "extra == 'admin'", specifier = ">=1.2.1" }, { name = "aiosqlite", marker = "extra == 'admin'" }, { name = "alibabacloud-cr20181201", marker = "extra == 'admin'", specifier = "==2.0.5" }, { name = "alibabacloud-cr20181201", marker = "extra == 'model-service'", specifier = "==2.0.5" }, @@ -4137,12 +4203,14 @@ requires-dist = [ { name = "build" }, { name = "cryptography", marker = "extra == 'admin'", specifier = "==39.0.1" }, { name = "docker", marker = "extra == 'builder'" }, + { name = "fakeredis", extras = ["json"], marker = "extra == 'admin'" }, { name = "fastapi", marker = "extra == 'model-service'" }, { name = "fastapi", marker = "extra == 'rocklet'" }, { name = "gem-llm", marker = "extra == 'builder'", specifier = ">=0.1.0" }, { name = "gem-llm", marker = "extra == 'rocklet'", specifier = ">=0.1.0" }, { name = "gem-llm", marker = "extra == 'sandbox-actor'", specifier = ">=0.1.0" }, { name = "httpx" }, + { name = "kubernetes", marker = "extra == 'admin'", specifier = ">=35.0.0" }, { name = "nacos-sdk-python", marker = "extra == 'admin'", specifier = ">=0.1.14" }, { name = "nacos-sdk-python", marker = "extra == 'sandbox-actor'", specifier = ">=0.1.14" }, { name = "opentelemetry-api" }, @@ -4181,7 +4249,6 @@ provides-extras = ["admin", "rocklet", "sandbox-actor", "builder", "model-servic [package.metadata.requires-dev] test = [ - { name = "fakeredis", extras = ["json"] }, { name = "iflow-cli-sdk", specifier = ">=0.1.1" }, { name = "pre-commit", specifier = ">=4.3.0" }, { name = "pytest" }, @@ -4964,6 +5031,15 @@ wheels = [ { url = "https://mirrors.aliyun.com/pypi/packages/6e/d4/ed38dd3b1767193de971e694aa544356e63353c33a85d948166b5ff58b9e/watchfiles-1.1.1-pp311-pypy311_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3e6f39af2eab0118338902798b5aa6664f46ff66bc0280de76fca67a7f262a49" }, ] +[[package]] +name = "websocket-client" +version = "1.9.0" +source = { registry = "https://mirrors.aliyun.com/pypi/simple/" } +sdist = { url = "https://mirrors.aliyun.com/pypi/packages/2c/41/aa4bf9664e4cda14c3b39865b12251e8e7d239f4cd0e3cc1b6c2ccde25c1/websocket_client-1.9.0.tar.gz", hash = "sha256:9e813624b6eb619999a97dc7958469217c3176312b3a16a4bd1bc7e08a46ec98" } +wheels = [ + { url = "https://mirrors.aliyun.com/pypi/packages/34/db/b10e48aa8fff7407e67470363eac595018441cf32d5e1001567a7aeba5d2/websocket_client-1.9.0-py3-none-any.whl", hash = "sha256:af248a825037ef591efbf6ed20cc5faa03d3b47b9e5a2230a529eeee1c1fc3ef" }, +] + [[package]] name = "websockets" version = "15.0.1"