diff --git a/changes/3452.feature.rst b/changes/3452.feature.rst new file mode 100644 index 0000000000..ed2f416ada --- /dev/null +++ b/changes/3452.feature.rst @@ -0,0 +1,6 @@ +Add optional uvloop support for improved async performance. + +When uvloop is available, Zarr will automatically use it as the event loop implementation +for better I/O performance. This can be controlled via the ``async.use_uvloop`` configuration +setting or the ``ZARR_ASYNC__USE_UVLOOP`` environment variable. uvloop can be installed +with ``pip install 'zarr[optional]'`` (Unix/Linux/macOS only). \ No newline at end of file diff --git a/docs/user-guide/config.rst b/docs/user-guide/config.rst index 76210da791..71d5bf686f 100644 --- a/docs/user-guide/config.rst +++ b/docs/user-guide/config.rst @@ -42,7 +42,7 @@ This is the current default configuration:: >>> zarr.config.pprint() {'array': {'order': 'C', 'write_empty_chunks': False}, - 'async': {'concurrency': 10, 'timeout': None}, + 'async': {'concurrency': 10, 'timeout': None, 'use_uvloop': True}, 'buffer': 'zarr.buffer.cpu.Buffer', 'codec_pipeline': {'batch_size': 1, 'path': 'zarr.core.codec_pipeline.BatchedCodecPipeline'}, diff --git a/docs/user-guide/performance.rst b/docs/user-guide/performance.rst index 0f31e5d7be..2f5e19eb30 100644 --- a/docs/user-guide/performance.rst +++ b/docs/user-guide/performance.rst @@ -270,6 +270,81 @@ E.g., pickle/unpickle an local store array:: >>> np.all(z1[:] == z2[:]) np.True_ +.. _user-guide-uvloop: + +Event loop optimization with uvloop +----------------------------------- + +Zarr can optionally use `uvloop `_, a fast, +drop-in replacement for the default Python asyncio event loop implementation. +uvloop is written in Cython and built on top of libuv, providing significantly +better performance for I/O-intensive operations. + +When uvloop is available, Zarr will use it by default for better performance. +This is particularly beneficial when working with remote storage backends or +performing many concurrent operations. + +Installation +~~~~~~~~~~~~ + +To enable uvloop support, install it as an optional dependency:: + + pip install 'zarr[optional]' + +Or install uvloop directly (Unix/Linux/macOS only):: + + pip install uvloop + +.. note:: + uvloop is automatically included in the ``optional`` dependency group, but only + installed on supported platforms (Unix/Linux/macOS). On Windows, the installation + will succeed but uvloop will be skipped. + +Configuration +~~~~~~~~~~~~~ + +uvloop usage can be controlled via Zarr's configuration system: + +.. code-block:: python + + import zarr + + # Enable uvloop (default when available) + zarr.config.set({"async.use_uvloop": True}) + + # Disable uvloop (use standard asyncio) + zarr.config.set({"async.use_uvloop": False}) + +You can also control this via environment variables:: + + # Disable uvloop + export ZARR_ASYNC__USE_UVLOOP=false + +Platform Support +~~~~~~~~~~~~~~~~~ + +uvloop is supported on: + +- Linux +- macOS +- Other Unix-like systems + +uvloop is **not** supported on Windows. On Windows, Zarr will automatically +fall back to the standard asyncio event loop regardless of the configuration setting. + +Performance Benefits +~~~~~~~~~~~~~~~~~~~~ + +uvloop can provide performance improvements for: + +- Remote storage operations (S3, GCS, etc.) +- Concurrent array operations +- Large numbers of small I/O operations +- Network-bound workloads + +The performance improvement varies depending on the workload, but can be +substantial for I/O-intensive operations. + .. _user-guide-tips-blosc: Configuring Blosc diff --git a/pyproject.toml b/pyproject.toml index 11d91944d5..c94fdcbf8b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -93,7 +93,11 @@ remote_tests = [ "moto[s3,server]", "requests", ] -optional = ["rich", "universal-pathlib"] +optional = [ + "rich", + "universal-pathlib", + "uvloop; sys_platform != 'win32'" +] docs = [ # Doc building 'sphinx==8.1.3', @@ -234,6 +238,7 @@ dependencies = [ 'typing_extensions==4.9.*', 'donfig==0.8.*', 'obstore==0.5.*', + 'uvloop==0.20.0; sys_platform != "win32"', # test deps 'zarr[test]', 'zarr[remote_tests]', diff --git a/src/zarr/core/config.py b/src/zarr/core/config.py index a918b789dd..b6ea723938 100644 --- a/src/zarr/core/config.py +++ b/src/zarr/core/config.py @@ -107,7 +107,7 @@ def enable_gpu(self) -> ConfigSet: "order": "C", "write_empty_chunks": False, }, - "async": {"concurrency": 10, "timeout": None}, + "async": {"concurrency": 10, "timeout": None, "use_uvloop": True}, "threading": {"max_workers": None}, "json_indent": 2, "codec_pipeline": { diff --git a/src/zarr/core/sync.py b/src/zarr/core/sync.py index ffb04e764d..a093b5b644 100644 --- a/src/zarr/core/sync.py +++ b/src/zarr/core/sync.py @@ -4,6 +4,7 @@ import atexit import logging import os +import sys import threading from concurrent.futures import ThreadPoolExecutor, wait from typing import TYPE_CHECKING, TypeVar @@ -165,6 +166,31 @@ def sync( return return_result +def _create_event_loop() -> asyncio.AbstractEventLoop: + """Create a new event loop, optionally using uvloop if available and enabled.""" + use_uvloop = config.get("async.use_uvloop", True) + + if use_uvloop and sys.platform != "win32": + try: + import uvloop + + logger.debug("Creating Zarr event loop with uvloop") + # uvloop.new_event_loop() returns a loop compatible with AbstractEventLoop + loop: asyncio.AbstractEventLoop = uvloop.new_event_loop() + except ImportError: + logger.debug("uvloop not available, falling back to asyncio") + else: + return loop + else: + if not use_uvloop: + logger.debug("uvloop disabled via config, using asyncio") + else: + logger.debug("uvloop not supported on Windows, using asyncio") + + logger.debug("Creating Zarr event loop with asyncio") + return asyncio.new_event_loop() + + def _get_loop() -> asyncio.AbstractEventLoop: """Create or return the default fsspec IO loop @@ -175,8 +201,7 @@ def _get_loop() -> asyncio.AbstractEventLoop: # repeat the check just in case the loop got filled between the # previous two calls from another thread if loop[0] is None: - logger.debug("Creating Zarr event loop") - new_loop = asyncio.new_event_loop() + new_loop = _create_event_loop() loop[0] = new_loop iothread[0] = threading.Thread(target=new_loop.run_forever, name="zarr_io") assert iothread[0] is not None diff --git a/tests/test_config.py b/tests/test_config.py index c7a2eb7394..390a2c84e6 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -55,7 +55,7 @@ def test_config_defaults_set() -> None: "order": "C", "write_empty_chunks": False, }, - "async": {"concurrency": 10, "timeout": None}, + "async": {"concurrency": 10, "timeout": None, "use_uvloop": True}, "threading": {"max_workers": None}, "json_indent": 2, "codec_pipeline": { diff --git a/tests/test_sync.py b/tests/test_sync.py index c5eadb0f4f..f471e25343 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -1,4 +1,5 @@ import asyncio +import sys from collections.abc import AsyncGenerator from unittest.mock import AsyncMock, patch @@ -8,6 +9,7 @@ from zarr.core.sync import ( SyncError, SyncMixin, + _create_event_loop, _get_executor, _get_lock, _get_loop, @@ -66,7 +68,7 @@ def test_sync_timeout() -> None: async def foo() -> None: await asyncio.sleep(duration) - with pytest.raises(asyncio.TimeoutError): + with pytest.raises(TimeoutError): sync(foo(), timeout=duration / 10) @@ -150,16 +152,88 @@ def test_threadpool_executor(clean_state, workers: int | None) -> None: if workers is None: # confirm no executor was created if no workers were specified # (this is the default behavior) - assert loop[0]._default_executor is None + # Note: uvloop doesn't expose _default_executor attribute, so we skip this check for uvloop + if hasattr(loop[0], "_default_executor"): + assert loop[0]._default_executor is None else: # confirm executor was created and attached to loop as the default executor # note: python doesn't have a direct way to get the default executor so we - # use the private attribute - assert _get_executor() is loop[0]._default_executor + # use the private attribute (when available) assert _get_executor()._max_workers == workers + if hasattr(loop[0], "_default_executor"): + assert _get_executor() is loop[0]._default_executor def test_cleanup_resources_idempotent() -> None: _get_executor() # trigger resource creation (iothread, loop, thread-pool) cleanup_resources() cleanup_resources() + + +def test_create_event_loop_default_config() -> None: + """Test that _create_event_loop respects the default config.""" + # Reset config to default + with zarr.config.set({"async.use_uvloop": True}): + loop = _create_event_loop() + if sys.platform != "win32": + try: + import uvloop + + assert isinstance(loop, uvloop.Loop) + except ImportError: + # uvloop not available, should use asyncio + assert isinstance(loop, asyncio.AbstractEventLoop) + assert "uvloop" not in str(type(loop)) + else: + # Windows doesn't support uvloop + assert isinstance(loop, asyncio.AbstractEventLoop) + assert "uvloop" not in str(type(loop)) + + loop.close() + + +def test_create_event_loop_uvloop_disabled() -> None: + """Test that uvloop can be disabled via config.""" + with zarr.config.set({"async.use_uvloop": False}): + loop = _create_event_loop() + # Should always use asyncio when disabled + assert isinstance(loop, asyncio.AbstractEventLoop) + assert "uvloop" not in str(type(loop)) + loop.close() + + +@pytest.mark.skipif(sys.platform == "win32", reason="uvloop is not supported on Windows") +def test_create_event_loop_uvloop_enabled_non_windows() -> None: + """Test uvloop usage on non-Windows platforms when uvloop is installed.""" + uvloop = pytest.importorskip("uvloop") + + with zarr.config.set({"async.use_uvloop": True}): + loop = _create_event_loop() + assert isinstance(loop, uvloop.Loop) + loop.close() + + +@pytest.mark.skipif(sys.platform != "win32", reason="This test is specific to Windows behavior") +def test_create_event_loop_windows_no_uvloop() -> None: + """Test that uvloop is never used on Windows.""" + with zarr.config.set({"async.use_uvloop": True}): + loop = _create_event_loop() + # Should use asyncio even when uvloop is requested on Windows + assert isinstance(loop, asyncio.AbstractEventLoop) + assert "uvloop" not in str(type(loop)) + loop.close() + + +@pytest.mark.skipif(sys.platform == "win32", reason="uvloop is not supported on Windows") +def test_uvloop_mock_import_error(clean_state) -> None: + """Test graceful handling when uvloop import fails.""" + with zarr.config.set({"async.use_uvloop": True}): + # Mock uvloop import failure by putting None in sys.modules + # This simulates the module being unavailable/corrupted + with patch.dict("sys.modules", {"uvloop": None}): + # When Python tries to import uvloop, it will get None and treat it as ImportError + loop = _create_event_loop() + # Should fall back to asyncio + assert isinstance(loop, asyncio.AbstractEventLoop) + assert "uvloop" not in str(type(loop)) + loop.close()