Skip to content

Commit fe0cce6

Browse files
authored
Add support for handling OOD messages in various Flow nodes (#141)
1 parent e7d56d5 commit fe0cce6

File tree

12 files changed

+123
-45
lines changed

12 files changed

+123
-45
lines changed

src/Flow/AbstractNode.hh

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,18 +105,21 @@ protected:
105105
*/
106106
template<class T>
107107
bool getData(Link* l, DataPtr<T>& d) {
108-
if (l != 0) {
109-
if (l->isDataAvailable())
110-
return l->getData(d);
111-
else {
112-
if (l->getFromNode()->work(l->getFromPort()))
113-
return l->getData(d);
114-
else {
115-
error("Node '%s' could not generate any output.",
116-
l->getFromNode()->name().c_str());
117-
}
118-
}
108+
if (l == 0) {
109+
d.reset();
110+
return false;
119111
}
112+
if (l->isDataAvailable()) {
113+
return l->getData(d);
114+
}
115+
if (l->getFromNode()->work(l->getFromPort())) {
116+
return l->getData(d);
117+
}
118+
l->getData(d);
119+
if (d == Flow::Data::ood()) {
120+
return false;
121+
}
122+
error("Node '%s' could not generate any output.", l->getFromNode()->name().c_str());
120123
d.reset();
121124
return false;
122125
}

src/Flow/Cutter.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ void CutterNode::setId(const std::string& id) {
5151
void CutterNode::fillCache() {
5252
DataPtr<Data> d;
5353
getData(0, d);
54-
while (d && d.get() != Data::eos()) {
54+
while (d && Data::isNotSentinel(d.get())) {
5555
featureSequence_.push_back(d);
5656
getData(0, d);
5757
}

src/Flow/Synchronization.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ bool Synchronization::work(const Timestamp& time, DataPointer& dataPointer) {
2222
Time startTime = time.startTime();
2323
do {
2424
if (!nextData(in)) {
25+
if (in == Flow::Data::ood()) {
26+
dataPointer = in;
27+
return false;
28+
}
29+
2530
lastError_ = Core::form("Input stream ended before the start-time %f.", startTime);
2631
return false;
2732
}
@@ -38,7 +43,7 @@ bool Synchronization::work(const Timestamp& time, DataPointer& dataPointer) {
3843

3944
bool TimestampCopy::work(const Timestamp& time, DataPointer& dataPointer) {
4045
if (!nextData(dataPointer)) {
41-
lastError_ = "input stream endet before target stream";
46+
lastError_ = "input stream ended before target stream";
4247
return false;
4348
}
4449
dataPointer->setTimestamp(time);

src/Flow/Synchronization.hh

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ private:
116116
bool firstData_;
117117
Time previousStartTime_;
118118

119+
// Placeholder for latest sync time in case we were able to fetch a sync time
120+
// but the data port reported OOD. We re-use the same sync time but re-fetch the data.
121+
DataPtr<Timestamp> interpolationTime_;
122+
119123
private:
120124
void reset();
121125

@@ -168,13 +172,18 @@ public:
168172
return Algorithm::name();
169173
}
170174
WeakSynchronizationNode(const Core::Configuration& c)
171-
: Core::Component(c), Precursor(c) {}
175+
: Core::Component(c),
176+
Precursor(c) {}
172177
virtual ~WeakSynchronizationNode() {}
173178
};
174179

175180
template<class Algorithm>
176181
SynchronizationNode<Algorithm>::SynchronizationNode(const Core::Configuration& c)
177-
: Core::Component(c), Node(c), firstData_(true), previousStartTime_(Core::Type<Time>::min) {
182+
: Core::Component(c),
183+
Node(c),
184+
firstData_(true),
185+
previousStartTime_(Core::Type<Time>::min),
186+
interpolationTime_() {
178187
ignoreErrors_ = paramSynchronizationIgnoreErrors(c);
179188

180189
addInputs(2);
@@ -208,35 +217,51 @@ bool SynchronizationNode<Algorithm>::configure() {
208217

209218
template<class Algorithm>
210219
bool SynchronizationNode<Algorithm>::work(Flow::PortId p) {
211-
DataPtr<Timestamp> interpolationTime;
212-
if (!getData(1, interpolationTime)) {
213-
putData(0, interpolationTime.get());
214-
putData(1, interpolationTime.get());
215-
return true;
220+
if (!interpolationTime_) {
221+
if (!getData(1, interpolationTime_)) {
222+
verify(Flow::Data::isSentinel(interpolationTime_.get()));
223+
putData(0, interpolationTime_.get());
224+
putData(1, interpolationTime_.get());
225+
interpolationTime_.reset();
226+
return interpolationTime_.get() != Flow::Data::ood();
227+
}
216228
}
217229

218230
DataPointer out;
219-
if (Algorithm::work(*interpolationTime, out)) {
231+
if (Algorithm::work(*interpolationTime_, out)) {
220232
verify((bool)out);
221233
putData(0, out.get());
222-
putData(1, interpolationTime.get());
234+
putData(1, interpolationTime_.get());
235+
interpolationTime_.reset();
223236
return true;
224237
}
225238

226-
if (!ignoreErrors_)
239+
verify(Flow::Data::isSentinel(out.get()));
240+
if (out == Flow::Data::ood()) {
241+
// forward OOD
242+
putData(0, out.get());
243+
putData(1, out.get());
244+
return false;
245+
}
246+
247+
if (!ignoreErrors_) {
227248
this->criticalError("%s", this->lastError().c_str());
249+
}
250+
228251
// Synchronization failed, typically at the end of the segment, so put eos
229252
putData(0, Flow::Data::eos());
230-
putData(1, interpolationTime.get());
253+
putData(1, interpolationTime_.get());
254+
interpolationTime_.reset();
231255
return true;
232256
}
233257

234258
template<class Algorithm>
235259
bool SynchronizationNode<Algorithm>::nextData(DataPointer& dataPointer) {
236260
bool result = Node::getData(0, dataPointer);
237261
if (result) {
238-
if (!dataPointer)
262+
if (!dataPointer) {
239263
criticalError("Input is null.");
264+
}
240265

241266
if (!firstData_ &&
242267
!Core::isSignificantlyLess(previousStartTime_, dataPointer->startTime(), timeTolerance)) {
@@ -247,9 +272,11 @@ bool SynchronizationNode<Algorithm>::nextData(DataPointer& dataPointer) {
247272
firstData_ = false;
248273
previousStartTime_ = dataPointer->startTime();
249274
}
250-
else {
251-
if (firstData_)
252-
warning("Input stream is empty.");
275+
else if (dataPointer == Flow::Data::ood()) {
276+
return result;
277+
}
278+
else if (firstData_) {
279+
warning("Input stream is empty.");
253280
}
254281
return result;
255282
}
@@ -267,6 +294,7 @@ void SynchronizationNode<Algorithm>::reset() {
267294
Algorithm::reset();
268295
firstData_ = true;
269296
previousStartTime_ = Core::Type<Time>::min;
297+
interpolationTime_.reset();
270298
}
271299

272300
} // namespace Flow

src/Flow/VectorDemultiplex.hh

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ public:
4949
}
5050

5151
VectorDemultiplexNode(const Core::Configuration& c)
52-
: Core::Component(c), SleeveNode(c), nTracks_(1) {
52+
: Core::Component(c),
53+
SleeveNode(c),
54+
nTracks_(1) {
5355
setTrack(parameterTrack(c));
5456
}
5557
virtual ~VectorDemultiplexNode() {}
@@ -76,8 +78,12 @@ public:
7678
Flow::DataPtr<Flow::Vector<T>> in;
7779

7880
if (!getData(0, in)) {
79-
if (in == Data::eos())
81+
if (in == Data::eos()) {
8082
offset_ = track_;
83+
}
84+
else if (in == Data::ood()) {
85+
return putOod(p);
86+
}
8187
return putData(0, in.get());
8288
}
8389

src/Flow/WarpTimeFilter.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,10 @@ bool WarpTimeFilterNode::work(PortId p) {
6363
return putData(0, in.get());
6464
}
6565

66-
if (in.get() == Flow::Data::eos() && warping_.size()) {
66+
if (in.get() == Flow::Data::ood()) {
67+
return putOod(p);
68+
}
69+
else if (in.get() == Flow::Data::eos() && warping_.size()) {
6770
Core::Component::Message msg = log();
6871
msg << "warping map:";
6972
for (std::vector<std::pair<Time, Time>>::iterator it = warping_.begin(); it != warping_.end(); ++it)

src/Signal/Delay.cc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,14 +141,19 @@ bool DelayNode::work(Flow::PortId port) {
141141
Flow::DataPtr<Flow::Data> i;
142142
while (getData(0, i)) {
143143
add(i);
144-
if (marginCondition_->isSatisfied())
144+
if (marginCondition_->isSatisfied()) {
145145
return putData();
146+
}
147+
}
148+
if (i == Flow::Data::ood()) {
149+
return putData(i);
146150
}
147151
if (i == Flow::Data::eos()) {
148152
while (!empty()) {
149153
flush();
150-
if (marginCondition_->isSatisfied())
154+
if (marginCondition_->isSatisfied()) {
151155
return putData();
156+
}
152157
}
153158
}
154159
return putData(i);

src/Signal/GammaTone.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,12 @@ void GammaToneNode::init() {
300300
bool GammaToneNode::work(Flow::PortId p) {
301301
Flow::DataPtr<Flow::Vector<f32>> in;
302302
if (!getData(0, in)) {
303-
if (in == Flow::Data::eos())
303+
if (in == Flow::Data::eos()) {
304304
reset();
305+
}
306+
else if (in == Flow::Data::ood()) {
307+
return putOod(p);
308+
}
305309
return putData(0, in.get());
306310
}
307311

src/Signal/Normalization.cc

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ void MeanNormalization::updateStatistics(const Frame& add, const Frame& remove)
131131
void MeanNormalization::finalize() {
132132
if (sumWeight_ > 0) {
133133
std::transform(sum_.begin(), sum_.end(), mean_.begin(),
134-
std::bind2nd(std::divides<Sum>(), sumWeight_));
134+
std::bind(std::divides<Sum>(), std::placeholders::_1, sumWeight_));
135135
}
136136
Precursor::finalize();
137137
}
@@ -263,7 +263,11 @@ void DivideByMean::apply(Frame& out) {
263263

264264
//===================================================================================================
265265
MeanNormNormalization::MeanNormNormalization(const Core::Configuration& c, f32 norm)
266-
: Core::Component(c), sumOfNorms_(0), averageNorm_(0), norm_(norm), normFunction_(c) {
266+
: Core::Component(c),
267+
sumOfNorms_(0),
268+
averageNorm_(0),
269+
norm_(norm),
270+
normFunction_(c) {
267271
}
268272

269273
void MeanNormNormalization::reset() {
@@ -288,7 +292,7 @@ void MeanNormNormalization::finalize() {
288292

289293
void MeanNormNormalization::apply(Frame& out) {
290294
std::transform(out->begin(), out->end(), out->begin(),
291-
std::bind2nd(std::divides<Value>(), averageNorm_));
295+
std::bind(std::divides<Value>(), std::placeholders::_1, averageNorm_));
292296
}
293297

294298
//===================================================================================================
@@ -413,15 +417,20 @@ bool NormalizationNode::work(Flow::PortId p) {
413417
Normalization::Frame in, out;
414418

415419
while (getData(0, in)) {
416-
if (update(in, out))
420+
if (update(in, out)) {
417421
return putData(0, out.get());
422+
}
418423
}
419424

420425
// in is invalid
421426
if (in == Flow::Data::eos()) {
422-
while (flush(out))
427+
while (flush(out)) {
423428
putData(0, out.get());
429+
}
424430
reset();
425431
}
432+
else if (in == Flow::Data::ood()) {
433+
return putOod(p);
434+
}
426435
return putData(0, in.get());
427436
}

src/Signal/Preemphasis.cc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ void Preemphasis::apply(Flow::Vector<f32>& v) {
7979
Core::ParameterFloat Signal::PreemphasisNode::paramAlpha("alpha", "preemphasis weight", 1);
8080

8181
PreemphasisNode::PreemphasisNode(const Core::Configuration& c)
82-
: Core::Component(c), SleeveNode(c) {
82+
: Core::Component(c),
83+
SleeveNode(c) {
8384
setAlpha(paramAlpha(c));
8485
}
8586

@@ -107,8 +108,12 @@ bool PreemphasisNode::configure() {
107108
bool PreemphasisNode::work(Flow::PortId p) {
108109
Flow::DataPtr<Flow::Vector<f32>> in;
109110
if (!getData(0, in)) {
110-
if (in == Flow::Data::eos())
111+
if (in == Flow::Data::eos()) {
111112
Preemphasis::reset();
113+
}
114+
else if (in == Flow::Data::ood()) {
115+
return putOod(p);
116+
}
112117
return putData(0, in.get());
113118
}
114119

0 commit comments

Comments
 (0)