Skip to content

Commit acf60c9

Browse files
authored
[chore]: Move run_async to _internal/utils (#2057)
Part of an upcoming refactoring, moved to a separate PR for a smaller diff and to avoid merge conflicts. `run_async` will be used both in `_internal/server` and `_internal/proxy`.
1 parent c10b1fe commit acf60c9

File tree

18 files changed

+37
-51
lines changed

18 files changed

+37
-51
lines changed

src/dstack/_internal/core/services/ssh/tunnel.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from dstack._internal.core.models.instances import SSHConnectionParams
1212
from dstack._internal.core.services.ssh import get_ssh_error
1313
from dstack._internal.core.services.ssh.client import get_ssh_client_info
14+
from dstack._internal.utils.common import run_async
1415
from dstack._internal.utils.logging import get_logger
1516
from dstack._internal.utils.path import FilePath, FilePathOrContent, PathLike
1617
from dstack._internal.utils.ssh import normalize_path
@@ -200,8 +201,7 @@ def open(self) -> None:
200201
raise get_ssh_error(stderr)
201202

202203
async def aopen(self) -> None:
203-
loop = asyncio.get_event_loop()
204-
await loop.run_in_executor(None, self._remove_log_file)
204+
await run_async(self._remove_log_file)
205205
proc = await asyncio.create_subprocess_exec(
206206
*self.open_command(), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
207207
)
@@ -214,7 +214,7 @@ async def aopen(self) -> None:
214214
raise SSHError(msg) from e
215215
if proc.returncode == 0:
216216
return
217-
stderr = await loop.run_in_executor(None, self._read_log_file)
217+
stderr = await run_async(self._read_log_file)
218218
logger.debug("SSH tunnel failed: %s", stderr)
219219
raise get_ssh_error(stderr)
220220

src/dstack/_internal/server/background/tasks/process_instances.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,7 @@
8989
from dstack._internal.server.services.runner import client as runner_client
9090
from dstack._internal.server.services.runner.client import HealthStatus
9191
from dstack._internal.server.services.runner.ssh import runner_ssh_tunnel
92-
from dstack._internal.server.utils.common import run_async
93-
from dstack._internal.utils.common import get_current_datetime
92+
from dstack._internal.utils.common import get_current_datetime, run_async
9493
from dstack._internal.utils.logging import get_logger
9594
from dstack._internal.utils.network import get_ip_from_network, is_ip_among_addresses
9695
from dstack._internal.utils.ssh import (
@@ -262,9 +261,7 @@ async def _add_remote(instance: InstanceModel) -> None:
262261
authorized_keys.append(instance.project.ssh_public_key.strip())
263262

264263
try:
265-
future = asyncio.get_running_loop().run_in_executor(
266-
None, _deploy_instance, remote_details, pkeys, authorized_keys
267-
)
264+
future = run_async(_deploy_instance, remote_details, pkeys, authorized_keys)
268265
deploy_timeout = 20 * 60 # 20 minutes
269266
result = await asyncio.wait_for(future, timeout=deploy_timeout)
270267
health, host_info = result

src/dstack/_internal/server/background/tasks/process_metrics.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@
1313
from dstack._internal.server.services.jobs import get_job_provisioning_data
1414
from dstack._internal.server.services.runner import client
1515
from dstack._internal.server.services.runner.ssh import runner_ssh_tunnel
16-
from dstack._internal.server.utils.common import run_async
17-
from dstack._internal.utils.common import batched, get_current_datetime
16+
from dstack._internal.utils.common import batched, get_current_datetime, run_async
1817
from dstack._internal.utils.logging import get_logger
1918

2019
logger = get_logger(__name__)

src/dstack/_internal/server/background/tasks/process_placement_groups.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@
1111
from dstack._internal.server.services import backends as backends_services
1212
from dstack._internal.server.services.locking import get_locker
1313
from dstack._internal.server.services.placement import placement_group_model_to_placement_group
14-
from dstack._internal.server.utils.common import run_async
15-
from dstack._internal.utils.common import get_current_datetime
14+
from dstack._internal.utils.common import get_current_datetime, run_async
1615
from dstack._internal.utils.logging import get_logger
1716

1817
logger = get_logger(__name__)

src/dstack/_internal/server/background/tasks/process_running_jobs.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
run_model_to_run,
4848
)
4949
from dstack._internal.server.services.storage import get_default_storage
50-
from dstack._internal.server.utils.common import run_async
5150
from dstack._internal.utils import common as common_utils
5251
from dstack._internal.utils.interpolator import VariablesInterpolator
5352
from dstack._internal.utils.logging import get_logger
@@ -188,7 +187,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
188187
if job_provisioning_data.backend == BackendType.LOCAL:
189188
# No need to update ~/.ssh/authorized_keys when running shim localy
190189
user_ssh_key = ""
191-
success = await run_async(
190+
success = await common_utils.run_async(
192191
_process_provisioning_with_shim,
193192
server_ssh_private_key,
194193
job_provisioning_data,
@@ -213,7 +212,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
213212
repo=repo_model,
214213
code_hash=run.run_spec.repo_code_hash,
215214
)
216-
success = await run_async(
215+
success = await common_utils.run_async(
217216
_process_provisioning_no_shim,
218217
server_ssh_private_key,
219218
job_provisioning_data,
@@ -254,7 +253,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
254253
repo=repo_model,
255254
code_hash=run.run_spec.repo_code_hash,
256255
)
257-
success = await run_async(
256+
success = await common_utils.run_async(
258257
_process_pulling_with_shim,
259258
server_ssh_private_key,
260259
job_provisioning_data,
@@ -268,7 +267,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
268267
)
269268
elif initial_status == JobStatus.RUNNING:
270269
logger.debug("%s: process running job, age=%s", fmt(job_model), job_submission.age)
271-
success = await run_async(
270+
success = await common_utils.run_async(
272271
_process_running,
273272
server_ssh_private_key,
274273
job_provisioning_data,
@@ -601,7 +600,7 @@ async def _get_job_code(
601600
storage = get_default_storage()
602601
if storage is None or code_model.blob is not None:
603602
return code_model.blob
604-
blob = await run_async(
603+
blob = await common_utils.run_async(
605604
storage.get_code,
606605
project.name,
607606
repo.name,

src/dstack/_internal/server/background/tasks/process_submitted_jobs.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@
6666
from dstack._internal.server.services.volumes import (
6767
volume_model_to_volume,
6868
)
69-
from dstack._internal.server.utils.common import run_async
7069
from dstack._internal.utils import common as common_utils
7170
from dstack._internal.utils.logging import get_logger
7271

@@ -406,7 +405,7 @@ async def _run_job_on_new_instance(
406405
)
407406
offer_volumes = get_offer_volumes(volumes, offer)
408407
try:
409-
job_provisioning_data = await run_async(
408+
job_provisioning_data = await common_utils.run_async(
410409
backend.compute().run_job,
411410
run,
412411
job,
@@ -587,7 +586,7 @@ async def _attach_volume(
587586
if volume_model.deleted:
588587
raise ServerClientError("Cannot attach a deleted volume")
589588
volume = volume_model_to_volume(volume_model)
590-
attachment_data = await run_async(
589+
attachment_data = await common_utils.run_async(
591590
backend.compute().attach_volume,
592591
volume=volume,
593592
instance_id=instance_id,

src/dstack/_internal/server/background/tasks/process_volumes.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@
99
from dstack._internal.server.services import backends as backends_services
1010
from dstack._internal.server.services import volumes as volumes_services
1111
from dstack._internal.server.services.locking import get_locker
12-
from dstack._internal.server.utils.common import run_async
13-
from dstack._internal.utils.common import get_current_datetime
12+
from dstack._internal.utils.common import get_current_datetime, run_async
1413
from dstack._internal.utils.logging import get_logger
1514

1615
logger = get_logger(__name__)

src/dstack/_internal/server/services/backends/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from dstack._internal.server.models import BackendModel, ProjectModel
2929
from dstack._internal.server.services.backends.configurators.base import Configurator
3030
from dstack._internal.server.settings import LOCAL_BACKEND_ENABLED
31-
from dstack._internal.server.utils.common import run_async
31+
from dstack._internal.utils.common import run_async
3232
from dstack._internal.utils.logging import get_logger
3333

3434
logger = get_logger(__name__)

src/dstack/_internal/server/services/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
DefaultPermissions,
3535
set_default_permissions,
3636
)
37-
from dstack._internal.server.utils.common import run_async
37+
from dstack._internal.utils.common import run_async
3838
from dstack._internal.utils.logging import get_logger
3939

4040
logger = get_logger(__name__)

src/dstack/_internal/server/services/gateways/__init__.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,8 @@
6666
string_to_lock_id,
6767
)
6868
from dstack._internal.server.services.logging import fmt
69-
from dstack._internal.server.utils.common import (
70-
gather_map_async,
71-
run_async,
72-
)
73-
from dstack._internal.utils.common import get_current_datetime
69+
from dstack._internal.server.utils.common import gather_map_async
70+
from dstack._internal.utils.common import get_current_datetime, run_async
7471
from dstack._internal.utils.crypto import generate_rsa_key_pair_bytes
7572
from dstack._internal.utils.logging import get_logger
7673

0 commit comments

Comments
 (0)