Skip to content

feat(kv-indexer): add python launcher follow-up#7395

Open
PeaBrane wants to merge 25 commits intomainfrom
rupei/kv-indexer-python-launcher-pr
Open

feat(kv-indexer): add python launcher follow-up#7395
PeaBrane wants to merge 25 commits intomainfrom
rupei/kv-indexer-python-launcher-pr

Conversation

@PeaBrane
Copy link
Contributor

@PeaBrane PeaBrane commented Mar 15, 2026

Adds the thin Python module launcher, CI filter update, and the router E2E wiring that depends on python -m dynamo.indexer.

Summary by CodeRabbit

  • New Features

    • Standalone indexer CLI entry point now available for independent execution
    • Support for multi-indexer configurations and coordination
  • Improvements

    • Enhanced process cleanup resilience with graceful error handling during shutdown

PeaBrane added 12 commits March 12, 2026 21:37
Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com>
…er routes

Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com>
@PeaBrane PeaBrane requested a review from a team as a code owner March 15, 2026 06:28
Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com>
@PeaBrane PeaBrane requested a review from a team as a code owner March 15, 2026 06:46
Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com>
Signed-off-by: PeaBrane <yanrpei@gmail.com>
…xer-python-launcher-pr

Signed-off-by: PeaBrane <yanrpei@gmail.com>

# Conflicts:
#	tests/router/test_router_e2e_with_mockers.py
Base automatically changed from rupei/nuke-native-kv-indexer-binary to main March 15, 2026 08:11
…on-launcher-pr

Signed-off-by: PeaBrane <yanrpei@gmail.com>

# Conflicts:
#	lib/bindings/python/Cargo.toml
#	lib/bindings/python/rust/lib.rs
#	tests/router/test_router_e2e_with_mockers.py
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 15, 2026

Walkthrough

This pull request introduces a KV indexer CLI module for the Dynamo component with Rust bindings and comprehensive test infrastructure. Changes include new Python entry points, Rust FFI exposures, test helpers for indexer synchronization, and process management improvements for robustness.

Changes

Cohort / File(s) Summary
Dynamo Indexer Module
.github/filters.yaml, components/src/dynamo/indexer/__init__.py, components/src/dynamo/indexer/__main__.py, components/src/dynamo/indexer/main.py
Establishes new Python indexer module with SPDX headers, CLI entry point, and main function that invokes run_kv_indexer with error handling for help requests.
Rust Bindings and FFI
lib/bindings/python/Cargo.toml, lib/bindings/python/rust/lib.rs, lib/bindings/python/rust/llm/kv.rs
Adds kv-indexer optional features, exposes run_kv_indexer Python binding, and implements run_kv_indexer_cli with conditional logging initialization and CLI argument parsing for standalone and runtime modes.
Python Package Integration
lib/bindings/python/src/dynamo/llm/__init__.py
Exports run_kv_indexer from _core module to make it accessible at dynamo.llm package level.
Test Infrastructure and Helpers
tests/router/helper.py, tests/router/common.py
Adds get_kv_indexer_command utility, async wait_for_indexer_workers_active polling function, converts send_request_with_retry to async, and synchronizes indexer workers before proceeding.
End-to-End Test Suite
tests/router/test_router_e2e_with_mockers.py
Introduces standalone_indexer workflow with per-mocker indexer integration, dual-indexer peer support, worker registration via HTTP, new test_indexers_sync test case covering multiple backends and DP-rank configurations, and extended cleanup logic.
Process Management Resilience
tests/utils/managed_process.py
Adds defensive exception handling for process enumeration in terminate_process_tree, _terminate_process_group, and subprocesses with graceful fallback behavior on permission errors.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐰 Hop, skip, and CLI jump!
Indexers now bind with Rust's trusty thump,
Workers sync in a careful dance,
Tests keep processes on chance,
A component's chance to advance!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 inconclusive)

