Skip to content

Commit 244bd2d

Browse files
committed
SpeakerStream: handle early speaker_labels, include keywords, alternatives, etc
* Can now gracefully handle early speaker_labels and recover once the matching result arrives * Includes original result w/ keywords, alternatives, etc * may not be matched to the exact correct result currently
1 parent 4e9b9cd commit 244bd2d

File tree

5 files changed

+303
-48
lines changed

5 files changed

+303
-48
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
### v0.27.0
4+
* TimingStream rewrite - now emits exact results received from the service, always in the exact order recieved
5+
* old version created extra interim results and could emit speaker_labels before their matching final result in certain circumstances
6+
* emitAt now defaults to END to allow for interim results even when final is cached
7+
* SpeakerStream now emits keywords, alternatives, etc, although sometimes on a slightly earlier result then where the word is mentioned
8+
* SpeakerStream now gracefully handles situations where labels arrive before the matching final result
9+
310
### v0.26.0
411
* Renamed RecognizeStream 'connect' event to 'open' to match 'close' event
512
* Removed deprecated connection-close event

speech-to-text/speaker-stream.js

Lines changed: 106 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,19 @@ var Transform = require('stream').Transform;
2121
var util = require('util');
2222
var pullAllWith = require('lodash.pullallwith');
2323
var noTimestamps = require('./no-timestamps');
24+
var clone = require('clone');
2425

2526
/**
2627
* Object-Mode stream that splits up results by speaker.
2728
*
2829
* Output format is similar to existing results formats, but with an extra speaker field,
2930
*
3031
* Output results array will usually contain multiple results.
31-
* All results are interim until the final batch; the text will not change, but the speaker may, and so the text may move from one interim result to another.
32+
* All results are interim until the final batch; the text may change (if options.speakerlessInterim is enabled) or move from one interim result to another.
3233
*
33-
* Note: when combined with a TimingStream, data events may contain a combination of final and interim results (with the last one sometimes being interim)
34+
* Keywords, words_alternatives, and other features may appear on results that come slightly earlier than the timestamp due to the way things are split up.
3435
*
35-
* Ignores interim results from the service.
36+
* Ignores interim results from the service unless options.speakerlessInterim is enabled.
3637
*
3738
* @constructor
3839
* @param {Object} options
@@ -55,7 +56,7 @@ function SpeakerStream(options) {
5556
* @type {Array<Array>}
5657
* @private
5758
*/
58-
this.timestamps = [];
59+
this.results = [];
5960
/**
6061
* speaker_labels is an array of objects.
6162
* Example:
@@ -82,6 +83,12 @@ function SpeakerStream(options) {
8283
* @private
8384
*/
8485
this.speaker_labels = [];
86+
87+
this.mismatchErrorEmitted = false;
88+
89+
// flag to signal that labels were recieved before results, and therefore
90+
// the stream needs to emit on the next batch of final results
91+
this.extraLabels = false;
8592
}
8693
util.inherits(SpeakerStream, Transform);
8794

