Skip to content

Commit 3cea822

Browse files
committed
add named filters usage
1 parent 27f463c commit 3cea822

File tree

4 files changed

+192
-29
lines changed

4 files changed

+192
-29
lines changed

GEECS-Data-Utils/geecs_data_utils/database/database.py

Lines changed: 119 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,43 +25,139 @@
2525

2626
from __future__ import annotations
2727
from pathlib import Path
28-
from typing import Union, List, Tuple, Callable, Optional
28+
from typing import Union, List, Tuple, Callable, Optional, Any, Mapping
2929
import pandas as pd
3030
import json
31+
import yaml
3132

3233

3334
class ScanDatabase:
3435
"""Partition-aware Parquet reader with fast date pruning and composable filters."""
3536

36-
def __init__(self, parquet_root: Union[str, Path]):
37+
def __init__(
38+
self, parquet_root: Union[str, Path], *, autoload_presets: bool = True
39+
):
3740
"""Initialize with the root directory of the Hive-partitioned dataset."""
3841
self.root = Path(parquet_root)
3942
if not self.root.exists():
4043
raise FileNotFoundError(f"Parquet root does not exist: {self.root}")
44+
4145
self._date_range: Optional[Tuple[pd.Timestamp, pd.Timestamp]] = None
4246
self._df_filters: List[Callable[[pd.DataFrame], pd.DataFrame]] = []
47+
self._named_specs: dict[
48+
str, dict[str, Any]
49+
] = {} # raw specs from YAML or registered
50+
51+
if autoload_presets:
52+
self._autoload_presets()
4353

4454
# -----------------------
45-
# Filters
55+
# Preset filter loading
56+
# -----------------------
57+
def _preset_default_path(self) -> Path:
58+
"""Return default filters YAML path inside the repo/package."""
59+
return Path(__file__).resolve().parent / "filters" / "scan_filters.yml"
60+
61+
def _autoload_presets(self) -> None:
62+
"""Autoload named filter presets from filters/scan_filters.yml if present."""
63+
preset_path = self._preset_default_path()
64+
if preset_path.exists():
65+
try:
66+
self.load_named_filters(preset_path)
67+
except Exception as e:
68+
# Non-fatal: keep operating without presets
69+
print(f"[WARN] Failed to load presets from {preset_path}: {e}")
70+
71+
def load_named_filters(self, path: Union[str, Path]) -> "ScanDatabase":
72+
"""Load named filter specs (including composites) from a YAML file."""
73+
p = Path(path)
74+
data = yaml.safe_load(p.read_text())
75+
specs = (data or {}).get("filters") or {}
76+
if not isinstance(specs, Mapping):
77+
raise ValueError("YAML must contain a top-level 'filters' mapping")
78+
self._named_specs = dict(specs)
79+
return self
80+
81+
def register_named_filter(
82+
self,
83+
name: str,
84+
*,
85+
kind: str | None = None,
86+
args: Mapping[str, Any] | None = None,
87+
subfilters: list[str] | None = None,
88+
) -> "ScanDatabase":
89+
"""Register a single named filter spec at runtime (supports composites)."""
90+
if not name:
91+
raise ValueError("Filter name must be non-empty")
92+
if kind == "composite":
93+
self._named_specs[name] = {
94+
"kind": "composite",
95+
"subfilters": list(subfilters or []),
96+
}
97+
elif kind:
98+
self._named_specs[name] = {"kind": kind, "args": dict(args or {})}
99+
else:
100+
raise ValueError(
101+
"Provide 'kind' (and 'args') or kind='composite' with 'subfilters'"
102+
)
103+
return self
104+
105+
def list_named_filters(self) -> list[str]:
106+
"""Return the list of available named filter names."""
107+
return sorted(self._named_specs.keys())
108+
109+
def describe_named_filter(self, name: str) -> dict[str, Any]:
110+
"""Return the raw spec dict for a named filter."""
111+
spec = self._named_specs.get(name)
112+
if spec is None:
113+
raise KeyError(f"Unknown named filter: '{name}'")
114+
return spec
115+
116+
def apply(self, *names: str) -> "ScanDatabase":
117+
"""Apply one or more named filters (supports composite filters)."""
118+
if not names:
119+
return self
120+
seen: set[str] = set()
121+
for nm in names:
122+
self._apply_named(nm, seen)
123+
return self
124+
125+
def _apply_named(self, name: str, seen: set[str]) -> None:
126+
if name in seen:
127+
raise ValueError(f"Cycle detected in composite filters at '{name}'")
128+
spec = self._named_specs.get(name)
129+
if not spec:
130+
raise KeyError(f"Unknown named filter: '{name}'")
131+
seen.add(name)
132+
kind = spec.get("kind")
133+
if kind == "composite":
134+
for sub in spec.get("subfilters", []) or []:
135+
self._apply_named(sub, seen)
136+
return
137+
self._instantiate_and_enqueue(kind, spec.get("args") or {})
138+
139+
def _instantiate_and_enqueue(self, kind: str, args: Mapping[str, Any]) -> None:
140+
"""Map 'kind' to the corresponding filter method and enqueue it."""
141+
if kind == "ecs_value_within":
142+
self.filter_ecs_value_within(**args)
143+
elif kind == "ecs_value_contains":
144+
self.filter_ecs_value_contains(**args)
145+
elif kind == "scan_parameter_contains":
146+
self.filter_scan_parameter_contains(**args)
147+
elif kind == "experiment_equals":
148+
self.filter_experiment_equals(**args)
149+
elif kind == "device_contains":
150+
self.filter_device_contains(**args)
151+
else:
152+
raise ValueError(f"Unknown filter kind: {kind}")
153+
154+
# -----------------------
155+
# Filters (chainable)
46156
# -----------------------
47157
def date_range(
48158
self, start: Union[str, pd.Timestamp], end: Union[str, pd.Timestamp]
49159
) -> "ScanDatabase":
50-
"""
51-
Set an inclusive date range for partition pruning and day-level trim.
52-
53-
Parameters
54-
----------
55-
start : str or pandas.Timestamp
56-
Start date (inclusive).
57-
end : str or pandas.Timestamp
58-
End date (inclusive).
59-
60-
Returns
61-
-------
62-
ScanDatabase
63-
Self for chaining.
64-
"""
160+
"""Set an inclusive date range for partition pruning and day-level trim."""
65161
s, e = pd.Timestamp(start), pd.Timestamp(end)
66162
if s > e:
67163
s, e = e, s
@@ -110,7 +206,7 @@ def _f(df: pd.DataFrame) -> pd.DataFrame:
110206
def _has(lst) -> bool:
111207
if isinstance(lst, list):
112208
return any(isinstance(x, str) and needle in x.lower() for x in lst)
113-
if isinstance(lst, str): # stray single string
209+
if isinstance(lst, str):
114210
return needle in lst.lower()
115211
return False
116212

