Skip to content

Commit 3e44624

Browse files
committed
added close and push events, delay option. Seems to have a bug where words appear slightly too early now.
1 parent 81d7976 commit 3e44624

File tree

1 file changed

+61
-46
lines changed

1 file changed

+61
-46
lines changed

speech-to-text/timing-stream.js

Lines changed: 61 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,61 @@
11
'use strict';
22

3-
var Transform = require('stream').Transform;
3+
var Duplex = require('stream').Duplex;
44
var util = require('util');
55
var clone = require('clone');
66

77
/**
8-
* Applies some basic formating to transcriptions:
9-
* - Capitalize the first word of each sentence
10-
* - Add a period to the end
11-
* - Fix any "cruft" in the transcription
12-
* - etc.
8+
* Slows results down to no faster than real time.
139
*
14-
* @param opts
15-
* @param opts.model - some models / languages need special handling
16-
* @param [opts.hesitation='\u2026'] - what to put down for a "hesitation" event, defaults to an ellipsis (...)
10+
* Useful when running recognizeBlob because the text can otherwise appear before the words are spoken
11+
*
12+
* @param {Object} opts
13+
* @param {*} [opts.emitAtt=TimingStream.START] - set to TimingStream.END to only emit text that has been completely spoken.
14+
* @param {Number} [opts.delay=0] - Additional delay (in seconds) to apply before emitting words, useful for precise syncing to audio tracks. May be negative
1715
* @constructor
1816
*/
1917
function TimingStream(opts) {
2018
this.opts = util._extend({
21-
emitAt: TimingStream.WORD_START // WORD_START = emit the word as it's beginning to be spoken, WORD_END = once it's completely spoken
19+
emitAt: TimingStream.START,
20+
delay: 0,
21+
allowHalfOpen: true // keep the readable side open after the source closes
2222
}, opts);
23-
Transform.call(this, opts);
23+
Duplex.call(this, opts);
2424

2525
this.startTime = Date.now();
2626
// buffer to store future results
2727
this.final = [];
2828
this.interim = [];
2929
this.nextTick = null;
30+
this.sourceEnded = false;
3031

3132
var self = this;
3233
this.on('pipe', function(source) {
3334
source.on('result', self.handleResult.bind(self));
3435
if(source.stop) {
3536
self.stop = source.stop.bind(source);
3637
}
38+
source.on('end', function() {
39+
self.sourceEnded = true;
40+
});
3741
});
3842
}
39-
util.inherits(TimingStream, Transform);
43+
util.inherits(TimingStream, Duplex);
4044

41-
TimingStream.WORD_START = 1;
42-
TimingStream.WORD_END = 2;
45+
TimingStream.START = 1;
46+
TimingStream.END = 2;
4347

