Skip to content

Overwriting metadata and multiple process FDB retrieval #646

@jinmannwong

Description

@jinmannwong

What happened?

Two issues I think related to the wrapping of eccodes. The first is related to overriding the metadata with edition = 2 and the second is retrieving data from the FDB from parent and child processes. There is also a small bug commented in the tests below on calling print on the output of earthkit.data.from_source("fdb", req, stream=True, read_all=True), which is due to ds._load() is not called in __repr__, I think.

What are the steps to reproduce the bug?

The two test cases are test_metadata and test_fdb.

import tempfile
import os
import shutil

import pyfdb
import earthkit.data

def temp_fdb() -> pyfdb.FDB:
    tmpdir = tempfile.mkdtemp()
    os.makedirs(f"{tmpdir}/etc/fdb")
    os.mkdir(f"{tmpdir}/fdb")
    shutil.copyfile(f"/home/fdbprod/etc/fdb/schema", f"{tmpdir}/etc/fdb/schema")
    with open(f"{tmpdir}/etc/fdb/config.yaml", "w") as f:
        f.write(
            f"""
---
type: local
engine: toc
schema: "{tmpdir}/etc/fdb/schema"
spaces:
- roots:
    - path: {tmpdir}/fdb
"""
        )
    os.environ["FDB_HOME"] = str(tmpdir)
    os.environ["FDB_HANDLE_LUSTRE_STRIPE"] = "0"
    return pyfdb.FDB(), tmpdir


def load_input(param: int, step: int):
    req = {
        "class": "od",
        "expver": "0001",
        "stream": "enfo",
        "date": 20250309,
        "time": 0,
        "domain": "g",
        "type": "cf",
        "levtype": "sfc",
        "step": step,
        "param": param,
    }
    ds = earthkit.data.from_source("fdb", req, stream=True, read_all=True)
    # print(ds) # This also raises an error because ds._load() is not called in __repr__
    if len(ds) == 0:
        raise ValueError(f"No data found for step {step}")

    return earthkit.data.FieldList.from_array(ds.values, ds.metadata())


def test_metadata():
    """
    Problems overring the metadata multiple times with edition = 2. Tried with 
    other keys and didn't have a problem. Error does not appear to occur if I replace the 
    second override with a direct set on the underling ecccodes handle. Test uses test data file in 
    earthkit-data repo.
    """
    fields = earthkit.data.from_source("file", "./tests/data/u_pl.grib")
    u = fields.sel(param="u")[0]
    template = u.metadata().override({"edition": 2})
    fields2 = earthkit.data.FieldList.from_array(u.values, template)
    template2 = fields2[0].metadata().override({"edition": 2})
    template2._handle.set_array("values", u.values)

        
def test_fdb():
    """
    Error occurs when reading from the FDB from parent and child processes. I tested reading 
    the FDB stream directly the eccodes.reader.StreamReader and didn't seem to encounter the error. 
    """
    fdb, tmpdir = temp_fdb()
    ds = earthkit.data.from_source("mars", {
        "class": "od", 
        "type": "cf", 
        "stream": "enfo",
        "expver": "0001", 
        "levtype": "sfc", 
        "param": "47.128/165.128",
        "date": "20250309",
        "time": "0000",
        "step": "0/1/2/3/4/5/6/7/8/9",
        "domain": "g",
    })
    fdb_target = earthkit.data.targets.create_target("fdb", fdb)
    fdb_target.write(ds)
    fdb.flush()

    try:
        futures = []
        with fut.ProcessPoolExecutor(max_workers=3) as executor:
            futures = []
            for step in range(0, 10, 1):
                load_input(47, step)
                futures.append(executor.submit(
                    load_input, 
                    165,
                    step,
                ))

            fut.wait(futures, return_when="ALL_COMPLETED")
    except Exception as e:
        shutil.rmtree(tmpdir)
        raise e

Version

0.13.1

Platform (OS and architecture)

Linux x86_64

Relevant log output

test_metadata output:
    ECCODES ERROR   :  Creating (section_8)7777 of ascii at offset 181-185 over message boundary (181)
ECCODES ERROR   :  grib_handle_new_from_message: No final 7777 in message!
Traceback (most recent call last):
  File "env/lib/python3.10/site-packages/eccodes/highlevel/message.py", line 19, in raise_keyerror
    yield
  File "env/lib/python3.10/site-packages/eccodes/highlevel/message.py", line 162, in set_array
    return eccodes.codes_set_array(self._handle, name, value)
  File "env/lib/python3.10/site-packages/gribapi/gribapi.py", line 2187, in grib_set_array
    grib_set_double_array(msgid, key, value)
  File "env/lib/python3.10/site-packages/gribapi/gribapi.py", line 1200, in grib_set_double_array
    GRIB_CHECK(lib.grib_set_double_array(h, key.encode(ENC), a, length))
  File "env/lib/python3.10/site-packages/gribapi/gribapi.py", line 232, in GRIB_CHECK
    errors.raise_grib_error(errid)
  File "env/lib/python3.10/site-packages/gribapi/errors.py", line 381, in raise_grib_error
    raise ERROR_MAP[errid](errid)
gribapi.errors.KeyValueNotFoundError: Key/value not found

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "test.py", line 101, in <module>
    test_metadata()
  File "test.py", line 61, in test_metadata
    template2._handle.set_array("values", u.values)
  File "env/lib/python3.10/site-packages/eccodes/highlevel/message.py", line 161, in set_array
    with raise_keyerror(name):
  File "/usr/local/apps/python3/3.10.10-01/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "env/lib/python3.10/site-packages/eccodes/highlevel/message.py", line 21, in raise_keyerror
    raise KeyError(name)
KeyError: 'values'

test_data output:
    Traceback (most recent call last):
  File "test.py", line 141, in <module>
    test_fdb()
  File "test.py", line 136, in test_fdb
    raise e
  File "test.py", line 126, in test_fdb
    load_input(47, step)
  File "test.py", line 46, in load_input
    if len(ds) == 0:
  File "earthkit-data/src/earthkit/data/readers/grib/memory.py", line 193, in __len__
    self._load()
  File "earthkit/data/readers/grib/memory.py", line 202, in _load
    self.fields = [f for f in self._reader]
  File "earthkit-data/src/earthkit/data/readers/grib/memory.py", line 202, in <listcomp>
    self.fields = [f for f in self._reader]
  File "earthkit-data/src/earthkit/data/readers/grib/memory.py", line 45, in __next__
    handle = self._next_handle()
  File "earthkit-data/src/earthkit/data/readers/grib/memory.py", line 123, in _next_handle
    return self._reader._next_handle()
  File "env/lib/python3.10/site-packages/eccodes/highlevel/reader.py", line 142, in _next_handle
    return codes_new_from_stream(self.stream)
  File "env/lib/python3.10/site-packages/eccodes/highlevel/reader.py", line 117, in codes_new_from_stream
    gribapi.GRIB_CHECK(err)
  File "env/lib/python3.10/site-packages/gribapi/gribapi.py", line 232, in GRIB_CHECK
    errors.raise_grib_error(errid)
  File "env/lib/python3.10/site-packages/gribapi/errors.py", line 381, in raise_grib_error
    raise ERROR_MAP[errid](errid)
gribapi.errors.WrongLengthError: Wrong message length

Accompanying data

The test_metadata test uses:
https://github.com/ecmwf/earthkit-data/blob/develop/tests/data/u_pl.grib

Organisation

ECMWF

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions