Skip to content

Can't use a lambda function in iteritems_options with FuturesExecutor #1408

@ikrommyd

Description

@ikrommyd

iteritems_options cannot contain lambda functions with FuturesExecutor. Results in pickling error with executors.
To reproduce:

from coffea import processor


fileset = {"dy": {"files": {"tests/samples/nano_dy.root": "Events"}}}


class P(processor.ProcessorABC):
    def process(self, events):
        return True

    def postprocess(self, accumulator):
        pass

if __name__ == "__main__":
    run = processor.Runner(executor=processor.FuturesExecutor())
    print(run(fileset=fileset, processor_instance=P(), iteritems_options={"filter_name": lambda name: True}))

gives

concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File "/Users/iason/micromamba/envs/awkward-coffea/lib/python3.13/multiprocessing/queues.py", line 262, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/Users/iason/micromamba/envs/awkward-coffea/lib/python3.13/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
    ~~~~~~~~~~~~~~~~~~~~~~~^^^^^
_pickle.PicklingError: Can't pickle <function <lambda> at 0x1079ce480>: attribute lookup <lambda> on __main__ failed
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/iason/Dropbox/work/coffea_dev/coffea/src/coffea/processor/executor.py", line 587, in _processwith
    merged = _watcher(FH, self, reducer, pool)
  File "/Users/iason/Dropbox/work/coffea_dev/coffea/src/coffea/processor/executor.py", line 382, in _watcher
    batch = FH.fetch(len(FH.completed))
  File "/Users/iason/Dropbox/work/coffea_dev/coffea/src/coffea/processor/executor.py", line 266, in fetch
    raise bad_futures[0].exception()
  File "/Users/iason/micromamba/envs/awkward-coffea/lib/python3.13/multiprocessing/queues.py", line 262, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/Users/iason/micromamba/envs/awkward-coffea/lib/python3.13/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
    ~~~~~~~~~~~~~~~~~~~~~~~^^^^^
_pickle.PicklingError: Can't pickle <function <lambda> at 0x1079ce480>: attribute lookup <lambda> on __main__ failed
---------------------------------------------------------------------------
PicklingError                             Traceback (most recent call last)
Cell In[1], line 16
     14 if __name__ == "__main__":
     15     run = processor.Runner(executor=processor.FuturesExecutor())
---> 16     print(run(fileset=fileset, processor_instance=P(), iteritems_options={"filter_name": lambda name: True}))

File ~/Dropbox/work/coffea_dev/coffea/src/coffea/processor/executor.py:1495, in Runner.__call__(self, fileset, processor_instance, treename, uproot_options, iteritems_options)
   1469 def __call__(
   1470     self,
   1471     fileset: dict,
   (...)   1475     iteritems_options: Optional[dict] = {},
   1476 ) -> Accumulatable:
   1477     """Run the processor_instance on a given fileset
   1478
   1479     Parameters
   (...)   1493             Any options to pass to ``tree.iteritems``
   1494     """
-> 1495     wrapped_out = self.run(
   1496         fileset, processor_instance, treename, uproot_options, iteritems_options
   1497     )
   1498     if self.use_dataframes:
   1499         return wrapped_out  # not wrapped anymore

File ~/Dropbox/work/coffea_dev/coffea/src/coffea/processor/executor.py:1639, in Runner.run(self, fileset, processor_instance, treename, uproot_options, iteritems_options)
   1634 closure = partial(
   1635     self.automatic_retries, self.retries, self.skipbadfiles, closure
   1636 )
   1638 executor = self.executor.copy(**exe_args)
-> 1639 wrapped_out, e = executor(chunks, closure, None)
   1640 if wrapped_out is None:
   1641     raise ValueError(
   1642         "No chunks returned results, verify ``processor`` instance structure.\n\
   1643         if you used skipbadfiles=True, it is possible all your files are bad."
   1644     )

File ~/Dropbox/work/coffea_dev/coffea/src/coffea/processor/executor.py:618, in FuturesExecutor.__call__(self, items, function, accumulator)
    616 else:
    617     mergepoolinstance = None
--> 618 return _processwith(pool=poolinstance, mergepool=mergepoolinstance)

File ~/Dropbox/work/coffea_dev/coffea/src/coffea/processor/executor.py:602, in FuturesExecutor.__call__.<locals>._processwith(pool, mergepool)
    600     return accumulate([_decompress(merged), accumulator]), e
    601 else:
--> 602     raise e from None

File ~/Dropbox/work/coffea_dev/coffea/src/coffea/processor/executor.py:587, in FuturesExecutor.__call__.<locals>._processwith(pool, mergepool)
    585 try:
    586     if mergepool is None:
--> 587         merged = _watcher(FH, self, reducer, pool)
    588     else:
    589         merged = _watcher(FH, self, reducer, mergepool)

File ~/Dropbox/work/coffea_dev/coffea/src/coffea/processor/executor.py:382, in _watcher(FH, executor, merge_fcn, pool)
    376             progress.update(
    377                 p_idm,
    378                 total=progress._tasks[p_idm].total + 1,
    379                 refresh=True,
    380             )
    381     else:  # Merge within process
--> 382         batch = FH.fetch(len(FH.completed))
    383         merged = _compress(
    384             accumulate(
    385                 progress.track(
   (...)    392             executor.compression,
    393         )
    394 # Add checkpointing

File ~/Dropbox/work/coffea_dev/coffea/src/coffea/processor/executor.py:266, in _FuturesHolder.fetch(self, N)
    264 bad_futures = [future for future in _completed if not _good_future(future)]
    265 self.completed.update(good_futures)
--> 266 raise bad_futures[0].exception()

File ~/micromamba/envs/awkward-coffea/lib/python3.13/multiprocessing/queues.py:262, in Queue._feed(buffer, notempty, send_bytes, writelock, reader_close, writer_close, ignore_epipe, onerror, queue_sem)
    259     return
    261 # serialize the data before acquiring the lock
--> 262 obj = _ForkingPickler.dumps(obj)
    263 if wacquire is None:
    264     send_bytes(obj)

File ~/micromamba/envs/awkward-coffea/lib/python3.13/multiprocessing/reduction.py:51, in ForkingPickler.dumps(cls, obj, protocol)
     48 @classmethod
     49 def dumps(cls, obj, protocol=None):
     50     buf = io.BytesIO()
---> 51     cls(buf, protocol).dump(obj)
     52     return buf.getbuffer()

PicklingError: Can't pickle <function <lambda> at 0x1079ce480>: attribute lookup <lambda> on __main__ failed

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

Projects

Status

In Progress

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions