Skip to content

Commit 74c595a

Browse files
committed
feat: Allow the new session to replace the old one
1 parent 662007e commit 74c595a

File tree

2 files changed

+56
-8
lines changed

2 files changed

+56
-8
lines changed

src/services/ws.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1261,6 +1261,7 @@ enum ProcessMessageResult {
12611261
fn process_message(msg: Message) -> ProcessMessageResult {
12621262
match msg {
12631263
Message::Text(t) => {
1264+
log::debug!("Received text message: {}", t);
12641265
if let Ok(cmd) = serde_json::from_str::<crate::protocol::ClientCommand>(&t) {
12651266
match cmd {
12661267
crate::protocol::ClientCommand::StartRecord => ProcessMessageResult::Skip,
@@ -1274,7 +1275,10 @@ fn process_message(msg: Message) -> ProcessMessageResult {
12741275
ProcessMessageResult::Skip
12751276
}
12761277
}
1277-
Message::Binary(d) => ProcessMessageResult::Audio(d),
1278+
Message::Binary(d) => {
1279+
log::debug!("Received binary message of size: {}", d.len());
1280+
ProcessMessageResult::Audio(d)
1281+
}
12781282
Message::Close(c) => {
12791283
if let Some(cf) = c {
12801284
log::info!(

src/services/ws/stable/mod.rs

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ async fn run_session(
9191
);
9292

9393
loop {
94+
log::info!(
95+
"{}:{:x} waiting for asr input",
96+
session.id,
97+
session.request_id
98+
);
9499
let text = asr_session
95100
.get_input(&session.id, &mut session.client_rx)
96101
.await?;
@@ -286,18 +291,57 @@ pub async fn run_session_manager(
286291
anyhow::anyhow!("error creating asr session for id `{}`: {}", id, e)
287292
})?;
288293

289-
while let Some(mut session) = rx.recv().await {
290-
if let Err(e) = run_session(
294+
let mut session = rx
295+
.recv()
296+
.await
297+
.ok_or_else(|| anyhow::anyhow!("no session received for id `{}`", id))?;
298+
299+
loop {
300+
log::info!("Running session for id `{}`", id);
301+
302+
let run_fut = run_session(
291303
&mut chat_session,
292304
&mut tts_req_tx,
293305
&mut asr_session,
294306
&mut session,
295-
)
296-
.await
297-
{
298-
log::error!("session error: {}", e);
299-
}
307+
);
308+
309+
let result = tokio::select! {
310+
res = run_fut => {
311+
Ok(res)
312+
},
313+
new_session = rx.recv() => {
314+
Err(new_session)
315+
}
316+
};
317+
300318
session.cmd_tx.send(super::WsCommand::EndResponse).ok();
319+
320+
match result {
321+
Ok(Ok(())) => {
322+
log::info!("session for id `{}` completed successfully", id);
323+
}
324+
Ok(Err(e)) => {
325+
log::error!("session for id `{}` error: {}", id, e);
326+
}
327+
Err(Some(new_session)) => {
328+
log::info!("received new session for id `{}`, restarting session", id);
329+
session = new_session;
330+
continue;
331+
}
332+
Err(None) => {
333+
log::info!("no more sessions for id `{}`, exiting", id);
334+
break;
335+
}
336+
}
337+
338+
session = match rx.recv().await {
339+
Some(s) => s,
340+
None => {
341+
log::info!("no more sessions for id `{}`, exiting", id);
342+
break;
343+
}
344+
};
301345
}
302346

303347
anyhow::Result::<()>::Ok(())

0 commit comments

Comments
 (0)