-
Notifications
You must be signed in to change notification settings - Fork 135
Open
Description
@btovar thanks for bringing up the dynamic chunking, I now remember you had implemented that at some point in the TaskVine Executor. So then we re-conceptualize Computable not as in iterator but perhaps more a Generator with a send channel for the Backend to request changes to the iterator in flight. Maybe something like:
from typing import Generator, Protocol, TypeAlias
from coffea.compute.protocol import (
Computable,
EmptyResult,
InputT,
ResultT,
WorkElement,
)
class SizedWorkElement(WorkElement[InputT, ResultT], Protocol):
def __len__(self) -> int:
"Return the size of this work element in some unit (e.g., number of events)"
...
NewSizeRequest: TypeAlias = int
class ResizableComputable(Computable[InputT, ResultT], Protocol):
def generate(
self,
) -> Generator[SizedWorkElement[InputT, ResultT], NewSizeRequest, None]:
"Generate work elements, possibly adapting their size based on external factors"
...
def compute_now(items: ResizableComputable[InputT, ResultT]) -> ResultT | EmptyResult:
out = EmptyResult()
work_gen = items.generate()
# Let it tell us the initial size
work_element = next(work_gen, None)
if work_element is None:
return out
while True:
result = work_element()
out += result
# Here we could adapt the size of future work elements based on performance metrics
# For simplicity, we just request the same size
try:
work_element = work_gen.send(len(work_element))
except StopIteration:
break
return out(a real implementation would have to wrap this into a Task)
This would require a bit of re-imagining how the FailedTaskElement is implemented, it could not anymore just keep track of the index in the iteratable, but the whole materialized WorkElement for later re-computation.
Originally posted by @nsmith- in #1470 (comment)
Metadata
Metadata
Assignees
Labels
No labels
Type
Projects
Status
No status