Skip to content

Commit 87347a7

Browse files
committed
Fix build
1 parent fea29f0 commit 87347a7

File tree

5 files changed

+124
-55
lines changed

5 files changed

+124
-55
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ jobs:
3737
runs-on: ubuntu-latest
3838
steps:
3939
- uses: actions/checkout@v2
40-
- uses: actions/setup-python@v2
41-
- uses: pre-commit/action@v2.0.0
40+
- uses: actions/setup-python@v5
41+
- uses: pre-commit/action@v3.0.1
4242

4343
imports:
4444
runs-on: ubuntu-latest

dask_cloudprovider/aws/ec2.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ async def create_vm(self):
165165
vm_kwargs["InstanceMarketOptions"] = {
166166
"MarketType": "spot",
167167
"SpotOptions": {"SpotInstanceType": "one-time"},
168-
}
168+
}
169169

170170
response = await client.run_instances(**vm_kwargs)
171171
[self.instance] = response["Instances"]

dask_cloudprovider/gcp/instances.py

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -635,37 +635,61 @@ def __init__(
635635
)
636636
self.machine_type = machine_type or self.config.get("machine_type")
637637
if machine_type is None:
638-
self.scheduler_machine_type = scheduler_machine_type or self.config.get("scheduler_machine_type")
639-
self.worker_machine_type = worker_machine_type or self.config.get("worker_machine_type")
638+
self.scheduler_machine_type = scheduler_machine_type or self.config.get(
639+
"scheduler_machine_type"
640+
)
641+
self.worker_machine_type = worker_machine_type or self.config.get(
642+
"worker_machine_type"
643+
)
640644
if self.scheduler_machine_type is None or self.worker_machine_type is None:
641645
raise ValueError("machine_type and scheduler_machine_type must be set")
642646
else:
643647
if scheduler_machine_type is not None or worker_machine_type is not None:
644-
raise ValueError("If you specify machine_type, you may not specify scheduler_machine_type or worker_machine_type")
648+
raise ValueError(
649+
"If you specify machine_type, you may not specify scheduler_machine_type or worker_machine_type"
650+
)
645651
self.scheduler_machine_type = machine_type
646652
self.worker_machine_type = machine_type
647653

648654
self.ngpus = ngpus or self.config.get("ngpus")
649655
if not self.ngpus:
650-
self.scheduler_ngpus = scheduler_ngpus if scheduler_ngpus is not None else self.config.get("scheduler_ngpus", 0)
651-
self.worker_ngpus = worker_ngpus if worker_ngpus is not None else self.config.get("worker_ngpus", 0)
656+
self.scheduler_ngpus = (
657+
scheduler_ngpus
658+
if scheduler_ngpus is not None
659+
else self.config.get("scheduler_ngpus", 0)
660+
)
661+
self.worker_ngpus = (
662+
worker_ngpus
663+
if worker_ngpus is not None
664+
else self.config.get("worker_ngpus", 0)
665+
)
652666
else:
653667
if scheduler_ngpus is not None or worker_ngpus is not None:
654-
raise ValueError("If you specify ngpus, you may not specify scheduler_ngpus or worker_ngpus")
668+
raise ValueError(
669+
"If you specify ngpus, you may not specify scheduler_ngpus or worker_ngpus"
670+
)
655671
self.scheduler_ngpus = self.ngpus
656672
self.worker_ngpus = self.ngpus
657673

