Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ccdbfa9
Group fdb requests when lazy loading is used
sandorkertesz Sep 4, 2025
a158b6e
Merge branch 'develop' into feature/group-fbd-requests-for-lazy-loading
sandorkertesz Sep 29, 2025
c0316c1
Group fdb requests
sandorkertesz Sep 29, 2025
e98f02f
Group fdb
sandorkertesz Sep 29, 2025
233d9e5
Merge branch 'develop' into feature/group-fbd-requests-for-lazy-loading
sandorkertesz Sep 29, 2025
a4c6db9
Group
sandorkertesz Sep 30, 2025
d6c9fb0
Merge branch 'develop' into feature/group-fbd-requests-for-lazy-loading
sandorkertesz Oct 8, 2025
292915a
Fdb group
sandorkertesz Oct 8, 2025
3687742
Log
sandorkertesz Oct 8, 2025
43f4cfd
Log
sandorkertesz Oct 8, 2025
eda88fe
Log
sandorkertesz Oct 8, 2025
dd3e0f0
Log
sandorkertesz Oct 8, 2025
9a5ffe0
Fdb group
sandorkertesz Oct 8, 2025
f161118
Merge branch 'develop' into feature/group-fdb-requests-for-lazy-loading
sandorkertesz Oct 15, 2025
5758add
Merge from develop
sandorkertesz Oct 22, 2025
2936d39
Merge branch 'develop' into feature/group-fdb-requests-for-lazy-loading
sandorkertesz Nov 14, 2025
ba12d36
Merge branch 'develop' into feature/group-fdb-requests-for-lazy-loading
sandorkertesz Nov 14, 2025
a410c3d
Merge branch 'develop' into feature/group-fdb-requests-for-lazy-loading
sandorkertesz Nov 17, 2025
078d5a3
Merge from develop
sandorkertesz Nov 18, 2025
def843f
Merge branch 'develop' into feature/group-fdb-requests-for-lazy-loading
sandorkertesz Nov 19, 2025
f9683ea
Merge branch 'develop' into feature/group-fdb-requests-for-lazy-loading
sandorkertesz Nov 20, 2025
fe2ac81
Merge from develop
sandorkertesz Dec 1, 2025
c2f5969
Merge branch 'develop' into feature/group-fdb-requests-for-lazy-loading
sandorkertesz Jan 6, 2026
94d6d1d
Merge branch 'develop' into feature/group-fdb-requests-for-lazy-loading
sandorkertesz Jan 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions src/earthkit/data/core/fieldlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def array_namespace(self):
return eku_array_namespace(self._values())

@abstractmethod
def _values(self, dtype=None):
def _values(self, dtype=None, context=None):
r"""Return the raw values extracted from the underlying storage format
of the field.

Expand Down Expand Up @@ -136,7 +136,7 @@ def _metadata(self):
r"""Metadata: Get the object representing the field's metadata."""
self._not_implemented()

def to_numpy(self, flatten=False, dtype=None, index=None):
def to_numpy(self, flatten=False, dtype=None, index=None, context=None):
r"""Return the values stored in the field as an ndarray.

Parameters
Expand All @@ -157,7 +157,11 @@ def to_numpy(self, flatten=False, dtype=None, index=None):
Field values

"""
v = convert_array(self._values(dtype=dtype), array_namespace="numpy")
if context is not None:
v = self._values(dtype=dtype, context=context)
else:
v = self._values(dtype=dtype)
v = convert_array(v, array_namespace="numpy")
shape = self._required_shape(flatten)
if shape != v.shape:
v = v.reshape(shape)
Expand All @@ -166,7 +170,14 @@ def to_numpy(self, flatten=False, dtype=None, index=None):
return v

