Skip to content

Commit bffee2d

Browse files
committed
chore: peel off indexer launcher follow-up
Signed-off-by: PeaBrane <yanrpei@gmail.com>
1 parent 9fba762 commit bffee2d

File tree

8 files changed

+24
-144
lines changed

8 files changed

+24
-144
lines changed

.github/filters.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ core:
8888
- 'lib/**'
8989
- 'tests/**'
9090
- 'components/src/dynamo/router/**'
91-
- 'components/src/dynamo/indexer/**'
9291
- 'components/src/dynamo/mocker/**'
9392
- 'components/src/dynamo/frontend/**'
9493
- 'components/src/dynamo/common/**'
@@ -158,7 +157,6 @@ frontend:
158157
- 'container/deps/*'
159158
- 'container/compliance/**'
160159
- 'components/src/dynamo/router/**'
161-
- 'components/src/dynamo/indexer/**'
162160
- 'components/src/dynamo/mocker/**'
163161
- 'components/src/dynamo/frontend/**'
164162
- 'components/src/dynamo/common/**'

components/src/dynamo/indexer/__init__.py

Lines changed: 0 additions & 2 deletions
This file was deleted.

components/src/dynamo/indexer/__main__.py

Lines changed: 0 additions & 7 deletions
This file was deleted.

components/src/dynamo/indexer/main.py

Lines changed: 0 additions & 22 deletions
This file was deleted.