658674
self.gpu_type = gpu_type or self.config.get("gpu_type")
659675
if not self.gpu_type:
660-
self.scheduler_gpu_type = scheduler_gpu_type or self.config.get("scheduler_gpu_type")
676+
self.scheduler_gpu_type = scheduler_gpu_type or self.config.get(
677+
"scheduler_gpu_type"
678+
)
661679
self.worker_gpu_type = worker_gpu_type or self.config.get("worker_gpu_type")
662680
if self.scheduler_ngpus > 0 and self.scheduler_gpu_type is None:
663-
raise ValueError("scheduler_gpu_type must be specified when scheduler_ngpus > 0")
681+
raise ValueError(
682+
"scheduler_gpu_type must be specified when scheduler_ngpus > 0"
683+
)
664684
if self.worker_ngpus > 0 and self.worker_gpu_type is None:
665-
raise ValueError("worker_gpu_type must be specified when worker_ngpus > 0")
685+
raise ValueError(
686+
"worker_gpu_type must be specified when worker_ngpus > 0"
687+
)
666688
else:
667689
if scheduler_gpu_type is not None or worker_gpu_type is not None:
668-
raise ValueError("If you specify gpu_type, you may not specify scheduler_gpu_type or worker_gpu_type")
690+
raise ValueError(
691+
"If you specify gpu_type, you may not specify scheduler_gpu_type or worker_gpu_type"
692+
)
669693
self.scheduler_gpu_type = self.gpu_type
670694
self.worker_gpu_type = self.gpu_type
671695

@@ -730,7 +754,6 @@ def __init__(self, service_account_credentials: Optional[dict[str, Any]] = None)
730754
self._compute = self.refresh_client()
731755

732756
def refresh_client(self):
733-
734757
scopes = ["https://www.googleapis.com/auth/cloud-platform"]
735758

736759
if self.service_account_credentials:

dask_cloudprovider/ibm/code_engine.py

