Skip to content

Commit 038fef0

Browse files
authored
Merge pull request #119 from tstenner/resetpostproc
Reset smoothing parameters after disabling postprocessing
2 parents ddfa301 + 80b07ae commit 038fef0

File tree

6 files changed

+222
-63
lines changed

6 files changed

+222
-63
lines changed

src/stream_inlet_impl.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,11 @@ class stream_inlet_impl {
305305
std::size_t samples_available() { return data_receiver_.samples_available(); }
306306

307307
/// Flush the queue, return the number of dropped samples
308-
uint32_t flush() { return data_receiver_.flush(); }
308+
uint32_t flush() {
309+
int nskipped = data_receiver_.flush();
310+
postprocessor_.skip_samples(nskipped);
311+
return nskipped;
312+
}
309313

310314
/** Query whether the clock was potentially reset since the last call to was_clock_reset().
311315
*

src/time_postprocessor.cpp

Lines changed: 76 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,39 @@
33

44
#include <cmath>
55

6+
#pragma GCC diagnostic ignored "-Wdouble-promotion"
7+
#pragma GCC diagnostic ignored "-Wimplicit-int-float-conversion"
8+
69
// === implementation of the time_postprocessor class ===
710

811
using namespace lsl;
912

13+
/// how many samples have to be seen between clocksyncs?
14+
const uint8_t samples_between_clocksyncs = 50;
15+
1016
time_postprocessor::time_postprocessor(const postproc_callback_t &query_correction,
1117
const postproc_callback_t &query_srate, const reset_callback_t &query_reset)
12-
: samples_seen_(0.0), query_srate_(query_srate), options_(proc_none),
13-
halftime_(api_config::get_instance()->smoothing_halftime()),
18+
: samples_since_last_clocksync(samples_between_clocksyncs), query_srate_(query_srate),
19+
options_(proc_none), halftime_(api_config::get_instance()->smoothing_halftime()),
1420
query_correction_(query_correction), query_reset_(query_reset), next_query_time_(0.0),
15-
last_offset_(0.0), smoothing_initialized_(false),
16-
last_value_(-std::numeric_limits<double>::infinity()) {}
21+
last_offset_(0.0), last_value_(std::numeric_limits<double>::lowest()) {}
22+
23+
void time_postprocessor::set_options(uint32_t options)
24+
{
25+
// bitmask which options actually changed (XOR)
26+
auto changed = options_ ^ options;
27+
28+
// dejitter option changed? -> Reset it
29+
// in case it got enabled, it'll be initialized with the correct t0 when
30+
// the next sample comes in
31+
if(changed & proc_dejitter)
32+
dejitter = postproc_dejitterer();
33+
34+
if(changed & proc_monotonize)
35+
last_value_ = std::numeric_limits<double>::lowest();
36+
37+
options_ = options;
38+
}
1739

1840
double time_postprocessor::process_timestamp(double value) {
1941
if (options_ & proc_threadsafe) {
@@ -23,19 +45,27 @@ double time_postprocessor::process_timestamp(double value) {
2345
return process_internal(value);
2446
}
2547

48+
void time_postprocessor::skip_samples(uint32_t skipped_samples) {
49+
if (options_ & proc_dejitter && dejitter.smoothing_applicable())
50+
dejitter.samples_since_t0_ += skipped_samples;
51+
}
52+
2653
double time_postprocessor::process_internal(double value) {
2754
// --- clock synchronization ---
2855
if (options_ & proc_clocksync) {
2956
// update last correction value if needed (we do this every 50 samples and at most twice per
3057
// second)
31-
if ((fmod(samples_seen_, 50.0) == 0.0) && lsl_clock() > next_query_time_) {
58+
if (++samples_since_last_clocksync > samples_between_clocksyncs &&
59+
lsl_clock() > next_query_time_) {
3260
last_offset_ = query_correction_();
61+
samples_since_last_clocksync = 0;
3362
if (query_reset_()) {
3463
// reset state to unitialized
3564
last_offset_ = query_correction_();
36-
last_value_ = -std::numeric_limits<double>::infinity();
37-
samples_seen_ = 0;
38-
smoothing_initialized_ = false;
65+
last_value_ = std::numeric_limits<double>::lowest();
66+
// reset the dejitterer to an uninitialized state so it's
67+
// initialized on the next use
68+
dejitter = postproc_dejitterer();
3969
}
4070
next_query_time_ = lsl_clock() + 0.5;
4171
}
@@ -48,51 +78,52 @@ double time_postprocessor::process_internal(double value) {
4878
// --- jitter removal ---
4979
if (options_ & proc_dejitter) {
5080
// initialize the smoothing state if not yet done so
51-
if (!smoothing_initialized_) {
81+
if (!dejitter.is_initialized()) {
5282
double srate = query_srate_();
53-
smoothing_applicable_ = (srate > 0.0);
54-
if (smoothing_applicable_) {
55-
// linear regression model coefficients (intercept, slope)
56-
w0_ = 0.0;
57-
w1_ = 1.0 / srate;
58-
// forget factor lambda in RLS calculation & its inverse
59-
lam_ = pow(2.0, -1.0 / (srate * halftime_));
60-
il_ = 1.0 / lam_;
61-
// inverse autocovariance matrix of predictors u
62-
P00_ = P11_ = 1e10;
63-
P01_ = P10_ = 0.0;
64-
// numeric baseline
65-
baseline_value_ = value;
66-
}
67-
smoothing_initialized_ = true;
68-
}
69-
if (smoothing_applicable_) {
70-
value -= baseline_value_;
71-
72-
// RLS update
73-
double u1 = samples_seen_; // u = np.matrix([[1.0], [samples_seen]])
74-
double pi0 = P00_ + u1 * P10_; // pi = u.T * P
75-
double pi1 = P01_ + u1 * P11_; // ... (ct'd)
76-
double al = value - w0_ - w1_ * u1; // al = value - w.T * u (prediction error)
77-
double gam = lam_ + pi0 + pi1 * u1; // gam = lam + pi * u
78-
P00_ = il_ * (P00_ - ((pi0 * pi0) / gam)); // Pp = k * pi; P = il * (P - Pp)
79-
P01_ = il_ * (P01_ - ((pi0 * pi1) / gam)); // ...
80-
P10_ = il_ * (P10_ - ((pi1 * pi0) / gam)); // ...
81-
P11_ = il_ * (P11_ - ((pi1 * pi1) / gam)); // ...
82-
w0_ += al * (P00_ + P10_ * u1); // w = w + k*al
83-
w1_ += al * (P01_ + P11_ * u1); // ...
84-
value = w0_ + w1_ * u1; // value = float(w.T * u)
85-
86-
value += baseline_value_;
83+
dejitter = postproc_dejitterer(value, srate, halftime_);
8784
}
85+
value = dejitter.dejitter(value);
8886
}
8987

9088
// --- force monotonic timestamps ---
9189
if (options_ & proc_monotonize) {
9290
if (value < last_value_) value = last_value_;
91+
else
92+
last_value_ = value;
9393
}
9494

95-
samples_seen_ += 1.0;
96-
last_value_ = value;
9795
return value;
9896
}
97+
98+
postproc_dejitterer::postproc_dejitterer(double t0, double srate, double halftime)
99+
: t0_(static_cast<uint_fast32_t>(t0)) {
100+
if (srate > 0) {
101+
w1_ = 1. / srate;
102+
lam_ = pow(2, -1 / (srate * halftime));
103+
}
104+
}
105+
106+
double postproc_dejitterer::dejitter(double t) noexcept {
107+
if (!smoothing_applicable()) return t;
108+
109+
// remove baseline for better numerical accuracy
110+
t -= t0_;
111+
112+
// RLS update
113+
const double u1 = samples_since_t0_++, // u = np.matrix([[1.0], [samples_seen]])
114+
pi0 = P00_ + u1 * P01_, // pi = u.T * P
115+
pi1 = P01_ + u1 * P11_, // ..
116+
al = t - (w0_ + u1 * w1_), // α = t - w.T * u # prediction error
117+
g_inv = 1 / (lam_ + pi0 + pi1 * u1), // g_inv = 1/(lam_ + pi * u)
118+
il_ = 1 / lam_; // ...
119+
P00_ = il_ * (P00_ - pi0 * pi0 * g_inv); // P = (P - k*pi) / lam_
120+
P01_ = il_ * (P01_ - pi0 * pi1 * g_inv); // ...
121+
P11_ = il_ * (P11_ - pi1 * pi1 * g_inv); // ...
122+
w0_ += al * (P00_ + P01_ * u1); // w += k*α
123+
w1_ += al * (P01_ + P11_ * u1); // ...
124+
return w0_ + u1 * w1_ + t0_; // t = float(w.T * u) + t0
125+
}
126+
127+
void postproc_dejitterer::skip_samples(uint_fast32_t skipped_samples) noexcept {
128+
samples_since_t0_ += skipped_samples;
129+
}

src/time_postprocessor.h

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,36 @@
55
#include <functional>
66
#include <mutex>
77

8-
98
namespace lsl {
109

1110
/// A callback function that allows the post-processor to query state from other objects if needed
1211
using postproc_callback_t = std::function<double()>;
1312
using reset_callback_t = std::function<bool()>;
1413

14+
/// Dejitter / smooth timestamps with a first order recursive least squares filter (RLS).
15+
struct postproc_dejitterer {
16+
/// first observed time-stamp value, used as a baseline to improve numerics
17+
uint_fast32_t t0_;
18+
/// number of samples since t0, i.e. the x in y=ax+b
19+
uint_fast32_t samples_since_t0_{0};
20+
/// linear regression model coefficients (intercept, slope)
21+
double w0_{0}, w1_{0};
22+
/// inverse covariance matrix elements (P00, P11, off diagonal element P01=P10)
23+
double P00_{1e10}, P11_{1e10}, P01_{0};
24+
/// forget factor lambda in RLS calculation
25+
double lam_{0};
26+
27+
/// constructor
28+
postproc_dejitterer(double t0 = 0, double srate = 0, double halftime = 0);
29+
30+
/// dejitter a timestamp and update RLS parameters
31+
double dejitter(double t) noexcept;
32+
33+
/// adjust RLS parameters to account for samples not seen
34+
void skip_samples(uint_fast32_t skipped_samples) noexcept;
35+
bool is_initialized() const noexcept { return t0_ != 0; }
36+
bool smoothing_applicable() const noexcept { return lam_ > 0; }
37+
};
1538

1639
/// Internal class of an inlet that is responsible for post-processing time stamps.
1740
class time_postprocessor {
@@ -30,20 +53,23 @@ class time_postprocessor {
3053
* processing_options_t together (e.g., proc_clocksync|proc_dejitter); the default is to enable
3154
* all options.
3255
*/
33-
void set_options(uint32_t options = proc_ALL) { options_ = options; }
56+
void set_options(uint32_t options = proc_ALL);
3457

3558
/// Post-process the given time stamp and return the new time-stamp.
3659
double process_timestamp(double value);
3760

3861
/// Override the half-time (forget factor) of the time-stamp smoothing.
3962
void smoothing_halftime(float value) { halftime_ = value; }
4063

64+
/// Inform the post processor some samples were skipped
65+
void skip_samples(uint32_t skipped_samples);
66+
4167
private:
4268
/// Internal function to process a time stamp.
4369
double process_internal(double value);
4470

45-
/// number of samples seen so far
46-
double samples_seen_;
71+
/// number of samples seen since last clocksync
72+
uint8_t samples_since_last_clocksync;
4773

4874
// configuration parameters
4975
/// a callback function that returns the current nominal sampling rate
@@ -63,19 +89,7 @@ class time_postprocessor {
6389
/// last queried correction offset
6490
double last_offset_;
6591

66-
// runtime parameters for smoothing
67-
/// first observed time-stamp value, used as a baseline to improve numerics
68-
double baseline_value_;
69-
/// linear regression model coefficients
70-
double w0_, w1_;
71-
/// inverse covariance matrix
72-
double P00_, P01_, P10_, P11_;
73-
/// forget factor lambda in RLS calculation & its inverse
74-
double lam_, il_;
75-
/// whether smoothing can be applied to these data (false if irregular srate)
76-
bool smoothing_applicable_;
77-
/// whether the smoothing parameters have been initialized already
78-
bool smoothing_initialized_;
92+
postproc_dejitterer dejitter;
7993

8094
// runtime parameters for monotonize
8195
/// last observed time-stamp value, to force monotonically increasing stamps

testing/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ add_executable(lsl_test_internal
3535
test_int_stringfuncs.cpp
3636
test_int_streaminfo.cpp
3737
test_int_samples.cpp
38+
internal/postproc.cpp
3839
internal/serialization_v100.cpp
3940
)
4041
target_link_libraries(lsl_test_internal PRIVATE lslobj lslboost catch_main)

testing/internal/postproc.cpp

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#include "time_postprocessor.h"
2+
#include <catch2/catch.hpp>
3+
#include <random>
4+
#include <thread>
5+
6+
template <std::size_t N>
7+
inline void test_postproc_array(
8+
lsl::time_postprocessor &pp, const double (&in)[N], const double (&expected)[N]) {
9+
for (std::size_t i = 0; i < N; ++i) CHECK(pp.process_timestamp(in[i]) == Approx(expected[i]));
10+
}
11+
12+
TEST_CASE("postprocessing", "[basic]") {
13+
double time_offset = -50.0, srate = 1.;
14+
bool was_reset = false;
15+
lsl::time_postprocessor pp(
16+
[&]() {
17+
UNSCOPED_INFO("time_offset " << time_offset);
18+
return time_offset;
19+
},
20+
[&]() { return srate; }, [&]() { return was_reset; });
21+
22+
double nopostproc[] = {2, 3.1, 3, 5, 5.9, 7.1};
23+
24+
{
25+
INFO("proc_clocksync")
26+
pp.set_options(proc_clocksync);
27+
for (double t : nopostproc) CHECK(pp.process_timestamp(t) == Approx(t + time_offset));
28+
}
29+
30+
{
31+
INFO("proc_clocksync + clock_reset")
32+
pp.set_options(proc_clocksync);
33+
// std::this_thread::sleep_for(std::chrono::milliseconds(600));
34+
was_reset = true;
35+
time_offset = -200;
36+
// for (double t : nopostproc) CHECK(pp.process_timestamp(t) == Approx(t + time_offset));
37+
}
38+
39+
{
40+
INFO("proc_monotonize")
41+
double monotonized[] = {2, 3.1, 3.1, 5, 5.9, 7.1};
42+
pp.set_options(proc_monotonize);
43+
test_postproc_array(pp, nopostproc, monotonized);
44+
}
45+
46+
{
47+
INFO("reset with proc_none")
48+
pp.set_options(proc_none);
49+
test_postproc_array(pp, nopostproc, nopostproc);
50+
}
51+
}
52+
53+
TEST_CASE("rls_smoothing", "[basic]") {
54+
const int n = 100000, warmup_samples = 1000;
55+
const double t0 = 5000, latency = 0.05, srate = 100., halftime = 90;
56+
lsl::postproc_dejitterer pp(t0, srate, halftime);
57+
58+
std::default_random_engine rng;
59+
std::normal_distribution<double> jitter(latency, .005);
60+
61+
pp.dejitter(t0);
62+
int n_outlier = 0;
63+
64+
for (int i = 0; i < n; ++i) {
65+
const double t = t0 + i / srate, e = jitter(rng), dejittered = pp.dejitter(t + e),
66+
est_error = dejittered - t - latency;
67+
if (i > warmup_samples && fabs(est_error) > std::max(e, .001)) n_outlier++;
68+
}
69+
LOG_F(INFO, "\nP:\t%f\t%f\n\t%f\t%f\nw:\t%f\t%f", pp.P00_, pp.P01_, pp.P01_, pp.P11_, pp.w0_,
70+
pp.w1_);
71+
CHECK(n_outlier < n / 100);
72+
73+
CHECK(pp.dejitter(t0 + latency + n / srate) == Approx(t0 + latency + n / srate));
74+
CHECK(fabs(pp.w0_ - latency) < .1);
75+
CHECK(fabs(pp.w1_ - 1 / srate) < 1e-6);
76+
}

testing/test_ext_DataType.cpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <catch2/catch.hpp>
44
#include <cstdint>
55
#include <lsl_cpp.h>
6+
#include <thread>
67

78
TEMPLATE_TEST_CASE(
89
"datatransfer", "[datatransfer][basic]", char, int16_t, int32_t, int64_t, float, double) {
@@ -69,3 +70,35 @@ TEST_CASE("TypeConversion", "[datatransfer][types][basic]") {
6970
CHECK(result == val);
7071
}
7172
}
73+
74+
TEST_CASE("Flush", "[datatransfer][basic]") {
75+
Streampair sp{create_streampair(
76+
lsl::stream_info("FlushTest", "flush", 1, 1, lsl::cf_double64, "FlushTest"))};
77+
sp.in_.set_postprocessing(lsl::post_dejitter);
78+
79+
const int n=20;
80+
81+
std::thread pusher([&](){
82+
double data = lsl::local_clock();
83+
for(int i=0; i<n; ++i) {
84+
sp.out_.push_sample(&data, data, true);
85+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
86+
data+=.1;
87+
}
88+
});
89+
90+
double data_in, ts_in;
91+
ts_in = sp.in_.pull_sample(&data_in, 1.);
92+
REQUIRE(ts_in == Approx(data_in));
93+
std::this_thread::sleep_for(std::chrono::milliseconds(700));
94+
int pulled = sp.in_.flush() + 1;
95+
96+
for(; pulled < n; ++pulled) {
97+
INFO(pulled);
98+
ts_in = sp.in_.pull_sample(&data_in, 1.);
99+
REQUIRE(ts_in == Approx(data_in));
100+
}
101+
102+
pusher.join();
103+
//sp.in_.set_postprocessing(lsl::post_none);
104+
}

0 commit comments

Comments
 (0)