-
Notifications
You must be signed in to change notification settings - Fork 409
Description
Hi there! Right now, async filesystems are explicitly incompatible with fork
, presumably because fsspec
tries to make sure that all the asyncio operations actually happen on a separate thread. See
filesystem_spec/fsspec/asyn.py
Lines 305 to 307 in 9ea19ba
def loop(self): | |
if self._pid != os.getpid(): | |
raise RuntimeError("This class is not fork-safe") |
I was wondering whether there was some way that we could add limited support for forked processes. I understand that fork + threading gets into some tricky technical weeds, but supporting some level of fork would make for some integration with other projects easier.
In particular, I am trying to use an S3-backed zarr array with PyTorch (in order to train neural networks on some very large arrays) -- torch.utils.data.DataLoader
does os.fork
by default on Linux (see https://pytorch.org/docs/stable/data.html#single-and-multi-process-data-loading), and the s3fs is an async file system, making this an incompatible combination.
I have been able to hack around this with the following code:
fs = ... # S3 file system, but could be an other async filesystem
try:
do_something(fs)
except RuntimeError as err:
if "fork-safe" not in str(err):
raise
import asyncio
from fsspec import asyn
if fs._loop is not asyn.loop[0]:
raise
# fs._loop is the default loop which is running on a thread on the parent process
# So let's spin up a new thread for this process.
new_loop = None
if len(asyn.loop) == 1:
new_loop = asyncio.new_event_loop()
asyn.loop.append(new_loop) # Appends are thread-safe!
if asyn.loop[1] is new_loop: # We inserted first
th = threading.Thread(target=new_loop.run_forever, name="fsspecIO")
th.daemon = True
th.start()
asyn.iothread.append(th)
fs._loop = asyn.loop[1]
fs._pid = os.getpid()
but obviously this is reaching fairly deep into the internals of fsspec
, which makes me uncomfortable.