Check name Status Explanation Resolution
Description check ❓ Inconclusive The description is minimal and vague, missing key sections from the template such as 'Overview' with proper context, detailed 'Details' section, 'Where should the reviewer start' guidance, and 'Related Issues'. Expand the description to follow the template structure: add an overview explaining the purpose, detail all file changes, specify reviewer focus areas, and reference related issues if applicable.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly describes the main change: adding a Python launcher for the KV indexer as a follow-up, which aligns with the multiple file changes including new Python modules and test wiring.
Docstring Coverage ✅ Passed Docstring coverage is 80.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (1)
components/src/dynamo/indexer/main.py (1)

8-10: Keep the launcher module import-side-effect free.

Line 8 mutates os.environ before Line 10 imports dynamo.llm, so merely importing this module changes process-wide launcher/logging behavior even if main() is never called. Moving that flag-setting into the actual CLI execution path would avoid the global side effect and keep the import block together.

As per coding guidelines, "keep imports at module top".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/src/dynamo/indexer/main.py` around lines 8 - 10, Remove the
top-level os.environ.setdefault("DYNAMO_SKIP_PYTHON_LOG_INIT", "1") so importing
components.src.dynamo.indexer.main no longer mutates process-wide state; keep
the import block (from dynamo.llm import run_kv_indexer) at module top, and
instead set os.environ["DYNAMO_SKIP_PYTHON_LOG_INIT"] = "1" inside the CLI
execution path (e.g., at the start of main() or under if __name__ ==
"__main__":) before invoking run_kv_indexer(), ensuring the environment flag is
applied only when the launcher actually runs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@lib/bindings/python/rust/lib.rs`:
- Around line 205-210: The CLI parse errors from clap are being turned into a
generic PyException in run_kv_indexer (via map_err(to_pyerr)), losing
clap::Error's kind and exit_code; update run_kv_indexer (and/or to_pyerr) to
special-case clap::Error returned by llm::kv::run_kv_indexer_cli (and by
KvIndexerCli::try_parse_from inside llm::kv::run_kv_indexer_cli) and preserve
the structured info: extract clap::Error::kind() and clap::Error::exit_code()
and the formatted error message (e.g., error.to_string() or error.display()) and
surface them across the FFI boundary (either by raising a Python exception with
attributes "clap_kind" and "exit_code" plus the message, or by handling parse
errors fully in Rust by printing the clap error to stdout/stderr and returning
an appropriate nonzero status before converting to Python), so callers can
distinguish usage/help errors from real failures.

In `@tests/router/helper.py`:
- Around line 325-335: The current loop can hang if session.get(workers_url)
stalls; add a per-request timeout using aiohttp.ClientTimeout and pass it when
creating the request (or when constructing ClientSession) so each GET will error
out instead of blocking the loop; compute remaining = max(min_timeout, deadline
- loop.time()) before the request and use that as the timeout for
session.get(workers_url, timeout=ClientTimeout(total=remaining)) (the existing
aiohttp.ClientError except block will handle the timeout error).

In `@tests/router/test_router_e2e_with_mockers.py`:
- Around line 497-519: The __exit__ method currently catches broad Exception for
each child shutdown (looping over self._mocker_processes and the
_indexer_b_process and _indexer_process), hiding ManagedProcess stop failures;
change the handlers to catch only the specific cleanup exceptions raised by
ManagedProcess.__exit__ (ManagedProcessStopError and
ManagedProcessStopTimeoutError), log them, perform the remaining best-effort
teardowns, and then re-raise the first caught specific exception (or chain them)
so failures propagate instead of being swallowed.
- Around line 839-923: The test_indexers_sync E2E is too heavy for pre-merge;
either move it to the post-merge suite or add a small smoke variant: update the
pytest marker for test_indexers_sync from pre_merge to post_merge (or add
`@pytest.mark.post_merge` above def test_indexers_sync) and keep the heavy
parametrized version only under post_merge, or alternatively create a
lightweight function (e.g., test_indexers_sync_smoke) that reduces params,
num_mockers, timeout, and resource usage for pre-merge while leaving the full
test as test_indexers_sync decorated with `@pytest.mark.post_merge`; target the
test_indexers_sync symbol and any module-level pytest.mark.pre_merge to make the
change.

