Skip to content

Conversation

@pfackeldey
Copy link
Collaborator

@pfackeldey pfackeldey commented Nov 6, 2025

This is a new implementation of the DaskExecutor scheduling logic. The logic is as follows:

  1. it submits the chunks for processing
  2. it collects them on-the-fly in batches as they arrive/finish and filters out the failed ones
  3. it continues scheduling the good ones for per-dataset merging as soon as possible into 1 merged output per dataset
  4. as soon as it's possible it starts merging these merged dataset outputs into a final merged one
  5. return the output of only successfully processed chunks and a list of failed workitems for reprocessing (with the failure reason)

Also:

  • fixes progress bars and shows proper merging progress bars.
  • supports partial result accumulation, only errors for merging failures

Breaks/deprecates:

  • worker_affinity
  • use_dataframes

Usage:

import hist
import awkward as ak
import random
import time

from coffea import processor
from coffea.nanoevents.schemas import NanoAODSchema

from dask.distributed import Client, LocalCluster


NanoAODSchema.warn_missing_crossrefs = False


class MyProcessor(processor.ProcessorABC):
    def process(self, events):
        dataset = events.metadata['dataset']
        muons = events.Muon

        h_mass = (
            hist.Hist.new
            .StrCat(["opposite", "same"], name="sign")
            .Log(1000, 0.2, 200., name="mass", label=r"$m_{\mu\mu}$ [GeV]")
            .Int64()
        )

        time.sleep(random.random() + 1.0)

        if random.random() < 0.45:
            raise ValueError('random failure')

        cut = (ak.num(muons) == 2) & (ak.sum(muons.charge, axis=1) == 0)
        # add first and second muon in every event together
        dimuon = muons[cut][:, 0] + muons[cut][:, 1]
        h_mass.fill(sign="opposite", mass=dimuon.mass)

        cut = (ak.num(muons) == 2) & (ak.sum(muons.charge, axis=1) != 0)
        dimuon = muons[cut][:, 0] + muons[cut][:, 1]
        h_mass.fill(sign="same", mass=dimuon.mass)

        return {
            dataset: {
                "entries": ak.num(events, axis=0),
                "mass": h_mass,
            }
        }

    def postprocess(self, accumulator):
        pass


fileset = {
    "Data": {
        "files": {
            "tests/samples/nano_dimuon.root": "Events",
        }
    },
    "DY": {
        "files": {
            "tests/samples/nano_dy.root": "Events"
        }
    },
}


if __name__ == "__main__":
    cluster = LocalCluster()
    client = Client(cluster)

    executor = processor.DaskExecutor(
        client=client,
        status=True,
        retries=1,
    )
    run = processor.Runner(
        executor=executor,
        schema=NanoAODSchema,
        chunksize=10,
        format="root",
    )

    output, failed_items = run(fileset, MyProcessor())
    
    import rich

    rich.print()
    rich.print(f"Output: {output}")
    rich.print()
    rich.print(f"Failed items: {failed_items}")

Example outputs with a random failure coffea processor:
image

Can be used for what #1393 wishes for, i.e.:

to_process = fileset
partial_outputs = []
while to_process:
  output, failed_items = run(to_process, MyProcessor())
  partial_outputs.append(output)
  if failed_items:
    to_process = (fail.item for fail in failed_items)
  else:
    to_process = []
    
 # accumulate all partial outputs
 out = accumulate(partial_outputs[1:], partial_outputs[0])

which should enable a more robust processing workflow.


Opening this in Draft mode as it breaks two kwargs and probably you want to test it first yourself. I wanted to write this down and have this diff somewhere as we're going to try this in the integration challenge.

cc @alexander-held @MoAly98 @oshadura @ArturU043

@lgray
Copy link
Collaborator

lgray commented Nov 6, 2025

I am fine with losing use_dataframes @nsmith- should chime in on worker_affinity.

use_dataframes lets people build up a distributed dataframe that they can output but I think it was pretty rarely used. I would be fine to say this should be put with dask-awkward instead.

@lgray
Copy link
Collaborator

lgray commented Nov 6, 2025

Also, this is excellent work! Thank you for the contribution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants