|
8 | 8 |
|
9 | 9 | """ |
10 | 10 |
|
11 | | -import asyncio |
12 | 11 | import logging |
13 | 12 | from collections.abc import Callable, Iterable |
14 | 13 | from dataclasses import dataclass |
@@ -241,9 +240,6 @@ def _comp_sidecar_fct( |
241 | 240 | ) |
242 | 241 | # NOTE: the callback is running in a secondary thread, and takes a future as arg |
243 | 242 | task_future.add_done_callback(lambda _: callback()) |
244 | | - await distributed.Variable(job_id, client=self.backend.client).set( |
245 | | - task_future |
246 | | - ) |
247 | 243 |
|
248 | 244 | await dask_utils.wrap_client_async_routine( |
249 | 245 | self.backend.client.publish_dataset(task_future, name=job_id) |
@@ -560,12 +556,6 @@ async def get_task_result(self, job_id: str) -> TaskOutputData: |
560 | 556 | async def release_task_result(self, job_id: str) -> None: |
561 | 557 | _logger.debug("releasing results for %s", f"{job_id=}") |
562 | 558 | try: |
563 | | - # NOTE: The distributed Variable holds the future of the tasks in the dask-scheduler |
564 | | - # Alas, deleting the variable is done asynchronously and there is no way to ensure |
565 | | - # the variable was effectively deleted. |
566 | | - # This is annoying as one can re-create the variable without error. |
567 | | - var = distributed.Variable(job_id, client=self.backend.client) |
568 | | - await asyncio.get_event_loop().run_in_executor(None, var.delete) |
569 | 559 | # first check if the key exists |
570 | 560 | await dask_utils.wrap_client_async_routine( |
571 | 561 | self.backend.client.get_dataset(name=job_id) |
|
0 commit comments