tests/router/common.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
send_request_with_retry,
2222
verify_response_timing,
2323
wait_for_frontend_ready,
24-
wait_for_indexer_workers_active,
2524
wait_for_workers_ready,
2625
)
2726
from tests.router.router_process import FrontendRouterProcess, KVRouterProcess
@@ -981,9 +980,6 @@ async def send_requests_to_router(router, num_requests, router_name, endpoint):
981980
# so ZMQ sockets connect before recovery, avoiding the slow-joiner problem.
982981
if standalone_indexer_b_url:
983982
engine_workers.launch_indexer()
984-
await wait_for_indexer_workers_active(
985-
standalone_indexer_b_url, engine_workers.worker_id_to_zmq_ports
986-
)
987983
logger.info(
988984
f"Launched Indexer B at {standalone_indexer_b_url} "
989985
f"(P2P recovery from Indexer A)"

tests/router/helper.py

Lines changed: 0 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import os
88
import random
99
import string
10-
import sys
1110
from typing import Any, Optional
1211

1312
import aiohttp
@@ -32,11 +31,6 @@ def generate_random_suffix() -> str:
3231
return "".join(random.choices(string.ascii_lowercase, k=10)) # noqa: S311
3332

3433

35-
def get_kv_indexer_command() -> list[str]:
36-
"""Return the preferred standalone indexer command for the current Python env."""
37-
return [sys.executable, "-m", "dynamo.indexer"]
38-
39-
4034
def assert_event_dumps_equal(
4135
expected: list[dict],
4236
actual: list[dict],
@@ -309,70 +303,6 @@ async def wait_for_workers_ready(
309303
return sorted(instance_ids)
310304

311305

312-
async def wait_for_indexer_workers_active(
313-
indexer_url: str,
314-
expected_workers: dict[int, dict[int, str]],
315-
timeout_s: float = 30.0,
316-
) -> None:
317-
"""Wait until the standalone indexer reports all ZMQ listeners as active."""
318-
if not expected_workers:
319-
return
320-
321-
loop = asyncio.get_running_loop()
322-
deadline = loop.time() + timeout_s
323-
workers_url = f"{indexer_url}/workers"
324-
325-
async with aiohttp.ClientSession() as session:
326-
while loop.time() < deadline:
327-
try:
328-
async with session.get(workers_url) as resp:
329-
if resp.status != 200:
330-
await asyncio.sleep(0.5)
331-
continue
332-
workers = await resp.json()
333-
except aiohttp.ClientError:
334-
await asyncio.sleep(0.5)
335-
continue
336-
337-
workers_by_id = {
338-
worker["instance_id"]: worker
339-
for worker in workers
340-
if worker.get("source") == "zmq"
341-
}
342-
343-
all_active = True
344-
for worker_id, endpoints in expected_workers.items():
345-
worker = workers_by_id.get(worker_id)
346-
if worker is None:
347-
all_active = False
348-
break
349-
350-
listeners = worker.get("listeners", {})
351-
for dp_rank, endpoint in endpoints.items():
352-
listener = listeners.get(str(dp_rank))
353-
if listener is None:
354-
all_active = False
355-
break
356-
if listener.get("endpoint") != endpoint:
357-
all_active = False
358-
break
359-
if listener.get("status") != "active":
360-
all_active = False
361-
break
362-
363-
if not all_active:
364-
break
365-
366-
if all_active:
367-
return
368-
369-
await asyncio.sleep(0.5)
370-
371-
raise RuntimeError(
372-
f"Timed out waiting for indexer listeners to become active at {workers_url}"
373-
)
374-
375-
376306
async def send_request_with_retry(url: str, payload: dict, max_retries: int = 8):
377307
"""Send a single request with exponential backoff retry"""
378308
wait_time = 1 # Start with 1 second

tests/router/test_router_e2e_with_mockers.py

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,7 @@
3232
_test_router_query_instance_id,
3333
_test_router_two_routers,
3434
)
35-
from tests.router.helper import (
36-
generate_random_suffix,
37-
get_kv_indexer_command,
38-
get_runtime,
39-
wait_for_indexer_workers_active,
40-
)
35+
from tests.router.helper import generate_random_suffix, get_runtime
4136
from tests.utils.constants import ROUTER_MODEL_NAME
4237
from tests.utils.managed_process import ManagedProcess
4338
from tests.utils.port_utils import allocate_ports, deallocate_ports
@@ -310,7 +305,15 @@ def __enter__(self):
310305
# Launch the standalone indexer binary
311306
block_size = self._mocker_args_orig.get("block_size", BLOCK_SIZE)
312307
indexer_cmd = [
313-
*get_kv_indexer_command(),
308+
"cargo",
309+
"run",
310+
"-p",
311+
"dynamo-kv-router",
312+
"--features",
313+
"indexer-bin,test-endpoints",
314+
"--bin",
315+
"dynamo-kv-indexer",
316+
"--",
314317
"--block-size",
315318
str(block_size),
316319
"--port",
@@ -342,7 +345,7 @@ async def launch_mockers_with_indexer(self, endpoint):
342345
For each mocker:
343346
1. Launch a mocker process with --num-workers 1
344347
2. Poll endpoint.client().instance_ids() until a new worker_id appears
345-
3. POST /register to the indexer with the worker_id and its ZMQ addresses
348+
3. POST /workers to the indexer with the worker_id and its ZMQ addresses
346349
347350
Args:
348351
endpoint: The dynamo endpoint object to discover worker IDs.
@@ -408,7 +411,7 @@ async def launch_mockers_with_indexer(self, endpoint):
408411
for dp_rank in range(dp_size):
409412
port = self._zmq_kv_events_ports[i * dp_size + dp_rank]
410413
endpoint = f"tcp://127.0.0.1:{port}"
411-
zmq_addresses[dp_rank] = endpoint
414+
zmq_addresses[str(dp_rank)] = endpoint
412415

413416
payload = {
414417
"instance_id": new_worker_id,
@@ -437,9 +440,6 @@ async def launch_mockers_with_indexer(self, endpoint):
437440
f"zmq_addresses={zmq_addresses}"
438441
)
439442

440-
await wait_for_indexer_workers_active(
441-
self.standalone_indexer_url, self.worker_id_to_zmq_ports
442-
)
443443
logger.info(
444444
f"All {self.num_workers} mockers launched and registered with indexer"
445445
)
@@ -461,12 +461,20 @@ def launch_indexer(self):
461461
# Build --workers arg: "worker_id:dp_rank=zmq_addr,..."
462462
worker_entries = []
463463
for worker_id, zmq_addresses in self.worker_id_to_zmq_ports.items():
464-
for dp_rank, zmq_endpoint in zmq_addresses.items():
465-
worker_entries.append(f"{worker_id}:{dp_rank}={zmq_endpoint}")
464+
for dp_rank_str, zmq_endpoint in zmq_addresses.items():
465+
worker_entries.append(f"{worker_id}:{dp_rank_str}={zmq_endpoint}")
466466
workers_arg = ",".join(worker_entries)
467467

468468
indexer_b_cmd = [
469-
*get_kv_indexer_command(),
469+
"cargo",
470+
"run",
471+
"-p",
472+
"dynamo-kv-router",
473+
"--features",
474+
"indexer-bin,test-endpoints",
475+
"--bin",
476+
"dynamo-kv-indexer",
477+
"--",
470478
"--block-size",
471479
str(block_size),
472480
"--port",

tests/utils/managed_process.py

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,7 @@ def terminate_process_tree(
6868
return
6969

7070
# 1. Snapshot children before signaling parent
71-
try:
72-
children = parent.children(recursive=True)
73-
except (PermissionError, psutil.AccessDenied) as exc:
74-
logger.warning(
75-
"Unable to enumerate child processes for PID %s; falling back to parent-only termination: %s",
76-
pid,
77-
exc,
78-
)
79-
children = []
71+
children = parent.children(recursive=True)
8072

8173
# 2. Terminate parent first (graceful)
8274
terminate_process(parent, logger, immediate_kill=immediate_kill)
@@ -540,12 +532,6 @@ def _terminate_process_group(self, timeout: float = 8.0):
540532
all_pgids.add(os.getpgid(child.pid))
541533
except (ProcessLookupError, OSError):
542534
pass
543-
except (PermissionError, psutil.AccessDenied) as exc:
544-
self._logger.warning(
545-
"Unable to enumerate child processes for PID %s; falling back to the main process group only: %s",
546-
self.proc.pid,
547-
exc,
548-
)
549535
except psutil.NoSuchProcess:
550536
pass
551537

@@ -890,13 +876,6 @@ def subprocesses(self) -> list[psutil.Process]:
890876
try:
891877
parent = psutil.Process(self.proc.pid)
892878
return parent.children(recursive=True)
893-
except (PermissionError, psutil.AccessDenied) as exc:
894-
self._logger.warning(
895-
"Unable to enumerate child processes for PID %s: %s",
896-
self.proc.pid,
897-
exc,
898-
)
899-
return []
900879
except psutil.NoSuchProcess:
901880
return []
902881

0 commit comments

Comments
 (0)