From 4d303126c89e4f408e3ef9b6722d0f49f4306e0b Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Tue, 11 Feb 2025 13:17:02 +0100 Subject: [PATCH 1/6] add failing multiprocessing test --- tests/test_array.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/test_array.py b/tests/test_array.py index e458ba106e..c05185c00a 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -1382,3 +1382,22 @@ def test_roundtrip_numcodecs() -> None: metadata = root["test"].metadata.to_dict() expected = (*filters, BYTES_CODEC, *compressors) assert metadata["codecs"] == expected + + +def _index_array(arr: Array, index: Any) -> Any: + return arr[index] + + +@pytest.mark.parametrize("store", ["local"], indirect=True) +def test_multiprocessing(store: Store) -> None: + """ + Test that arrays can be pickled and indexed in child processes + """ + data = np.arange(100) + arr = zarr.create_array(store=store, data=data) + from multiprocessing import Pool + + pool = Pool() + + results = pool.starmap(_index_array, [(arr, slice(len(data)))] * 3) + assert all(np.array_equal(r, data) for r in results) From 5dceb045f12ef6fd73a38da1838b160f47d9a44e Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Tue, 11 Feb 2025 16:30:53 +0100 Subject: [PATCH 2/6] add hook to reset global vars after fork --- src/zarr/core/sync.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/zarr/core/sync.py b/src/zarr/core/sync.py index 6a2de855e8..bbb7ebf554 100644 --- a/src/zarr/core/sync.py +++ b/src/zarr/core/sync.py @@ -3,6 +3,7 @@ import asyncio import atexit import logging +import os import threading from concurrent.futures import ThreadPoolExecutor, wait from typing import TYPE_CHECKING, TypeVar @@ -89,6 +90,20 @@ def cleanup_resources() -> None: atexit.register(cleanup_resources) +def reset_resources_after_fork() -> None: + """ + Ensure that global resources are reset after a fork. Without this function, + forked processes will retain invalid references to the parent process's resources. + """ + global loop, iothread, _executor + loop[0] = None + iothread[0] = None + _executor = None + + +os.register_at_fork(after_in_child=reset_resources_after_fork) + + async def _runner(coro: Coroutine[Any, Any, T]) -> T | BaseException: """ Await a coroutine and return the result of running it. If awaiting the coroutine raises an From b1378a793889cb2b83889a74a20a5214b1cc44a3 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Tue, 11 Feb 2025 16:32:00 +0100 Subject: [PATCH 3/6] parametrize multiprocessing test over different methods --- tests/test_array.py | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/tests/test_array.py b/tests/test_array.py index c05185c00a..b254263fa5 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -1,8 +1,10 @@ import dataclasses import json import math +import multiprocessing as mp import pickle import re +import sys from itertools import accumulate from typing import TYPE_CHECKING, Any, Literal from unittest import mock @@ -1388,16 +1390,33 @@ def _index_array(arr: Array, index: Any) -> Any: return arr[index] +@pytest.mark.parametrize( + "method", + [ + pytest.param( + "fork", + marks=pytest.mark.skipif( + sys.platform in ("win32", "darwin"), reason="fork not supported on Windows or OSX" + ), + ), + "spawn", + pytest.param( + "forkserver", + marks=pytest.mark.skipif( + sys.platform == "win32", reason="forkserver not supported on Windows" + ), + ), + ], +) @pytest.mark.parametrize("store", ["local"], indirect=True) -def test_multiprocessing(store: Store) -> None: +def test_multiprocessing(store: Store, method: Literal["fork", "spawn"]) -> None: """ Test that arrays can be pickled and indexed in child processes """ data = np.arange(100) arr = zarr.create_array(store=store, data=data) - from multiprocessing import Pool - - pool = Pool() + ctx = mp.get_context(method) + pool = ctx.Pool() - results = pool.starmap(_index_array, [(arr, slice(len(data)))] * 3) + results = pool.starmap(_index_array, [(arr, slice(len(data)))]) assert all(np.array_equal(r, data) for r in results) From b558f7f46c3756704f217f328ea47ee0d4067a03 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Tue, 11 Feb 2025 16:46:27 +0100 Subject: [PATCH 4/6] guard execution of register_at_fork with a hasattr check --- src/zarr/core/sync.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/zarr/core/sync.py b/src/zarr/core/sync.py index bbb7ebf554..c4b3595a95 100644 --- a/src/zarr/core/sync.py +++ b/src/zarr/core/sync.py @@ -101,7 +101,9 @@ def reset_resources_after_fork() -> None: _executor = None -os.register_at_fork(after_in_child=reset_resources_after_fork) +# this is only available on certain operating systems +if hasattr(os, "register_at_fork"): + os.register_at_fork(after_in_child=reset_resources_after_fork) async def _runner(coro: Coroutine[Any, Any, T]) -> T | BaseException: From bf6c79d60dde253d10f25d6181af277aeca01153 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Tue, 11 Feb 2025 16:56:09 +0100 Subject: [PATCH 5/6] exempt runs-in-a-forked-process code from coverage --- src/zarr/core/sync.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/zarr/core/sync.py b/src/zarr/core/sync.py index c4b3595a95..2bb5f24802 100644 --- a/src/zarr/core/sync.py +++ b/src/zarr/core/sync.py @@ -96,9 +96,13 @@ def reset_resources_after_fork() -> None: forked processes will retain invalid references to the parent process's resources. """ global loop, iothread, _executor - loop[0] = None - iothread[0] = None - _executor = None + # These lines are excluded from coverage because this function only runs in a child process, + # which is not observed by the test coverage instrumentation. Despite the apparent lack of + # test coverage, this function should be adequately tested by any test that uses Zarr IO with + # multiprocessing. + loop[0] = None # pragma: no cover + iothread[0] = None # pragma: no cover + _executor = None # pragma: no cover # this is only available on certain operating systems From 4f6f433fd5e4c933855a7f71d61570ac09677fbb Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Tue, 11 Feb 2025 16:57:25 +0100 Subject: [PATCH 6/6] update literal type --- tests/test_array.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_array.py b/tests/test_array.py index b254263fa5..1b84d1d061 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -1409,7 +1409,7 @@ def _index_array(arr: Array, index: Any) -> Any: ], ) @pytest.mark.parametrize("store", ["local"], indirect=True) -def test_multiprocessing(store: Store, method: Literal["fork", "spawn"]) -> None: +def test_multiprocessing(store: Store, method: Literal["fork", "spawn", "forkserver"]) -> None: """ Test that arrays can be pickled and indexed in child processes """