|
1 | | -from typing import Union |
| 1 | +from enum import Enum |
| 2 | +from typing import NamedTuple, Union |
2 | 3 |
|
3 | 4 | import numpy as np |
4 | 5 |
|
@@ -255,3 +256,76 @@ def _downsample( |
255 | 256 |
|
256 | 257 | # NOTE: we do not use the np.unique so that all indices are retained |
257 | 258 | return np.array(sorted(rel_idxs)) |
| 259 | + |
| 260 | + |
| 261 | +class FPCS_py(AbstractDownsampler): |
| 262 | + def _downsample( |
| 263 | + self, x: Union[np.ndarray, None], y: np.ndarray, n_out: int, **kwargs |
| 264 | + ) -> np.ndarray: |
| 265 | + # fmt: off |
| 266 | + # ------------------------- Helper datastructures ------------------------- |
| 267 | + class Flag(Enum): |
| 268 | + NONE = -1 # -1: no data points have been retained |
| 269 | + MAX = 0 # 0: a max has been retained |
| 270 | + MIN = 1 # 1: a min has been retained |
| 271 | + |
| 272 | + Point = NamedTuple("point", [("x", int), ("val", y.dtype)]) |
| 273 | + # ------------------------------------------------------------------------ |
| 274 | + |
| 275 | + # 0. Downsample the data using the MinMax algorithm |
| 276 | + MINMAX_FACTOR = 2 |
| 277 | + n_1 = len(x) - 1 |
| 278 | + # NOTE: as we include the first and last point, we reduce the number of points |
| 279 | + downsampled_idxs = MinMax_py().downsample( |
| 280 | + x[1:n_1], y[1:n_1], n_out=(n_out - 2) * MINMAX_FACTOR |
| 281 | + ) |
| 282 | + downsampled_idxs += 1 |
| 283 | + |
| 284 | + previous_min_flag: Flag = Flag.NONE |
| 285 | + potential_point = Point(0, 0) |
| 286 | + max_point = Point(0, y[0]) |
| 287 | + min_point = Point(0, y[0]) |
| 288 | + |
| 289 | + sampled_indices = [] |
| 290 | + sampled_indices.append(0) # prepend the first point |
| 291 | + for i in range(0, len(downsampled_idxs), 2): |
| 292 | + # get the min and max indices and convert them to the correct order |
| 293 | + min_idx, max_idxs = downsampled_idxs[i], downsampled_idxs[i + 1] |
| 294 | + if y[min_idx] > y[max_idxs]: |
| 295 | + min_idx, max_idxs = max_idxs, min_idx |
| 296 | + bin_min = Point(min_idx, y[min_idx]) |
| 297 | + bin_max = Point(max_idxs, y[max_idxs]) |
| 298 | + |
| 299 | + # update the max and min points based on the extrema of the current bin |
| 300 | + if max_point.val < bin_max.val: |
| 301 | + max_point = bin_max |
| 302 | + if min_point.val > bin_min.val: |
| 303 | + min_point = bin_min |
| 304 | + |
| 305 | + # if the min is to the left of the max |
| 306 | + if min_point.x < max_point.x: |
| 307 | + # if the min was not selected in the previous bin |
| 308 | + if previous_min_flag == Flag.MIN and min_point.x != potential_point.x: |
| 309 | + # Both adjacent samplings retain MinPoint, and PotentialPoint and |
| 310 | + # MinPoint are not the same point |
| 311 | + sampled_indices.append(potential_point.x) |
| 312 | + |
| 313 | + sampled_indices.append(min_point.x) # receiving min_point b4 max_point -> retain min_point |
| 314 | + potential_point = (max_point) # update potential point to unselected max_point |
| 315 | + min_point = max_point # update min_point to unselected max_point |
| 316 | + previous_min_flag = Flag.MIN # min_point has been selected |
| 317 | + |
| 318 | + else: |
| 319 | + if previous_min_flag == Flag.MAX and max_point.x != potential_point.x: |
| 320 | + # # Both adjacent samplings retain MaxPoint, and PotentialPoint and |
| 321 | + # MaxPoint are not the same point |
| 322 | + sampled_indices.append(potential_point.x) |
| 323 | + |
| 324 | + sampled_indices.append(max_point.x) # receiving max_point b4 min_point -> retain max_point |
| 325 | + potential_point = (min_point) # update potential point to unselected min_point |
| 326 | + max_point = min_point # update max_point to unselected min_point |
| 327 | + previous_min_flag = Flag.MAX # max_point has been selected |
| 328 | + |
| 329 | + sampled_indices.append(len(y) - 1) # append the last point |
| 330 | + # fmt: on |
| 331 | + return np.array(sampled_indices, dtype=np.int64) |
0 commit comments