@@ -165,7 +261,7 @@ def _ecs_values(row_ecs: object, device_like: str, variable_like: str):
165261
def filter_ecs_value_within(
166262
self, device_like: str, variable_like: str, target: float, tol: float
167263
) -> "ScanDatabase":
168-
"""Keep rows where any matching ECS value is within target±tol (values are parsed as floats)."""
264+
"""Keep rows where any matching ECS value is within target±tol (values parsed as floats)."""
169265
tgt, tol = float(target), float(tol)
170266

171267
def _ok_num(v) -> bool:
@@ -228,7 +324,7 @@ def reset(self) -> "ScanDatabase":
228324
return self
229325

230326
# -----------------------
231-
# Internal helpers
327+
# Partition I/O helpers
232328
# -----------------------
233329
def _all_partitions(self) -> List[Tuple[int, int]]:
234330
"""Return all (year, month) partitions present on disk."""
@@ -250,13 +346,11 @@ def _all_partitions(self) -> List[Tuple[int, int]]:
250346
def _partitions_to_read(self) -> List[Path]:
251347
"""Compute partition directories to load based on the date range."""
252348
if self._date_range is None:
253-
# No pruning → all partitions
254349
return [
255350
self.root / f"year={y}" / f"month={m}"
256351
for (y, m) in self._all_partitions()
257352
if (self.root / f"year={y}" / f"month={m}").exists()
258353
]
259-
260354
s, e = self._date_range
261355
months = pd.period_range(s, e, freq="M")
262356
paths: List[Path] = []
@@ -274,13 +368,11 @@ def _load_partitions(self) -> pd.DataFrame:
274368
dfs.append(pd.read_parquet(folder))
275369
except Exception as e:
276370
print(f"[WARN] Skipping partition {folder}: {e}")
277-
278371
if not dfs:
279372
return pd.DataFrame()
280-
281373
df = pd.concat(dfs, ignore_index=True)
282374

283-
# Day-level trim inside selected months
375+
# day-level trim
284376
if self._date_range is not None and all(
285377
c in df.columns for c in ("year", "month", "day")
286378
):
@@ -291,10 +383,9 @@ def _load_partitions(self) -> pd.DataFrame:
291383
)
292384
df = df[(ts >= s) & (ts <= e)].copy()
293385

294-
# Apply queued pandas filters
386+
# apply pandas filters
295387
for f in self._df_filters:
296388
df = f(df)
297-
# print(f"[DEBUG] filter {getattr(f, '__name__', 'callable')} kept {after}/{before}")
298389
return df
299390

300391
# -----------------------
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
filters:
2+
pmq_in:
3+
kind: ecs_value_within
4+
args:
5+
device_like: "hexapod"
6+
variable_like: "y"
7+
target: 18.5
8+
tol: 0.5

GEECS-Data-Utils/poetry.lock

Lines changed: 64 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

GEECS-Data-Utils/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pydantic = "^2.0"
1717
pytest = "^7.0"
1818
numpy = "<2.0"
1919
duckdb = "^1.1.3"
20+
pyyaml = "^6.0.1"
2021

2122
[build-system]
2223
requires = ["poetry-core"]

0 commit comments

Comments
 (0)