Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dask_cloudprovider/cloudprovider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ cloudprovider:
external_network_id: null # The ID of the external network used for assigning floating IPs. List available external networks using: `openstack network list --external`
create_floating_ip: false # Specifies whether to assign a floating IP to each instance, enabling external access. Set to `True` if external connectivity is needed.
docker_image: "daskdev/dask:latest" # docker image to use
worker_threads: 2 # The number of threads to use on each worker.
worker_command: null # str (optional) The command workers should run when starting. for example, ``dask-cuda-worker`` on GPU-enabled instances.


nebius:
token: null # iam token for interacting with the Nebius AI Cloud
Expand Down
63 changes: 63 additions & 0 deletions dask_cloudprovider/openstack/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ def __init__(
docker_image: str = None,
env_vars: str = None,
extra_bootstrap: str = None,
worker_threads: int = None,
worker_command: str = None,
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -42,6 +44,8 @@ def __init__(
self.size = size
self.image = image
self.env_vars = env_vars
self.worker_threads = worker_threads
self.worker_command = worker_command
self.bootstrap = True
self.docker_image = docker_image
self.extra_bootstrap = extra_bootstrap
Expand Down Expand Up @@ -226,6 +230,44 @@ async def start_scheduler(self):
class OpenStackWorker(WorkerMixin, OpenStackInstance):
"""Worker running on a OpenStack Instance."""

def __init__(
self,
scheduler: str,
*args,
worker_module: str = None,
worker_class: str = None,
worker_options: dict = {},
**kwargs,
):
super().__init__(
scheduler=scheduler,
*args,
worker_module=worker_module,
worker_class=worker_class,
worker_options=worker_options,
**kwargs,
)

# Select scheduler address (external or internal)
if self.config.get("create_floating_ip", True):
scheduler_ip = self.cluster.scheduler_external_ip
else:
scheduler_ip = self.cluster.scheduler_internal_ip
scheduler_address = f"{self.cluster.protocol}://{scheduler_ip}:{self.cluster.scheduler_port}"

# If user provides worker_command, override the start of the command
if self.worker_command:
# This is only for custom worker_command overrides
cmd = (
self.worker_command if isinstance(self.worker_command, list)
else self.worker_command.split()
)
self.command = " ".join([self.set_env] + cmd + [scheduler_address])

async def start(self):
self.cluster._log(f"Creating worker instance {self.name}")
await self.create_vm()
self.status = Status.running

class OpenStackCluster(VMCluster):
"""Cluster running on Openstack VM Instances
Expand Down Expand Up @@ -298,6 +340,21 @@ class OpenStackCluster(VMCluster):
Params to be passed to the worker class.
See :class:`distributed.worker.Worker` for default worker class.
If you set ``worker_module`` then refer to the docstring for the custom worker class.
worker_threads: int
The number of threads to use on each worker.
worker_command : str (optional)
The command workers should run when starting. By default this will be
``python -m distributed.cli.dask_spec``, but you can override it—for example, to
``dask-cuda-worker`` on GPU-enabled instances.
Example
-------
worker_command=[
"dask worker",
"--nthreads", "4",
"--memory-limit", "16GB",
]
scheduler_options: dict
Params to be passed to the scheduler class.
See :class:`distributed.scheduler.Scheduler`.
Expand Down Expand Up @@ -355,6 +412,8 @@ def __init__(
docker_image: str = None,
debug: bool = False,
bootstrap: bool = True,
worker_threads: int = 2,
worker_command: str = None,
**kwargs,
):
self.config = dask.config.get("cloudprovider.openstack", {})
Expand All @@ -364,13 +423,17 @@ def __init__(
self.bootstrap = (
bootstrap if bootstrap is not None else self.config.get("bootstrap")
)
self.worker_threads = worker_threads or self.config.get("worker_threads")
self.worker_command = worker_command or self.config.get("worker_command")
self.options = {
"cluster": self,
"config": self.config,
"region": region if region is not None else self.config.get("region"),
"size": size if size is not None else self.config.get("size"),
"image": image if image is not None else self.config.get("image"),
"docker_image": docker_image or self.config.get("docker_image"),
"worker_command": self.worker_command,
"worker_threads": self.worker_threads,
}
self.scheduler_options = {**self.options}
self.worker_options = {**self.options}
Expand Down
4 changes: 2 additions & 2 deletions examples/OpenstackCluster-scorepredict.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@
},
{
"cell_type": "code",
"execution_count": 49,
"execution_count": null,
"id": "5b745083-bb26-4fe9-a4f6-f7d049628a79",
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"import dask_cloudprovider\n",
"from instances import OpenStackCluster\n",
"from dask_cloudprovider.openstack import OpenStackCluster\n",
"from dask.distributed import Client, progress"
]
},
Expand Down
Loading