Skip to content

Commit 92fc6c0

Browse files
committed
Fix bug
1 parent 9762340 commit 92fc6c0

File tree

1 file changed

+12
-7
lines changed

1 file changed

+12
-7
lines changed

doreisa/_scheduler.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from collections import Counter
44

55
import ray
6+
import ray.actor
67
from dask.core import get_dependencies
78

89
from doreisa._scheduling_actor import ChunkRef, ScheduledByOtherActor
@@ -123,13 +124,8 @@ def explore(k) -> int:
123124

124125
res_ref = scheduling_actors[partition[key]].get_value.remote(graph_id, key)
125126

126-
# At this point, get_value will not be called anymore for this graph.
127-
ray.get(
128-
[
129-
actor.clear_graph.options(enable_task_events=False).remote(graph_id)
130-
for id, actor in enumerate(scheduling_actors)
131-
if partitionned_graphs[id]
132-
]
127+
clear_graph.remote(
128+
[actor for id, actor in enumerate(scheduling_actors) if partitionned_graphs[id]], res_ref, graph_id
133129
)
134130

135131
if kwargs.get("ray_persist"):
@@ -144,3 +140,12 @@ def explore(k) -> int:
144140
if isinstance(keys[0], list):
145141
return [[res]]
146142
return [res]
143+
144+
145+
@ray.remote(max_retries=0, num_cpus=0)
146+
def clear_graph(scheduling_actors: list[ray.actor.ActorHandle], res: ray.ObjectRef, graph_id: int) -> None:
147+
# Wait until the result is ready
148+
ray.wait([res], fetch_local=False)
149+
150+
# Clear the graph
151+
ray.get([actor.clear_graph.remote(graph_id) for actor in scheduling_actors])

0 commit comments

Comments
 (0)