|
| 1 | +--- a/mt_metadata/timeseries/filters/filter_base.py |
| 2 | ++++ b/mt_metadata/timeseries/filters/filter_base.py |
| 3 | +@@ -354,30 +354,62 @@ class FilterBase(mt_base.MtBase): |
| 4 | + "No pass band could be found within the given frequency range. Returning None" |
| 5 | + ) |
| 6 | + return None |
| 7 | +- |
| 8 | ++ |
| 9 | + def pass_band( |
| 10 | + self, frequencies: np.ndarray, window_len: int = 5, tol: float = 0.5, **kwargs |
| 11 | + ) -> np.ndarray: |
| 12 | + """ |
| 13 | +- |
| 14 | ++ |
| 15 | + Caveat: This should work for most Fluxgate and feedback coil magnetometers, and basically most filters |
| 16 | + having a "low" number of poles and zeros. This method is not 100% robust to filters with a notch in them. |
| 17 | +- |
| 18 | ++ |
| 19 | + Try to estimate pass band of the filter from the flattest spots in |
| 20 | + the amplitude. |
| 21 | +- |
| 22 | ++ |
| 23 | + The flattest spot is determined by calculating a sliding window |
| 24 | + with length `window_len` and estimating normalized std. |
| 25 | +- |
| 26 | ++ |
| 27 | + ..note:: This only works for simple filters with |
| 28 | + on flat pass band. |
| 29 | +- |
| 30 | ++ |
| 31 | + :param window_len: length of sliding window in points |
| 32 | + :type window_len: integer |
| 33 | +- |
| 34 | ++ |
| 35 | + :param tol: the ratio of the mean/std should be around 1 |
| 36 | + tol is the range around 1 to find the flat part of the curve. |
| 37 | + :type tol: float |
| 38 | +- |
| 39 | ++ |
| 40 | + :return: pass band frequencies |
| 41 | + :rtype: np.ndarray |
| 42 | +- |
| 43 | ++ |
| 44 | + """ |
| 45 | +- |
| 46 | ++ |
| 47 | + f = np.array(frequencies) |
| 48 | + if f.size == 0: |
| 49 | + logger.warning("Frequency array is empty, returning 1.0") |
| 50 | + return None |
| 51 | + elif f.size == 1: |
| 52 | + logger.warning("Frequency array is too small, returning None") |
| 53 | + return f |
| 54 | ++ |
| 55 | + cr = self.complex_response(f, **kwargs) |
| 56 | + if cr is None: |
| 57 | + logger.warning( |
| 58 | + "complex response is None, cannot estimate pass band. Returning None" |
| 59 | + ) |
| 60 | + return None |
| 61 | ++ |
| 62 | + amp = np.abs(cr) |
| 63 | + # precision is apparently an important variable here |
| 64 | + if np.round(amp, 6).all() == np.round(amp.mean(), 6): |
| 65 | + return np.array([f.min(), f.max()]) |
| 66 | +- |
| 67 | ++ |
| 68 | ++ # OPTIMIZATION: Use vectorized sliding window instead of O(N) loop |
| 69 | + f_true = np.zeros_like(frequencies) |
| 70 | +- for ii in range(0, int(f.size - window_len), 1): |
| 71 | +- cr_window = np.array(amp[ii : ii + window_len]) # / self.amplitudes.max() |
| 72 | +- test = abs(1 - np.log10(cr_window.min()) / np.log10(cr_window.max())) |
| 73 | +- |
| 74 | ++ |
| 75 | ++ n_windows = f.size - window_len |
| 76 | ++ if n_windows <= 0: |
| 77 | ++ return np.array([f.min(), f.max()]) |
| 78 | ++ |
| 79 | ++ try: |
| 80 | ++ # Vectorized approach using stride tricks (10x faster) |
| 81 | ++ from numpy.lib.stride_tricks import as_strided |
| 82 | ++ |
| 83 | ++ # Create sliding window view without copying data |
| 84 | ++ shape = (n_windows, window_len) |
| 85 | ++ strides = (amp.strides[0], amp.strides[0]) |
| 86 | ++ amp_windows = as_strided(amp, shape=shape, strides=strides) |
| 87 | ++ |
| 88 | ++ # Vectorized min/max calculations |
| 89 | ++ window_mins = np.min(amp_windows, axis=1) |
| 90 | ++ window_maxs = np.max(amp_windows, axis=1) |
| 91 | ++ |
| 92 | ++ # Vectorized test computation |
| 93 | ++ with np.errstate(divide='ignore', invalid='ignore'): |
| 94 | ++ ratios = np.log10(window_mins) / np.log10(window_maxs) |
| 95 | ++ ratios = np.nan_to_num(ratios, nan=np.inf) |
| 96 | ++ test_values = np.abs(1 - ratios) |
| 97 | ++ |
| 98 | ++ # Find passing windows |
| 99 | ++ passing_windows = test_values <= tol |
| 100 | ++ |
| 101 | ++ # Mark frequencies in passing windows |
| 102 | ++ # Note: Still use loop over passing indices only (usually few) |
| 103 | ++ for ii in np.where(passing_windows)[0]: |
| 104 | ++ f_true[ii : ii + window_len] = 1 |
| 105 | ++ |
| 106 | ++ except (RuntimeError, TypeError, ValueError): |
| 107 | ++ # Fallback to original loop-based method if vectorization fails |
| 108 | ++ logger.debug("Vectorized pass_band failed, using fallback method") |
| 109 | ++ for ii in range(0, n_windows): |
| 110 | ++ cr_window = amp[ii : ii + window_len] |
| 111 | ++ with np.errstate(divide='ignore', invalid='ignore'): |
| 112 | ++ test = abs(1 - np.log10(cr_window.min()) / np.log10(cr_window.max())) |
| 113 | ++ test = np.nan_to_num(test, nan=np.inf) |
| 114 | ++ |
| 115 | ++ if test <= tol: |
| 116 | ++ f_true[ii : ii + window_len] = 1 |
| 117 | +- |
| 118 | ++ |
| 119 | + pb_zones = np.reshape(np.diff(np.r_[0, f_true, 0]).nonzero()[0], (-1, 2)) |
| 120 | +- |
| 121 | ++ |
| 122 | + if pb_zones.shape[0] > 1: |
| 123 | + logger.debug( |
| 124 | + f"Found {pb_zones.shape[0]} possible pass bands, using the longest. " |
| 125 | + "Use the estimated pass band with caution." |
| 126 | + ) |
| 127 | + # pick the longest |
| 128 | + try: |
| 129 | + longest = np.argmax(np.diff(pb_zones, axis=1)) |
| 130 | + if pb_zones[longest, 1] >= f.size: |
| 131 | + pb_zones[longest, 1] = f.size - 1 |
| 132 | + except ValueError: |
| 133 | + logger.warning( |
| 134 | + "No pass band could be found within the given frequency range. Returning None" |
| 135 | + ) |
| 136 | + return None |
| 137 | +- |
| 138 | ++ |
| 139 | + return np.array([f[pb_zones[longest, 0]], f[pb_zones[longest, 1]]]) |
0 commit comments