Skip to content

Commit 13531dc

Browse files
dengwx2026claude
andauthored
feat: validate and sync experiment_id/namespace in JobConfig (#716) (#717)
* feat: refine expr_id and namespace * feat(sdk): add JobConfig enhancements and fix linting issues Add job configuration improvements including experiment_id support, OSS mirror config updates, and port validation changes. Fix ruff lint and format issues across the codebase. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: use urlparse for URL hostname validation in speedup tests Replace substring-based URL checks with urlparse().hostname to satisfy CodeQL's "Incomplete URL substring sanitization" rule. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: extract domain constants to avoid CodeQL URL substring warnings CodeQL flags `"domain" in var` as incomplete URL sanitization regardless of variable type. Extract hostnames to constants and use helper functions to eliminate the flagged pattern entirely. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent df10981 commit 13531dc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+786
-557
lines changed

docs/dev/agent/JobConfig.md

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# JobConfig 字段分析: `namespace``experiment_id`
2+
3+
## 1. 现状分析
4+
5+
### 1.1 字段定义位置
6+
7+
| 字段 | 定义位置 | 类型 |
8+
|------|---------|------|
9+
| `namespace` | `JobConfig` (`models/job/config.py:143`) | `str \| None = None` |
10+
| `experiment_id` | `JobConfig` (`models/job/config.py:147`) | `str \| None = None` |
11+
| `experiment_id` | `SandboxConfig` (`sdk/sandbox/config.py:40`) | `str \| None = None` |
12+
| `namespace` | `SandboxConfig` (`sdk/sandbox/config.py:42`) | `str \| None = None` |
13+
| `_namespace` | `Sandbox` (`sdk/sandbox/client.py:78`) | `str \| None = None` |
14+
| `_experiment_id` | `Sandbox` (`sdk/sandbox/client.py:79`) | `str \| None = None` |
15+
16+
注意: `JobConfig.environment` 类型为 `RockEnvironmentConfig`,继承自 `SandboxConfig`,因此 `self.environment.experiment_id` 来自 `SandboxConfig`
17+
18+
### 1.2 当前数据流
19+
20+
```
21+
用户构造 JobConfig(experiment_id="exp-1", environment=RockEnvironmentConfig(experiment_id=?))
22+
↑ 来自 SandboxConfig
23+
24+
Sandbox.start()
25+
└─ get_status() 获取 sandbox_info
26+
├─ sandbox_info.namespace → Sandbox._namespace (client.py:203)
27+
└─ sandbox_info.experiment_id → Sandbox._experiment_id (client.py:205)
28+
29+
Job.submit()
30+
└─ _prepare_and_start()
31+
└─ _autofill_sandbox_info() (job.py:268-269)
32+
└─ self._config.namespace = self._sandbox._namespace
33+
└─ (experiment_id 未处理)
34+
```
35+
36+
### 1.3 `to_harbor_yaml()` 序列化
37+
38+
`JobConfig.to_harbor_yaml()``namespace``experiment_id` 序列化到 Harbor YAML 中(`exclude={"environment"}, exclude_none=True`),最终传给 `harbor jobs start -c`
39+
40+
---
41+
42+
## 2. 问题
43+
44+
### 2.1 `experiment_id` 两处定义未同步
45+
46+
- `JobConfig.experiment_id` — Harbor YAML 层面的实验标识
47+
- `SandboxConfig.experiment_id`(通过 `JobConfig.environment`)— sandbox 创建时传递的实验标识
48+
- **问题**: 两个 `experiment_id` 各自独立,没有同步或校验逻辑。用户可能在两处设置不同的值,导致 sandbox 创建和 Harbor 执行使用不同的 experiment_id。
49+
50+
### 2.2 `namespace` 缺少一致性校验
51+
52+
- `_autofill_sandbox_info()` 直接覆盖 `self._config.namespace = self._sandbox._namespace`
53+
- **问题**: 如果用户已经设置了 `namespace`(非 None),当前逻辑会静默覆盖,不做任何校验。
54+
55+
---
56+
57+
## 3. 改进方案
58+
59+
### 3.1 `experiment_id`: 在 JobConfig 中增加 model_validator (post init)
60+
61+
**保留** `JobConfig.experiment_id` 作为唯一权威来源,通过 `model_validator(mode="after")` 做三件事:
62+
63+
1. **校验非空**: `JobConfig.experiment_id` 不能为 None 或空字符串
64+
2. **一致性校验**: 如果 `environment.experiment_id`(即 SandboxConfig 的)已有值,必须与 `JobConfig.experiment_id` 一致,否则抛异常
65+
3. **向下同步**: 将 `JobConfig.experiment_id` 设置到 `environment.experiment_id`
66+
67+
```python
68+
@model_validator(mode="after")
69+
def _sync_experiment_id(self):
70+
if not self.experiment_id:
71+
raise ValueError("experiment_id must not be empty")
72+
env_exp = self.environment.experiment_id
73+
if env_exp is not None and env_exp != self.experiment_id:
74+
raise ValueError(
75+
f"experiment_id mismatch: JobConfig has '{self.experiment_id}', "
76+
f"but environment (SandboxConfig) has '{env_exp}'"
77+
)
78+
self.environment.experiment_id = self.experiment_id
79+
return self
80+
```
81+
82+
**行为矩阵**:
83+
84+
| JobConfig.experiment_id | environment.experiment_id | 行为 |
85+
|------------------------|--------------------------|------|
86+
| `None``""` | 任意 | **抛出 ValueError**: experiment_id 不能为空 |
87+
| `"exp-1"` | `None` | 同步: `environment.experiment_id = "exp-1"` |
88+
| `"exp-1"` | `"exp-1"` | 通过,一致 |
89+
| `"exp-1"` | `"exp-2"` | **抛出 ValueError**: mismatch |
90+
91+
### 3.2 `namespace`: 保持由 sandbox 返回值设置(运行时回填)
92+
93+
`namespace``experiment_id` 不同 — 它的权威来源是 sandbox 运行时返回值,用户通常不需要设置。保持在 `_autofill_sandbox_info()` 中处理,但增加一致性校验:
94+
95+
| 用户设置 | sandbox 返回 | 行为 |
96+
|---------|-------------|------|
97+
| `None` | 有值 | 自动回填 sandbox 返回值 |
98+
| `None` | `None` | 保持 `None` |
99+
| 有值 | 有值且一致 | 保持不变 |
100+
| 有值 | 有值且不一致 | **抛出 ValueError** |
101+
| 有值 | `None` | 保留用户设置值 |
102+
103+
---
104+
105+
## 4. 涉及文件
106+
107+
| 文件 | 变更内容 |
108+
|------|---------|
109+
| `rock/sdk/agent/models/job/config.py` | 新增 `_sync_experiment_id` model_validator |
110+
| `rock/sdk/agent/job.py` | 更新 `_autofill_sandbox_info()`,namespace 增加一致性校验 |
111+
| `tests/unit/sdk/agent/` | 补充测试: validator 校验逻辑、mismatch 异常、空值异常 |

examples/evaluation/swe_bench/common.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import re
2-
import yaml
32
from pathlib import Path
43

4+
import yaml
5+
56
from rock.logger import init_logger
67
from rock.sdk.sandbox.client import RunMode, Sandbox
78
from rock.sdk.sandbox.config import SandboxConfig
@@ -15,6 +16,7 @@
1516

1617
logger = init_logger(__name__)
1718

19+
1820
def load_task_config(task_dir: Path) -> dict:
1921
"""Load task configuration from task.yaml."""
2022
task_yaml_path = task_dir / "task.yaml"

examples/evaluation/swe_bench/swe_bench_verified_demo.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
python -m examples.evaluation.swe_bench.swe_bench_verified_demo
1919
"""
2020

21-
import sys
2221
import asyncio
22+
import sys
2323
from pathlib import Path
2424

2525
from examples.evaluation.swe_bench.common import load_task_config, parse_swebench_result, setup_test_env, start_sandbox
@@ -31,6 +31,7 @@
3131
test_timeout_sec = 3600
3232
logger = init_logger(__name__)
3333

34+
3435
async def run_swe_evaluation(sandbox: Sandbox, task_dir: Path, instruction: str, agent_config_path: str) -> bool:
3536
"""Run SWE evaluation on the sandbox."""
3637
task_name = task_dir.name

rock/admin/metrics/gc_view_instrument_match.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
from collections.abc import Sequence
12
from time import time_ns
2-
from typing import Dict, List, Optional, Sequence
33

44
from opentelemetry.sdk.metrics._internal._view_instrument_match import (
55
_ViewInstrumentMatch as _OrigViewInstrumentMatch,
@@ -15,28 +15,23 @@ class _GcViewInstrumentMatch(_OrigViewInstrumentMatch):
1515
metric series (based on attributes). This is useful for preventing memory
1616
leaks when dealing with high-cardinality metrics.
1717
"""
18+
1819
# Idle metric series are cleaned up after 20 minutes. This can be adjusted.
1920
_IDLE_TIMEOUT_NS = 20 * 60 * 1_000_000_000
2021

2122
def __init__(self, *args, **kwargs):
2223
super().__init__(*args, **kwargs)
23-
self._last_used_ns: Dict[frozenset, int] = {}
24+
self._last_used_ns: dict[frozenset, int] = {}
2425

25-
def consume_measurement(
26-
self, measurement: Measurement, should_sample_exemplar: bool = True
27-
) -> None:
26+
def consume_measurement(self, measurement: Measurement, should_sample_exemplar: bool = True) -> None:
2827
"""
2928
Consumes a measurement, aggregates it, and tracks its usage for GC.
3029
"""
3130
attributes = measurement.attributes or {}
3231
measurement_for_aggregation = measurement
3332

3433
if self._view._attribute_keys is not None:
35-
filtered_attributes = {
36-
key: value
37-
for key, value in attributes.items()
38-
if key in self._view._attribute_keys
39-
}
34+
filtered_attributes = {key: value for key, value in attributes.items() if key in self._view._attribute_keys}
4035

4136
# If attributes were filtered, a new Measurement object must be used
4237
# for aggregation. This ensures that if an exemplar is recorded, it
@@ -61,22 +56,20 @@ def consume_measurement(
6156
should_sample_exemplar,
6257
)
6358
else:
64-
self._attributes_aggregation[aggr_key].aggregate(
65-
measurement_for_aggregation, should_sample_exemplar
66-
)
59+
self._attributes_aggregation[aggr_key].aggregate(measurement_for_aggregation, should_sample_exemplar)
6760
self._last_used_ns[aggr_key] = now_ns
6861

6962
def collect(
70-
self,
71-
collection_aggregation_temporality: AggregationTemporality,
72-
collection_start_nanos: int,
73-
) -> Optional[Sequence[DataPointT]]:
63+
self,
64+
collection_aggregation_temporality: AggregationTemporality,
65+
collection_start_nanos: int,
66+
) -> Sequence[DataPointT] | None:
7467
"""
7568
Collects all data points for the metric, and garbage collects idle series.
7669
"""
77-
data_points: List[DataPointT] = []
70+
data_points: list[DataPointT] = []
7871
now_ns = time_ns()
79-
to_delete: List[frozenset] = []
72+
to_delete: list[frozenset] = []
8073