44-
TimingStream.prototype._transform = function(chunk, encoding, next) {
48+
TimingStream.prototype._write = function(chunk, encoding, next) {
4549
// ignore - we'll emit our own final text based on the result events
4650
next();
4751
};
4852

53+
TimingStream.prototype._read = function(size) {
54+
// ignore - we'll emit results once the time has come
55+
};
56+
4957
TimingStream.prototype.cutoff = function cutoff() {
50-
return (Date.now() - this.startTime)/1000;
58+
return (Date.now() - this.startTime)/1000 - this.opts.delay;
5159
};
5260

5361
TimingStream.prototype.withinRange = function(result, cutoff) {
@@ -72,7 +80,7 @@ TimingStream.prototype.completelyWithinRange = function(result, cutoff) {
7280
* Clones the given result and then crops out any words that occur later than the current cutoff
7381
* @param result
7482
*/
75-
Transform.prototype.crop = function crop(result, cutoff) {
83+
Duplex.prototype.crop = function crop(result, cutoff) {
7684
result = clone(result);
7785
result.alternatives = result.alternatives.map(function(alt) {
7886
var timestamps = [];
@@ -102,6 +110,7 @@ Transform.prototype.crop = function crop(result, cutoff) {
102110
* - the original next result object (removing it from the array) if it's completely earlier than the current cutoff
103111
*
104112
* @param results
113+
* @param cutoff
105114
* @returns {*}
106115
*/
107116
TimingStream.prototype.getCurrentResult = function getCurrentResult(results, cutoff) {
@@ -110,33 +119,14 @@ TimingStream.prototype.getCurrentResult = function getCurrentResult(results, cut
110119
}
111120
};
112121

113-
/**
114-
* try to figure out when we'll emit the next word
115-
* @param lastResultWasFinal
116-
* @param numCurrentTimestamps
117-
* @returns {*}
118-
*/
119-
TimingStream.prototype.getNextWordOffset = function getNextWordOffset(lastResultWasFinal, numCurrentTimestamps) {
120-
if (lastResultWasFinal) {
121-
// if the current result is final, then grab the first timestamp of the next one
122-
var nextResult = this.final[0] || this.interim[0];
123-
return nextResult && nextResult.alternatives[0].timestamps[0][this.opts.emitAt];
124-
} else {
125-
// if the current result wasn't final, then we just want the next word from the current result (assuming there is one)
126-
var currentResultSource = this.final[0] || this.interim[0];
127-
var nextTimestamp = currentResultSource && currentResultSource.alternatives[0].timestamps[numCurrentTimestamps];
128-
return nextTimestamp && nextTimestamp[this.opts.emitAt];
129-
}
130-
};
131-
132122

133123
/**
134-
* Tick occurs every half second, or when results are received if we're behind schedule.
124+
* Tick emits any buffered words that have a timestamp before the current time, then calls scheduleNextTick()
135125
*/
136126
TimingStream.prototype.tick = function tick() {
137127
var cutoff = this.cutoff();
138128

139-
this.nextTick = null;
129+
clearTimeout(this.nextTick);
140130
var result = this.getCurrentResult(this.final, cutoff);
141131

142132
if (!result) {
@@ -147,14 +137,41 @@ TimingStream.prototype.tick = function tick() {
147137
this.emit('result', result);
148138
if (result.final) {
149139
this.push(result.alternatives[0].transcript);
150-
}
151-
var nextWordOffset = this.getNextWordOffset(result.final, result.alternatives[0].timestamps.length);
152-
// if we have a next word, set a timeout to emit it. Otherwise the next call to handleResult() will trigger a tick.
153-
if (nextWordOffset) {
154-
this.nextTick = setTimeout(this.tick.bind(this), this.startTime + (nextWordOffset*1000));
140+
return this.nextTick = setTimeout(this.tick.bind(this), 0); // in case we are multiple results behind - don't schedule until we are out of final results that are due now
155141
}
156142
}
157143

144+
this.scheduleNextTick(cutoff);
145+
};
146+
147+
/**
148+
* Schedules next tick if possible.
149+
*
150+
* triggers the 'close' and 'end' events if the buffer is empty and no further results are expected
151+
*
152+
* @param cutoff
153+
*/
154+
TimingStream.prototype.scheduleNextTick = function scheduleNextTick(cutoff) {
155+
156+
// prefer final results over interim - when final results are added, any older interim ones are automatically deleted.
157+
var nextResult = this.final[0] || this.interim[0];
158+
if (nextResult) {
159+
// loop through the timestamps until we find one that comes after the current cutoff (there should always be one)
160+
var timestamps = nextResult.alternatives[0].timestamps;
161+
for(var i=0; i<timestamps.length; i++) {
162+
var wordOffset = timestamps[i][this.opts.emitAt];
163+
if (wordOffset > cutoff) {
164+
return this.nextTick = setTimeout(this.tick.bind(this), this.startTime + (wordOffset*1000));
165+
}
166+
}
167+
throw new Error('No future words found'); // this shouldn't happen ever - getCurrentResult should automatically delete the result from the buffer if all of it's words are consumed
168+
} else {
169+
// if we have no next result in the buffer, and the source has ended, then we're done.
170+
if (this.sourceEnded) {
171+
this.emit('close');
172+
this.push(null);
173+
}
174+
}
158175
};
159176

160177
function noTimestamps(result) {
@@ -189,9 +206,7 @@ TimingStream.prototype.handleResult = function handleResult(result) {
189206
this.interim.push(result);
190207
}
191208

192-
if (!this.nextTick) {
193-
this.tick();
194-
}
209+
this.tick();
195210
};
196211

197212
TimingStream.prototype.stop = function(){}; // usually overwritten during the `pipe` event

0 commit comments

Comments
 (0)