---

Nitpick comments:
In `@components/src/dynamo/indexer/main.py`:
- Around line 8-10: Remove the top-level
os.environ.setdefault("DYNAMO_SKIP_PYTHON_LOG_INIT", "1") so importing
components.src.dynamo.indexer.main no longer mutates process-wide state; keep
the import block (from dynamo.llm import run_kv_indexer) at module top, and
instead set os.environ["DYNAMO_SKIP_PYTHON_LOG_INIT"] = "1" inside the CLI
execution path (e.g., at the start of main() or under if __name__ ==
"__main__":) before invoking run_kv_indexer(), ensuring the environment flag is
applied only when the launcher actually runs.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: cbf4eab8-4b8a-45fd-82e1-c9c3127a5bad

📥 Commits

Reviewing files that changed from the base of the PR and between b6596c5 and 237a365.

⛔ Files ignored due to path filters (1)
  • lib/bindings/python/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (12)
  • .github/filters.yaml
  • components/src/dynamo/indexer/__init__.py
  • components/src/dynamo/indexer/__main__.py
  • components/src/dynamo/indexer/main.py
  • lib/bindings/python/Cargo.toml
  • lib/bindings/python/rust/lib.rs
  • lib/bindings/python/rust/llm/kv.rs
  • lib/bindings/python/src/dynamo/llm/__init__.py
  • tests/router/common.py
  • tests/router/helper.py
  • tests/router/test_router_e2e_with_mockers.py
  • tests/utils/managed_process.py

