Skip to content

Commit 4e9b9cd

Browse files
committed
TimingSteam rewrite
* Emit exact messages recieved rather than splicing final results. * Also resolves issues where speaker_labels could be emitted before matching final result
1 parent a4c6263 commit 4e9b9cd

File tree

3 files changed

+1524
-188
lines changed

3 files changed

+1524
-188
lines changed

speech-to-text/timing-stream.js

Lines changed: 77 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
var Duplex = require('stream').Duplex;
44
var util = require('util');
5-
var clone = require('clone');
65
var defaults = require('defaults');
76
var noTimestamps = require('./no-timestamps');
87

@@ -15,7 +14,7 @@ var noTimestamps = require('./no-timestamps');
1514
* @todo: fix TimingStream to work with the output of the SpeakerStream
1615
*
1716
* @param {Object} [opts]
18-
* @param {*} [opts.emitAtt=TimingStream.START] - set to TimingStream.END to only emit text that has been completely spoken.
17+
* @param {*} [opts.emitAt=TimingStream.START] - set to TimingStream.END to only emit text that has been completely spoken.
1918
* @param {Number} [opts.delay=0] - Additional delay (in seconds) to apply before emitting words, useful for precise syncing to audio tracks. May be negative
2019
* @constructor
2120
*/
@@ -29,12 +28,14 @@ function TimingStream(opts) {
2928
Duplex.call(this, opts);
3029

3130
this.startTime = Date.now();
32-
// buffer to store future results
33-
this.final = [];
34-
this.interim = [];
35-
this.speakerLabels = [];
31+
32+
// queue to store future messages
33+
this.messages = [];
34+
35+
// setTimeout handle. if null, next tick will occur whenever new data arrives
3636
this.nextTick = null;
37-
this.nextSpeakerLabelsTick = null;
37+
38+
// this stream cannot end until both the messages queue is empty and the source stream has ended
3839
this.sourceEnded = false;
3940

4041
var self = this;
@@ -48,15 +49,21 @@ util.inherits(TimingStream, Duplex);
4849
TimingStream.START = 1;
4950
TimingStream.END = 2;
5051

51-
TimingStream.prototype._write = function(data, encoding, next) {
52-
if (data instanceof Buffer) {
52+
TimingStream.prototype._write = function(msg, encoding, next) {
53+
if (msg instanceof Buffer) {
5354
return this.emit('error', new Error('TimingStream requires the source to be in objectMode'));
5455
}
55-
if (Array.isArray(data.results) && data.results.length) {
56-
this.handleResult(data);
56+
if (Array.isArray(msg.results) && msg.results.length && noTimestamps(msg)) {
57+
var err = new Error('TimingStream requires timestamps');
58+
err.name = noTimestamps.ERROR_NO_TIMESTAMPS;
59+
this.emit('error', err);
60+
return;
5761
}
58-
if (Array.isArray(data.speaker_labels) && data.speaker_labels.length) {
59-
this.handleSpeakerLabels(data);
62+
63+
this.messages.push(msg);
64+
65+
if (!this.nextTick) {
66+
this.scheduleNextTick();
6067
}
6168
next();
6269
};
@@ -69,108 +76,73 @@ TimingStream.prototype.cutoff = function cutoff() {
6976
return (Date.now() - this.startTime) / 1000 - this.options.delay;
7077
};
7178

72-
TimingStream.prototype.withinRange = function(result, cutoff) {
73-
return result.results[0].alternatives.some(function(alt) {
74-
// timestamp structure is ["word", startTime, endTime]
75-
// if the first timestamp ends before the cutoff, then it's at least partially within range
76-
var timestamp = alt.timestamps[0];
77-
return !!timestamp && timestamp[this.options.emitAt] <= cutoff;
78-
}, this);
79-
};
80-
81-
TimingStream.prototype.completelyWithinRange = function(result, cutoff) {
82-
return result.results[0].alternatives.every(function(alt) {
83-
// timestamp structure is ["word", startTime, endTime]
84-
// if the last timestamp ends before the cutoff, then it's completely within range
85-
var timestamp = alt.timestamps[alt.timestamps.length - 1];
86-
return timestamp[this.options.emitAt] <= cutoff;
87-
}, this);
88-
};
89-
9079
/**
91-
* Clones the given result and then crops out any words that occur later than the current cutoff
92-
* @param {Object} data
93-
* @param {Number} cutoff timestamp (in seconds)
94-
* @returns {Object}
80+
* Grabs the appropriate timestamp from the given message, depending on options.emitAt and the type of message
81+
*
82+
* @private
83+
* @param {Object} msg
84+
* @returns {Number} timestamp
9585
*/
96-
Duplex.prototype.crop = function crop(data, cutoff) {
97-
data = clone(data);
98-
var result = data.results[0];
99-
result.alternatives = result.alternatives.map(function(alt) {
100-
var timestamps = [];
101-
for (var i = 0, timestamp; i < alt.timestamps.length; i++) {
102-
timestamp = alt.timestamps[i];
103-
if (timestamp[this.options.emitAt] <= cutoff) {
104-
timestamps.push(timestamp);
105-
} else {
106-
break;
107-
}
86+
TimingStream.prototype.getMessageTime = function(msg) {
87+
if (this.options.emitAt === TimingStream.START) {
88+
if (Array.isArray(msg.results) && msg.results.length) {
89+
return msg.results[0].alternatives[0].timestamps[0][TimingStream.START];
90+
} else if (Array.isArray(msg.speaker_labels) && msg.speaker_labels.length) {
91+
return msg.speaker_labels[0].from;
10892
}
109-
alt.timestamps = timestamps;
110-
alt.transcript = timestamps.map(function(ts) {
111-
return ts[0];
112-
}).join(' ');
113-
return alt;
114-
}, this);
115-
// "final" signifies both that the text won't change, and that we're at the end of a sentence. Only one of those is true here.
116-
result.final = false;
117-
return data;
93+
} else {
94+
if (Array.isArray(msg.results) && msg.results.length) {
95+
var timestamps = msg.results[msg.results.length - 1].alternatives[0].timestamps;
96+
return timestamps[timestamps.length - 1][TimingStream.END];
97+
} else if (Array.isArray(msg.speaker_labels) && msg.speaker_labels.length) {
98+
return msg.speaker_labels[msg.speaker_labels.length - 1].to;
99+
}
100+
}
101+
return 0; // failsafe for unknown message types
118102
};
119103

120104
/**
121105
* Returns one of:
122-
* - undefined if the next result is completely later than the current cutoff
123-
* - a cropped clone of the next result if it's later than the current cutoff && in objectMode
124-
* - the original next result object (removing it from the array) if it's completely earlier than the current cutoff (or we're in string mode with emitAt set to start)
106+
* - null if the next result is completely later than the current cutoff
107+
* - the original next result object (removing it from the array) if it's completely earlier than the current cutoff
108+
* (or it's partially within range and emitAt is set to start)
125109
*
126-
* @param {Object} results
127-
* @param {Number} cutoff
128-
* @returns {Object|undefined}
110+
* @private
111+
* @returns {Object|null}
129112
*/
130-
TimingStream.prototype.getCurrentResult = function getCurrentResult(results, cutoff) {
131-
if (results.length && this.withinRange(results[0], cutoff)) {
132-
var completeResult = this.completelyWithinRange(results[0], cutoff);
133-
if (this.options.objectMode || this.options.readableObjectMode) {
134-
// object mode: emit either a complete result or a cropped result
135-
return completeResult ? results.shift() : this.crop(results[0], cutoff);
136-
} else if (completeResult || this.options.emitAt === TimingStream.START) {
137-
// string mode: emit either a complete result or nothing
138-
return results.shift();
139-
}
113+
TimingStream.prototype.getCurrentResult = function getCurrentResult() {
114+
if (!this.messages.length) {
115+
return null;
116+
}
117+
if (this.getMessageTime(this.messages[0]) <= this.cutoff()) {
118+
return this.messages.shift();
140119
}
141120
};
142121

143122

144123
/**
145124
* Tick emits any buffered words that have a timestamp before the current time, then calls scheduleNextTick()
125+
*
126+
* @private
146127
*/
147128
TimingStream.prototype.tick = function tick() {
148-
var cutoff = this.cutoff();
149-
150-
clearTimeout(this.nextTick);
151-
var result = this.getCurrentResult(this.final, cutoff);
152-
153-
if (!result) {
154-
result = this.getCurrentResult(this.interim, cutoff);
155-
}
156-
157-
if (result) {
129+
var msg;
130+
// eslint-disable-next-line no-cond-assign
131+
while (msg = this.getCurrentResult()) {
158132
if (this.options.objectMode || this.options.readableObjectMode) {
159-
this.push(result);
160-
} else {
161-
this.push(result.results[0].alternatives[0].transcript);
162-
}
163-
if (result.results[0].final) {
164-
this.nextTick = setTimeout(this.tick.bind(this), 0); // in case we are multiple results behind - don't schedule until we are out of final results that are due now
165-
return;
133+
this.push(msg);
134+
} else if (Array.isArray(msg.results && msg.results.length)) {
135+
this.push(msg.results[0].alternatives[0].transcript);
166136
}
167137
}
168138

169-
this.scheduleNextTick(cutoff);
139+
this.scheduleNextTick();
170140
};
171141

172142
/**
173143
* Given a speaker labels message, returns the final to time
144+
*
145+
* @private
174146
* @param {Object} msg
175147
* @returns {Number}
176148
*/
@@ -180,7 +152,7 @@ function getEnd(msg) {
180152

181153
TimingStream.prototype.tickSpeakerLables = function tickSpeakerLabels() {
182154
clearTimeout(this.nextSpeakerLabelsTick);
183-
while (this.speakerLabels.length && getEnd(this.speakerLabels[0]) <= this.cutoff()) {
155+
if (this.speakerLabels.length && getEnd(this.speakerLabels[0]) <= this.cutoff()) {
184156
this.push(this.speakerLabels.shift());
185157
}
186158
if (this.speakerLabels.length) {
@@ -194,31 +166,18 @@ TimingStream.prototype.tickSpeakerLables = function tickSpeakerLabels() {
194166
};
195167

196168
/**
197-
* Schedules next tick if possible. Requires previous stream to emit recognize objects (objectMode or readableObjectMode)
198-
*
199-
* triggers the 'close' and 'end' events if the buffer is empty and no further results are expected
200-
*
201-
* @param {Number} cutoff
169+
* Schedules next tick or checks for the end of the results
202170
*
171+
* @private
203172
*/
204-
TimingStream.prototype.scheduleNextTick = function scheduleNextTick(cutoff) {
205-
206-
// prefer final results over interim - when final results are added, any older interim ones are automatically deleted.
207-
var nextResult = this.final[0] || this.interim[0];
208-
if (nextResult) {
209-
// loop through the timestamps until we find one that comes after the current cutoff (there should always be one)
210-
var timestamps = nextResult.results[0].alternatives[0].timestamps;
211-
for (var i = 0; i < timestamps.length; i++) {
212-
var wordOffset = timestamps[i][this.options.emitAt];
213-
if (wordOffset > cutoff) {
214-
var nextTime = this.startTime + (wordOffset * 1000);
215-
this.nextTick = setTimeout(this.tick.bind(this), nextTime - Date.now());
216-
return;
217-
}
218-
}
219-
throw new Error('No future words found'); // this shouldn't happen ever - getCurrentResult should automatically delete the result from the buffer if all of it's words are consumed
173+
TimingStream.prototype.scheduleNextTick = function scheduleNextTick() {
174+
clearTimeout(this.nextTick); // just in case
175+
if (this.messages.length) {
176+
var messageTime = this.getMessageTime(this.messages[0]);
177+
var nextTickTime = this.startTime + (messageTime * 1000); // ms since epoch
178+
var nextTickOffset = Math.min(0, nextTickTime - Date.now()); // ms from right now
179+
this.nextTick = setTimeout(this.tick.bind(this), nextTickOffset);
220180
} else {
221-
// clear the next tick
222181
this.nextTick = null;
223182
this.checkForEnd();
224183
}
@@ -228,79 +187,25 @@ TimingStream.prototype.scheduleNextTick = function scheduleNextTick(cutoff) {
228187
* Triggers the 'close' and 'end' events if both pre-conditions are true:
229188
* - the previous stream must have already emitted it's 'end' event
230189
* - there must be no next tick scheduled, indicating that there are no results buffered for later delivery
190+
*
191+
* @private
231192
*/
232193
TimingStream.prototype.checkForEnd = function() {
233-
if (this.sourceEnded && !this.nextTick && !this.nextSpeakerLabelsTick) {
194+
if (this.sourceEnded && !this.nextTick) {
234195
this.emit('close');
235196
this.push(null);
236197
}
237198
};
238199

239200

240-
/**
241-
* Creates a new result with all transcriptions formatted
242-
*
243-
* @param {Object} data
244-
*/
245-
TimingStream.prototype.handleResult = function handleResult(data) {
246-
if (noTimestamps(data)) {
247-
var err = new Error('TimingStream requires timestamps');
248-
err.name = noTimestamps.ERROR_NO_TIMESTAMPS;
249-
this.emit('error', err);
250-
return;
251-
}
252-
253-
// http://www.ibm.com/watson/developercloud/speech-to-text/api/v1/#SpeechRecognitionEvent
254-
var index = data.result_index;
255-
256-
// process each result individually
257-
data.results.forEach(function(result) {
258-
// additional alternatives do not include timestamps, so we can't process and emit them correctly
259-
if (result.alternatives.length > 1) {
260-
result.alternatives.length = 1;
261-
}
262-
263-
// loop through the buffer and delete any interim results with the same or lower index
264-
while (this.interim.length && this.interim[0].result_index <= index) {
265-
this.interim.shift();
266-
}
267-
268-
// in case this data object had multiple results in it
269-
var newData = {
270-
results: [result],
271-
result_index: index
272-
};
273-
274-
index++;
275-
276-
if (result.final) {
277-
// then add it to the final results array
278-
this.final.push(newData);
279-
// and reset the interim results array because anything there has now been superseded and should not be emitted.
280-
this.interim = [];
281-
} else {
282-
this.interim.push(newData);
283-
}
284-
285-
}, this);
286-
287-
this.tick();
288-
};
289-
290-
TimingStream.prototype.handleSpeakerLabels = function handleSpeakerLabels(data) {
291-
this.speakerLabels.push(data);
292-
this.tickSpeakerLables();
293-
};
294-
295201
TimingStream.prototype.promise = require('./to-promise');
296202

297203
// when stop is called, immediately stop emitting results
298204
TimingStream.prototype.stop = function stop() {
299205
this.emit('stop');
300-
clearTimeout(this.nextTick);
301-
clearTimeout(this.nextSpeakerLabelsTick);
302-
this.handleResult = this.handleSpeakerLabels = function noop(){}; // RecognizeStream.stop() closes the connection gracefully, so we will usually see one more result
303206
this.checkForEnd(); // in case the RecognizeStream already ended
207+
clearTimeout(this.nextTick);
208+
this.nextTick = -1; // fake timer to prevent _write from scheduling new ticks
304209
};
305210

306211
module.exports = TimingStream;

0 commit comments

Comments
 (0)