Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 143 additions & 17 deletions echopop/ingest/sv.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from pathlib import Path
from typing import Any, Dict, Literal, Optional, Tuple
from typing import Any, Dict, Literal, Optional, Tuple, Union

import numpy as np
import pandas as pd
import xarray as xr

from . import nasc

Expand Down Expand Up @@ -400,12 +401,19 @@ def aggregate_transects(
"thickness_mean"
].transform("sum")

# Define safe log10 function that handles zero/negative values
def safe_log10_sum(x):
sum_val = x.sum()
if sum_val <= 0:
return -999.0 # Use standard missing data indicator
return 10 * np.log10(sum_val)

# Aggregate the values over each interval
data_pvt = data.groupby(["frequency", "transect_num"]).agg(
{
"longitude_weight": "sum",
"latitude_weight": "sum",
"sv_L": lambda x: 10 * np.log10(x.sum()),
"sv_L": safe_log10_sum,
"nasc": "sum",
"thickness_interval": "mean",
}
Expand Down Expand Up @@ -531,13 +539,114 @@ def integrate_measurements(
return sv_indexed, sv_coordinates


def df_to_xarray(df: pd.DataFrame) -> Union[xr.DataArray, xr.Dataset]:
"""
Convert a pandas DataFrame with MultiIndex index and columns to an xarray Dataset.
Handles both dense and sparse structures, and removes duplicate index level names.
"""

# Return None if empty
if df is None or df.empty:
return None

# Get index names
if isinstance(df.index, pd.MultiIndex):
index_names = list(df.index.names)
else:
index_names = [df.index.name] if df.index.name else ["index"]

# Check for duplicate names in index
if len(index_names) != len(set(index_names)):
# ---- Keep only first occurrence of each name
seen = set()
unique_indices = []
for i, name in enumerate(index_names):
if name not in seen:
seen.add(name)
unique_indices.append(i)
# ---- Drop duplicate columns that match index level names
df = df.copy()
df.index = df.index.droplevel(
[i for i in range(len(index_names)) if i not in unique_indices]
)
index_names = [
index_names[i]
for i in range(len(index_names))
if i in [idx for idx, _ in enumerate(index_names) if index_names.index(_) == idx]
]

# For MultiIndex columns (dense array)
if isinstance(df.columns, pd.MultiIndex):
# ---- Get MultiIndex column names
column_names = list(df.columns.names)
# ---- Filter out non-numeric columns
numeric_vars = [
var
for var in df.columns.get_level_values(0).unique()
if not all(df.columns[df.columns.get_level_values(0) == var].get_level_values(1) == "")
]
# ---- Check if index is sparse with non-unique combinations
if len(index_names) > 1:
# ---- For sparse data, use a single point dimension
mindex_coords = xr.Coordinates.from_pandas_multiindex(df.index, "point")
ds = xr.Dataset(
{var: (["point", column_names[1]], df[var].values) for var in numeric_vars},
coords={**mindex_coords, column_names[1]: df[numeric_vars[0]].columns.values},
)
else:
# ---- For dense data with unique indices, use separate dimensions
ds = xr.Dataset(
{var: (index_names + [column_names[1]], df[var].values) for var in numeric_vars},
coords={
**{name: df.index.get_level_values(name).unique() for name in index_names},
column_names[1]: df[numeric_vars[0]].columns.values,
},
)
# ---- Add the uneven multiindexed columns as coordinates/attributes
to_add_vars = [
var for var in df.columns.get_level_values(0).unique() if var not in numeric_vars
]
# ---- Add as a coordinate along the index dimension
for var in to_add_vars:
# ---- Get column data
var_data = df[var]
# ---- If it's a DataFrame, get the first column
if isinstance(var_data, pd.DataFrame):
var_values = var_data.iloc[:, 0].values
# ---- If it is already a series
else:
var_values = var_data.values
# ---- Assign the coordinates using the extracted values
if len(index_names) > 1:
# ---- Sparse case
ds = ds.assign_coords({var: ("point", var_values)})
else:
# ---- Dense case
ds = ds.assign_coords({var: (index_names, var_values)})
return ds
# For MultiIndex index but single-level columns (sparse array)
elif isinstance(df.index, pd.MultiIndex):
# ---- Get unique column names
column_name = df.columns.name if df.columns.name else "column"
# ---- Create the DataArray
da = xr.DataArray(
df.values,
dims=["point", column_name],
coords={
"point": pd.MultiIndex.from_tuples(df.index.to_list(), names=index_names),
column_name: df.columns.values,
},
)
return da


def ingest_echoview_sv(
sv_path: Path,
center_frequencies: Optional[Dict[str, float]] = None,
transect_pattern: Optional[str] = None,
aggregate_method: Literal["cells", "interval", "transect"] = "cells",
impute_coordinates: bool = True,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
) -> Tuple[Union[xr.DataArray, xr.Dataset], Union[xr.DataArray, xr.Dataset]]:
r"""
Complete ingestion pipeline for Echoview Sv export data.

Expand All @@ -563,11 +672,13 @@ def ingest_echoview_sv(

Returns
-------
tuple[|pd.DataFrame|, |pd.DataFrame| or None]
- ``sv_integrated``: Spatially aggregated acoustic data with :class:`pandas.MultiIndex`
columns organized by measurement type and frequency.
- ``sv_coordinates``: Coordinate reference data for spatial analysis, or None if
coordinates unavailable.
tuple[xr.Dataset or xr.DataArray, xr.DataArray or None]
- ``sv_integrated``: Spatially aggregated acoustic data as xarray Dataset with data
variables for each measurement type (nasc, sv_mean, thickness_mean) organized by
frequency dimension. For sparse data (cells, intervals), uses a "point" dimension with
MultiIndex coordinates.
- ``sv_coordinates``: Coordinate reference data as xarray DataArray with "point" dimension
and frequency dimension, or None if coordinates unavailable.

Raises
------
Expand Down Expand Up @@ -597,7 +708,18 @@ def ingest_echoview_sv(
>>> sv_data, coords = ingest_echoview_sv(
... sv_path, frequencies, r"transect_(\d+)", "interval"
... )
>>> print(sv_data)
<xarray.Dataset>
Dimensions: (point: 11608, frequency: 5)
Coordinates:
* point (point) MultiIndex
* frequency (frequency) float64 18000.0 38000.0 70000.0 120000.0 200000.0
Data variables:
nasc (point, frequency) float64 ...
sv_mean (point, frequency) float64 ...
thickness_mean (point, frequency) float64 ...
"""

# Validate directory existence
if not sv_path.exists():
raise FileNotFoundError(f"The export file directory ({sv_path.as_posix()}) not found!")
Expand All @@ -611,7 +733,11 @@ def ingest_echoview_sv(
# Update the units for `center_frequencies` to match expected values from Echoview
# ---- Hz -> kHz
if center_frequencies is not None:
center_frequencies = {freq * 1e-3: value for freq, value in center_frequencies.items()}
center_frequencies_thresh = {
freq * 1e-3: value for freq, value in center_frequencies.items()
}
else:
center_frequencies_thresh = {}

# Get the target Sv files
sv_filepaths = {"cells": [p for p in sv_path.rglob("*.csv") if p.is_file()]}
Expand All @@ -630,29 +756,29 @@ def ingest_echoview_sv(
)

# Concatenate the files
sv = pd.concat(
sv_output = pd.concat(
[
read_echoview_sv(row["file_path"], impute_coordinates, row["transect_num"])
for _, row in transect_num_df.iterrows()
]
)

# Sort
nasc.sort_echoview_export_df(sv, inplace=True)
nasc.sort_echoview_export_df(sv_output, inplace=True)

# Set min/max threshold for each frequency
if center_frequencies:
sv_subset = sv[sv["frequency"].isin(center_frequencies)]
if center_frequencies_thresh:
sv_subset = sv_output[sv_output["frequency"].isin(center_frequencies_thresh)]
else:
sv_subset = sv.copy()
center_frequencies = {
sv_subset = sv_output.copy()
center_frequencies_thresh = {
freq: {"min": -999.0, "max": 999.0} for freq in sv_subset["frequency"].unique()
}

# Integrate the backscatter based on the defined aggregation method
sv_integrated, sv_coordinates = integrate_measurements(
data=sv_subset, method=aggregate_method, sv_thresholds=center_frequencies
data=sv_subset, method=aggregate_method, sv_thresholds=center_frequencies_thresh
)

# Return
return sv_integrated, sv_coordinates
return df_to_xarray(sv_integrated), df_to_xarray(sv_coordinates)
Loading