def to_array(
self, flatten=False, dtype=None, array_backend=None, array_namespace=None, device=None, index=None
self,
flatten=False,
dtype=None,
array_backend=None,
array_namespace=None,
device=None,
index=None,
context=None,
):
r"""Return the values stored in the field.

Expand Down Expand Up @@ -206,7 +217,11 @@ def to_array(
raise ValueError("to_array(): only one of array_backend and array_namespace can be specified")
array_namespace = array_backend

v = self._values(dtype=dtype)
if context is not None:
v = self._values(dtype=dtype, context=context)
else:
v = self._values(dtype=dtype)

if array_namespace is not None:
v = convert_array(v, array_namespace=array_namespace, device=device)

Expand Down
4 changes: 2 additions & 2 deletions src/earthkit/data/indexing/fieldlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ def __init__(self, field, values=None, metadata=None, **kwargs):
else:
self.__metadata = field._metadata

def _values(self, dtype=None):
def _values(self, dtype=None, context=None):
if self.__values is None:
return self._field._values(dtype=dtype)
return self._field._values(dtype=dtype, context=context)
else:
if dtype is None:
return self.__values
Expand Down
6 changes: 4 additions & 2 deletions src/earthkit/data/indexing/tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,11 +399,13 @@ def _prepare_tensor_data(self, source_to_array_func, index=None):
if all(i == slice(None, None, None) for i in index):
index = None

context = self

if index is None:
arr = source_to_array_func()
arr = source_to_array_func(context=context)
current_field_shape = self.field_shape
else:
arr = source_to_array_func(index=index)
arr = source_to_array_func(index=index, context=context)
if len(arr) > 0:
current_field_shape = tuple(arr.shape[1:])
else:
Expand Down
2 changes: 1 addition & 1 deletion src/earthkit/data/readers/geotiff.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def to_pandas(self):
series.name = self._da.name
return series

def _values(self, dtype=None):
def _values(self, dtype=None, context=None):
return self._da.values.astype(dtype)

def write(self, f):
Expand Down
2 changes: 1 addition & 1 deletion src/earthkit/data/readers/grib/codes.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def handle(self):
def _create_handle(self):
return GribCodesReader.from_cache(self.path).at_offset(self.offset)

def _values(self, dtype=None):
def _values(self, dtype=None, context=None):
return self.handle.get_values(dtype=dtype)

@property
Expand Down
100 changes: 88 additions & 12 deletions src/earthkit/data/readers/grib/virtual.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#

import logging
from collections import defaultdict
from threading import Lock

from earthkit.data.core.fieldlist import Field
from earthkit.data.core.metadata import WrappedMetadata
Expand All @@ -23,8 +25,9 @@


class VirtualGribField(Field):
def __init__(self, owner, request, metadata_alias, reference=None):
def __init__(self, owner, index, request, metadata_alias, reference=None):
self.owner = owner
self.index = index
self.request = request
self.metadata_alias = metadata_alias
self.reference = reference
Expand Down Expand Up @@ -85,7 +88,7 @@ def metadata(self, *keys, astype=None, remapping=None, patches=None, **kwargs):
)

def _one_metadata(self, key, remapping=None, patches=None, **kwargs):
# print(f"one_metadata key={key} kwargs={kwargs}")
# print(f"one_metadata key={key} kwargs={kwargs} {self.request=}")
if key in self.extra:
return self.extra[key]
if key in self.request:
Expand Down Expand Up @@ -118,23 +121,19 @@ def _metadata(self):

return WrappedMetadata(self.owner.reference._metadata, extra=r)

def _values(self, dtype=None):
return self._field._values(dtype=dtype)

@property
def _field(self):
if self.reference:
return self.reference
else:
return self.owner.retriever.get(self.request)[0]
def _values(self, dtype=None, context=None):
return self.owner._get_grib_field(self, context=context)._values(dtype=dtype)


class VirtualGribFieldList(GribFieldList):
def __init__(self, request_mapper, retriever):
def __init__(self, request_mapper, retriever, request_grouping=False):
self.request_mapper = request_mapper
self.retriever = retriever
self.request_grouping = request_grouping

self._info_cache = {}
self._group_cache = None
self._group_lock = Lock()

def __len__(self):
return len(self.request_mapper)
Expand All @@ -155,6 +154,7 @@ def _getitem(self, n):

return VirtualGribField(
self,
n,
self.request_mapper.request_at(n),
self.request_mapper.metadata_alias,
reference=self.reference if n == 0 else None,
Expand All @@ -173,3 +173,79 @@ def _get_info(self, param):

self._info_cache[param] = r
return r

def _get_grib_field(self, field, context=None):
if field.reference:
return self.reference
elif self.request_grouping:
if context is not None:
from earthkit.data.core.fieldlist import FieldList

source = None
if context is self:
source = self
elif hasattr(context, "source"):
source = context.source
elif isinstance(context, FieldList):
source = context

if source is not None:
with self._group_lock:
if self._group_cache is not None:
if field.index in self._group_cache.fields:
return self._group_cache.fields[field.index]

self._group_cache = None
self._group_cache = self._create_group(context)
return self._group_cache.fields[field.index]

return self.retriever.get(field.request)[0]

def _create_group(self, context):
return VirtualGroup.from_source(context, self)


class VirtualGroup:
def __init__(self, owner, field_index, request):
"""
Parameters
----------
owner: FDBFileSource
The owner of this group
field_index: list of int
The list of field indices in the owner's dataset that correspond to this group.
request: dict
The request defining the group. The request formed as the smallest request
that includes all fields in the group. All fields requests in
``field_index`` must match

"""
self.owner = owner
self.request = request

# get the GRIB data
ds = self.owner.retriever.get(request)

# create a mapping from the field indices in the group to the downloaded GRIB data
request_mapper = owner.request_mapper.clone(request)
fields = {}
for i, index in enumerate(field_index):
field_request = owner.request_mapper.request_at(index)
new_index = request_mapper.index_from_request(field_request)
fields[index] = ds[new_index]
self.fields = fields

@classmethod
def from_source(cls, source, owner):
field_index = []
r = defaultdict(set)
for f in source:
if hasattr(f, "owner") and f.owner is owner:
field_index.append(f.index)
for k, v in f.request.items():
r[k].add(v)

for k in list(r.keys()):
r[k] = sorted(list(r[k]))

return cls(owner, field_index, r)
2 changes: 1 addition & 1 deletion src/earthkit/data/readers/netcdf/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def _to_numpy(self):
values = self._ds[self.variable].isel(dimensions).values
return values

def _values(self, dtype=None):
def _values(self, dtype=None, context=None):
if dtype is None:
return self._to_numpy()
else:
Expand Down
2 changes: 1 addition & 1 deletion src/earthkit/data/sources/array_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, array, metadata):
def _array(self):
return self._array_

def _values(self, dtype=None):
def _values(self, dtype=None, context=None):
"""Native array type"""
if dtype is None:
return self._array
Expand Down
18 changes: 16 additions & 2 deletions src/earthkit/data/sources/fdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,25 @@


class FDBSource(Source):
def __init__(self, *args, request=None, stream=True, config=None, userconfig=None, lazy=False, **kwargs):
def __init__(
self,
*args,
request=None,
stream=True,
config=None,
userconfig=None,
lazy=False,
lazy_request_grouping=False,
**kwargs,
):
super().__init__()

for k in ["group_by", "batch_size"]:
if k in kwargs:
raise ValueError(f"Invalid argument '{k}' for FDBSource. Deprecated since 0.8.0.")

self.lazy = lazy
self.lazy_request_grouping = lazy_request_grouping
self._fdb_kwargs = {}
if config is not None:
self._fdb_kwargs["config"] = config
Expand Down Expand Up @@ -85,7 +96,7 @@ def mutate(self):
retriever = FDBRetriever(self._fdb_kwargs)
from earthkit.data.readers.grib.virtual import VirtualGribFieldList

return VirtualGribFieldList(mapper, retriever)
return VirtualGribFieldList(mapper, retriever, request_grouping=self.lazy_request_grouping)


class FDBFileSource(FileSource):
Expand Down Expand Up @@ -152,5 +163,8 @@ def _convert(data):

return data

def clone(self, request):
return FDBRequestMapper(request, fdb_kwargs=self.fdb_kwargs)


source = FDBSource
2 changes: 1 addition & 1 deletion src/earthkit/data/sources/forcings.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def _metadata(self):
)
return ForcingMetadata(d, self.maker.field.metadata().geography)

def _values(self, dtype=None):
def _values(self, dtype=None, context=None):
values = self.proc(self.date)
if dtype is not None:
values = values.astype(dtype)
Expand Down
6 changes: 6 additions & 0 deletions src/earthkit/data/utils/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,5 +162,11 @@ def _build(self):
def request_at(self, index):
return self.field_requests[index]

def index_from_request(self, request):
for i, r in enumerate(self.field_requests):
if r == request:
return i
return -1

def __len__(self):
return len(self.field_requests)
Loading
Loading