Skip to content

Commit 08caffd

Browse files
committed
♻️ add Python interface
1 parent 636587b commit 08caffd

File tree

5 files changed

+112
-49
lines changed

5 files changed

+112
-49
lines changed

downsample_rs/src/lttb/scalar.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,18 +104,22 @@ pub fn lttb_without_x<Ty: Num>(y: ArrayView1<Ty>, n_out: usize) -> Array1<usize>
104104
// Slicing seems to be a lot slower
105105
// let avg_x: Tx = x.slice(s![avg_range_start..avg_range_end]).sum();
106106
let avg_y: f64 = avg_y.to_f64() / (avg_range_end - avg_range_start) as f64;
107+
let avg_x: f64 = (avg_range_start + avg_range_end - 1) as f64 / 2.0;
107108

108109
// Get the range for this bucket
109110
let range_offs = (every * i as f64) as usize + 1;
110111
let range_to = (every * (i + 1) as f64) as usize + 1;
111112

112113
// Point a
113114
let point_ay = y[a].to_f64();
115+
let point_ax = a as f64;
114116

115117
let mut max_area = -1.0;
116118
for i in range_offs..range_to {
117119
// Calculate triangle area over three buckets
118-
let area = ((y[i].to_f64() - point_ay) - (avg_y - point_ay)).abs();
120+
let area = ((point_ax - avg_x) * (y[i].to_f64() - point_ay)
121+
- (point_ax - i as f64) * (avg_y - point_ay))
122+
.abs();
119123
if area > max_area {
120124
max_area = area;
121125
a = i;
@@ -167,13 +171,9 @@ mod tests {
167171
let n = 5_000;
168172
let x: Array1<i32> = Array1::from((0..n).map(|i| i as i32).collect::<Vec<i32>>());
169173
let y = utils::get_random_array(n, f32::MIN, f32::MAX);
170-
let sampled_indices = lttb(x.view(), y.view(), 200);
174+
let sampled_indices1 = lttb(x.view(), y.view(), 200);
171175
let sampled_indices2 = lttb_without_x(y.view(), 200);
172-
// TODO: for some reason the second last point is off..
173-
assert_eq!(
174-
sampled_indices.slice(s![0..198]),
175-
sampled_indices2.slice(s![0..198])
176-
);
176+
assert_eq!(sampled_indices1, sampled_indices2);
177177
}
178178
}
179179
}

src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ use downsample_rs::minmax as minmax_mod;
122122

