Skip to content

Commit 8194ab9

Browse files
committed
feat(Speech to Text): Handle WebSocket overload with multiple threads
1 parent 0fea93c commit 8194ab9

File tree

1 file changed

+13
-2
lines changed

1 file changed

+13
-2
lines changed

speech-to-text/src/main/java/com/ibm/watson/developer_cloud/speech_to_text/v1/websocket/SpeechToTextWebSocketListener.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,13 @@ public final class SpeechToTextWebSocketListener extends WebSocketListener {
5858
private static final String ACOUSTIC_CUSTOMIZATION_ID = "acoustic_customization_id";
5959
private static final String CUSTOMIZATION_WEIGHT = "customization_weight";
6060
private static final String VERSION = "base_model_version";
61-
6261
private static final String TIMEOUT_PREFIX = "No speech detected for";
6362

63+
// 8 MB, half of the maximum OkHttp WebSocket queue size
64+
// (https://github.com/square/okhttp/blob/master/okhttp/src/main/java/okhttp3/internal/ws/RealWebSocket.java#L63)
65+
private static final long QUEUE_SIZE_LIMIT = 16 * 1024 * 512;
66+
private static final long QUEUE_WAIT_MILLIS = 500;
67+
6468
private final InputStream stream;
6569
private final RecognizeOptions options;
6670
private final RecognizeCallback callback;
@@ -188,13 +192,20 @@ private void sendInputStream(InputStream inputStream) {
188192
// AudioInputStreams, typically used for streaming microphone inputs return 0 only when the stream has been
189193
// closed. Elsewise AudioInputStream.read() blocks until enough audio frames are read.
190194
while (((read = inputStream.read(buffer)) > 0) && socketOpen) {
195+
196+
// If OkHttp's WebSocket queue gets overwhelmed, it'll abruptly close the connection
197+
// (see: https://github.com/square/okhttp/issues/3317). This will ensure we wait until the coast is clear.
198+
while (socket.queueSize() > QUEUE_SIZE_LIMIT) {
199+
Thread.sleep(QUEUE_WAIT_MILLIS);
200+
}
201+
191202
if (read == ONE_KB) {
192203
socket.send(ByteString.of(buffer));
193204
} else {
194205
socket.send(ByteString.of(Arrays.copyOfRange(buffer, 0, read)));
195206
}
196207
}
197-
} catch (IOException e) {
208+
} catch (IOException | InterruptedException e) {
198209
LOG.log(Level.SEVERE, e.getMessage(), e);
199210
} finally {
200211
try {

0 commit comments

Comments
 (0)