@@ -103,60 +110,111 @@ SpeakerStream.ERROR_MISMATCH = 'MISMATCH';
103110
*/
104111
SpeakerStream.prototype.buildMessage = function() {
105112
var final = this.isFinal();
106-
var errored = false;
113+
this.extraLabels = false;
107114

115+
// first match all speaker_labeles to the appropriate word and result
108116
// assumes that each speaker_label will have a matching word timestamp at the same index
109117
// stops processing and emits an error if this assumption is violated
110-
var pairs = this.speaker_labels.map(function(label, i) {
111-
var timestamp = this.timestamps[i];
112-
if (!timestamp || timestamp[FROM] !== label.from || timestamp[TO] !== label.to) {
113-
if (!errored) {
118+
var resultIndex = 0;
119+
var timestampIndex = -1;
120+
// eslint-disable-next-line camelcase
121+
var words = this.speaker_labels.map(function(speaker_label) {
122+
var result = this.results[resultIndex];
123+
timestampIndex++;
124+
var timestamp = result.alternatives[0].timestamps[timestampIndex];
125+
if (!timestamp) {
126+
timestampIndex = 0;
127+
resultIndex++;
128+
result = this.results[resultIndex];
129+
timestamp = result && result.alternatives[0].timestamps[timestampIndex];
130+
}
131+
if (!timestamp) {
132+
// this shouldn't happen normally, but the TimingStream could inadvertently cause a
133+
// speaker_labels to be emitted before a result
134+
this.extraLabels = true;
135+
return null;
136+
}
137+
if (timestamp[FROM] !== speaker_label.from || timestamp[TO] !== speaker_label.to) {
138+
if (!this.mismatchErrorEmitted) {
114139
var err = new Error('Mismatch between speaker_label and word timestamp');
115140
err.name = SpeakerStream.ERROR_MISMATCH;
116-
err.speaker_label = label;
141+
// eslint-disable-next-line camelcase
142+
err.speaker_label = speaker_label;
117143
err.timestamp = timestamp;
144+
// eslint-disable-next-line camelcase
118145
err.speaker_labels = this.speaker_labels;
119-
err.timestamps = this.timestamps;
146+
err.results = this.results;
120147
this.emit('error', err);
121-
errored = true;
148+
this.mismatchErrorEmitted = true; // If one is off, then a bunch probably are. Just emit one error.
122149
}
123150
return null;
124151
}
125-
return [timestamp, label];
152+
return {
153+
timestamp: timestamp,
154+
speaker: speaker_label.speaker,
155+
result: result
156+
};
126157
}, this);
127158

128-
if (errored) {
159+
// assume that there's nothing new to emit right now,
160+
// wait for new results to match our new labels
161+
if (this.extraLabels) {
129162
return;
130163
}
131164

132-
var results = pairs.reduce(function(arr, pair) {
133-
// this turns our pairs into something that looks like a regular results object, only with a speaker field
134-
// each result represents a single "line" from a particular speaker
135-
// todo: consider also splitting results up at pauses (where they are split when they arrive from the service) - FormatStream helps here
136-
var currentResult = arr[arr.length - 1];
137-
if (!currentResult || currentResult.speaker !== pair[1].speaker) {
138-
// new speaker - start a new result
139-
// todo: consider trying to include word alternatives and other features in these results
140-
currentResult = {
141-
speaker: pair[1].speaker,
142-
alternatives: [{
143-
transcript: pair[0][WORD] + ' ',
144-
timestamps: [
145-
pair[0]
146-
]
147-
}],
148-
final: final
165+
// filter out any nulls
166+
words = words.filter(function(w) {
167+
return w;
168+
});
169+
170+
// group the words together into utterances by speaker
171+
var utterances = words.reduce(function(arr, word) {
172+
var utterance = arr[arr.length - 1];
173+
// any time the speaker changes or the (original) result changes, create a new utterance
174+
if (!utterance || utterance.speaker !== word.speaker || utterance.result !== word.result) {
175+
utterance = {
176+
speaker: word.speaker,
177+
timestamps: [word.timestamp],
178+
result: word.result
149179
};
150180
// and add it to the list
151-
arr.push(currentResult);
181+
arr.push(utterance);
152182
} else {
153183
// otherwise just append the current word to the current result
154-
currentResult.alternatives[0].transcript += pair[0][WORD] + ' ';
155-
currentResult.alternatives[0].timestamps.push(pair[0]);
184+
utterance.timestamps.push(word.timestamp);
156185
}
157186
return arr;
158187
}, []);
159188

189+
// create new results
190+
var results = utterances.map(function(utterance, i) {
191+
192+
// if this is the first usage of this result, clone the original (to keep keywords and such)
193+
// otherwise create a new one
194+
var result;
195+
var lastUtterance = utterances[i - 1] || {};
196+
if (utterance.result === lastUtterance.result) {
197+
result = {alternatives: [{}]};
198+
} else {
199+
result = clone(utterance.result);
200+
}
201+
202+
// update the result object
203+
// set the speaker
204+
result.speaker = utterance.speaker;
205+
// overwrite the transcript and timestamps on the first alternative
206+
var alt = result.alternatives[0];
207+
alt.transcript = utterance.timestamps.map(function(ts) {
208+
return ts[WORD];
209+
}).join(' ') + ' ';
210+
alt.timestamps = utterance.timestamps;
211+
// overwrite the final value
212+
result.final = final;
213+
// todo: split up words_alternatives, keywords, etc and copy to appropriate result for time
214+
215+
return result;
216+
});
217+
160218
// result_index is always 0 because the results always includes the entire conversation so far.
161219
return {results: results, result_index: 0};
162220
};
@@ -175,7 +233,7 @@ SpeakerStream.prototype.handleResults = function(data) {
175233
data.results.filter(function(result) {
176234
return result.final;
177235
}).forEach(function(result) {
178-
this.timestamps = this.timestamps.concat(result.alternatives[0].timestamps);
236+
this.results.push(result);
179237
}, this);
180238
};
181239

@@ -220,6 +278,10 @@ SpeakerStream.prototype._transform = function(data, encoding, next) {
220278
message = this.buildMessage();
221279
message.results = message.results.concat(data.results);
222280
}
281+
// clean up if things got out of order
282+
if (this.extraLabels && data.results.length && data.results[0].final === true) {
283+
message = this.buildMessage();
284+
}
223285
}
224286
if (Array.isArray(data.speaker_labels)) {
225287
this.handleSpeakerLabels(data);
@@ -247,18 +309,23 @@ SpeakerStream.prototype._transform = function(data, encoding, next) {
247309
* @private
248310
*/
249311
SpeakerStream.prototype._flush = function(done) {
250-
if (this.timestamps.length !== this.speaker_labels.length) {
312+
var timestamps = this.results.map(function(r) {
313+
return r.alternatives[0].timestamps;
314+
}).reduce(function(a,b) {
315+
return a.concat(b);
316+
}, []);
317+
if (timestamps.length !== this.speaker_labels.length) {
251318
var msg;
252-
if (this.timestamps.length && !this.speaker_labels.length) {
319+
if (timestamps.length && !this.speaker_labels.length) {
253320
msg = 'No speaker_labels found. SpeakerStream requires speaker_labels to be enabled.';
254321
} else {
255-
msg = 'Mismatch between number of word timestamps (' + this.timestamps.length + ') and number of speaker_labels (' +
322+
msg = 'Mismatch between number of word timestamps (' + timestamps.length + ') and number of speaker_labels (' +
256323
this.speaker_labels.length + ') - some data may be lost.';
257324
}
258325
var err = new Error(msg);
259326
err.name = SpeakerStream.ERROR_MISMATCH;
260327
err.speaker_labels = this.speaker_labels;
261-
err.timestamps = this.timestamps;
328+
err.timestamps = this.results;
262329
this.emit('error', err);
263330
}
264331
done();

speech-to-text/timing-stream.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ var noTimestamps = require('./no-timestamps');
1414
* @todo: fix TimingStream to work with the output of the SpeakerStream
1515
*
1616
* @param {Object} [opts]
17-
* @param {*} [opts.emitAt=TimingStream.START] - set to TimingStream.END to only emit text that has been completely spoken.
17+
* @param {*} [opts.emitAt=TimingStream.END] - set to TimingStream.START for a more subtitles-like output where results are returned as soon as the utterance begins
1818
* @param {Number} [opts.delay=0] - Additional delay (in seconds) to apply before emitting words, useful for precise syncing to audio tracks. May be negative
1919
* @constructor
2020
*/
2121
function TimingStream(opts) {
2222
this.options = defaults(opts, {
23-
emitAt: TimingStream.START,
23+
emitAt: TimingStream.END,
2424
delay: 0,
2525
allowHalfOpen: true, // keep the readable side open after the source closes
2626
writableObjectMode: true

test/speaker-stream-spec.js

Lines changed: 99 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,105 @@ describe('SpeakerStream', function() {
242242
stream.end();
243243
});
244244

245+
it('should handle early speaker_labels gracefully', function(done) {
246+
// there is/was a bug in the timing stream that could cause this in certain scenarios
247+
var stream = new SpeakerStream();
248+
stream.on('error', done);
249+
var actual = [];
250+
stream.on('data', function(data) {
251+
actual.push(data);
252+
});
253+
254+
var expected = [{
255+
results: [{
256+
speaker: 0,
257+
alternatives: [{
258+
timestamps: [
259+
['hi', 0.06, 0.28],
260+
],
261+
transcript: 'hi '
262+
}],
263+
final: false
264+
}],
265+
result_index: 0
266+
}, {
267+
results: [{
268+
speaker: 0,
269+
alternatives: [{
270+
timestamps: [
271+
['hi', 0.06, 0.28],
272+
],
273+
transcript: 'hi '
274+
}],
275+
final: true
276+
},
277+
{
278+
speaker: 1,
279+
alternatives: [{
280+
timestamps: [
281+
['hello', 0.28, 0.37],
282+
],
283+
transcript: 'hello '
284+
}],
285+
final: true
286+
}],
287+
result_index: 0
288+
}];
289+
290+
stream.on('end', function() {
291+
assert.deepEqual(actual, expected);
292+
done();
293+
});
294+
295+
stream.write({
296+
results: [{
297+
alternatives: [{
298+
timestamps: [
299+
['hi', 0.06, 0.28],
300+
],
301+
transcript: 'hi '
302+
}],
303+
final: true,
304+
}],
305+
result_index: 0
306+
});
307+
stream.write({
308+
speaker_labels: [{
309+
from: 0.06,
310+
to: 0.28,
311+
speaker: 0,
312+
confidence: 0.512,
313+
final: false
314+
}]
315+
});
316+
// this one is early
317+
stream.write({
318+
speaker_labels: [{
319+
from: 0.28,
320+
to: 0.37,
321+
speaker: 1,
322+
confidence: 0.512,
323+
final: true
324+
}]
325+
});
326+
// or, this is late
327+
stream.write({
328+
results: [{
329+
alternatives: [{
330+
timestamps: [
331+
['hello', 0.28, 0.37],
332+
],
333+
transcript: 'hello '
334+
}],
335+
final: true
336+
}],
337+
result_index: 0
338+
});
339+
stream.end();
340+
});
341+
245342
describe('with TimingStream', function() {
343+
var TimingStream = require('../speech-to-text/timing-stream.js');
246344
var clock;
247345
beforeEach(function() {
248346
clock = sinon.useFakeTimers();
@@ -252,9 +350,8 @@ describe('SpeakerStream', function() {
252350
clock.restore();
253351
});
254352

255-
it('should produce the same output with results from a TimingStream', function(done) {
353+
it('should produce the same output with and without a TimingStream', function(done) {
256354
var inputMessages = require('./resources/car_loan_stream.json');
257-
var TimingStream = require('../speech-to-text/timing-stream.js');
258355
var actualSpeakerStream = new SpeakerStream();
259356
var expectedSpeakerStream = new SpeakerStream();
260357
var timingStream = new TimingStream({objectMode: true});
@@ -290,7 +387,6 @@ describe('SpeakerStream', function() {
290387
});
291388
});
292389

293-
294390
it('should provide early results when options.speakerlessInterim=true', function(done) {
295391
var stream = new SpeakerStream({speakerlessInterim: true});
296392
stream.on('error', done);

0 commit comments

Comments
 (0)