88import com .alibaba .dashscope .exception .ApiException ;
99import com .alibaba .dashscope .exception .InputRequiredException ;
1010import com .alibaba .dashscope .exception .NoApiKeyException ;
11- import com .alibaba .dashscope .protocol .ApiServiceOption ;
12- import com .alibaba .dashscope .protocol .ConnectionOptions ;
13- import com .alibaba .dashscope .protocol .Protocol ;
14- import com .alibaba .dashscope .protocol .StreamingMode ;
11+ import com .alibaba .dashscope .protocol .*;
12+ import com .alibaba .dashscope .threads .runs .Run ;
1513import com .google .gson .JsonObject ;
1614import io .reactivex .BackpressureStrategy ;
1715import io .reactivex .Emitter ;
1816import io .reactivex .Flowable ;
19- import lombok .Builder ;
20- import lombok .Getter ;
21- import lombok .NonNull ;
22- import lombok .experimental .SuperBuilder ;
23- import lombok .extern .slf4j .Slf4j ;
24-
2517import java .io .ByteArrayOutputStream ;
2618import java .io .IOException ;
2719import java .nio .ByteBuffer ;
3527import java .util .concurrent .TimeUnit ;
3628import java .util .concurrent .atomic .AtomicBoolean ;
3729import java .util .concurrent .atomic .AtomicReference ;
30+ import lombok .Builder ;
31+ import lombok .Getter ;
32+ import lombok .NonNull ;
33+ import lombok .Setter ;
34+ import lombok .experimental .SuperBuilder ;
35+ import lombok .extern .slf4j .Slf4j ;
3836
3937/** @author lengjiayi */
4038@ Slf4j
@@ -64,6 +62,10 @@ public final class SpeechSynthesizer {
6462 private long startStreamTimeStamp = -1 ;
6563 private long firstPackageTimeStamp = -1 ;
6664 private double recvAudioLength = 0 ;
65+ @ Getter @ Setter private long startedTimeout = -1 ;
66+ @ Getter @ Setter private long firstAudioTimeout = -1 ;
67+ private AtomicReference <CountDownLatch > startLatch = new AtomicReference <>(null );
68+ private AtomicReference <CountDownLatch > firstAudioLatch = new AtomicReference <>(null );
6769
6870 /**
6971 * CosyVoice Speech Synthesis SDK
@@ -93,6 +95,7 @@ public SpeechSynthesizer(
9395 .task (Task .TEXT_TO_SPEECH .getValue ())
9496 .function (Function .SPEECH_SYNTHESIZER .getValue ())
9597 .baseWebSocketUrl (baseUrl )
98+ .passTaskStarted (true )
9699 .build ();
97100 duplexApi = new SynchronizeFullDuplexApi <>(connectionOptions , serviceOption );
98101 this .callback = callback ;
@@ -116,6 +119,7 @@ public SpeechSynthesizer(String baseUrl, ConnectionOptions connectionOptions) {
116119 .task (Task .TEXT_TO_SPEECH .getValue ())
117120 .function (Function .SPEECH_SYNTHESIZER .getValue ())
118121 .baseWebSocketUrl (baseUrl )
122+ .passTaskStarted (true )
119123 .build ();
120124 duplexApi = new SynchronizeFullDuplexApi <>(connectionOptions , serviceOption );
121125 this .callback = null ;
@@ -132,6 +136,7 @@ public SpeechSynthesizer() {
132136 .taskGroup (TaskGroup .AUDIO .getValue ())
133137 .task (Task .TEXT_TO_SPEECH .getValue ())
134138 .function (Function .SPEECH_SYNTHESIZER .getValue ())
139+ .passTaskStarted (true )
135140 .build ();
136141 duplexApi = new SynchronizeFullDuplexApi <>(serviceOption );
137142 this .callback = null ;
@@ -141,9 +146,14 @@ public void updateParamAndCallback(
141146 SpeechSynthesisParam param , ResultCallback <SpeechSynthesisResult > callback ) {
142147 this .parameters = param ;
143148 this .callback = callback ;
149+ this .canceled .set (false );
144150
145151 // reset inner params
146152 this .stopLatch = new AtomicReference <>(null );
153+ this .startLatch = new AtomicReference <>(null );
154+ this .startedTimeout = -1 ;
155+ this .firstAudioLatch = new AtomicReference <>(null );
156+ this .firstAudioTimeout = -1 ;
147157 this .cmdBuffer .clear ();
148158 this .textEmitter = null ;
149159 this .isFirst = true ;
@@ -175,6 +185,7 @@ public SpeechSynthesizer(
175185 .task (Task .TEXT_TO_SPEECH .getValue ())
176186 .function (Function .SPEECH_SYNTHESIZER .getValue ())
177187 .baseWebSocketUrl (baseUrl )
188+ .passTaskStarted (true )
178189 .build ();
179190 duplexApi = new SynchronizeFullDuplexApi <>(serviceOption );
180191 this .callback = callback ;
@@ -203,6 +214,7 @@ public SpeechSynthesizer(
203214 .taskGroup (TaskGroup .AUDIO .getValue ())
204215 .task (Task .TEXT_TO_SPEECH .getValue ())
205216 .function (Function .SPEECH_SYNTHESIZER .getValue ())
217+ .passTaskStarted (true )
206218 .build ();
207219 duplexApi = new SynchronizeFullDuplexApi <>(serviceOption );
208220 this .callback = callback ;
@@ -228,6 +240,7 @@ public Flowable<SpeechSynthesisResult> streamingCallAsFlowable(Flowable<String>
228240 .duplexCall (
229241 StreamInputTtsParamWithStream .fromStreamInputTtsParam (
230242 this .parameters , textStream , preRequestId , false ))
243+ .filter (item -> item .getEvent () != WebSocketEventType .TASK_STARTED .getValue ())
231244 .map (SpeechSynthesisResult ::fromDashScopeResult )
232245 .filter (item -> !canceled .get ())
233246 .doOnNext (
@@ -279,7 +292,8 @@ public Flowable<SpeechSynthesisResult> callAsFlowable(String text)
279292 },
280293 BackpressureStrategy .BUFFER ),
281294 preRequestId ,
282- true ))
295+ true ))
296+ .filter (item -> item .getEvent () != WebSocketEventType .TASK_STARTED .getValue ())
283297 .map (SpeechSynthesisResult ::fromDashScopeResult )
284298 .doOnNext (
285299 result -> {
@@ -348,6 +362,8 @@ private void startStream(boolean enableSsml) {
348362 cmdBuffer .clear ();
349363 }
350364 stopLatch = new AtomicReference <>(new CountDownLatch (1 ));
365+ startLatch = new AtomicReference <>(new CountDownLatch (1 ));
366+ firstAudioLatch = new AtomicReference <>(new CountDownLatch (1 ));
351367 preRequestId = UUID .randomUUID ().toString ();
352368 try {
353369 duplexApi .duplexCall (
@@ -358,6 +374,10 @@ private void startStream(boolean enableSsml) {
358374
359375 @ Override
360376 public void onEvent (DashScopeResult message ) {
377+ if (message .getEvent () == WebSocketEventType .TASK_STARTED .getValue ()) {
378+ startLatch .get ().countDown ();
379+ return ;
380+ }
361381 if (canceled .get ()) {
362382 return ;
363383 }
@@ -383,6 +403,7 @@ public void onEvent(DashScopeResult message) {
383403 */
384404 if (speechSynthesisResult .getAudioFrame () != null ) {
385405 if (recvAudioLength == 0 ) {
406+ firstAudioLatch .get ().countDown ();
386407 firstPackageTimeStamp = System .currentTimeMillis ();
387408 log .debug ("[TtsV2] first package delay: " + getFirstPackageDelay () + " ms" );
388409 }
@@ -461,6 +482,12 @@ public void onError(Exception e) {
461482 if (stopLatch .get () != null ) {
462483 stopLatch .get ().countDown ();
463484 }
485+ if (startLatch .get () != null ) {
486+ startLatch .get ().countDown ();
487+ }
488+ if (firstAudioLatch .get () != null ) {
489+ firstAudioLatch .get ().countDown ();
490+ }
464491 }
465492 }
466493
@@ -497,7 +524,7 @@ private void submitText(String text) {
497524 * greater than zero, it will wait for the corresponding number of milliseconds; otherwise, it
498525 * will wait indefinitely. Throws TimeoutError exception if it times out.
499526 */
500- public void streamingComplete (long completeTimeoutMillis ) throws RuntimeException {
527+ public void streamingComplete (long completeTimeoutMillis ) {
501528 log .debug ("streamingComplete with timeout: " + completeTimeoutMillis );
502529 synchronized (this ) {
503530 if (state != SpeechSynthesisState .TTS_STARTED ) {
@@ -537,7 +564,7 @@ public void streamingComplete(long completeTimeoutMillis) throws RuntimeExceptio
537564 * synthesized audio before returning. If it does not complete within 600 seconds, a timeout
538565 * occurs and a TimeoutError exception is thrown.
539566 */
540- public void streamingComplete () throws RuntimeException {
567+ public void streamingComplete () {
541568 streamingComplete (600000 );
542569 }
543570
@@ -588,9 +615,50 @@ public void streamingCancel() {
588615 public void streamingCall (String text ) {
589616 if (isFirst ) {
590617 isFirst = false ;
591- this .startStream (false );
618+ try {
619+ this .startStream (false );
620+ long startTime = System .currentTimeMillis ();
621+ if (this .startedTimeout > 0 ) {
622+ if (!this .startLatch .get ().await (this .startedTimeout , TimeUnit .MILLISECONDS )) {
623+ synchronized (SpeechSynthesizer .this ) {
624+ state = SpeechSynthesisState .IDLE ;
625+ }
626+ throw new RuntimeException (
627+ "TimeoutError: waiting for task started more than " + this .startedTimeout + " ms." );
628+ }
629+ log .debug (
630+ "get started within "
631+ + this .startedTimeout
632+ + " ms, cost: "
633+ + (System .currentTimeMillis () - startTime )
634+ + " ms" );
635+ }
636+ this .submitText (text );
637+ if (this .firstAudioTimeout > 0 && this .firstAudioTimeout - this .startedTimeout > 0 ) {
638+ if (!this .firstAudioLatch
639+ .get ()
640+ .await (this .firstAudioTimeout - this .startedTimeout , TimeUnit .MILLISECONDS )) {
641+ synchronized (SpeechSynthesizer .this ) {
642+ state = SpeechSynthesisState .IDLE ;
643+ }
644+ throw new RuntimeException (
645+ "TimeoutError: waiting for first audio more than "
646+ + this .firstAudioTimeout
647+ + " ms." );
648+ }
649+ log .debug (
650+ "get first audio within "
651+ + this .firstAudioTimeout
652+ + " ms, cost: "
653+ + (System .currentTimeMillis () - startTime )
654+ + " ms" );
655+ }
656+ } catch (InterruptedException ignored ) {
657+ log .error ("Interrupted while waiting for streaming complete" );
658+ }
659+ } else {
660+ this .submitText (text );
592661 }
593- this .submitText (text );
594662 }
595663
596664 /**
@@ -605,7 +673,8 @@ public void streamingCall(String text) {
605673 * @return If a callback is not set during initialization, the complete audio is returned as the
606674 * function's return value. Otherwise, the return value is null.
607675 */
608- public ByteBuffer call (String text , long timeoutMillis ) throws RuntimeException {
676+ public ByteBuffer call (String text , long timeoutMillis )
677+ throws RuntimeException {
609678 if (this .callback == null ) {
610679 this .callback =
611680 new ResultCallback <SpeechSynthesisResult >() {
@@ -619,8 +688,45 @@ public void onComplete() {}
619688 public void onError (Exception e ) {}
620689 };
621690 }
622- this .startStream (true );
623- this .submitText (text );
691+ try {
692+ this .startStream (true );
693+ long startTime = System .currentTimeMillis ();
694+ if (this .startedTimeout > 0 ) {
695+ if (!this .startLatch .get ().await (this .startedTimeout , TimeUnit .MILLISECONDS )) {
696+ synchronized (SpeechSynthesizer .this ) {
697+ state = SpeechSynthesisState .IDLE ;
698+ }
699+ throw new RuntimeException (
700+ "TimeoutError: waiting for task started more than " + this .startedTimeout + " ms." );
701+ }
702+ log .debug (
703+ "get started within "
704+ + this .startedTimeout
705+ + " ms, cost: "
706+ + (System .currentTimeMillis () - startTime )
707+ + " ms" );
708+ }
709+ this .submitText (text );
710+ if (this .firstAudioTimeout > 0 && this .firstAudioTimeout - this .startedTimeout > 0 ) {
711+ if (!this .firstAudioLatch
712+ .get ()
713+ .await (this .firstAudioTimeout - this .startedTimeout , TimeUnit .MILLISECONDS )) {
714+ synchronized (SpeechSynthesizer .this ) {
715+ state = SpeechSynthesisState .IDLE ;
716+ }
717+ throw new RuntimeException (
718+ "TimeoutError: waiting for first audio more than " + this .firstAudioTimeout + " ms." );
719+ }
720+ log .debug (
721+ "get first audio within "
722+ + this .firstAudioTimeout
723+ + " ms, cost: "
724+ + (System .currentTimeMillis () - startTime )
725+ + " ms" );
726+ }
727+ } catch (InterruptedException ignored ) {
728+ log .error ("Interrupted while waiting for streaming complete" );
729+ }
624730 if (this .asyncCall ) {
625731 this .asyncStreamingComplete ();
626732 return null ;
@@ -639,7 +745,7 @@ public void onError(Exception e) {}
639745 * @return If a callback is not set during initialization, the complete audio is returned as the
640746 * function's return value. Otherwise, the return value is null.
641747 */
642- public ByteBuffer call (String text ) throws RuntimeException {
748+ public ByteBuffer call (String text ) {
643749 return call (text , 0 );
644750 }
645751
@@ -655,7 +761,10 @@ private static class StreamInputTtsParamWithStream extends SpeechSynthesisParam
655761 @ NonNull private Flowable <String > textStream ;
656762
657763 public static StreamInputTtsParamWithStream fromStreamInputTtsParam (
658- SpeechSynthesisParam param , Flowable <String > textStream , String preRequestId , boolean enableSsml ) {
764+ SpeechSynthesisParam param ,
765+ Flowable <String > textStream ,
766+ String preRequestId ,
767+ boolean enableSsml ) {
659768 return StreamInputTtsParamWithStream .builder ()
660769 .headers (param .getHeaders ())
661770 .parameters (param .getParameters ())
0 commit comments