8174
with self._lock:
8275
# First, collect data points and identify idle series
@@ -104,7 +97,9 @@ def patch_view_instrument_match() -> None:
10497
# Call this once at application startup, before initializing any metric
10598
# readers or providers, to replace the SDK's internal class.
10699
import opentelemetry.sdk.metrics._internal._view_instrument_match as vim_mod
100+
107101
vim_mod._ViewInstrumentMatch = _GcViewInstrumentMatch
108102

109103
from opentelemetry.sdk.metrics._internal import metric_reader_storage as mrs
104+
110105
mrs._ViewInstrumentMatch = _GcViewInstrumentMatch

rock/admin/scheduler/task_factory.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ def register_all_tasks(cls, scheduler_config: SchedulerConfig):
6262
try:
6363
task = cls.create_task(task_config)
6464
TaskRegistry.register(task)
65-
logger.info(
66-
f"Registered task '{task.type}' with interval {task.interval_seconds}s"
67-
)
65+
logger.info(f"Registered task '{task.type}' with interval {task.interval_seconds}s")
6866
except Exception as e:
6967
logger.error(f"Failed to create task '{task_config.task_class}': {e}")

rock/common/port_validation.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Port validation utilities for port forwarding."""
2+
23
from rock.logger import init_logger
34

45
logger = init_logger(__name__)

rock/rocklet/local_api.py

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -180,19 +180,13 @@ async def portforward(websocket: WebSocket, port: int):
180180

181181
try:
182182
# Connect to local TCP port
183-
reader, writer = await asyncio.wait_for(
184-
asyncio.open_connection("127.0.0.1", port),
185-
timeout=TCP_CONNECT_TIMEOUT
186-
)
183+
reader, writer = await asyncio.wait_for(asyncio.open_connection("127.0.0.1", port), timeout=TCP_CONNECT_TIMEOUT)
187184
logger.info(
188185
f"[Portforward] TCP connection established: target_port={port}, "
189186
f"local_addr={writer.get_extra_info('sockname')}"
190187
)
191188
except asyncio.TimeoutError:
192-
logger.error(
193-
f"[Portforward] TCP connection timeout: target_port={port}, "
194-
f"timeout={TCP_CONNECT_TIMEOUT}s"
195-
)
189+
logger.error(f"[Portforward] TCP connection timeout: target_port={port}, " f"timeout={TCP_CONNECT_TIMEOUT}s")
196190
await websocket.close(code=1011, reason=f"Connection to port {port} timed out")
197191
return
198192
except OSError as e:
@@ -204,8 +198,7 @@ async def portforward(websocket: WebSocket, port: int):
204198
return
205199
except Exception as e:
206200
logger.error(
207-
f"[Portforward] Unexpected TCP error: target_port={port}, "
208-
f"error_type={type(e).__name__}, error={e}"
201+
f"[Portforward] Unexpected TCP error: target_port={port}, " f"error_type={type(e).__name__}, error={e}"
209202
)
210203
await websocket.close(code=1011, reason=f"Unexpected error: {e}")
211204
return
@@ -232,13 +225,10 @@ async def ws_to_tcp():
232225
f"bytes={len(data)}, total_msgs={ws_to_tcp_msgs}, total_bytes={ws_to_tcp_bytes}"
233226
)
234227
except WebSocketDisconnect as e:
235-
logger.info(
236-
f"[Portforward] ws->tcp: client disconnected: target_port={port}, code={e.code}"
237-
)
228+
logger.info(f"[Portforward] ws->tcp: client disconnected: target_port={port}, code={e.code}")
238229
except Exception as e:
239230
logger.debug(
240-
f"[Portforward] ws->tcp error: target_port={port}, "
241-
f"error_type={type(e).__name__}, error={e}"
231+
f"[Portforward] ws->tcp error: target_port={port}, " f"error_type={type(e).__name__}, error={e}"
242232
)
243233
finally:
244234
writer.close()
@@ -261,8 +251,7 @@ async def tcp_to_ws():
261251
)
262252
except Exception as e:
263253
logger.debug(
264-
f"[Portforward] tcp->ws error: target_port={port}, "
265-
f"error_type={type(e).__name__}, error={e}"
254+
f"[Portforward] tcp->ws error: target_port={port}, " f"error_type={type(e).__name__}, error={e}"
266255
)
267256
finally:
268257
try:
@@ -275,8 +264,7 @@ async def tcp_to_ws():
275264
await asyncio.gather(ws_to_tcp(), tcp_to_ws())
276265
except Exception as e:
277266
logger.debug(
278-
f"[Portforward] Forwarding error: target_port={port}, "
279-
f"error_type={type(e).__name__}, error={e}"
267+
f"[Portforward] Forwarding error: target_port={port}, " f"error_type={type(e).__name__}, error={e}"
280268
)
281269
finally:
282270
writer.close()

0 commit comments

Comments
 (0)