|
| 1 | +"""Property-based tests for lease fencing logic.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +import tempfile |
| 6 | +from collections import defaultdict, deque |
| 7 | +from contextlib import asynccontextmanager |
| 8 | +from datetime import datetime, timezone, timedelta |
| 9 | +from pathlib import Path |
| 10 | + |
| 11 | +import pytest |
| 12 | +from hypothesis import given, settings, strategies as st |
| 13 | +from sqlalchemy import update |
| 14 | + |
| 15 | +from src.nimbus.common.schemas import JobAssignment, GitHubRepository, RunnerRegistrationToken |
| 16 | +from src.nimbus.control_plane.jobs import QUEUE_KEY, enqueue_job, lease_job_with_fence |
| 17 | +from src.nimbus.control_plane.db import ( |
| 18 | + metadata, |
| 19 | + job_leases_table, |
| 20 | + jobs_table, |
| 21 | + record_job_queued, |
| 22 | + create_engine, |
| 23 | + session_factory, |
| 24 | +) |
| 25 | + |
| 26 | + |
| 27 | +class FakeRedis: |
| 28 | + def __init__(self) -> None: |
| 29 | + self._lists = defaultdict(deque) |
| 30 | + |
| 31 | + async def lpush(self, key: str, value: str) -> None: |
| 32 | + self._lists[key].appendleft(value) |
| 33 | + |
| 34 | + async def rpop(self, key: str): |
| 35 | + if key not in self._lists or not self._lists[key]: |
| 36 | + return None |
| 37 | + return self._lists[key].pop() |
| 38 | + |
| 39 | + async def delete(self, key: str) -> None: |
| 40 | + self._lists.pop(key, None) |
| 41 | + |
| 42 | + |
| 43 | +@asynccontextmanager |
| 44 | +async def session_context(): |
| 45 | + with tempfile.TemporaryDirectory() as tmp_dir: |
| 46 | + db_path = Path(tmp_dir) / "nimbus.db" |
| 47 | + engine = create_engine(f"sqlite+aiosqlite:///{db_path}") |
| 48 | + async with engine.begin() as conn: |
| 49 | + await conn.run_sync(metadata.create_all) |
| 50 | + Session = session_factory(engine) |
| 51 | + async with Session() as session: |
| 52 | + yield session |
| 53 | + await engine.dispose() |
| 54 | + |
| 55 | + |
| 56 | +async def _prepare_job(session, job_id: int, executor: str = "firecracker") -> JobAssignment: |
| 57 | + repo = GitHubRepository( |
| 58 | + id=1, |
| 59 | + name="repo", |
| 60 | + full_name="org/repo", |
| 61 | + private=False, |
| 62 | + owner_id=1, |
| 63 | + ) |
| 64 | + token = RunnerRegistrationToken( |
| 65 | + token="token", |
| 66 | + expires_at=datetime.now(timezone.utc) + timedelta(hours=1), |
| 67 | + ) |
| 68 | + assignment = JobAssignment( |
| 69 | + job_id=job_id, |
| 70 | + run_id=job_id, |
| 71 | + run_attempt=1, |
| 72 | + repository=repo, |
| 73 | + labels=["nimbus", executor], |
| 74 | + runner_registration=token, |
| 75 | + executor=executor, |
| 76 | + ) |
| 77 | + await record_job_queued(session, assignment) |
| 78 | + await session.commit() |
| 79 | + return assignment |
| 80 | + |
| 81 | + |
| 82 | +@settings(max_examples=25, deadline=None) |
| 83 | +@given( |
| 84 | + st.integers(min_value=1000, max_value=1000000), |
| 85 | + st.integers(min_value=1, max_value=5), |
| 86 | +) |
| 87 | +@pytest.mark.asyncio |
| 88 | +async def test_lease_fencing_monotonic(job_id: int, attempts: int): |
| 89 | + """Ensure fence tokens never decrease across acquisitions.""" |
| 90 | + fake_redis = FakeRedis() |
| 91 | + async with session_context() as session: |
| 92 | + assignment = await _prepare_job(session, job_id) |
| 93 | + await enqueue_job(fake_redis, assignment) |
| 94 | + |
| 95 | + seen_fences: list[int] = [] |
| 96 | + for idx in range(attempts): |
| 97 | + result = await lease_job_with_fence( |
| 98 | + fake_redis, |
| 99 | + session, |
| 100 | + agent_id=f"agent-{idx}", |
| 101 | + ttl_seconds=10, |
| 102 | + capabilities=[assignment.executor], |
| 103 | + ) |
| 104 | + if result is None: |
| 105 | + continue |
| 106 | + _, fence = result |
| 107 | + if seen_fences: |
| 108 | + assert fence > seen_fences[-1] |
| 109 | + seen_fences.append(fence) |
| 110 | + |
| 111 | + # Expire lease and reset job state for next iteration |
| 112 | + await session.execute( |
| 113 | + update(job_leases_table) |
| 114 | + .where(job_leases_table.c.job_id == job_id) |
| 115 | + .values(lease_expires_at=datetime.now(timezone.utc) - timedelta(seconds=1)) |
| 116 | + ) |
| 117 | + await session.execute( |
| 118 | + update(jobs_table) |
| 119 | + .where(jobs_table.c.job_id == job_id) |
| 120 | + .values( |
| 121 | + status="queued", |
| 122 | + agent_id=None, |
| 123 | + leased_at=None, |
| 124 | + updated_at=datetime.now(timezone.utc), |
| 125 | + ) |
| 126 | + ) |
| 127 | + await session.commit() |
| 128 | + await fake_redis.lpush(QUEUE_KEY, assignment.model_dump_json()) |
| 129 | + |
| 130 | + assert seen_fences, "Expected at least one successful lease acquisition" |
| 131 | + |
| 132 | + |
| 133 | +@settings(max_examples=10, deadline=None) |
| 134 | +@given( |
| 135 | + st.sampled_from(["firecracker", "docker", "gpu"]), |
| 136 | + st.sampled_from(["firecracker", "docker", "gpu"]), |
| 137 | +) |
| 138 | +@pytest.mark.asyncio |
| 139 | +async def test_capability_matching(job_executor: str, agent_capability: str): |
| 140 | + """Jobs should only lease to agents with matching capability.""" |
| 141 | + fake_redis = FakeRedis() |
| 142 | + async with session_context() as session: |
| 143 | + assignment = await _prepare_job(session, job_id=42, executor=job_executor) |
| 144 | + await enqueue_job(fake_redis, assignment) |
| 145 | + |
| 146 | + result = await lease_job_with_fence( |
| 147 | + fake_redis, |
| 148 | + session, |
| 149 | + agent_id="agent-1", |
| 150 | + ttl_seconds=60, |
| 151 | + capabilities=[agent_capability], |
| 152 | + ) |
| 153 | + |
| 154 | + if job_executor == agent_capability: |
| 155 | + assert result is not None |
| 156 | + leased_job, fence = result |
| 157 | + assert leased_job.executor == job_executor |
| 158 | + assert fence > 0 |
| 159 | + else: |
| 160 | + assert result is None |
0 commit comments