Comment on lines +205 to +210
#[pyfunction(name = "run_kv_indexer")]
#[pyo3(signature = (argv=None))]
fn run_kv_indexer(py: Python<'_>, argv: Option<Vec<String>>) -> PyResult<()> {
let argv = argv.unwrap_or_default();
py.allow_threads(move || llm::kv::run_kv_indexer_cli(argv))
.map_err(to_pyerr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve CLI parse errors when converting them to Python.

lib/bindings/python/rust/llm/kv.rs Lines 99-102 parse argv with KvIndexerCli::try_parse_from(...). Line 209 then converts every resulting error into a generic PyException, which drops clap’s error kind and exit-code information. The launcher in components/src/dynamo/indexer/main.py can no longer distinguish usage errors from real startup failures, so invalid flags end up as Python tracebacks instead of normal CLI output/status. Preserve the structured clap error across the FFI boundary, or handle parse errors entirely on the Rust side before returning to Python.

How does clap 4 expose invalid-argument and help results from `Parser::try_parse_from`, and what error kind / exit-code information is available to callers?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lib/bindings/python/rust/lib.rs` around lines 205 - 210, The CLI parse errors
from clap are being turned into a generic PyException in run_kv_indexer (via
map_err(to_pyerr)), losing clap::Error's kind and exit_code; update
run_kv_indexer (and/or to_pyerr) to special-case clap::Error returned by
llm::kv::run_kv_indexer_cli (and by KvIndexerCli::try_parse_from inside
llm::kv::run_kv_indexer_cli) and preserve the structured info: extract
clap::Error::kind() and clap::Error::exit_code() and the formatted error message
(e.g., error.to_string() or error.display()) and surface them across the FFI
boundary (either by raising a Python exception with attributes "clap_kind" and
"exit_code" plus the message, or by handling parse errors fully in Rust by
printing the clap error to stdout/stderr and returning an appropriate nonzero
status before converting to Python), so callers can distinguish usage/help
errors from real failures.

Comment on lines +325 to +335
async with aiohttp.ClientSession() as session:
while loop.time() < deadline:
try:
async with session.get(workers_url) as resp:
if resp.status != 200:
await asyncio.sleep(0.5)
continue
workers = await resp.json()
except aiohttp.ClientError:
await asyncio.sleep(0.5)
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's find and read the helper.py file to examine the code around lines 325-335
fd -name "helper.py" -path "*/router/*"

Repository: ai-dynamo/dynamo

Length of output: 290


🏁 Script executed:

# Read the relevant section of the file
cat -n tests/router/helper.py | sed -n '310,350p'

Repository: ai-dynamo/dynamo

Length of output: 1681


🏁 Script executed:

# Also check the function signature and full context of the polling function
cat -n tests/router/helper.py | sed -n '300,370p'

Repository: ai-dynamo/dynamo

Length of output: 2848


🏁 Script executed:

# Check if there are any existing tests that exercise this function
rg -A 5 "wait_for_indexer_workers_active" tests/

Repository: ai-dynamo/dynamo

Length of output: 2168


🏁 Script executed:

# Check if aiohttp is already imported and how it's used elsewhere in the file
rg -B 2 -A 2 "aiohttp\." tests/router/helper.py | head -40

Repository: ai-dynamo/dynamo

Length of output: 1436


🏁 Script executed:

# Search for any existing timeout configurations in the codebase
rg "ClientTimeout\|aiohttp.*timeout" tests/

Repository: ai-dynamo/dynamo

Length of output: 42


🏁 Script executed:

# Check aiohttp documentation and imports to understand timeout handling
rg "import aiohttp" tests/router/helper.py

Repository: ai-dynamo/dynamo

Length of output: 74


🏁 Script executed:

# Search for how ClientTimeout is used elsewhere in the codebase
rg "ClientTimeout" .

Repository: ai-dynamo/dynamo

Length of output: 428


🏁 Script executed:

# Check if asyncio.TimeoutError is the right exception to catch
python3 << 'EOF'
import asyncio
import aiohttp

# Check what exceptions aiohttp raises on timeout
print("aiohttp.ClientError MRO:", aiohttp.ClientError.__mro__)
print("\nChecking if asyncio.TimeoutError is caught by aiohttp.ClientError:")

# This is the key question: does aiohttp wrap TimeoutError in ClientError?
# Let's check the aiohttp documentation behavior
help_text = """
In aiohttp, when a timeout occurs:
- A timeout on the request itself raises aiohttp.ClientError (or a subclass like ClientConnectorError)
- The actual asyncio.TimeoutError is NOT directly raised; aiohttp wraps it
"""
print(help_text)
EOF

Repository: ai-dynamo/dynamo

Length of output: 481


🏁 Script executed:

# Let's verify the actual exception behavior by checking test files for patterns
rg -B 3 -A 3 "asyncio.TimeoutError|asyncio.timeout" tests/ --type py | head -50

Repository: ai-dynamo/dynamo

Length of output: 1100


🏁 Script executed:

# Let's verify the exact exception hierarchy and behavior with aiohttp timeout
python3 << 'EOF'
import asyncio

# Check what aiohttp raises when timeout occurs
# According to aiohttp docs, when ClientSession has timeout set and timeout occurs:
# - asyncio.timeout() context manager raises asyncio.TimeoutError
# - aiohttp wraps it in aiohttp.ClientError subclasses (like ClientConnectorError, ClientOSError)
# - When using asyncio.wait_for() with timeout, it raises asyncio.TimeoutError

# The key difference:
# 1. aiohttp.ClientSession(timeout=...) wraps timeouts in ClientError subclasses
# 2. asyncio.wait_for() raises asyncio.TimeoutError directly

# In the suggested fix, since we're using ClientSession(timeout=...), 
# aiohttp will raise ClientError subclasses on timeout, not asyncio.TimeoutError

print("Exception behavior with aiohttp.ClientSession(timeout=...):")
print("- aiohttp raises aiohttp.ClientError (or subclasses)")
print("- asyncio.TimeoutError is NOT directly raised from session.get()")
print("\nTherefore, catching just aiohttp.ClientError is sufficient.")
print("Adding asyncio.TimeoutError is redundant but harmless.")
EOF

Repository: ai-dynamo/dynamo

Length of output: 353


🏁 Script executed:

# Let's check the deadline enforcement logic more carefully
cat -n tests/router/helper.py | sed -n '321,370p'

Repository: ai-dynamo/dynamo

Length of output: 2113


🏁 Script executed:

# Check if there's any other context about how deadline/timeout is expected to work
cat -n tests/router/helper.py | sed -n '370,390p'

Repository: ai-dynamo/dynamo

Length of output: 1057


🏁 Script executed:

# Let's verify the complete function to see if there's any explicit timeout enforcement after the loop
cat -n tests/router/helper.py | sed -n '312,390p'

Repository: ai-dynamo/dynamo

Length of output: 3446


🏁 Script executed:

# Let's trace through the logic: after resp.json() succeeds, does the code continue to check deadline?
# Looking at the code structure:
# 1. while loop.time() < deadline: (check)
# 2. session.get() - NO TIMEOUT HERE
# 3. if resp.status != 200 - status check
# 4. workers = await resp.json() - NO TIMEOUT HERE
# 5. (process workers)
# 6. await asyncio.sleep(0.5) at line 369
# 7. loop back to line 326

# Worst case: if resp.json() hangs, we never get back to the deadline check
# Let's verify this is the issue by checking if resp.json() could hang
python3 << 'EOF'
print("Scenario analysis:")
print("\nCurrent code flow:")
print("1. Check: while loop.time() < deadline (no timeout)")
print("2. session.get(url) - BLOCKED IF SERVER ACCEPTS BUT DOESN'T RESPOND")
print("3. resp.json() - BLOCKED IF RESPONSE INCOMPLETE")
print("4. Check deadline at loop restart")
print("\nProblem: If step 2 or 3 blocks, the deadline check is bypassed.")
print("The session.get() with no timeout can block indefinitely.")
print("\nSolution: Add ClientTimeout to ensure per-request timeout.")
EOF

Repository: ai-dynamo/dynamo

Length of output: 483


🏁 Script executed:

# Check how other polling functions handle this pattern in the codebase
rg -B 5 -A 10 "while.*deadline\|while.*time\(\)" tests/router/helper.py | head -50

Repository: ai-dynamo/dynamo

Length of output: 42


Add per-request timeout to prevent indefinite blocking on stalled HTTP responses.

The while loop.time() < deadline: check only guards loop iteration start. If the HTTP server accepts the connection but never responds, session.get() can block indefinitely, preventing the deadline from ever being checked again. Add a per-request timeout to the ClientSession and handle the resulting exception:

Suggested fix
-    async with aiohttp.ClientSession() as session:
+    request_timeout = aiohttp.ClientTimeout(total=2.0)
+    async with aiohttp.ClientSession(timeout=request_timeout) as session:
         while loop.time() < deadline:
             try:
                 async with session.get(workers_url) as resp:

The aiohttp.ClientError exception handler already covers timeout cases (aiohttp wraps timeouts internally), so no additional exception type is needed.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async with aiohttp.ClientSession() as session:
while loop.time() < deadline:
try:
async with session.get(workers_url) as resp:
if resp.status != 200:
await asyncio.sleep(0.5)
continue
workers = await resp.json()
except aiohttp.ClientError:
await asyncio.sleep(0.5)
continue
request_timeout = aiohttp.ClientTimeout(total=2.0)
async with aiohttp.ClientSession(timeout=request_timeout) as session:
while loop.time() < deadline:
try:
async with session.get(workers_url) as resp:
if resp.status != 200:
await asyncio.sleep(0.5)
continue
workers = await resp.json()
except aiohttp.ClientError:
await asyncio.sleep(0.5)
continue
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/router/helper.py` around lines 325 - 335, The current loop can hang if
session.get(workers_url) stalls; add a per-request timeout using
aiohttp.ClientTimeout and pass it when creating the request (or when
constructing ClientSession) so each GET will error out instead of blocking the
loop; compute remaining = max(min_timeout, deadline - loop.time()) before the
request and use that as the timeout for session.get(workers_url,
timeout=ClientTimeout(total=remaining)) (the existing aiohttp.ClientError except
block will handle the timeout error).

Comment on lines 497 to +519
def __exit__(self, exc_type, exc_val, exc_tb):
logger.info("Stopping mocker process")
logger.info("Stopping mocker process(es)")
# Stop individual mocker processes (standalone_indexer mode)
for proc in self._mocker_processes:
try:
proc.__exit__(exc_type, exc_val, exc_tb)
except Exception as e:
logger.warning(f"Error stopping mocker process: {e}")
self._mocker_processes.clear()
# Stop standalone indexer B (P2P recovery peer)
if self._indexer_b_process is not None:
try:
self._indexer_b_process.__exit__(exc_type, exc_val, exc_tb)
except Exception as e:
logger.warning(f"Error stopping indexer B process: {e}")
self._indexer_b_process = None
# Stop standalone indexer A
if self._indexer_process is not None:
try:
self._indexer_process.__exit__(exc_type, exc_val, exc_tb)
except Exception as e:
logger.warning(f"Error stopping indexer process: {e}")
self._indexer_process = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't swallow shutdown failures in MockerProcess.__exit__.

ManagedProcess.__exit__ already raises ManagedProcessStopError / ManagedProcessStopTimeoutError when a child does not shut down cleanly. Catching broad Exception and only logging it here turns leaked mockers/indexers into green tests and pushes the fallout into later cases. Please catch only the specific cleanup errors you want to aggregate, then re-raise after best-effort teardown.

As per coding guidelines, "In test error handling, do not use blanket except Exception; catch only specific exceptions you can actually handle and let failures propagate."

🧰 Tools
🪛 Ruff (0.15.5)

[warning] 503-503: Do not catch blind exception: Exception

(BLE001)


[warning] 510-510: Do not catch blind exception: Exception

(BLE001)


[warning] 517-517: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/router/test_router_e2e_with_mockers.py` around lines 497 - 519, The
__exit__ method currently catches broad Exception for each child shutdown
(looping over self._mocker_processes and the _indexer_b_process and
_indexer_process), hiding ManagedProcess stop failures; change the handlers to
catch only the specific cleanup exceptions raised by ManagedProcess.__exit__
(ManagedProcessStopError and ManagedProcessStopTimeoutError), log them, perform
the remaining best-effort teardowns, and then re-raise the first caught specific
exception (or chain them) so failures propagate instead of being swallowed.

Comment on lines +839 to +923
@pytest.mark.parametrize(
"store_backend,durable_kv_events,request_plane",
[
("etcd", True, "nats"), # JetStream mode - uses JetStream
("etcd", False, "tcp"), # NATS core mode (with gap detection) - no JetStream
("file", True, "nats"), # File backend - uses JetStream
],
ids=[
"jetstream",
"nats_core",
"file",
],
indirect=["request_plane", "durable_kv_events"],
)
@pytest.mark.timeout(300)
def test_indexers_sync(
request,
runtime_services_dynamic_ports,
predownload_tokenizers,
file_storage_backend,
store_backend,
durable_kv_events,
request_plane,
):
"""
Test that two KV routers have synchronized indexer states after processing requests.
This test verifies that both routers converge to the same internal state.

Tests with three configurations:
- jetstream: etcd backend, JetStream for KV events, NATS request plane
- nats_core: etcd backend, NATS Core with gap detection, TCP request plane
- file: file backend, JetStream for KV events, NATS request plane
"""
logger.info(
f"Starting indexers sync test: store_backend={store_backend}, "
f"durable_kv_events={durable_kv_events}, request_plane={request_plane}"
)

# Use the dynamic-port fixture to avoid hardcoded localhost:4222/2379 in parallel runs.
nats_process, _etcd_process = runtime_services_dynamic_ports

# Create mocker args dictionary
# Use 2 DP ranks to test per-dp_rank event ID tracking and recovery
mocker_args = {
"speedup_ratio": SPEEDUP_RATIO,
"block_size": BLOCK_SIZE,
"durable_kv_events": durable_kv_events,
"dp_size": 2,
}

with MockerProcess(
request,
mocker_args=mocker_args,
num_mockers=NUM_MOCKERS,
store_backend=store_backend,
request_plane=request_plane,
zmq_kv_events=True,
zmq_replay=True,
standalone_indexer=True,
model_name=MODEL_NAME,
) as mockers:
# Start mocker instances (2 workers x 2 DP ranks = 4 independent event streams)
logger.info(f"Starting {NUM_MOCKERS} mocker instances with dp_size=2")
logger.info(f"All mockers using endpoint: {mockers.endpoint}")

# Use the common test implementation (creates its own runtimes for each router)
# Note: Consumer verification is done inside _test_router_indexers_sync while routers are alive
# When using durable_kv_events=True, use JetStream mode for the router
_test_router_indexers_sync(
engine_workers=mockers,
block_size=BLOCK_SIZE,
model_name=MODEL_NAME,
num_workers=NUM_MOCKERS,
store_backend=store_backend,
request_plane=request_plane,
test_nats_interruption=not durable_kv_events,
nats_server=nats_process if not durable_kv_events else None,
durable_kv_events=durable_kv_events,
standalone_indexer_url=mockers.standalone_indexer_url,
standalone_indexer_b_url=mockers.standalone_indexer_b_url,
test_zmq_replay=True,
)

logger.info("Indexers sync test completed successfully")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This new sync suite is too heavy for the pre-merge lane.

This test runs three backends/configurations, starts multiple mockers, two routers, and two standalone indexers, and carries a 300s timeout. Because the file inherits pytest.mark.pre_merge, this lands a long-running E2E-style flow directly in the critical path. Please move it to post_merge, or split out a much smaller smoke case if pre-merge coverage is required.

As per coding guidelines, "Only use pre_merge for absolutely critical tests; prefer post_merge for tests averaging over 60 seconds unless they guard a critical path, and prefer post_merge for E2E tests unless they guard a critical path."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/router/test_router_e2e_with_mockers.py` around lines 839 - 923, The
test_indexers_sync E2E is too heavy for pre-merge; either move it to the
post-merge suite or add a small smoke variant: update the pytest marker for
test_indexers_sync from pre_merge to post_merge (or add `@pytest.mark.post_merge`
above def test_indexers_sync) and keep the heavy parametrized version only under
post_merge, or alternatively create a lightweight function (e.g.,
test_indexers_sync_smoke) that reduces params, num_mockers, timeout, and
resource usage for pre-merge while leaving the full test as test_indexers_sync
decorated with `@pytest.mark.post_merge`; target the test_indexers_sync symbol and
any module-level pytest.mark.pre_merge to make the change.

Comment on lines +893 to +899
except (PermissionError, psutil.AccessDenied) as exc:
self._logger.warning(
"Unable to enumerate child processes for PID %s: %s",
self.proc.pid,
exc,
)
return []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't treat child-enumeration failure as "no subprocesses".

tests/fault_tolerance/test_vllm_health_check.py:164-174 uses worker.subprocesses() to find and kill VLLM::EngineCore. Returning [] here turns an access failure into a silent no-op, so that kill loop never runs and the test fails later with a misleading symptom. Please raise a clear error instead of pretending there are no children.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants