diff --git a/src/dstack/_internal/core/models/routers.py b/src/dstack/_internal/core/models/routers.py index b7950369c2..b1f189c522 100644 --- a/src/dstack/_internal/core/models/routers.py +++ b/src/dstack/_internal/core/models/routers.py @@ -9,6 +9,7 @@ class RouterType(str, Enum): SGLANG = "sglang" + DYNAMO = "dynamo" class SGLangGatewayRouterConfig(CoreModel): @@ -45,8 +46,15 @@ class SGLangServiceRouterConfig(CoreModel): class ReplicaGroupRouterConfig(CoreModel): type: Annotated[ - Literal["sglang"], - Field(description="The router implementation for this replica group."), + Literal["sglang", "dynamo"], + Field( + description=( + "The router implementation for this replica group. " + "`sglang` runs the SGLang router and dstack syncs worker URLs to it. " + "`dynamo` runs the NVIDIA Dynamo frontend, which discovers workers " + "itself via etcd/NATS." + ), + ), ] = "sglang" diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index 9748aa46f8..e7b40c8a0b 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -43,6 +43,7 @@ ) from dstack._internal.core.models.repos import AnyRunRepoData from dstack._internal.core.models.resources import Memory, ResourcesSpec +from dstack._internal.core.models.routers import RouterType from dstack._internal.core.models.unix import UnixUser from dstack._internal.core.models.volumes import MountPoint from dstack._internal.utils import common as common_utils @@ -603,6 +604,31 @@ def _merged_profile(cls, values) -> Dict: values["merged_profile"] = merged_profile return values + @root_validator + def _validate_dynamo_no_retry(cls, values) -> Dict: + """Reject `retry` for services with a Dynamo router replica group. + Dynamo workers cache the router's internal IP at provisioning time. A + retry would produce a new router and likely a new internal_ip, leaving workers bound + to a router that no longer exists. + """ + merged_profile = values.get("merged_profile") + cfg = values.get("configuration") + if merged_profile is None or merged_profile.retry is None: + return values + if not isinstance(cfg, ServiceConfiguration): + return values + for g in cfg.replica_groups: + if g.router is not None and g.router.type == RouterType.DYNAMO: + raise ValueError( + "Retry cannot be configured for services with a Dynamo " + "router replica group. The router's address must remain " + "stable for the life of the run; allowing retry would " + "leave workers bound to a router that no longer exists. " + "Remove `retry` from the profile/configuration and " + "re-apply." + ) + return values + class ServiceModelSpec(CoreModel): name: str diff --git a/src/dstack/_internal/proxy/gateway/services/registry.py b/src/dstack/_internal/proxy/gateway/services/registry.py index 84fbce8711..f190523a39 100644 --- a/src/dstack/_internal/proxy/gateway/services/registry.py +++ b/src/dstack/_internal/proxy/gateway/services/registry.py @@ -20,7 +20,7 @@ ServiceConfig, ) from dstack._internal.proxy.lib import models -from dstack._internal.proxy.lib.const import SGLANG_WHITELISTED_PATHS +from dstack._internal.proxy.lib.const import ROUTER_WHITELISTED_PATHS from dstack._internal.proxy.lib.errors import ProxyError, UnexpectedProxyError from dstack._internal.proxy.lib.repo import BaseProxyRepo from dstack._internal.proxy.lib.services.service_connection import ( @@ -344,7 +344,7 @@ async def get_nginx_service_config( ) -> ServiceConfig: limit_req_zones: list[LimitReqZoneConfig] = [] locations: list[LocationConfig] = [] - is_sglang = ( + is_router = ( service.router is not None and service.router.type == RouterType.SGLANG ) or service.has_router_replica sglang_limits: dict[str, LimitReqConfig] = {} @@ -361,8 +361,8 @@ async def get_nginx_service_config( limit_req_zones.append( LimitReqZoneConfig(name=zone_name, key=key, rpm=round(rate_limit.rps * 60)) ) - if is_sglang: - for path in SGLANG_WHITELISTED_PATHS: + if is_router: + for path in ROUTER_WHITELISTED_PATHS: if rate_limit.prefix == path or path.startswith(rate_limit.prefix): # Use the longest prefix if multiple prefixes match the same path current_prefix_len = len(rate_limit.prefix) @@ -381,9 +381,9 @@ async def get_nginx_service_config( ) ) - # Add SGLang whitelisted paths as locations - if is_sglang: - for path in SGLANG_WHITELISTED_PATHS: + # Add router whitelisted paths as locations + if is_router: + for path in ROUTER_WHITELISTED_PATHS: # Use prefix match for paths that end with a slash and exact match for paths that don't if path.endswith("/"): locations.append(LocationConfig(prefix=path, limit_req=sglang_limits.get(path))) @@ -392,8 +392,8 @@ async def get_nginx_service_config( LocationConfig(prefix=f"= {path}", limit_req=sglang_limits.get(path)) ) - # Don't auto-add / location for SGLang routers (catch-all 403 handles it) - if not any(location.prefix == "/" for location in locations) and not is_sglang: + # Don't auto-add / location for router-based services (catch-all 403 handles it) + if not any(location.prefix == "/" for location in locations) and not is_router: locations.append(LocationConfig(prefix="/", limit_req=None)) return ServiceConfig( domain=service.domain_safe, diff --git a/src/dstack/_internal/proxy/lib/const.py b/src/dstack/_internal/proxy/lib/const.py index a307c22756..43ede03ac8 100644 --- a/src/dstack/_internal/proxy/lib/const.py +++ b/src/dstack/_internal/proxy/lib/const.py @@ -2,7 +2,10 @@ Shared constants for proxy components (gateway + in-server proxy). """ -SGLANG_WHITELISTED_PATHS: tuple[str, ...] = ( +# Inference endpoints exposed by the in-replica HTTP router. Applies to both +# SGLang's router and Dynamo's `dynamo.frontend` — they share the +# OpenAI-compatible endpoint surface. +ROUTER_WHITELISTED_PATHS: tuple[str, ...] = ( "/generate", "/v1/", "/chat/completions", diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py index d441a9e2d3..45bd39a2d5 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py @@ -6,7 +6,7 @@ from typing import Dict, Iterable, Literal, Optional, Sequence, Union import httpx -from sqlalchemy import and_, exists, func, or_, select, update +from sqlalchemy import and_, exists, false, func, or_, select, true, update from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import aliased, contains_eager, joinedload, load_only @@ -23,6 +23,7 @@ from dstack._internal.core.models.metrics import Metric from dstack._internal.core.models.profiles import StartupOrder from dstack._internal.core.models.repos import RemoteRepoCreds +from dstack._internal.core.models.routers import RouterType from dstack._internal.core.models.runs import ( ClusterInfo, ImagePullProgress, @@ -102,6 +103,11 @@ from dstack._internal.server.services.runner import client from dstack._internal.server.services.runner.ssh import runner_ssh_tunnel from dstack._internal.server.services.runs import is_job_ready, run_model_to_run +from dstack._internal.server.services.runs.replicas import ( + RouterEnvStatus, + get_router_env_for_job, + get_router_replica_group, +) from dstack._internal.server.services.secrets import get_project_secrets_mapping from dstack._internal.server.services.storage import get_default_storage from dstack._internal.server.utils import sentry_utils @@ -114,6 +120,8 @@ JOB_STATUSES_WITH_MIN_PROCESSING_INTERVAL = [JobStatus.PROVISIONING, JobStatus.PULLING] +ROUTER_PROVISIONING_WAIT_TIMEOUT_SECONDS = 30 * 60 + JOB_DISCONNECTED_RETRY_TIMEOUT = timedelta(minutes=2) """`The minimum time before terminating active job in case of connectivity issues.""" @@ -368,6 +376,12 @@ class _StartupContext: volumes: list[Volume] secrets: dict[str, str] repo_creds: Optional[RemoteRepoCreds] + router_env: Optional[Dict[str, str]] = None + """Dynamo-specific env (e.g. DSTACK_ROUTER_INTERNAL_IP) computed from the + router replica's state. Passed through to RunnerClient.submit_job, which + merges it into a deep-copied job_spec.env so the shared job_spec is not + mutated. None for SGLang services, non-router runs, and the router + replica itself.""" async def _load_process_context(item: JobRunningPipelineItem) -> Optional[_ProcessContext]: @@ -384,10 +398,18 @@ async def _load_process_context(item: JobRunningPipelineItem) -> Optional[_Proce job_submissions=[job_model_to_job_submission(job_model)], ) else: - # PROVISIONING/PULLING jobs need same-replica siblings for cluster coordination. - # All sibling access is replica-scoped, so only load jobs for this replica. + # PROVISIONING/PULLING jobs need same-replica siblings for cluster + # coordination, plus — when the run has a router replica group — + # the router replica's job (cross-replica) so the env-injection + # gate in _prepare_startup_context can read its status / IP. + # _fetch_run_model handles both: same-replica jobs always, plus + # all non-terminated jobs when one exists. + run_spec = RunSpec.__response__.parse_raw(job_model.run.run_spec) run_model = await _fetch_run_model( - session=session, run_id=job_model.run_id, replica_num=item.replica_num + session=session, + run_id=job_model.run_id, + replica_num=item.replica_num, + run_spec=run_spec, ) run = run_model_to_run(run_model, include_sensitive=True) job = find_job(run.jobs, job_model.replica_num, job_model.job_num) @@ -477,6 +499,58 @@ async def _prepare_startup_context( ) return None + # If this run has a router replica group and this job is a worker, gate + # startup on the router replica's state. The helper returns None for the + # router itself and for runs without a router group, so this whole block + # is a no-op in those cases. + router_env_outcome = get_router_env_for_job( + run_model=context.run_model, + run_spec=context.run.run_spec, + job_model=context.job_model, + ) + if router_env_outcome is RouterEnvStatus.FAILED: + # Router has reached a terminal state — the worker cannot recover by + # waiting. Terminate it now with a clear reason instead of letting it + # idle until the run-level reconciler tears the whole run down. + _terminate_job( + job_model=context.job_model, + job_update_map=result.job_update_map, + termination_reason=JobTerminationReason.TERMINATED_BY_SERVER, + termination_reason_message=( + "Router replica is in a terminal state; cannot provision worker " + "without a running router." + ), + ) + return None + if router_env_outcome is RouterEnvStatus.NOT_PROVISIONED: + # Router is alive but its internal_ip is not yet known. Defer this + # worker — the next pipeline tick will re-check. Bound the wait so a + # router that is genuinely stuck can't burn worker instance-hours + # forever; see ROUTER_PROVISIONING_WAIT_TIMEOUT_SECONDS. + waited_seconds = (get_current_datetime() - context.job_model.submitted_at).total_seconds() + if waited_seconds > ROUTER_PROVISIONING_WAIT_TIMEOUT_SECONDS: + _terminate_job( + job_model=context.job_model, + job_update_map=result.job_update_map, + termination_reason=JobTerminationReason.TERMINATED_BY_SERVER, + termination_reason_message=( + f"Router replica did not acquire an internal IP within " + f"{ROUTER_PROVISIONING_WAIT_TIMEOUT_SECONDS}s; terminating worker." + ), + ) + return None + logger.debug( + "%s: waiting for router replica to be provisioned", + fmt(context.job_model), + ) + return None + # Past the enum branches, router_env_outcome is either None or a Dict. + # We don't mutate job_spec.env here — RunnerClient.submit_job merges it + # into a deep-copied spec, mirroring how instance_env is handled. + router_env: Optional[Dict[str, str]] = ( + router_env_outcome if isinstance(router_env_outcome, dict) else None + ) + cluster_info = _get_cluster_info( jobs=context.run.jobs, replica_num=context.job.job_spec.replica_num, @@ -520,6 +594,7 @@ async def _prepare_startup_context( volumes=volumes, secrets=secrets, repo_creds=repo_creds, + router_env=router_env, ) @@ -534,6 +609,7 @@ async def _refetch_locked_job_model( ) .options(joinedload(JobModel.instance).joinedload(InstanceModel.project)) .options(joinedload(JobModel.probes).load_only(ProbeModel.success_streak)) + .options(joinedload(JobModel.run).load_only(RunModel.id, RunModel.run_spec)) .execution_options(populate_existing=True) ) return res.unique().scalar_one_or_none() @@ -543,13 +619,22 @@ async def _fetch_run_model( session: AsyncSession, run_id: uuid.UUID, replica_num: Optional[int] = None, + run_spec: Optional[RunSpec] = None, ) -> RunModel: """Fetch run model with related project, user, repo, and fleet. Args: replica_num: If None, skip loading jobs (for RUNNING jobs that don't need siblings). If set, load only latest-submission jobs for that replica (for PROVISIONING/PULLING - jobs that need same-replica siblings for cluster coordination). + jobs that need same-replica siblings for cluster coordination). When the run has + a Dynamo router replica group, all non-terminated latest-submission jobs for the + run are loaded so find_router_job can identify the router by replica-group + membership. + run_spec: Required whenever `replica_num` is set. Used only to detect + whether the run has a Dynamo router replica group. The caller is + expected to parse it once from the eager-loaded JobModel.run + (see _refetch_locked_job_model) so we don't issue a separate + query for it here. """ query = ( select(RunModel) @@ -560,6 +645,14 @@ async def _fetch_run_model( .options(joinedload(RunModel.fleet).load_only(FleetModel.id, FleetModel.name)) ) if replica_num is not None: + assert run_spec is not None, "run_spec must be provided when replica_num is set" + router_group = get_router_replica_group(run_spec) + is_dynamo = ( + router_group is not None + and router_group.router is not None + and router_group.router.type == RouterType.DYNAMO + ) + latest_submissions_sq = ( select( JobModel.run_id.label("run_id"), @@ -567,7 +660,12 @@ async def _fetch_run_model( JobModel.job_num.label("job_num"), func.max(JobModel.submission_num).label("max_submission_num"), ) - .where(JobModel.run_id == run_id, JobModel.replica_num == replica_num) + .where( + JobModel.run_id == run_id, + # For Service with Dynamo router: load all replicas. For Non-Dynamo: only the worker's + # own replica. + true() if is_dynamo else JobModel.replica_num == replica_num, + ) .group_by(JobModel.run_id, JobModel.replica_num, JobModel.job_num) .subquery() ) @@ -581,6 +679,15 @@ async def _fetch_run_model( job_alias.replica_num == latest_submissions_sq.c.replica_num, job_alias.job_num == latest_submissions_sq.c.job_num, job_alias.submission_num == latest_submissions_sq.c.max_submission_num, + # For Dynamo runs, drop terminated rows so accumulated + # scale-down history doesn't bloat the load. Non-Dynamo + # runs are already restricted to the worker's own + # replica above, so this filter is a no-op for them. + or_( + false() if is_dynamo else true(), + ~job_alias.status.in_(JobStatus.finished_statuses()) + & (job_alias.status != JobStatus.TERMINATING), + ), ), ) .options(contains_eager(RunModel.jobs, alias=job_alias)) @@ -690,6 +797,7 @@ async def _process_provisioning_status( file_archives=file_archives, secrets=startup_context.secrets, repo_credentials=startup_context.repo_creds, + router_env=startup_context.router_env, success_if_not_available=False, ) if submit_result is not False: @@ -800,6 +908,7 @@ async def _process_pulling_status( file_archives=file_archives, secrets=startup_context.secrets, repo_credentials=startup_context.repo_creds, + router_env=startup_context.router_env, success_if_not_available=True, ) if submit_result is not False: @@ -1408,6 +1517,7 @@ def _submit_job_to_runner( file_archives: Iterable[tuple[uuid.UUID, bytes]], secrets: Dict[str, str], repo_credentials: Optional[RemoteRepoCreds], + router_env: Optional[Dict[str, str]], success_if_not_available: bool, ) -> Union[_SubmitJobToRunnerResult, Literal[False]]: logger.debug("%s: submitting job spec", fmt(job_model)) @@ -1435,6 +1545,7 @@ def _submit_job_to_runner( secrets={}, repo_credentials=repo_credentials, instance_env=instance_env, + router_env=router_env, ) for archive_id, archive in file_archives: logger.debug("%s: uploading file archive: %s", fmt(job_model), archive_id) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/service_router_worker_sync.py b/src/dstack/_internal/server/background/pipeline_tasks/service_router_worker_sync.py index 2fa80fc43a..2b416fb823 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/service_router_worker_sync.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/service_router_worker_sync.py @@ -33,7 +33,7 @@ from dstack._internal.server.services.locking import get_locker from dstack._internal.server.services.pipelines import PipelineHinterProtocol from dstack._internal.server.services.runs.router_worker_sync import ( - run_model_has_router_replica_group, + run_model_has_sglang_router_replica_group, sync_router_workers_for_run_model, ) from dstack._internal.server.utils import sentry_utils @@ -212,7 +212,7 @@ async def process(self, item: ServiceRouterWorkerSyncPipelineItem) -> None: run_model.deleted or run_model.status.is_finished() or run_model.status != RunStatus.RUNNING - or not run_model_has_router_replica_group(run_model) + or not run_model_has_sglang_router_replica_group(run_model) ): early_cleanup_update_map: _SyncRowUpdateMap = {"deleted": True} set_processed_update_map_fields(early_cleanup_update_map) diff --git a/src/dstack/_internal/server/services/proxy/services/service_proxy.py b/src/dstack/_internal/server/services/proxy/services/service_proxy.py index c75fb23542..85b7150e8c 100644 --- a/src/dstack/_internal/server/services/proxy/services/service_proxy.py +++ b/src/dstack/_internal/server/services/proxy/services/service_proxy.py @@ -6,7 +6,7 @@ from starlette.requests import ClientDisconnect from dstack._internal.core.models.routers import RouterType -from dstack._internal.proxy.lib.const import SGLANG_WHITELISTED_PATHS +from dstack._internal.proxy.lib.const import ROUTER_WHITELISTED_PATHS from dstack._internal.proxy.lib.deps import ProxyAuthContext from dstack._internal.proxy.lib.errors import ProxyError from dstack._internal.proxy.lib.repo import BaseProxyRepo @@ -45,7 +45,7 @@ async def proxy( service.router is not None and service.router.type == RouterType.SGLANG ) or service.has_router_replica: path_for_match = path if path.startswith("/") else f"/{path}" - if not _is_whitelisted_path(path_for_match, SGLANG_WHITELISTED_PATHS): + if not _is_whitelisted_path(path_for_match, ROUTER_WHITELISTED_PATHS): raise ProxyError("Path is not allowed for this service", status.HTTP_403_FORBIDDEN) client = await get_service_replica_client(service, repo, service_conn_pool) diff --git a/src/dstack/_internal/server/services/runner/client.py b/src/dstack/_internal/server/services/runner/client.py index 8fd17a3a2d..6a1c541856 100644 --- a/src/dstack/_internal/server/services/runner/client.py +++ b/src/dstack/_internal/server/services/runner/client.py @@ -104,16 +104,23 @@ def submit_job( secrets: Dict[str, str], repo_credentials: Optional[RemoteRepoCreds], instance_env: Optional[Union[Env, Dict[str, str]]] = None, + router_env: Optional[Dict[str, str]] = None, ): - # XXX: This is a quick-and-dirty hack to deliver InstanceModel-specific environment - # variables to the runner without runner API modification. + # XXX: This is a quick-and-dirty hack to deliver InstanceModel-specific + # and Dynamo-router environment variables to the runner without runner + # API modification. Both layers are merged into a deep-copied job_spec + # so the shared spec object held by the caller is not mutated. job_spec = job.job_spec - if instance_env is not None: - if isinstance(instance_env, Env): - merged_env = instance_env.as_dict() - else: - merged_env = instance_env.copy() + if instance_env is not None or router_env is not None: + merged_env: Dict[str, str] = {} + if instance_env is not None: + if isinstance(instance_env, Env): + merged_env.update(instance_env.as_dict()) + else: + merged_env.update(instance_env) merged_env.update(job_spec.env) + if router_env is not None: + merged_env.update(router_env) job_spec = job_spec.copy(deep=True) job_spec.env = merged_env quota = server_settings.SERVER_LOG_QUOTA_PER_JOB_HOUR diff --git a/src/dstack/_internal/server/services/runs/replicas.py b/src/dstack/_internal/server/services/runs/replicas.py index 53c26ca738..9320881522 100644 --- a/src/dstack/_internal/server/services/runs/replicas.py +++ b/src/dstack/_internal/server/services/runs/replicas.py @@ -1,10 +1,13 @@ from dataclasses import dataclass -from typing import List, Optional, Tuple +from enum import Enum +from typing import Dict, List, Optional, Tuple, Union -from dstack._internal.core.models.configurations import ReplicaGroup -from dstack._internal.core.models.runs import JobStatus, JobTerminationReason +from dstack._internal.core.models.configurations import ReplicaGroup, ServiceConfiguration +from dstack._internal.core.models.routers import RouterType +from dstack._internal.core.models.runs import JobStatus, JobTerminationReason, RunSpec from dstack._internal.server.models import JobModel, RunModel from dstack._internal.server.services.jobs import ( + get_job_provisioning_data, get_job_spec, group_jobs_by_replica_latest, ) @@ -20,6 +23,31 @@ class GroupRolloutState: registered_non_terminating_replica_count: int +class RouterEnvStatus(str, Enum): + """Outcomes returned from get_router_env_for_job() when no env dict is + appropriate. Each value carries a distinct caller-side action. + + Using an enum (rather than empty-dict sentinels) means callers can rely + on either `is` or `==` to compare — both yield correct, unambiguous + results — and stray dicts from elsewhere can never accidentally match. + + NOT_PROVISIONED — router job exists but its internal_ip is not yet + known. Transient; caller should defer this worker + and retry on the next pipeline tick (subject to + ROUTER_PROVISIONING_WAIT_TIMEOUT_SECONDS in + jobs_running.py). + FAILED — router job has reached a terminal state + (TERMINATING/TERMINATED/FAILED/ABORTED/DONE). + Permanent; caller should stop deferring and + terminate this worker — waiting longer cannot + recover because the router will not come back with + a fresh internal_ip. + """ + + NOT_PROVISIONED = "not_provisioned" + FAILED = "failed" + + def build_replica_lists( run_model: RunModel, group_filter: Optional[str] = None, @@ -124,3 +152,80 @@ def has_out_of_date_replicas(run: RunModel, group_filter: Optional[str] = None) def is_replica_registered(jobs: list[JobModel]) -> bool: # Only job_num=0 is supposed to receive service requests return jobs[0].registered + + +def get_router_replica_group(run_spec: RunSpec) -> Optional[ReplicaGroup]: + """Return the (single) replica group with a `router:` field, or None. + + `validate_at_most_one_router_replica_group` guarantees at most one such + group exists, so we can safely return on the first match. + """ + cfg = run_spec.configuration + if not isinstance(cfg, ServiceConfiguration): + return None + for g in cfg.replica_groups: + if g.router is not None: + return g + return None + + +def find_router_job(run_model: RunModel, router_group_name: str) -> Optional[JobModel]: + for j in run_model.jobs: + if job_belongs_to_group(j, router_group_name): + return j + return None + + +def get_router_env_for_job( + run_model: RunModel, run_spec: RunSpec, job_model: JobModel +) -> Optional[Union[Dict[str, str], RouterEnvStatus]]: + """Compute env vars exposing the router replica's address to a worker job. + + Returns one of four values, each communicating a distinct outcome: + + None -> not applicable. Either the + run has no router replica + group, or this job IS the + router replica. Caller does + nothing. + RouterEnvStatus.NOT_PROVISIONED -> router job exists but has no + internal_ip yet. Caller defers. + RouterEnvStatus.FAILED -> router job has reached a + terminal state and can never + expose an internal_ip. Caller + terminates this worker; + waiting cannot recover. + {"DSTACK_ROUTER_INTERNAL_IP": ...} -> ready-to-merge env dict + containing the router + replica's internal IP. + """ + router_group = get_router_replica_group(run_spec) + if router_group is None or router_group.name is None: + return None + # DSTACK_ROUTER_INTERNAL_IP is Dynamo-specific. SGLang workers + # are registered via the worker-sync pipeline (ServiceRouterWorkerSyncModel) + if router_group.router is None or router_group.router.type != RouterType.DYNAMO: + return None + if job_belongs_to_group(job_model, router_group.name): + # Router replica itself doesn't need to be told its own IP. + return None + + router_job = find_router_job(run_model, router_group.name) + if router_job is None: + # The router's latest submission is in a terminal state and was + # filtered out by _fetch_run_model's not-terminated predicate. + return RouterEnvStatus.FAILED + + # If the router has reached a terminal state, the worker cannot recover + # by waiting — the router will not come back with a fresh internal_ip + # under the same job. Surface this as FAILED so the caller can stop + # the wait loop and terminate the worker with a clear reason. + if router_job.status == JobStatus.TERMINATING or router_job.status.is_finished(): + return RouterEnvStatus.FAILED + + # Router is alive but may not yet have been assigned a machine. + jpd = get_job_provisioning_data(router_job) + if jpd is None or not jpd.internal_ip: + return RouterEnvStatus.NOT_PROVISIONED + + return {"DSTACK_ROUTER_INTERNAL_IP": jpd.internal_ip} diff --git a/src/dstack/_internal/server/services/runs/router_worker_sync.py b/src/dstack/_internal/server/services/runs/router_worker_sync.py index 5087749acf..2fc9add74b 100644 --- a/src/dstack/_internal/server/services/runs/router_worker_sync.py +++ b/src/dstack/_internal/server/services/runs/router_worker_sync.py @@ -19,7 +19,7 @@ from dstack._internal.utils.logging import get_logger from .replicas import job_belongs_to_group -from .service_router_worker_sync import run_spec_has_router_replica_group +from .service_router_worker_sync import run_spec_has_sglang_router_replica_group logger = get_logger(__name__) @@ -93,9 +93,9 @@ class _TargetWorker(TypedDict): bootstrap_port: NotRequired[Optional[int]] -def run_model_has_router_replica_group(run_model: RunModel) -> bool: +def run_model_has_sglang_router_replica_group(run_model: RunModel) -> bool: run_spec = RunSpec.__response__.parse_raw(run_model.run_spec) - return run_spec_has_router_replica_group(run_spec) + return run_spec_has_sglang_router_replica_group(run_spec) def _get_router_job(run_model: RunModel, router_group: ReplicaGroup) -> Optional[JobModel]: diff --git a/src/dstack/_internal/server/services/runs/service_router_worker_sync.py b/src/dstack/_internal/server/services/runs/service_router_worker_sync.py index f23a0d44cd..b251e76f92 100644 --- a/src/dstack/_internal/server/services/runs/service_router_worker_sync.py +++ b/src/dstack/_internal/server/services/runs/service_router_worker_sync.py @@ -9,6 +9,7 @@ import dstack._internal.utils.common as common_utils from dstack._internal.core.models.configurations import ServiceConfiguration +from dstack._internal.core.models.routers import RouterType from dstack._internal.core.models.runs import RunSpec from dstack._internal.server.models import RunModel, ServiceRouterWorkerSyncModel @@ -31,13 +32,15 @@ def _reactivate_sync_row_update_map(*, now: datetime) -> _SyncRowUpdateMap: } -def run_spec_has_router_replica_group(run_spec: RunSpec) -> bool: +def run_spec_has_sglang_router_replica_group(run_spec: RunSpec) -> bool: if run_spec.configuration.type != "service": return False cfg = run_spec.configuration if not isinstance(cfg, ServiceConfiguration): return False - return any(g.router is not None for g in cfg.replica_groups) + return any( + g.router is not None and g.router.type == RouterType.SGLANG for g in cfg.replica_groups + ) async def ensure_service_router_worker_sync_row( @@ -45,7 +48,7 @@ async def ensure_service_router_worker_sync_row( run_model: RunModel, run_spec: RunSpec, ) -> None: - if not run_spec_has_router_replica_group(run_spec): + if not run_spec_has_sglang_router_replica_group(run_spec): return res = await session.execute( select(ServiceRouterWorkerSyncModel).where( diff --git a/src/dstack/_internal/server/services/runs/spec.py b/src/dstack/_internal/server/services/runs/spec.py index 403437964b..9e6f1e417f 100644 --- a/src/dstack/_internal/server/services/runs/spec.py +++ b/src/dstack/_internal/server/services/runs/spec.py @@ -5,6 +5,7 @@ ServiceConfiguration, ) from dstack._internal.core.models.repos.virtual import DEFAULT_VIRTUAL_REPO_ID, VirtualRunRepoData +from dstack._internal.core.models.routers import RouterType from dstack._internal.core.models.runs import LEGACY_REPO_DIR, AnyRunConfiguration, RunSpec from dstack._internal.core.models.volumes import InstanceMountPoint from dstack._internal.core.services import validate_dstack_resource_name @@ -214,6 +215,56 @@ def _check_can_update_configuration( "Cannot update router replica groups in-place (adding/removing `router` or changing " "which replica group is the router is not supported). Stop the run and apply again." ) + current_router_type = ( + current_router_group.router.type + if current_router_group is not None and current_router_group.router is not None + else None + ) + new_router_type = ( + new_router_group.router.type + if new_router_group is not None and new_router_group.router is not None + else None + ) + # Universal rule: changing router.type in place is incompatible with + # the currently-provisioned router and the workers/clients configured + # for it. Applies to any pair of router types (today sglang ↔ dynamo). + if ( + current_router_type is not None + and new_router_type is not None + and current_router_type != new_router_type + ): + raise ServerClientError( + "Cannot change router.type in place. Stop the run with `dstack stop` and re-apply." + ) + # Dynamo-specific: even when router.type stays the same, Dynamo's + # workers cache the router's internal IP at provisioning time, so any + # change that would re-provision the router must be rejected. + if RouterType.DYNAMO in (current_router_type, new_router_type): + # 1. Direct changes to the router replica group's own fields. + if current_router_group != new_router_group: + raise ServerClientError( + "Cannot update a Dynamo router replica group in place. " + "Stop the run with `dstack stop` and re-apply." + ) + # 2. Top-level service fields that cascade to replicas — these + # would re-provision the router via inheritance whenever the + # router group does not override them at the group level. + # Sourced from dstack's rolling-updatable service field list to + # stay in sync if new fields are added later. + _router_affecting_top_level_fields = tuple( + f + for f in _TYPE_SPECIFIC_CONF_UPDATABLE_FIELDS.get("service", []) + if f not in ("replicas", "scaling") + ) + for field in _router_affecting_top_level_fields: + if getattr(current, field, None) != getattr(new, field, None): + raise ServerClientError( + f"Cannot change top-level `{field}` in place when the " + f"service has a Dynamo router (would re-provision the " + f"router and invalidate workers' cached " + f"DSTACK_ROUTER_INTERNAL_IP). Stop the run with " + f"`dstack stop` and re-apply." + ) updatable_fields = _CONF_UPDATABLE_FIELDS + _TYPE_SPECIFIC_CONF_UPDATABLE_FIELDS.get( new.type, [] ) diff --git a/src/tests/_internal/core/models/test_run_spec_validators.py b/src/tests/_internal/core/models/test_run_spec_validators.py new file mode 100644 index 0000000000..685f6b1eba --- /dev/null +++ b/src/tests/_internal/core/models/test_run_spec_validators.py @@ -0,0 +1,98 @@ +import pytest +from pydantic import ValidationError + +from dstack._internal.core.models.runs import RunSpec + + +def _service_run_spec_dict(router_type=None, retry=None, top_level_extras=None): + """Build a minimal RunSpec dict for a service. + + `router_type`: None | "sglang" | "dynamo" — controls whether/how the + second replica group has a router field. + `retry`: optional dict passed as `profile.retry`. + `top_level_extras`: optional dict merged into the service configuration. + """ + replicas = [{"name": "worker", "commands": ["echo hi"], "count": 1}] + if router_type is not None: + replicas.append( + { + "name": "router", + "router": {"type": router_type}, + "commands": ["echo router"], + "count": 1, + } + ) + configuration = { + "type": "service", + "port": 8000, + "replicas": replicas, + } + if top_level_extras: + configuration.update(top_level_extras) + profile = {"name": "default"} + if retry is not None: + profile["retry"] = retry + return { + "run_name": "test-run", + "repo_id": "test-repo", + "configuration_path": "dstack.yaml", + "configuration": configuration, + "profile": profile, + "ssh_key_pub": "ssh-rsa AAAA...", + "repo_data": {"repo_type": "virtual"}, + } + + +class TestDynamoNoRetryValidator: + def test_dynamo_router_with_retry_at_profile_level_is_rejected(self): + spec = _service_run_spec_dict( + router_type="dynamo", + retry={"on_events": ["error"]}, + ) + with pytest.raises(ValidationError, match="Dynamo"): + RunSpec.parse_obj(spec) + + def test_dynamo_router_with_retry_in_configuration_is_rejected(self): + # retry can also be specified at configuration level; _merged_profile + # folds it into merged_profile.retry, so the validator should still + # catch it. + spec = _service_run_spec_dict( + router_type="dynamo", + top_level_extras={"retry": {"on_events": ["error"]}}, + ) + with pytest.raises(ValidationError, match="Dynamo"): + RunSpec.parse_obj(spec) + + def test_dynamo_router_without_retry_is_accepted(self): + spec = _service_run_spec_dict(router_type="dynamo", retry=None) + # Should not raise: + RunSpec.parse_obj(spec) + + def test_sglang_router_with_retry_is_accepted(self): + spec = _service_run_spec_dict( + router_type="sglang", + retry={"on_events": ["error"]}, + ) + # SGLang services are unaffected by the validator. + RunSpec.parse_obj(spec) + + def test_service_without_router_with_retry_is_accepted(self): + spec = _service_run_spec_dict(router_type=None, retry={"on_events": ["error"]}) + RunSpec.parse_obj(spec) + + def test_non_service_run_with_retry_is_accepted(self): + # Validator is service-only. A task or dev-environment with retry + # shouldn't be flagged regardless of the rest of the spec. + spec = { + "run_name": "test-run", + "repo_id": "test-repo", + "configuration_path": "dstack.yaml", + "configuration": { + "type": "task", + "commands": ["echo hi"], + }, + "profile": {"name": "default", "retry": {"on_events": ["error"]}}, + "ssh_key_pub": "ssh-rsa AAAA...", + "repo_data": {"repo_type": "virtual"}, + } + RunSpec.parse_obj(spec) diff --git a/src/tests/_internal/server/services/runs/test_spec.py b/src/tests/_internal/server/services/runs/test_spec.py new file mode 100644 index 0000000000..675ec057dd --- /dev/null +++ b/src/tests/_internal/server/services/runs/test_spec.py @@ -0,0 +1,117 @@ +import pytest + +from dstack._internal.core.errors import ServerClientError +from dstack._internal.core.models.configurations import ServiceConfiguration +from dstack._internal.server.services.runs.spec import ( + _check_can_update_configuration, +) + + +def _service_configuration( + *, + router_type=None, + image=None, + env=None, + worker_count_min=None, + router_commands="echo router", + worker_commands="echo worker", +): + # Build a ServiceConfiguration instance for the in-place update tests. + worker = { + "name": "worker", + "commands": [worker_commands], + } + if worker_count_min is None: + worker["count"] = 1 + else: + worker["count"] = {"min": worker_count_min, "max": worker_count_min + 1} + worker["scaling"] = {"metric": "rps", "target": 4} + replicas = [worker] + if router_type is not None: + replicas.append( + { + "name": "router", + "router": {"type": router_type}, + "commands": [router_commands], + "count": 1, + } + ) + data = { + "type": "service", + "port": 8000, + "replicas": replicas, + } + if image is not None: + data["image"] = image + if env is not None: + data["env"] = env + return ServiceConfiguration.parse_obj(data) + + +class TestCheckCanUpdateConfigurationRouterType: + def test_sglang_to_dynamo_router_type_change_is_rejected(self): + current = _service_configuration(router_type="sglang") + new = _service_configuration(router_type="dynamo") + with pytest.raises(ServerClientError, match="router.type"): + _check_can_update_configuration(current, new, ignore_files=True) + + def test_dynamo_to_sglang_router_type_change_is_rejected(self): + current = _service_configuration(router_type="dynamo") + new = _service_configuration(router_type="sglang") + with pytest.raises(ServerClientError, match="router.type"): + _check_can_update_configuration(current, new, ignore_files=True) + + def test_same_router_type_no_other_changes_succeeds(self): + current = _service_configuration(router_type="dynamo") + new = _service_configuration(router_type="dynamo") + _check_can_update_configuration(current, new, ignore_files=True) + + +class TestCheckCanUpdateConfigurationDynamoRouterGroup: + def test_dynamo_router_group_commands_change_is_rejected(self): + current = _service_configuration(router_type="dynamo", router_commands="a") + new = _service_configuration(router_type="dynamo", router_commands="b") + with pytest.raises(ServerClientError, match="Dynamo router replica group"): + _check_can_update_configuration(current, new, ignore_files=True) + + +class TestCheckCanUpdateConfigurationDynamoTopLevel: + def test_dynamo_top_level_image_change_is_rejected(self): + current = _service_configuration(router_type="dynamo", image="img:1") + new = _service_configuration(router_type="dynamo", image="img:2") + with pytest.raises(ServerClientError, match="image.*Dynamo"): + _check_can_update_configuration(current, new, ignore_files=True) + + def test_dynamo_top_level_env_change_is_rejected(self): + current = _service_configuration(router_type="dynamo", env={"FOO": "1"}) + new = _service_configuration(router_type="dynamo", env={"FOO": "2"}) + with pytest.raises(ServerClientError, match="env.*Dynamo"): + _check_can_update_configuration(current, new, ignore_files=True) + + +class TestCheckCanUpdateConfigurationWorkerOnlyChangesAllowed: + def test_dynamo_worker_count_min_change_is_allowed(self): + current = _service_configuration(router_type="dynamo", worker_count_min=1) + new = _service_configuration(router_type="dynamo", worker_count_min=2) + # Worker group count change is allowed on a Dynamo service. + _check_can_update_configuration(current, new, ignore_files=True) + + def test_dynamo_worker_commands_change_is_allowed(self): + current = _service_configuration(router_type="dynamo", worker_commands="x") + new = _service_configuration(router_type="dynamo", worker_commands="y") + # Non-router replica group's commands change is allowed. + _check_can_update_configuration(current, new, ignore_files=True) + + +class TestCheckCanUpdateConfigurationNonDynamoUnchanged: + def test_sglang_top_level_image_change_is_allowed(self): + current = _service_configuration(router_type="sglang", image="img:1") + new = _service_configuration(router_type="sglang", image="img:2") + # Top-level changes on SGLang services flow through to the existing + # rolling-deployment path; no Dynamo gate fires. + _check_can_update_configuration(current, new, ignore_files=True) + + def test_no_router_top_level_image_change_is_allowed(self): + current = _service_configuration(router_type=None, image="img:1") + new = _service_configuration(router_type=None, image="img:2") + _check_can_update_configuration(current, new, ignore_files=True)