-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathfdb.py
More file actions
80 lines (61 loc) · 2.35 KB
/
fdb.py
File metadata and controls
80 lines (61 loc) · 2.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# (C) Copyright 2020 ECMWF.
#
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.
#
import logging
import os
import shutil
try:
import pyfdb
except ImportError:
raise ImportError("FDB access requires 'pyfdb' to be installed")
from earthkit.data.sources.file import FileSource
from earthkit.data.sources.stream import StreamSource
from . import Source
LOG = logging.getLogger(__name__)
class FDBSource(Source):
def __init__(self, *args, stream=True, **kwargs):
super().__init__()
for k in ["group_by", "batch_size"]:
if k in kwargs:
raise ValueError(f"Invalid argument '{k}' for FDBSource. Deprecated since 0.8.0.")
self._stream_kwargs = dict()
for k in ["read_all"]:
if k in kwargs:
self._stream_kwargs[k] = kwargs.pop(k)
self.stream = stream
self.request = {}
for a in args:
self.request.update(a)
self.request.update(kwargs)
fdb5_home = os.environ.get("FDB5_HOME", None)
fdb5_conf = os.environ.get("FDB5_CONFIG", None)
fdb5_config_file = os.environ.get("FDB5_CONFIG_FILE", None)
if fdb5_home is None or (fdb5_conf is None and fdb5_config_file is None):
raise ValueError(
"""FDB5_HOME and either FDB5_CONFIG or FDB5_CONFIG_FILE need to be set.
See: https://fields-database.readthedocs.io for details about FDB."""
)
def mutate(self):
if self.stream:
stream = pyfdb.retrieve(self.request)
return StreamSource(stream, **self._stream_kwargs)
else:
return FDBFileSource(self.request)
class FDBFileSource(FileSource):
def __init__(self, request):
super().__init__()
self.path = self._retrieve(request)
def _retrieve(self, request):
def retrieve(target, request):
with open(target, "wb") as o, pyfdb.retrieve(request) as i:
shutil.copyfileobj(i, o)
return self.cache_file(
retrieve,
request,
)
source = FDBSource