-
Notifications
You must be signed in to change notification settings - Fork 26
Description
What happened?
I am looking for mechanisms to speed up selection operations on field lists that are backed by large numbers of grib files loaded via ekd.from_source('file', '/my/path/*.grib').
I discovered the Grib Indexing feature that would fit my use-case pretty well.
I changed my code to load my fields using ekd.from_source('file', '/my/path/*.grib', indexing=True) and then perform various selection operations before finally converting the field list to an xarray dataset.
Although the selection operations (.sel) seem to work, converting the fiedlist to xarray raises an Error.
Please let me know if this is officially not supported yet or if I'm somehow using this feature wrong.
What are the steps to reproduce the bug?
import earthkit.data as ekd
ekd.download_example_file("tuv_pl.grib")
fs = ekd.from_source("file", "tuv_pl.grib", indexing=True)
fs.to_xarray() # Raises OperationalError
fs.sel(param='t').to_xarray() # Raises OperationalErrorI attached the stack trace of the raised error message below.
The error OperationalError: no such column: i_number indicates that an expected column does not exist in the cache (sqlite database). For this reason, I also attached below the schema of the created cache db (which indeed does not contain the i_number column):
$ sqlite3 /my/cache/path/grib-index-b41c500.db
SQLite version 3.26.0 2018-12-01 12:34:55
Enter ".help" for usage hints.
sqlite> .tables
entries paths
sqlite> .schema entries
CREATE TABLE entries (i_domain TEXT,i_levtype TEXT,i_levelist INTEGER,i_date INTEGER,i_time INTEGER,i_step INTEGER,i_param TEXT,i_class TEXT,i_type TEXT,i_stream TEXT,i_expver TEXT,i_valid_datetime TEXT,i_param_level TEXT,mean FLOAT,std FLOAT,min FLOAT,max FLOAT,shape TEXT,path TEXT,offset INTEGER,length INTEGER,param_id TEXT,i_md5_grid_section TEXT);
Version
develop (55032df)
Platform (OS and architecture)
Linux (Atos)
Relevant log output
OperationalError Traceback (most recent call last)
Cell In[7], line 1
----> 1 fs.sel(param='t').to_xarray()
File [CENSORED_PATH]/xarray.py:426, in XarrayMixIn.to_xarray(self, engine, xarray_open_dataset_kwargs, **kwargs)
423 backend_kwargs[key] = user_xarray_open_dataset_kwargs.pop(key)
424 user_xarray_open_dataset_kwargs["backend_kwargs"] = backend_kwargs
--> 426 return engines[engine](user_xarray_open_dataset_kwargs)
File [CENSORED_PATH]/xarray.py:467, in XarrayMixIn.to_xarray_earthkit(self, user_kwargs)
463 other_kwargs = xarray_open_dataset_kwargs
465 from earthkit.data.utils.xarray.builder import from_earthkit
--> 467 return from_earthkit(self, backend_kwargs=backend_kwargs, other_kwargs=other_kwargs)
File [CENSORED_PATH]/builder.py:726, in from_earthkit(ds, backend_kwargs, other_kwargs)
724 for k in NON_XR_OPEN_DS_KWARGS:
725 backend_kwargs.pop(k, None)
--> 726 return xarray.open_dataset(ds, backend_kwargs=backend_kwargs, **other_kwargs)
727 else:
728 return SplitDatasetBuilder(ds, profile, other_kwargs=other_kwargs).build()
File [CENSORED_PATH]/api.py:687, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs)
675 decoders = _resolve_decoders_kwargs(
676 decode_cf,
677 open_backend_dataset_parameters=backend.open_dataset_parameters,
(...) 683 decode_coords=decode_coords,
684 )
686 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 687 backend_ds = backend.open_dataset(
688 filename_or_obj,
689 drop_variables=drop_variables,
690 **decoders,
691 **kwargs,
692 )
693 ds = _dataset_from_backend_dataset(
694 backend_ds,
695 filename_or_obj,
(...) 705 **kwargs,
706 )
707 return ds
File [CENSORED_PATH]/engine.py:358, in EarthkitBackendEntrypoint.open_dataset(self, filename_or_obj, source_type, profile, variable_key, drop_variables, rename_variables, mono_variable, extra_dims, drop_dims, ensure_dims, fixed_dims, dim_roles, dim_name_from_role_name, rename_dims, dims_as_attrs, time_dim_mode, level_dim_mode, squeeze, add_valid_time_coord, decode_times, decode_timedelta, add_geo_coords, attrs_mode, attrs, variable_attrs, global_attrs, coord_attrs, add_earthkit_attrs, rename_attrs, fill, remapping, flatten_values, lazy_load, release_source, strict, dtype, array_module, errors)
318 from .builder import SingleDatasetBuilder
320 _kwargs = dict(
321 variable_key=variable_key,
322 drop_variables=drop_variables,
(...) 355 errors=errors,
356 )
--> 358 return SingleDatasetBuilder(fieldlist, profile, from_xr=True, backend_kwargs=_kwargs).build()
File [CENSORED_PATH]/builder.py:615, in SingleDatasetBuilder.build(self)
614 def build(self):
--> 615 ds_sorted, _ = self.parse(self.ds, self.profile)
616 dims = self.profile.dims.to_list()
617 LOG.debug(f"{dims=}")
File [CENSORED_PATH]/builder.py:576, in DatasetBuilder.parse(self, ds, profile, full)
571 profile.update(ds_xr)
572 # LOG.debug(f"after update: {profile.dim_keys=}")
573
574 # LOG.debug(f"{profile.sort_keys=}")
575 # the data is only sorted once
--> 576 ds_xr = ds_xr.order_by(profile.sort_keys)
578 if not profile.lazy_load and profile.release_source:
579 ds_xr.make_releasable()
File [CENSORED_PATH]/fieldlist.py:182, in XArrayInputFieldList.order_by(self, *args, **kwargs)
180 return ds
181 else:
--> 182 ds = self.ds.order_by(*args, remapping=self.remapping, **kwargs)
183 ds = XArrayInputFieldList(
184 ds,
185 db=self.db,
186 remapping=self.remapping,
187 )
188 return ds
File [CENSORED_PATH]/sql.py:102, in FieldListInFilesWithSqlIndex.order_by(self, remapping, *args, **kwargs)
99 out = out.filter(SqlRemapping(remapping=remapping))
101 if kwargs:
--> 102 out = out.filter(SqlOrder(kwargs))
104 return out
File [CENSORED_PATH]/sql.py:82, in FieldListInFilesWithSqlIndex.filter(self, filter)
80 return self
81 db = self.db.filter(filter)
---> 82 return self.__class__(db=db)
File [CENSORED_PATH]/__init__.py:22, in MetaBase.__call__(cls, *args, **kwargs)
20 obj = cls.__new__(cls, *args, **kwargs)
21 args, kwargs = cls.patch(obj, *args, **kwargs)
---> 22 obj.__init__(*args, **kwargs)
23 return obj
File [CENSORED_PATH]/db.py:34, in FieldListInFilesWithDBIndex.__init__(self, db, **kwargs)
31 self._cache = None
32 self._dict_cache = None
---> 34 super().__init__(**kwargs)
File [CENSORED_PATH]/__init__.py:372, in GribFieldListInFiles.__init__(self, grib_field_policy, grib_handle_policy, grib_handle_cache_size, use_grib_metadata_cache, *args, **kwargs)
369 def _get_opt(v, name):
370 return v if v is not None else CONFIG.get(name)
--> 372 self._field_manager = GribFieldManager(_get_opt(grib_field_policy, "grib-field-policy"), self)
373 self._handle_manager = GribHandleManager(
374 _get_opt(grib_handle_policy, "grib-handle-policy"),
375 _get_opt(grib_handle_cache_size, "grib-handle-cache-size"),
376 )
378 self._use_metadata_cache = _get_opt(use_grib_metadata_cache, "use-grib-metadata-cache")
File [CENSORED_PATH]/__init__.py:255, in GribFieldManager.__init__(self, policy, owner)
251 from lru import LRU
253 # TODO: the number of fields might only be available only later (e.g. fieldlists with
254 # an SQL index). Consider making cache a cached property.
--> 255 n = len(owner)
256 if n > 0:
257 self.cache = LRU(n)
File [CENSORED_PATH]/__init__.py:403, in GribFieldListInFiles.__len__(self)
402 def __len__(self):
--> 403 return self.number_of_parts()
File [CENSORED_PATH]/decorators.py:322, in cached_method.<locals>.wrapped(self)
319 @functools.wraps(method)
320 def wrapped(self):
321 if getattr(self, name, None) is None:
--> 322 setattr(self, name, method(self))
323 return getattr(self, name)
File [CENSORED_PATH]/sql.py:132, in FieldListInFilesWithSqlIndex.number_of_parts(self)
130 @cached_method
131 def number_of_parts(self):
--> 132 return self.db.count()
File [CENSORED_PATH]/sql.py:582, in SqlDatabase.count(self)
580 def count(self):
581 statement = f"SELECT COUNT(*) FROM {self.view};"
--> 582 for result in execute(self.connection, statement):
583 return result[0]
584 assert False, statement
File [CENSORED_PATH]/sql.py:50, in execute(connection, statement, *arg, **kwargs)
48 assert False
49 dump_sql(statement)
---> 50 return connection.execute(statement, *arg, **kwargs)
OperationalError: no such column: i_numberAccompanying data
No response
Organisation
No response