123123
// Create a sub module for the minmax algorithm
124124
#[pymodule]
125-
fn min_max(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
125+
fn minmax(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
126126
// ----------------- SCALAR
127127

128128
let scalar_mod = PyModule::new(_py, "scalar")?;
@@ -320,15 +320,15 @@ fn minmaxlttb(_py: Python, m: &PyModule) -> PyResult<()> {
320320

321321
#[pymodule] // The super module
322322
fn tsdownsample_rs(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
323-
m.add_wrapped(wrap_pymodule!(min_max))?;
323+
m.add_wrapped(wrap_pymodule!(minmax))?;
324324
m.add_wrapped(wrap_pymodule!(m4))?;
325325
m.add_wrapped(wrap_pymodule!(lttb))?;
326326
m.add_wrapped(wrap_pymodule!(minmaxlttb))?;
327327

328328
_py.run(
329329
"\
330330
import sys
331-
sys.modules['tsdownsample_rs.min_max'] = min_max
331+
sys.modules['tsdownsample_rs.minmax'] = minmax
332332
sys.modules['tsdownsample_rs.m4'] = m4
333333
sys.modules['tsdownsample_rs.lttb'] = lttb
334334
sys.modules['tsdownsample_rs.minmaxlttb'] = minmaxlttb

tsdownsample/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
MinMaxAggregator = RustDownsamplingInterface(resampling_rs.minmax)
2-
M4Aggregator = RustDownsamplingInterface(resampling_rs.m4)
3-
LTTBAggregator = RustDownsamplingInterface(resampling_rs.lttb)
4-
MinMaxLTTBAggregator = RustDownsamplingInterface(resampling_rs.minmax_lttb)
1+
__version__ = "0.1.0a1"
2+
__author__ = "Jeroen Van Der Donckt"
3+
4+
from .downsamplers import *

tsdownsample/downsamplers.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,30 @@
1+
# ------------------ Rust Downsamplers ------------------
12
import tsdownsample._rust.tsdownsample_rs as tsdownsample_rs
23
from .downsampling_interface import RustDownsamplingInterface
34

4-
# ------------------ Rust Downsamplers ------------------
5+
MinMaxDownsampler = RustDownsamplingInterface("MinMax", tsdownsample_rs.minmax)
6+
M4Downsampler = RustDownsamplingInterface("M4", tsdownsample_rs.m4)
7+
LTTBDownsampler = RustDownsamplingInterface("LTTB", tsdownsample_rs.lttb)
8+
MinMaxLTTBDownsampler = RustDownsamplingInterface("MinMaxLTTB", tsdownsample_rs.minmaxlttb)
9+
10+
# ------------------ Function Downsamplers ------------------
11+
import numpy as np
12+
from .downsampling_interface import FuncDownsamplingInterface
13+
14+
MeanDownsampler = FuncDownsamplingInterface("Mean", np.mean)
15+
MedianDownsampler = FuncDownsamplingInterface("Median", np.median)
16+
17+
# ------------------ EveryNth Downsampler ------------------
18+
import math
19+
import pandas as pd
20+
from .downsampling_interface import DownsampleInterface
521

6-
MinMaxDownsampler = RustDownsamplingInterface(tsdownsample_rs.minmax)
7-
M4Downsampler = RustDownsamplingInterface(tsdownsample_rs.m4)
8-
LTTBDownsampler = RustDownsamplingInterface(tsdownsample_rs.lttb)
9-
MinMaxLTTBDownsampler = RustDownsamplingInterface(tsdownsample_rs.minmax_lttb)
22+
class _EveryNthDownsampler(DownsampleInterface):
1023

11-
# ------------------ Python Downsamplers ------------------
24+
def __init__(self) -> None:
25+
super().__init__(f"EveryNth")
26+
27+
def downsample(self, s: pd.Series, n_out: int, parallel: bool = False) -> pd.Series:
28+
return s[:: max(1, math.ceil(len(s) / n_out))]
1229

13-
MeanDownsampler = PythonDownsamplingInterface(np.mean)
30+
EveryNthDownsampler = _EveryNthDownsampler()

tsdownsample/downsampling_interface.py

Lines changed: 74 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99

1010
class DownsampleInterface(ABC):
1111

12-
def __init__(self) -> None:
13-
super().__init__()
12+
def __init__(self, name: str) -> None:
13+
self.name = name
1414

1515
@staticmethod
1616
def _construct_output_series(s: pd.Series, idxs: np.ndarray) -> pd.Series:
@@ -29,7 +29,7 @@ def _supports_dtype(self, s: pd.Series):
2929
f"{s.dtype} doesn't match with any regex in {self.dtype_regex_list}"
3030
)
3131

32-
def downsample(self, s: pd.Series, n_out: int, parallel: bool = False) -> pd.Series
32+
def downsample(self, s: pd.Series, n_out: int, parallel: bool = False) -> pd.Series:
3333
"""Downsample a pandas series to n_out samples.
3434
3535
Parameters
@@ -47,6 +47,9 @@ def downsample(self, s: pd.Series, n_out: int, parallel: bool = False) -> pd.Ser
4747
The downsampled series.
4848
"""
4949
raise NotImplementedError
50+
51+
def __repr__(self) -> str:
52+
return f"{self.name}"
5053

5154
# ------------------- Rust Downsample Interface -------------------
5255

@@ -59,14 +62,14 @@ def _switch_mod_with_y(y_dtype: np.dtype, mod: ModuleType, downsample_func: str
5962
----------
6063
y_dtype : np.dtype
6164
The dtype of the y-data
62-
mod : Module
65+
mod : ModuleType
6366
The module to select the appropriate function from
6467
downsample_func : str, optional
6568
The name of the function to use, by default DOWNSAMPLE_FUNC.
6669
"""
6770
# FLOATS
6871
if np.issubdtype(y_dtype, np.floating):
69-
if y.dtype == np.float16:
72+
if y_dtype == np.float16:
7073
return getattr(mod, downsample_func + '_f16')
7174
elif y_dtype == np.float32:
7275
return getattr(mod, downsample_func + '_f32')
@@ -105,33 +108,33 @@ def _switch_mod_with_x_and_y(x_dtype: np.dtype, y_dtype: np.dtype, mod: ModuleTy
105108
The dtype of the x-data
106109
y_dtype : np.dtype
107110
The dtype of the y-data
108-
mod : Module
111+
mod : ModuleType
109112
The module to select the appropriate function from
110113
"""
111114
# FLOATS
112115
if np.issubdtype(x_dtype, np.floating):
113116
if x_dtype == np.float16:
114-
return switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_f16')
117+
return _switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_f16')
115118
elif x_dtype == np.float32:
116-
return switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_f32')
119+
return _switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_f32')
117120
elif x_dtype == np.float64:
118-
return switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_f64')
121+
return _switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_f64')
119122
# INTS
120123
elif np.issubdtype(x_dtype, np.integer):
121124
if x_dtype == np.int16:
122-
return switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_i16')
125+
return _switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_i16')
123126
elif x_dtype == np.int32:
124-
return switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_i32')
127+
return _switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_i32')
125128
elif x_dtype == np.int64:
126-
return switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_i64')
129+
return _switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_i64')
127130
# UINTS
128131
elif np.issubdtype(x_dtype, np.unsignedinteger):
129132
if x_dtype == np.uint16:
130-
return switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_u16')
133+
return _switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_u16')
131134
elif x_dtype == np.uint32:
132-
return switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_u32')
135+
return _switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_u32')
133136
elif x_dtype == np.uint64:
134-
return switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_u64')
137+
return _switch_mod_with_y(y_dtype, mod, f'{DOWNSAMPLE_F}_u64')
135138
# BOOLS
136139
# TODO: support bools
137140
# elif data_dtype == np.bool:
@@ -140,14 +143,25 @@ def _switch_mod_with_x_and_y(x_dtype: np.dtype, y_dtype: np.dtype, mod: ModuleTy
140143

141144
class RustDownsamplingInterface(DownsampleInterface):
142145

143-
def __init__(self, resampling_mod: Module) -> None:
144-
self._mod = resampling_mod
145-
if hasattr(self.mod, 'simd'):
146-
self.mod_single_core = self._mod.simd
147-
self.mod_multi_core = self._mod.simd_parallel
148-
else:
149-
self.mod_single_core = self._mod.scalar
150-
self.mod_multi_core = self._mod.scalar_parallel
146+
def __init__(self, name: str, resampling_mod: ModuleType) -> None:
147+
super().__init__(name + " [tsdownsample_rs]")
148+
self.rust_mod = resampling_mod
149+
150+
# Store the single core sub module
151+
self.mod_single_core = self.rust_mod.scalar
152+
if hasattr(self.rust_mod, "simd"):
153+
# use SIMD implementation if available
154+
self.mod_single_core = self.rust_mod.simd
155+
156+
# Store the multi-core sub module (if present)
157+
self.mod_multi_core = None # no multi-core implementation (default)
158+
if hasattr(self.rust_mod, "simd_parallel"):
159+
# use SIMD implementation if available
160+
self.mod_multi_core = self.rust_mod.simd_parallel
161+
elif hasattr(self.rust_mod, "scalar_parallel"):
162+
# use scalar implementation if available (when no SIMD available)
163+
self.mod_multi_core = self.rust_mod.scalar_parallel
164+
151165

152166
def _downsample_without_x(self, s: pd.Series, n_out: int) -> pd.Series:
153167
downsample_method = _switch_mod_with_y(s.dtype, self.mod_single_core)
@@ -170,7 +184,10 @@ def _downsample_with_x_parallel(self, s: pd.Series, n_out: int) -> pd.Series:
170184
return self._construct_output_series(s, idxs)
171185

172186
def downsample(self, s: pd.Series, n_out: int, parallel: bool = False) -> pd.Series:
173-
if s.index.freq is None: # TODO: or the other way around??
187+
fixed_sr = False
188+
if isinstance(s.index, pd.RangeIndex) or s.index.freq is not None:
189+
fixed_sr = True
190+
if fixed_sr: # TODO: or the other way around??
174191
if parallel:
175192
return self._downsample_without_x_parallel(s, n_out)
176193
else:
@@ -183,10 +200,39 @@ def downsample(self, s: pd.Series, n_out: int, parallel: bool = False) -> pd.Ser
183200

184201
# ------------------ Numpy Downsample Interface ------------------
185202

186-
class NumpyDownsamplingInterface():
203+
class FuncDownsamplingInterface(DownsampleInterface):
187204

188-
def __init__(self, resampling_func: Callable) -> None:
189-
self._func = resampling_func
205+
def __init__(self, name: str, downsample_func: Callable) -> None:
206+
super().__init__("[Func]_" + name)
207+
self.downsample_func = downsample_func
190208

191209
def downsample(self, s: pd.Series, n_out: int, parallel: bool = False) -> pd.Series:
192-
210+
if isinstance(s.index, pd.DatetimeIndex):
211+
t_start, t_end = s.index[:: len(s) - 1]
212+
rate = (t_end - t_start) / n_out
213+
return s.resample(rate).apply(self.downsample_func).dropna()
214+
215+
# no time index -> use the every nth heuristic
216+
group_size = max(1, np.ceil(len(s) / n_out))
217+
s_out = (
218+
s.groupby(
219+
# create an array of [0, 0, 0, ...., n_out, n_out]
220+
# where each value is repeated based $len(s)/n_out$ times
221+
by=np.repeat(np.arange(n_out), group_size)[: len(s)]
222+
)
223+
.agg(self.downsample_func)
224+
.dropna()
225+
)
226+
# Create an index-estimation for real-time data
227+
# Add one to the index so it's pointed at the end of the window
228+
# Note: this can be adjusted to .5 to center the data
229+
# Multiply it with the group size to get the real index-position
230+
# TODO: add option to select start / middle / end as index
231+
idx_locs = (np.arange(len(s_out)) + 1) * group_size
232+
idx_locs[-1] = len(s) - 1
233+
return pd.Series(
234+
index=s.iloc[idx_locs.astype(s.index.dtype)].index.astype(s.index.dtype),
235+
data=s_out.values,
236+
name=str(s.name),
237+
copy=False,
238+
)

0 commit comments

Comments
 (0)