Skip to content

Commit b23c644

Browse files
committed
various cactus server improvements
Signed-off-by: Yujong Lee <yujonglee.dev@gmail.com>
1 parent 72ecd7d commit b23c644

File tree

15 files changed

+783
-187
lines changed

15 files changed

+783
-187
lines changed

crates/listener2-core/src/batch.rs

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ async fn spawn_batch_task(
393393
AdapterKind::Hyprnote => {
394394
spawn_batch_task_with_adapter::<HyprnoteAdapter>(args, myself).await
395395
}
396-
AdapterKind::Cactus => spawn_batch_task_with_adapter::<CactusAdapter>(args, myself).await,
396+
AdapterKind::Cactus => spawn_cactus_batch_task(args, myself).await,
397397
}
398398
}
399399

@@ -507,6 +507,114 @@ async fn spawn_argmax_streaming_batch_task(
507507
Ok((rx_task, shutdown_tx))
508508
}
509509

510+
async fn spawn_cactus_batch_task(
511+
args: BatchArgs,
512+
myself: ActorRef<BatchMsg>,
513+
) -> Result<
514+
(
515+
tokio::task::JoinHandle<()>,
516+
tokio::sync::oneshot::Sender<()>,
517+
),
518+
ActorProcessingErr,
519+
> {
520+
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>();
521+
522+
let rx_task = tokio::spawn(async move {
523+
tracing::info!("cactus streaming batch task: starting");
524+
let start_notifier = args.start_notifier.clone();
525+
526+
let stream_result = CactusAdapter::transcribe_file_streaming(
527+
&args.base_url,
528+
&args.listen_params,
529+
&args.file_path,
530+
)
531+
.await;
532+
533+
let mut stream = match stream_result {
534+
Ok(s) => {
535+
notify_start_result(&start_notifier, Ok(()));
536+
s
537+
}
538+
Err(e) => {
539+
let raw_error = format!("{:?}", e);
540+
let error = format_user_friendly_error(&raw_error);
541+
tracing::error!("cactus streaming batch task: failed to start: {:?}", e);
542+
notify_start_result(&start_notifier, Err(error.clone()));
543+
let _ = myself.send_message(BatchMsg::StreamStartFailed(error));
544+
return;
545+
}
546+
};
547+
548+
let response_timeout = Duration::from_secs(BATCH_STREAM_TIMEOUT_SECS);
549+
let mut response_count = 0;
550+
let mut ended_cleanly = false;
551+
552+
loop {
553+
tokio::select! {
554+
_ = &mut shutdown_rx => {
555+
tracing::info!("cactus streaming batch task: shutdown");
556+
ended_cleanly = true;
557+
break;
558+
}
559+
result = tokio::time::timeout(response_timeout, StreamExt::next(&mut stream)) => {
560+
match result {
561+
Ok(Some(Ok(event))) => {
562+
response_count += 1;
563+
564+
let is_from_finalize = matches!(
565+
&event.response,
566+
StreamResponse::TranscriptResponse { from_finalize, .. } if *from_finalize
567+
);
568+
569+
tracing::info!(
570+
"cactus streaming batch: response #{}{}",
571+
response_count,
572+
if is_from_finalize { " (from_finalize)" } else { "" }
573+
);
574+
575+
if let Err(e) = myself.send_message(BatchMsg::StreamResponse {
576+
response: Box::new(event.response),
577+
percentage: event.percentage,
578+
}) {
579+
tracing::error!("failed to send stream response message: {:?}", e);
580+
}
581+
582+
if is_from_finalize {
583+
ended_cleanly = true;
584+
break;
585+
}
586+
}
587+
Ok(Some(Err(e))) => {
588+
let raw_error = format!("{:?}", e);
589+
let error = format_user_friendly_error(&raw_error);
590+
tracing::error!("cactus streaming batch error: {:?}", e);
591+
let _ = myself.send_message(BatchMsg::StreamError(error));
592+
break;
593+
}
594+
Ok(None) => {
595+
tracing::info!("cactus streaming batch completed (total: {})", response_count);
596+
ended_cleanly = true;
597+
break;
598+
}
599+
Err(elapsed) => {
600+
tracing::warn!(timeout = ?elapsed, responses = response_count, "cactus streaming batch timeout");
601+
let _ = myself.send_message(BatchMsg::StreamError(format_user_friendly_error("timeout waiting for response")));
602+
break;
603+
}
604+
}
605+
}
606+
}
607+
}
608+
609+
if ended_cleanly && let Err(e) = myself.send_message(BatchMsg::StreamEnded) {
610+
tracing::error!("failed to send stream ended message: {:?}", e);
611+
}
612+
tracing::info!("cactus streaming batch task exited");
613+
});
614+
615+
Ok((rx_task, shutdown_tx))
616+
}
617+
510618
async fn spawn_batch_task_with_adapter<A: RealtimeSttAdapter>(
511619
args: BatchArgs,
512620
myself: ActorRef<BatchMsg>,

crates/owhisper-client/src/adapter/argmax/batch.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
use std::path::{Path, PathBuf};
2-
use std::pin::Pin;
32
use std::time::Duration;
43

5-
use futures_util::{Stream, StreamExt};
4+
use futures_util::StreamExt;
65
use hypr_audio_utils::{Source, f32_to_i16_bytes, resample_audio, source_from_path};
76
use owhisper_interface::batch::Response as BatchResponse;
87
use owhisper_interface::stream::StreamResponse;
@@ -151,14 +150,7 @@ impl StreamingBatchConfig {
151150
}
152151
}
153152

154-
#[derive(Debug, Clone)]
155-
pub struct StreamingBatchEvent {
156-
pub response: StreamResponse,
157-
pub percentage: f64,
158-
}
159-
160-
pub type StreamingBatchStream =
161-
Pin<Box<dyn Stream<Item = Result<StreamingBatchEvent, Error>> + Send>>;
153+
pub use crate::adapter::{StreamingBatchEvent, StreamingBatchStream};
162154

163155
impl ArgmaxAdapter {
164156
pub async fn transcribe_file_streaming<P: AsRef<Path>>(

crates/owhisper-client/src/adapter/argmax/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ pub(crate) mod language;
55
mod live;
66

77
#[cfg(feature = "argmax")]
8-
pub use batch::{StreamingBatchConfig, StreamingBatchEvent, StreamingBatchStream};
8+
pub use batch::StreamingBatchConfig;
99

1010
pub use language::PARAKEET_V3_LANGS;
1111

0 commit comments

Comments
 (0)