Skip to content

Commit e749fb4

Browse files
committed
improve mmap backend
1 parent 2442251 commit e749fb4

File tree

5 files changed

+619
-22
lines changed

5 files changed

+619
-22
lines changed

python/themachinethatgoesping/pingprocessing/watercolumn/echograms/backends/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,15 @@
33
from .base import EchogramDataBackend
44
from .ping_backend import PingDataBackend
55
from .zarr_backend import ZarrDataBackend
6+
from .mmap_backend import MmapDataBackend
7+
8+
# Keep old name as alias for backwards compatibility
9+
MmapDataBackend = MmapDataBackend
610

711
__all__ = [
812
"EchogramDataBackend",
9-
"PingDataBackend",
13+
"PingDataBackend",
1014
"ZarrDataBackend",
15+
"MmapDataBackend",
16+
"MmapDataBackend", # alias for backwards compatibility
1117
]

python/themachinethatgoesping/pingprocessing/watercolumn/echograms/backends/base.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,32 @@ def iterate_raw_columns(
148148
for idx in ping_indices:
149149
yield idx, self.get_raw_column(idx)
150150

151+
def get_chunk(self, start_ping: int, end_ping: int) -> np.ndarray:
152+
"""Get a chunk of WCI data for multiple consecutive pings.
153+
154+
Returns a 2D array of shape (n_pings, max_samples) where n_pings
155+
is end_ping - start_ping. Backends can override this for faster
156+
bulk access.
157+
158+
Default implementation uses get_column for each ping.
159+
160+
Args:
161+
start_ping: First ping index (inclusive).
162+
end_ping: Last ping index (exclusive).
163+
164+
Returns:
165+
2D array of shape (end_ping - start_ping, max_samples).
166+
"""
167+
n_pings = end_ping - start_ping
168+
max_samples = int(self.max_sample_counts[start_ping:end_ping].max()) + 1
169+
170+
chunk = np.full((n_pings, max_samples), np.nan, dtype=np.float32)
171+
for i, ping_idx in enumerate(range(start_ping, end_ping)):
172+
col = self.get_column(ping_idx)
173+
chunk[i, :len(col)] = col
174+
175+
return chunk
176+
151177
# =========================================================================
152178
# Optional: beam sample selection access (for advanced use cases)
153179
# =========================================================================
Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
"""Backend for ultra-fast echogram data access using memory-mapped files.
2+
3+
Memory-mapped files provide near-instantaneous random access by letting the
4+
OS handle paging. This is ideal for interactive visualization where scattered
5+
ping access patterns are common.
6+
7+
Trade-off vs Zarr:
8+
- Mmap: Uncompressed (larger files), but 10-100x faster random access
9+
- Zarr: Compressed (smaller files), but slower due to decompression
10+
11+
Memory efficiency:
12+
- WCI data is truly lazy-loaded via OS page cache
13+
- get_image processes in chunks to avoid loading entire dataset
14+
- Only the requested pages are loaded from disk
15+
"""
16+
17+
from typing import Dict, Optional, Tuple
18+
from pathlib import Path
19+
import json
20+
21+
import numpy as np
22+
23+
from .base import EchogramDataBackend
24+
from ..indexers import EchogramImageRequest
25+
26+
27+
# Mmap store format version (2.0 = binary .npy metadata instead of JSON)
28+
MMAP_FORMAT_VERSION = "2.0"
29+
30+
31+
class MmapDataBackend(EchogramDataBackend):
32+
"""Backend using memory-mapped files for ultra-fast random access.
33+
34+
This backend stores data in a simple directory structure:
35+
- wci_data.bin: Raw float32 array (n_pings × max_samples)
36+
- metadata.json: All metadata and ping parameters
37+
38+
Memory-mapped access means:
39+
- Zero decompression overhead
40+
- OS handles caching via page cache (truly lazy loading)
41+
- Instant random access (10-100x faster than Zarr for scattered reads)
42+
- File size = raw data size (no compression)
43+
- Supports larger-than-memory files (only accessed pages are loaded)
44+
"""
45+
46+
def __init__(
47+
self,
48+
store_path: str,
49+
wci_mmap: np.memmap,
50+
metadata: Dict,
51+
):
52+
"""Initialize MmapDataBackend.
53+
54+
Prefer using the `from_path` factory method or `EchogramBuilder.from_mmap()`.
55+
56+
Args:
57+
store_path: Path to the mmap store directory.
58+
wci_mmap: Memory-mapped WCI data array (lazy-loaded by OS).
59+
metadata: Dictionary containing all metadata.
60+
"""
61+
self._store_path = store_path
62+
self._wci_mmap = wci_mmap # np.memmap is lazy - OS loads pages on demand
63+
self._metadata = metadata
64+
65+
# Cache frequently accessed values (small scalars, not the data)
66+
self._n_pings = wci_mmap.shape[0]
67+
self._n_samples = wci_mmap.shape[1]
68+
69+
# =========================================================================
70+
# Factory methods
71+
# =========================================================================
72+
73+
@classmethod
74+
def from_path(cls, path: str) -> "MmapDataBackend":
75+
"""Load a MmapDataBackend from a store directory.
76+
77+
The WCI data is memory-mapped with mode='r' (read-only), meaning:
78+
- No data is loaded into memory until accessed
79+
- OS page cache handles all caching efficiently
80+
- Supports files larger than available RAM
81+
82+
Args:
83+
path: Path to the mmap store directory.
84+
85+
Returns:
86+
MmapDataBackend instance with lazy-loaded data.
87+
"""
88+
path = Path(path)
89+
90+
# Load scalar metadata (small JSON)
91+
metadata_file = path / "metadata.json"
92+
if not metadata_file.exists():
93+
raise FileNotFoundError(f"Metadata file not found: {metadata_file}")
94+
95+
with open(metadata_file, "r") as f:
96+
metadata = json.load(f)
97+
98+
# Verify format version
99+
version = metadata.get("format_version", "unknown")
100+
if not version.startswith("2."):
101+
raise ValueError(
102+
f"Unsupported mmap store format version: {version}. "
103+
f"Expected version {MMAP_FORMAT_VERSION}"
104+
)
105+
106+
# Load array metadata from binary .npy files
107+
metadata["ping_times"] = np.load(path / "ping_times.npy")
108+
metadata["max_sample_counts"] = np.load(path / "max_sample_counts.npy")
109+
metadata["sample_nr_min"] = np.load(path / "sample_nr_min.npy")
110+
metadata["sample_nr_max"] = np.load(path / "sample_nr_max.npy")
111+
112+
if (path / "range_min.npy").exists():
113+
metadata["range_min"] = np.load(path / "range_min.npy")
114+
metadata["range_max"] = np.load(path / "range_max.npy")
115+
116+
if (path / "depth_min.npy").exists():
117+
metadata["depth_min"] = np.load(path / "depth_min.npy")
118+
metadata["depth_max"] = np.load(path / "depth_max.npy")
119+
120+
# Ping parameters
121+
ping_params = {}
122+
for name in metadata.get("ping_param_names", []):
123+
timestamps = np.load(path / f"ping_param_{name}_times.npy")
124+
values = np.load(path / f"ping_param_{name}_values.npy")
125+
y_ref = metadata["ping_params_meta"][name]
126+
ping_params[name] = {
127+
"y_reference": y_ref,
128+
"timestamps": timestamps,
129+
"values": values,
130+
}
131+
metadata["ping_params"] = ping_params
132+
133+
# Open memory-mapped data (lazy - no data loaded yet)
134+
wci_file = path / "wci_data.bin"
135+
if not wci_file.exists():
136+
raise FileNotFoundError(f"WCI data file not found: {wci_file}")
137+
138+
shape = (metadata["n_pings"], metadata["n_samples"])
139+
# mode='r' = read-only, lazy-loaded via OS page cache
140+
wci_mmap = np.memmap(wci_file, dtype=np.float32, mode="r", shape=shape)
141+
142+
return cls(str(path), wci_mmap, metadata)
143+
144+
145+
146+
# =========================================================================
147+
# Metadata properties
148+
# =========================================================================
149+
150+
@property
151+
def n_pings(self) -> int:
152+
return self._n_pings
153+
154+
@property
155+
def ping_times(self) -> np.ndarray:
156+
return self._metadata["ping_times"]
157+
158+
@property
159+
def max_sample_counts(self) -> np.ndarray:
160+
return self._metadata["max_sample_counts"]
161+
162+
@property
163+
def sample_nr_extents(self) -> Tuple[np.ndarray, np.ndarray]:
164+
return self._metadata["sample_nr_min"], self._metadata["sample_nr_max"]
165+
166+
@property
167+
def range_extents(self) -> Optional[Tuple[np.ndarray, np.ndarray]]:
168+
if "range_min" not in self._metadata:
169+
return None
170+
return self._metadata["range_min"], self._metadata["range_max"]
171+
172+
@property
173+
def depth_extents(self) -> Optional[Tuple[np.ndarray, np.ndarray]]:
174+
if "depth_min" not in self._metadata:
175+
return None
176+
return self._metadata["depth_min"], self._metadata["depth_max"]
177+
178+
@property
179+
def has_navigation(self) -> bool:
180+
return self._metadata.get("has_navigation", False)
181+
182+
@property
183+
def wci_value(self) -> str:
184+
return self._metadata.get("wci_value", "sv")
185+
186+
@property
187+
def linear_mean(self) -> bool:
188+
return self._metadata.get("linear_mean", True)
189+
190+
# =========================================================================
191+
# Ping parameters
192+
# =========================================================================
193+
194+
def get_ping_params(self) -> Dict[str, Tuple[str, Tuple[np.ndarray, np.ndarray]]]:
195+
"""Return pre-computed ping parameters.
196+
197+
Returns:
198+
Dict mapping param name to (y_reference, (timestamps, values)).
199+
"""
200+
params = {}
201+
for name, data in self._metadata.get("ping_params", {}).items():
202+
params[name] = (data["y_reference"], (data["timestamps"], data["values"]))
203+
return params
204+
205+
# =========================================================================
206+
# Data access
207+
# =========================================================================
208+
209+
def get_column(self, ping_index: int) -> np.ndarray:
210+
"""Get beam-averaged water column data for a single ping."""
211+
return self._wci_mmap[ping_index, :].copy()
212+
213+
def get_raw_column(self, ping_index: int) -> np.ndarray:
214+
"""Get raw water column data (same as get_column for mmap)."""
215+
return self.get_column(ping_index)
216+
217+
def get_chunk(self, start_ping: int, end_ping: int) -> np.ndarray:
218+
"""Get a chunk of WCI data for multiple consecutive pings.
219+
220+
Optimized for MmapDataBackend - direct slice from memory-mapped file.
221+
222+
Args:
223+
start_ping: First ping index (inclusive).
224+
end_ping: Last ping index (exclusive).
225+
226+
Returns:
227+
2D array of shape (end_ping - start_ping, n_samples).
228+
"""
229+
# Direct slice from mmap - OS handles page loading efficiently
230+
return self._wci_mmap[start_ping:end_ping, :]
231+
232+
# =========================================================================
233+
# Image generation (memory-efficient with chunked mmap access)
234+
# =========================================================================
235+
236+
def get_image(self, request: EchogramImageRequest) -> np.ndarray:
237+
"""Build a complete echogram image from a request.
238+
239+
Uses memory-mapped scattered access for ultra-fast loading.
240+
Processes in chunks to limit memory usage for large datasets.
241+
242+
Memory usage: O(chunk_size * ny) + O(nx * ny) for output
243+
244+
Args:
245+
request: Image request with ping mapping and affine parameters.
246+
247+
Returns:
248+
2D array of shape (nx, ny) with echogram data (ping, sample).
249+
"""
250+
# Create output array
251+
image = np.full((request.nx, request.ny), request.fill_value, dtype=np.float32)
252+
253+
# Find valid x indices (where ping_indexer >= 0)
254+
valid_x_mask = request.ping_indexer >= 0
255+
valid_x_indices = np.where(valid_x_mask)[0]
256+
257+
if len(valid_x_indices) == 0:
258+
return image
259+
260+
# Get unique pings and mapping
261+
unique_pings, inverse_indices = np.unique(
262+
request.ping_indexer[valid_x_mask], return_inverse=True
263+
)
264+
265+
n_unique = len(unique_pings)
266+
ny = request.ny
267+
y_coords = request.y_coordinates
268+
269+
# Process in chunks to limit memory usage
270+
# Each chunk loads at most chunk_size pings worth of data
271+
chunk_size = 1000 # ~4MB per chunk at 1000 samples float32
272+
273+
# Pre-allocate result array for unique pings
274+
unique_results = np.empty((n_unique, ny), dtype=np.float32)
275+
276+
for chunk_start in range(0, n_unique, chunk_size):
277+
chunk_end = min(chunk_start + chunk_size, n_unique)
278+
chunk_indices = slice(chunk_start, chunk_end)
279+
chunk_pings = unique_pings[chunk_indices]
280+
n_chunk = chunk_end - chunk_start
281+
282+
# Load chunk of ping data from mmap (only these pages loaded)
283+
ping_data = self._wci_mmap[chunk_pings, :]
284+
285+
# Compute sample indices for this chunk
286+
a_chunk = request.affine_a[chunk_pings]
287+
b_chunk = request.affine_b[chunk_pings]
288+
max_samples_chunk = request.max_sample_indices[chunk_pings]
289+
290+
# Sample indices: shape (n_chunk, ny)
291+
sample_indices = np.rint(
292+
a_chunk[:, np.newaxis] + b_chunk[:, np.newaxis] * y_coords
293+
).astype(np.int32)
294+
295+
# Bounds checking
296+
valid_samples = (
297+
(sample_indices >= 0) &
298+
(sample_indices < max_samples_chunk[:, np.newaxis])
299+
)
300+
301+
# Clip for safe indexing
302+
sample_indices_clipped = np.clip(sample_indices, 0, ping_data.shape[1] - 1)
303+
304+
# Gather values using advanced indexing
305+
ping_idx_flat = np.repeat(np.arange(n_chunk), ny)
306+
sample_idx_flat = sample_indices_clipped.ravel()
307+
chunk_values = ping_data[ping_idx_flat, sample_idx_flat].reshape(n_chunk, ny)
308+
309+
# Apply mask
310+
chunk_values = np.where(valid_samples, chunk_values, request.fill_value)
311+
312+
# Store in result array
313+
unique_results[chunk_indices] = chunk_values
314+
315+
# Explicitly release chunk data to help garbage collector
316+
del ping_data, chunk_values
317+
318+
# Map unique results back to output image
319+
image[valid_x_indices] = unique_results[inverse_indices]
320+
321+
return image
322+
323+
# =========================================================================
324+
# Store info
325+
# =========================================================================
326+
327+
@property
328+
def store_path(self) -> str:
329+
"""Path to the mmap store directory."""
330+
return self._store_path
331+
332+
def __repr__(self) -> str:
333+
return (
334+
f"MmapDataBackend(n_pings={self.n_pings}, "
335+
f"wci_value='{self.wci_value}', "
336+
f"store='{self._store_path}')"
337+
)

0 commit comments

Comments
 (0)