Skip to content

Commit 8c0df97

Browse files
authored
Dask check resources (#967)
1 parent c66ca2d commit 8c0df97

File tree

2 files changed

+30
-0
lines changed

2 files changed

+30
-0
lines changed

cluster_tools/Changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ For upgrade instructions, please check the respective *Breaking Changes* section
1414
### Added
1515

1616
### Changed
17+
- Added check whether any dask worker has enough resources for the submitted job in the `DaskExecutor`. [#967](https://github.com/scalableminds/webknossos-libs/pull/967)
1718

1819
### Fixed
1920

cluster_tools/cluster_tools/executors/dask.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,35 @@ def submit( # type: ignore[override]
181181
# with the scheduler regularly.
182182
__fn = partial(_run_with_nanny, __fn)
183183

184+
currently_available_workers = self.client.scheduler_info()["workers"]
185+
186+
def check_resources(
187+
job_resources: Optional[Dict[str, float]],
188+
worker_resources: Optional[Dict[str, float]],
189+
) -> bool:
190+
if job_resources is None or len(job_resources) == 0:
191+
return True
192+
if worker_resources is None:
193+
return False
194+
for key, value in job_resources.items():
195+
if worker_resources.get(key, 0) < value:
196+
return False
197+
return True
198+
199+
assert any(
200+
check_resources(self.job_resources, worker.get("resources"))
201+
for worker in currently_available_workers.values()
202+
), (
203+
"Requested resources are not available on any currently available worker. "
204+
+ f"Requested resources: {self.job_resources}. Available workers: "
205+
+ str(
206+
[
207+
f"{w['id']} => {w.get('resources', {})}"
208+
for w in currently_available_workers.values()
209+
]
210+
)
211+
)
212+
184213
fut = self.client.submit(
185214
partial(__fn, *args, **kwargs), pure=False, resources=self.job_resources
186215
)

0 commit comments

Comments
 (0)