Lines changed: 75 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ def __init__(
8282
self.docker_password = docker_password
8383
self.docker_registry_name = docker_registry_name
8484

85-
self.authenticator = IAMAuthenticator(self.api_key, url="https://iam.cloud.ibm.com")
85+
self.authenticator = IAMAuthenticator(
86+
self.api_key, url="https://iam.cloud.ibm.com"
87+
)
8688
self.authenticator.set_disable_ssl_verification(
8789
True
8890
) # Disable SSL verification for the authenticator
@@ -97,9 +99,11 @@ def __init__(
9799

98100
def _extract_k8s_config_details(self, project_id):
99101
delegated_refresh_token_payload = {
100-
'grant_type': 'urn:ibm:params:oauth:grant-type:apikey', 'apikey': self.api_key,
101-
'response_type': 'delegated_refresh_token', 'receiver_client_ids': 'ce',
102-
'delegated_refresh_token_expiry': '3600'
102+
"grant_type": "urn:ibm:params:oauth:grant-type:apikey",
103+
"apikey": self.api_key,
104+
"response_type": "delegated_refresh_token",
105+
"receiver_client_ids": "ce",
106+
"delegated_refresh_token_expiry": "3600",
103107
}
104108
token_manager = self.code_engine_service.authenticator.token_manager
105109
original_request_payload = token_manager.request_payload
@@ -109,24 +113,40 @@ def _extract_k8s_config_details(self, project_id):
109113
finally:
110114
token_manager.request_payload = original_request_payload
111115

112-
kc_resp = self.code_engine_service_v1.get_kubeconfig(iam_response['delegated_refresh_token'], project_id)
116+
kc_resp = self.code_engine_service_v1.get_kubeconfig(
117+
iam_response["delegated_refresh_token"], project_id
118+
)
113119
kubeconfig_data = kc_resp.get_result()
114120

115-
current_context_name = kubeconfig_data['current-context']
116-
context_details = next(c['context'] for c in kubeconfig_data['contexts'] if c['name'] == current_context_name)
121+
current_context_name = kubeconfig_data["current-context"]
122+
context_details = next(
123+
c["context"]
124+
for c in kubeconfig_data["contexts"]
125+
if c["name"] == current_context_name
126+
)
117127

118-
namespace = context_details.get('namespace', 'default')
119-
server_url = next(c['cluster'] for c in kubeconfig_data['clusters'] if c['name'] == context_details['cluster'])['server']
128+
namespace = context_details.get("namespace", "default")
129+
server_url = next(
130+
c["cluster"]
131+
for c in kubeconfig_data["clusters"]
132+
if c["name"] == context_details["cluster"]
133+
)["server"]
120134
return namespace, server_url
121135

122136
def create_registry_secret(self):
123137
# Set up the authenticator and service instance
124-
self.code_engine_service_v1 = IbmCloudCodeEngineV1(authenticator=self.authenticator)
125-
self.code_engine_service_v1.set_service_url("https://api." + self.region + ".codeengine.cloud.ibm.com/api/v1")
138+
self.code_engine_service_v1 = IbmCloudCodeEngineV1(
139+
authenticator=self.authenticator
140+
)
141+
self.code_engine_service_v1.set_service_url(
142+
"https://api." + self.region + ".codeengine.cloud.ibm.com/api/v1"
143+
)
126144
token = self.authenticator.token_manager.get_token()
127145

128146
# Fetch K8s config details
129-
namespace, k8s_api_server_url = self._extract_k8s_config_details(self.project_id)
147+
namespace, k8s_api_server_url = self._extract_k8s_config_details(
148+
self.project_id
149+
)
130150

131151
# Create a new configuration instance
132152
configuration = client.Configuration()
@@ -136,35 +156,51 @@ def create_registry_secret(self):
136156
core_api = client.CoreV1Api(api_client_instance)
137157

138158
secret = client.V1Secret(
139-
metadata=client.V1ObjectMeta(name=self.docker_registry_name, namespace=namespace),
159+
metadata=client.V1ObjectMeta(
160+
name=self.docker_registry_name, namespace=namespace
161+
),
140162
type="kubernetes.io/dockerconfigjson",
141-
string_data={".dockerconfigjson": json.dumps({
142-
"auths": {
143-
self.docker_server: {
144-
"username": self.docker_username,
145-
"password": self.docker_password,
163+
string_data={
164+
".dockerconfigjson": json.dumps(
165+
{
166+
"auths": {
167+
self.docker_server: {
168+
"username": self.docker_username,
169+
"password": self.docker_password,
170+
}
171+
}
146172
}
147-
}
148-
})}
173+
)
174+
},
149175
)
150176

151177
try:
152-
core_api.delete_namespaced_secret(self.docker_registry_name, namespace=namespace)
178+
core_api.delete_namespaced_secret(
179+
self.docker_registry_name, namespace=namespace
180+
)
153181
except ApiException as e:
154-
if e.status == 404: # Not Found, which is fine
182+
if e.status == 404: # Not Found, which is fine
155183
pass
156184
else:
157-
self.cluster._log(f"Error deleting existing registry secret {self.docker_registry_name} in {namespace}: {e}")
185+
self.cluster._log(
186+
f"Error deleting existing registry secret {self.docker_registry_name} in {namespace}: {e}"
187+
)
158188
pass
159189

160190
try:
161191
core_api.create_namespaced_secret(namespace, secret)
162-
self.cluster._log(f"Successfully created registry secret '{self.docker_registry_name}'.")
192+
self.cluster._log(
193+
f"Successfully created registry secret '{self.docker_registry_name}'."
194+
)
163195
except ApiException as e:
164-
if e.status == 409: # Conflict, secret already exists
165-
self.cluster._log(f"Registry secret '{self.docker_registry_name}' already exists.")
196+
if e.status == 409: # Conflict, secret already exists
197+
self.cluster._log(
198+
f"Registry secret '{self.docker_registry_name}' already exists."
199+
)
166200
else:
167-
self.cluster._log(f"Error creating registry secret '{self.docker_registry_name}': {e}")
201+
self.cluster._log(
202+
f"Error creating registry secret '{self.docker_registry_name}': {e}"
203+
)
168204
raise e
169205

170206
async def create_vm(self):
@@ -307,7 +343,9 @@ def __init__(self, *args, **kwargs):
307343
async def start(self):
308344
if self.docker_server and self.docker_username and self.docker_password:
309345
self.docker_registry_name = "dask-" + self.docker_server.split(".")[0]
310-
self.cluster._log(f"Creating registry secret for {self.docker_registry_name}")
346+
self.cluster._log(
347+
f"Creating registry secret for {self.docker_registry_name}"
348+
)
311349
self.create_registry_secret()
312350

313351
self.cluster._log(
@@ -375,7 +413,7 @@ def __init__(
375413
),
376414
]
377415

378-
# To work with Code Engine, we need to use the extra arguments
416+
# To work with Code Engine, we need to use the extra arguments
379417
if self.worker_command:
380418
custom_command_prefix = self.worker_command.split()
381419
original_command_suffix = self.command[3:]
@@ -445,7 +483,8 @@ class IBMCodeEngineCluster(VMCluster):
445483
docker_username: str
446484
The username for authenticating with the Docker registry. Required if using private Docker images.
447485
docker_password: str
448-
The password or access token for authenticating with the Docker registry. Required if using private Docker images.
486+
The password or access token for authenticating with the Docker registry.
487+
Required if using private Docker images.
449488
debug: bool, optional
450489
More information will be printed when constructing clusters to enable debugging.
451490
@@ -572,8 +611,12 @@ def __init__(
572611
self.scheduler_cpu = scheduler_cpu or self.config.get("scheduler_cpu")
573612
self.scheduler_mem = scheduler_mem or self.config.get("scheduler_mem")
574613
self.scheduler_disk = scheduler_disk or self.config.get("scheduler_disk")
575-
self.scheduler_timeout = scheduler_timeout or self.config.get("scheduler_timeout")
576-
self.scheduler_command = scheduler_command or self.config.get("scheduler_command")
614+
self.scheduler_timeout = scheduler_timeout or self.config.get(
615+
"scheduler_timeout"
616+
)
617+
self.scheduler_command = scheduler_command or self.config.get(
618+
"scheduler_command"
619+
)
577620
self.worker_cpu = worker_cpu or self.config.get("worker_cpu")
578621
self.worker_mem = worker_mem or self.config.get("worker_mem")
579622
self.worker_disk = worker_disk or self.config.get("worker_disk")

dask_cloudprovider/openstack/instances.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,18 +122,17 @@ async def create_and_assign_floating_ip(self, conn):
122122

123123
# Find the first port of the instance
124124
ports = await self.call_async(
125-
conn.network.ports,
126-
device_id=self.instance.id
125+
conn.network.ports, device_id=self.instance.id
127126
)
128127
ports = list(ports)
129128
if not ports:
130-
raise RuntimeError(f"No network ports found for instance {self.instance.id}")
129+
raise RuntimeError(
130+
f"No network ports found for instance {self.instance.id}"
131+
)
131132

132133
# Assign the floating IP to the instance's port
133134
await self.call_async(
134-
conn.network.update_ip,
135-
floating_ip,
136-
port_id=ports[0].id
135+
conn.network.update_ip, floating_ip, port_id=ports[0].id
137136
)
138137

139138
return floating_ip.floating_ip_address
@@ -253,13 +252,16 @@ def __init__(
253252
scheduler_ip = self.cluster.scheduler_external_ip
254253
else:
255254
scheduler_ip = self.cluster.scheduler_internal_ip
256-
scheduler_address = f"{self.cluster.protocol}://{scheduler_ip}:{self.cluster.scheduler_port}"
255+
scheduler_address = (
256+
f"{self.cluster.protocol}://{scheduler_ip}:{self.cluster.scheduler_port}"
257+
)
257258

258259
# If user provides worker_command, override the start of the command
259260
if self.worker_command:
260261
# This is only for custom worker_command overrides
261262
cmd = (
262-
self.worker_command if isinstance(self.worker_command, list)
263+
self.worker_command
264+
if isinstance(self.worker_command, list)
263265
else self.worker_command.split()
264266
)
265267
self.command = " ".join([self.set_env] + cmd + [scheduler_address])
@@ -269,6 +271,7 @@ async def start(self):
269271
await self.create_vm()
270272
self.status = Status.running
271273

274+
272275
class OpenStackCluster(VMCluster):
273276
"""Cluster running on Openstack VM Instances
274277
@@ -354,7 +357,7 @@ class OpenStackCluster(VMCluster):
354357
"dask worker",
355358
"--nthreads", "4",
356359
"--memory-limit", "16GB",
357-
]
360+
]
358361
scheduler_options: dict
359362
Params to be passed to the scheduler class.
360363
See :class:`distributed.scheduler.Scheduler`.

0 commit comments

Comments
 (0)