-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathvalidate.py
More file actions
97 lines (82 loc) · 3.13 KB
/
validate.py
File metadata and controls
97 lines (82 loc) · 3.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import time
from typing import Any
import xarray as xr
from xrlint.config import ConfigObject
from xrlint.result import Message, Result
from ..constants import NODE_ROOT_NAME
from .apply import apply_rule
from .rulectx import RuleContextImpl
def validate_dataset(config_obj: ConfigObject, dataset: Any, file_path: str):
assert isinstance(config_obj, ConfigObject)
assert dataset is not None
assert isinstance(file_path, str)
if isinstance(dataset, xr.Dataset):
messages = _validate_dataset(config_obj, dataset, file_path, None, None)
else:
messages = _open_and_validate_dataset(config_obj, dataset, file_path)
return Result.new(config_object=config_obj, messages=messages, file_path=file_path)
def _validate_dataset(
config_obj: ConfigObject,
dataset: xr.Dataset,
file_path: str,
file_index: int | None,
access_latency: float | None,
) -> list[Message]:
assert isinstance(config_obj, ConfigObject)
assert isinstance(dataset, xr.Dataset)
assert isinstance(file_path, str)
context = RuleContextImpl(
config_obj, dataset, file_path, file_index, access_latency
)
for rule_id, rule_config in config_obj.rules.items():
with context.use_state(rule_id=rule_id):
apply_rule(context, rule_id, rule_config)
return context.messages
def _open_and_validate_dataset(
config_obj: ConfigObject, ds_source: Any, file_path: str
) -> list[Message]:
assert isinstance(config_obj, ConfigObject)
assert ds_source is not None
assert isinstance(file_path, str)
opener_options = config_obj.opener_options or {}
if config_obj.processor is not None:
processor_op = config_obj.get_processor_op(config_obj.processor)
t0 = time.time()
try:
ds_path_list = processor_op.preprocess(file_path, opener_options)
except (OSError, ValueError, TypeError) as e:
return [new_fatal_message(str(e))]
access_latency = time.time() - t0
return processor_op.postprocess(
[
_validate_dataset(config_obj, ds, path, i, access_latency)
for i, (ds, path) in enumerate(ds_path_list)
],
file_path,
)
else:
t0 = time.time()
try:
dataset = _open_dataset(ds_source, opener_options, file_path)
except (OSError, ValueError, TypeError) as e:
return [new_fatal_message(str(e))]
access_latency = time.time() - t0
with dataset:
return _validate_dataset(
config_obj, dataset, file_path, None, access_latency
)
def _open_dataset(
ds_source: Any, opener_options: dict[str, Any] | None, file_path: str
) -> xr.Dataset:
"""Open a dataset."""
engine = opener_options.pop("engine", None)
if engine is None and (file_path.endswith(".zarr") or file_path.endswith(".zarr/")):
engine = "zarr"
return xr.open_dataset(ds_source, engine=engine, **(opener_options or {}))
def new_fatal_message(message: str) -> Message:
return Message(
message=message,
fatal=True,
severity=2,
node_path=NODE_ROOT_NAME,
)