Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
fd5811f
add note on using open_virtual_mfdataset with combine=nested
TomNicholas May 9, 2025
4cfced5
clarification
TomNicholas May 9, 2025
bc31fb6
combine by coords
TomNicholas May 9, 2025
d261257
glob
TomNicholas May 9, 2025
8bf6c3c
add scaling page with pre-requisites
TomNicholas May 9, 2025
426740e
strategy
TomNicholas May 10, 2025
183def5
threadpool executor
TomNicholas May 10, 2025
14d8e84
dask
TomNicholas May 10, 2025
a04ca01
lithops
TomNicholas May 10, 2025
616f33f
custom executor
TomNicholas May 10, 2025
56f8aa0
kerchunk references format
TomNicholas May 10, 2025
683643d
memory usage
TomNicholas May 10, 2025
ff82113
caching
TomNicholas May 10, 2025
e75552f
batching
TomNicholas May 10, 2025
5061170
retries
TomNicholas May 10, 2025
71e71fb
add open_virtual_mfdataset to docs
TomNicholas May 11, 2025
66641ff
improve docstring
TomNicholas May 11, 2025
60eb6de
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 11, 2025
b2c39c2
release note
TomNicholas May 11, 2025
8500706
spelling
TomNicholas May 11, 2025
27fde97
Correct syntax in code example
TomNicholas Jun 17, 2025
f880142
comment about Icechunk
TomNicholas Jun 17, 2025
852d434
Merge branch 'develop' into docs-open_virtual_mfdataset
TomNicholas Jun 20, 2025
3c7b507
add executors to API docs
TomNicholas Jun 20, 2025
e6743d9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 20, 2025
a50482d
correct class name
TomNicholas Jun 20, 2025
d2887b4
spelling
TomNicholas Jun 20, 2025
6c390ba
add new page to nav
TomNicholas Jun 20, 2025
6654b31
correct name of class in api docs
TomNicholas Jun 20, 2025
b767b11
Max's APi links suggestions
TomNicholas Jun 24, 2025
e11fc04
reader->parser
TomNicholas Jun 24, 2025
a25dd8f
Chunk's grammaritcal usggestions
TomNicholas Jun 24, 2025
228792e
Merge branch 'docs-open_virtual_mfdataset' of https://github.com/TomN…
TomNicholas Jun 24, 2025
4fd1b30
another cross-reference
TomNicholas Jun 24, 2025
9ddac2b
correct API link in release notes
TomNicholas Jun 24, 2025
3cb440f
try linking to xarray API docs
TomNicholas Jun 24, 2025
247ac37
Relative link within page to caching section
TomNicholas Jun 24, 2025
29256d5
Use adminoitions for TODos
TomNicholas Jun 24, 2025
549c1b0
Correct admonition syntax for note
TomNicholas Jun 24, 2025
a7d9ae1
Merge branch 'develop' into docs-open_virtual_mfdataset
TomNicholas Jun 24, 2025
0db47c5
Merge branch 'docs-open_virtual_mfdataset' of https://github.com/TomN…
TomNicholas Jun 24, 2025
77dc926
add a bunch more cross-linked references
TomNicholas Jun 24, 2025
8f1feac
try removing backticks
TomNicholas Jun 24, 2025
07386cd
remove links to virtualizarr.parallel module
TomNicholas Jun 24, 2025
64fb153
missed one
TomNicholas Jun 24, 2025
3b4235f
add Dockerfile
TomNicholas Jun 24, 2025
3d3a2e1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 24, 2025
07a3f13
add link to Dockerfile
TomNicholas Jun 24, 2025
eef5fe4
Merge branch 'docs-open_virtual_mfdataset' of https://github.com/TomN…
TomNicholas Jun 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Reading
:toctree: generated/

open_virtual_dataset
open_virtual_mfdataset

Serialization
-------------
Expand Down
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ examples
faq
api
data_structures
scaling
custom_readers
releases
contributing
Expand Down
4 changes: 4 additions & 0 deletions docs/releases.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ Documentation

- Added new docs page on how to write a custom reader for bespoke file formats (:issue:`452`, :pull:`580`)
By `Tom Nicholas <https://github.com/TomNicholas>`_.
- Added new docs page on how to scale VirtualiZarr effectively (:pull:`590`)
By `Tom Nicholas <https://github.com/TomNicholas>`_.
- Documented the new :py:func:`open_virtual_mfdataset` function (:pull:`590`)
By `Tom Nicholas <https://github.com/TomNicholas>`_.
- Added MUR SST virtual and zarr icechunk store generation using lithops example.
(:pull:`475`) by `Aimee Barciauskas <https://github.com/abarciauskas-bgse>`_.
- Added FAQ answer about what data can be virtualized (:issue:`430`, :pull:`532`)
Expand Down
262 changes: 262 additions & 0 deletions docs/scaling.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
(scaling)=

# Scaling

This page explains how to scale up your usage of VirtualiZarr to cloud-optimize large numbers of files.

## Pre-requisites

Before you attempt to use VirtualiZarr on a large number of files at once, you should check that you can successfully use the library on a small subset of your data.

In particular, you should check that:
- You can call `open_virtual_dataset` on one of your files, which requires there to be a reader which can interpret that file format.
- After calling `open_virtual_dataset` on a few files making up a representative subset of your data, you can concatenate them into one logical datacube without errors (see the [FAQ](faq.md#can-my-specific-data-be-virtualized) for possible reasons for errors at this stage).
- You can serialize those virtual references to some format (e.g. Kerchunk/Icechunk) and read the data back.
- The data you read back is exactly what you would have expected to get if you read the data from the original files.

If you don't do these checks now, you might find that you deploy a large amount of resources to run VirtualiZarr on many files, only to hit a problem that you could have found much earlier.

## Strategy

### The need for parallelization

VirtualiZarr is a tool designed for taking a large number of slow-to-access files (i.e. non-cloud-optimized data) and creating a way to make all subsequent accesses much faster (i.e. a cloud-optimized datacube).

Running `open_virtual_dataset` on just one file can take a while (seconds to minutes), because for data sat in object storage, fetching just the metadata can be almost as time-consuming as fetching the actual data.
(For a full explanation as to why [see this article](https://earthmover.io/blog/fundamentals-what-is-cloud-optimized-scientific-data)).
In some cases we may find it's easiest to load basically the entire contents of the file in order to virtualize it.

Therefore we should expect that running VirtualiZarr on all our data files will take a long time - we are paying this cost once up front so that our users do not have to pay it again on subsequent data accesses.

However, the `open_virtual_dataset` calls for each file are completely independent, meaning that part of the computation is "embarrassingly parallelizable".

### Map-reduce

The problem of scaling VirtualiZarr is an example of a classic map-reduce problem, with two parts:

1. We first must apply the `open_virtual_dataset` function over every file we want to virtualize. This is the map step, and can be parallelized.
2. Then we must take all the resultant virtual datasets (one per file), and combine them together into one final virtual dataset. This is the reduce step.

Finally we write this single virtual dataset to some persistent format.
We have already reduced the data, so this step is a third step, the serialization step.

In our case the amount of data being reduced is fairly small - each virtual dataset is hopefully only a few kBs in memory, small enough to send over the network.
Even a million such virtual datasets together would only require a few GB of RAM in total to hold in memory at once.
This means that as long as we can get all the virtual datasets to be sent back successfully, the reduce step can generally be performed in memory on a single small machine, such as a laptop.
This avoids the need for more complicated parallelization strategies such as a tree-reduce.

## Parallelization Approaches

There are two ways you can implement a map-reduce approach to virtualization in your code.
The first is to write it yourself, and the second is to use `open_virtual_mfdataset`.

### Manual parallelism

You are free to call `open_virtual_dataset` on your various files however you like, using any method to apply them, including applying them in parallel.

For example you may want to parallelize using the [dask library](https://www.dask.org/), which you can do by wrapping each call using `dask.delayed` like this:

```python
import virtualizarr as vz
import dask

tasks = [dask.delayed(vz.open_virtual_dataset)(path) for path in filepaths]
virtual_datasets = dask.compute(tasks)
```

This returns a list of virtual `xr.Dataset` objects, which you can then combine:

```python
import xarray as xr

combined_vds = xr.combine_by_coords(virtual_datasets)
```

### The `parallel` kwarg to `open_virtual_mfdataset`

Alternatively, you can use `virtualizarr.open_virtual_mfdataset`'s `parallel` keyword argument.

This argument allows you to conveniently choose from a range of pre-defined parallel execution frameworks, or even pass your own executor.

The resulting code only takes one function call to generate virtual references in parallel and combine them into one virtual dataset.

```python
combined_vds = vz.open_virtual_mfdataset(filepaths, parallel=<choice_of_executor>)
```

VirtualiZarr's `open_virtual_mfdataset` is designed to mimic the API of Xarray's `open_mfdataset`, and so accepts all the same keyword argument options for combining.

## Executors

VirtualiZarr comes with a small selection of executors you can choose from when using `open_virtual_mfdataset`.

```{note}
If you prefer to do manual parallelism but would like to use one of these executors you can - just import the executor directly from the `virtualizarr.parallel` namespace and use its `.map` method.
```

TODO: auto-generate API docs for all the executors here

### Serial

The simplest executor is the `SerialExecutor`, which executes all the `open_virtual_dataset` calls in serial, not in parallel.
It is the default executor.

### Threads or Processes

One way to parallelize creating virtual references from a single machine is to across multiple threads or processes.
For this you can use the `ThreadPoolExecutor` or `ProcessPoolExecutor` class from the `concurrent.futures` module in the python standard library.
You simply pass the executor class directly via the `parallel` kwarg.

```python
from concurrent.futures import ThreadPoolExecutor

combined_vds = vz.open_virtual_mfdataset(filepaths, parallel=ThreadPoolExecutor)
```

This can work well when virtualizing files in remote object storage because it parallelizes the issuing of HTTP GET requests for each file.

### Dask Delayed

You can parallelize using `dask.delayed` automatically by passing `parallel='dask'`.
This will select the `virtualizarr.parallel.DaskDelayedExecutor`.

```python
combined_vds = vz.open_virtual_mfdataset(filepaths, parallel='dask')
```

This uses the same approach that `xarray.open_mfdataset` does when `parallel=True` is passed to it.
Using `dask.delayed` allows for parallelizing with any type of dask cluster, included a managed [Coiled](http://www.coiled.io) cluster.

### Lithops

As the map step is totally embarrassingly parallel, it can be performed entirely using serverless functions.
This approach allows for virtualizing N files in the same time it takes to virtualize 1 file, (assuming you can provision N concurrent serverless functions), avoiding the need to configure, scale, and shutdown a cluster.

You can parallelize VirtualiZarr serverlessly by using the [lithops](http://lithops-cloud.github.io) library.
Lithops can run on all the main cloud provider's serverless FaaS platforms.

To run on lithops you need to configure lithops for the relevant compute backend (e.g. AWS Lambda), build a runtime using Docker ([example Dockerfile]() with the required dependencies), and ensure the necessary cloud permissions to run are available.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the example Dockerfile?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 3b4235f.

Will need to be changed to keep up with releases.

Then you can use the `virtualizarr.parallel.LithopsEagerFunctionExecutor` simply via

```python
combined_vds = vz.open_virtual_mfdataset(filepaths, parallel='lithops')
```

### Custom Executors

You can also define your own executor to run in some other way, for example on a different serverless platform such as [Modal](https://modal.com).

Your custom executor must inherit from the `concurrent.futures.Executor` ABC, and must implement the `.map` method.

```python
from concurrent.futures import Executor

class CustomExecutor(Executor):
def map(
self,
fn: Callable,
*iterables: Iterable,
) -> Iterator:

combined_vds = vz.open_virtual_mfdataset(filepaths, parallel=CustomExecutor)
```

## Memory usage

For the virtualization to succeed you need to ensure that your available memory is not exceeded at any point.
There are 3 points at which this might happen:

1. While generating references
2. While combining references
3. While writing references

While generating references each worker calling `open_virtual_dataset` needs to avoid running out of memory.
This primarily depends on how the file is read - see the section on [Caching](scaling.md#Caching) below.

The combine step happens back on the machine on which `vz.open_virtual_mfdataset` was called, so while combining references that machine must have enough memory to hold all the virtual references at once.
You can find the in-memory size of the references for a single virtual dataset by calling the `.nbytes` accessor method on it (not to be confused with the `.nbytes` xarray method, which returns the total size if all that data were actually loaded into memory).
Do this for one file, and multiply by the number of files you have to estimate the total memory required for this step.

Writing the combined virtual references out requires converting them to a different references format, which may have different memory requirements.

## Scalability of references formats

After the map-reduce operation is complete, you will likely still want to persist the virtual references in some format.
Depending on the format, this step may also have scalability concerns.

### Kerchunk

The Kerchunk references specification supports 3 formats - an in-memory (nested) `dict`, JSON, and Parquet.

Both the in-memory Kerchunk `dict` and Kerchunk JSON formats are extremely inefficient ways to represent virtual references.
You may well find that a virtual dataset object that easily fits in memory suddenly uses up many times more memory or space on disk when converted to one of these formats.
Persisting large numbers of references in these formats is therefore not recommended.

The Kerchunk Parquet format is more scalable, but you may want to experiment with the `record_size` and `categorical_threshold` arguments to the virtualizarr `.to_kerchunk` accessor method.

### Icechunk

TODO

## Tips for success

Here are some assorted tips for successfully scaling VirtualiZarr.

### Caching remote files

When you call `open_virtual_dataset` on a remote file, it needs to extract the metadata and store it in memory (the returned virtual dataset).

One way to do this is to issue HTTP range requests only for each piece of metadata.
This will download the absolute minimum amount of data in total, but issue a lot of HTTP requests, each of which can take a long time to be returned from high-latency object storage.
This approach therefore uses the minimum amount of memory on the worker but takes more time.

TODO: Describe how this is the default with obstore
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also really requires #625 to be clear what the settings are and how to change them.


The other extreme is to download the entire file up front.
This downloads all the metadata by definition, but also all the actual data, which is likely millions of times more than you need for virtualization.
This approach usually takes a lot less time on the worker but requires the maximum amount of memory - using this approach on every file in the dataset entails downloading the entire dataset across all workers!

TODO: How to enable this by passing `cache=True` to obstore
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requires #625


There are various tricks one can use when fetching metadata, such as pre-fetching, minimum fetch sizes, or read-ahead caching strategies.
All of these approaches will put your memory requirements somewhere in between the two extremes described above, and are not necessary for successful execution.

Generally if you have access only to a limited amount of RAM you want to avoid caching to avoid running out of memory, whereas if you are able to scale out across many workers (e.g. serverlessly using lithops) your job will complete faster if you cache the files.
Caching a file onto a worker requires that the memory available on that worker is greater than the size of the file.

### Batching

You don't need to create and write virtual references for all your files in one go.

Creating virtual references for subsets of files in batches means the memory requirements for combining and serializing each batch are lower.

Batching also allows you to pick up where you left off.
This works particularly well with Icechunk, as you can durably commit each batch of references in a separate transaction.

```python
import icehunk as ic

repo = ic.open(<repo_url>)

for i, batch in enumerate(file_batches):
session = repo.writable_session("main")

combined_batch_vds = vz.open_virtual_mfdataset(batch)

combined_batch_vds.virtualize.to_icechunk(session.store, append_dim=...)

session.commit(f"wrote virtual references for batch {i}")
```

Notice this workflow could also be used for appending data only as it becomes available, e.g. by replacing the for loop with a cron job.

### Retries

Sometimes an `open_virtual_dataset` call might fail for a transient reason, such as a failed HTTP response from a server.
In such a scenario automatically retrying the failed call might be enough to obtain success and keep the computation proceeding.

If you are batching your computation then you could retry each loop iteration if any `open_virtual_dataset` calls fail, but that's potentially very inefficient, because that would also retry the successful calls.

Instead what is more efficient is to use per-task retries at te executor level.

TODO: We plan to add support for automatic retries to the Lithops and Dask executors (see Github PR #575)
31 changes: 30 additions & 1 deletion docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,14 @@ Without indexes we can avoid loading any data whatsoever from the files.
However, you should first be confident that the archival files actually do have compatible data, as the coordinate values then cannot be efficiently compared for consistency (i.e. aligned).
```

You can achieve both the opening and combining steps for multiple files in one go by using `open_virtual_mfdataset <virtualizarr.open_virtual_mfdataset>`.

```python
combined_vds = xr.open_virtual_mfdataset(['air1.nc', 'air2.nc'], concat_dim='time', combine='nested')
```

We passed `combine='nested'` to specify that we want the datasets to be combined in the order they appear, using `xr.combine_nested` under the hood.

### Ordering by coordinate values

If you're happy to load 1D dimension coordinates into memory, you can use their values to do the ordering for you!
Expand All @@ -264,7 +272,8 @@ combined_vds = xr.combine_by_coords([vds2, vds1])
```

Notice we don't have to specify the concatenation dimension explicitly - xarray works out the correct ordering for us.
Even though we actually passed in the virtual datasets in the wrong order just now, the manifest still has the chunks listed in the correct order such that the 1-dimensional `time` coordinate has ascending values:
Even though we actually passed in the virtual datasets in the wrong order just now, they have been combined in the correct order such that the 1-dimensional `time` coordinate has ascending values.
As a result our chunk manifest still has the chunks listed in the expected order:

```python
combined_vds['air'].data.manifest.dict()
Expand All @@ -275,10 +284,30 @@ combined_vds['air'].data.manifest.dict()
'1.0.0': {'path': 'file:///work/data/air2.nc', 'offset': 15419, 'length': 3869000}}
```

Again, we can achieve both the opening and combining steps for multiple files in one go by using `open_virtual_mfdataset <virtualizarr.open_virtual_mfdataset>`, but this passing `combine='by_coords'`.

```python
combined_vds = xr.open_virtual_mfdataset(['air2.nc', 'air1.nc'], combine='by_coords')
```

We can even pass in a glob to find all the files we want to automatically combine:

```python
combined_vds = xr.open_virtual_mfdataset('air*.nc', combine='by_coords')
```

### Ordering using metadata

TODO: Use preprocess to create a new index from the metadata. Requires `open_virtual_mfdataset` to be implemented in [PR #349](https://github.com/zarr-developers/VirtualiZarr/pull/349).

### Combining many virtual datasets at once

Combining a large number (e.g. 1000's) of virtual datasets at once should be very quick (a few seconds), as we are just manipulating a few kBs of metadata in memory.

However creating 1000's of virtual datasets at once can take a very long time.
(If that was quick there would be little need for this library!)
See the page on [Scaling](scaling.md) for tips on how to create large numbers of virtual datasets at once.

## Writing virtual stores to disk

Once we've combined references to all the chunks of all our archival files into one virtual xarray dataset, we still need to write these references out to disk so that they can be read by our analysis code later.
Expand Down
4 changes: 3 additions & 1 deletion virtualizarr/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ def open_virtual_mfdataset(
"""
Open multiple files as a single virtual dataset.

This function is explicitly modelled after `xarray.open_mfdataset`, and works in the same way.

If combine='by_coords' then the function ``combine_by_coords`` is used to combine
the datasets into one before returning the result, and if combine='nested' then
``combine_nested`` is used. The filepaths must be structured according to which
Expand Down Expand Up @@ -295,7 +297,7 @@ def open_virtual_mfdataset(

Notes
-----
The results of opening each virtual dataset in parallel are sent back to the client process, so must not be too large.
The results of opening each virtual dataset in parallel are sent back to the client process, so must not be too large. See the docs page on Scaling.
"""

# TODO this is practically all just copied from xarray.open_mfdataset - an argument for writing a virtualizarr engine for xarray?
Expand Down