Skip to content

Commit 62a11c0

Browse files
committed
feat: support opus
1 parent 31a9e9f commit 62a11c0

File tree

5 files changed

+143
-4
lines changed

5 files changed

+143
-4
lines changed

Cargo.lock

Lines changed: 31 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@ axum-extra = { version = "0.10.0", features = ["typed-header"] }
2323
tokio = { version = "1", features = ["full"] }
2424

2525
reqwest = { version = "0.12", features = ["multipart", "json", "stream"] }
26+
2627
hound = "3.5.1"
2728
wav_io = "0.1.15"
29+
opus = "0.3.0"
30+
2831
rand = "0.9.0"
2932
uuid = { version = "1.14", features = [
3033
"v4", # Lets you generate random UUIDs

src/services/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ pub struct ConnectQueryParams {
1717
reconnect: bool,
1818
#[serde(default)]
1919
record: bool,
20+
#[serde(default)]
21+
opus: bool,
2022
}
2123

2224
pub async fn mixed_handler(
@@ -37,6 +39,7 @@ pub async fn mixed_handler(
3739
Path(id),
3840
Query(ws::ConnectQueryParams {
3941
reconnect: params.reconnect,
42+
opus: params.opus,
4043
}),
4144
)
4245
.await
@@ -62,6 +65,7 @@ pub async fn v2_mixed_handler(
6265
Path(id),
6366
Query(ws::ConnectQueryParams {
6467
reconnect: params.reconnect,
68+
opus: params.opus,
6569
}),
6670
)
6771
.await

src/services/ws.rs

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ impl WsSetting {
8080
pub struct ConnectQueryParams {
8181
#[serde(default)]
8282
pub reconnect: bool,
83+
#[serde(default)]
84+
pub opus: bool,
8385
}
8486

8587
pub async fn ws_handler(
@@ -929,22 +931,35 @@ async fn process_socket_io(
929931
rx: &mut WsRx,
930932
audio_tx: ClientTx,
931933
socket: &mut WebSocket,
934+
enable_opus: bool,
932935
) -> anyhow::Result<()> {
936+
let mut opus_encoder =
937+
opus::Encoder::new(SAMPLE_RATE, opus::Channels::Mono, opus::Application::Voip)
938+
.map_err(|e| anyhow::anyhow!("opus encoder error: {e}"))?;
939+
let mut ret_audio = Vec::new();
940+
933941
loop {
934942
let r = tokio::select! {
935943
cmd = rx.recv() => {
936944
cmd.map(|cmd| WsEvent::Command(cmd))
937945
}
938946
message = socket.recv() => {
939-
message.map(|message| match message{
947+
message.map(|message| match message {
940948
Ok(message) => WsEvent::Message(Ok(message)),
941949
Err(e) => WsEvent::Message(Err(anyhow::anyhow!("recv ws error: {e}"))),
942950
})
943951
}
944952
};
945953

946954
match r {
947-
Some(WsEvent::Command(cmd)) => process_command(socket, cmd).await?,
955+
Some(WsEvent::Command(cmd)) => {
956+
if enable_opus {
957+
process_command_with_opus(socket, cmd, &mut opus_encoder, &mut ret_audio)
958+
.await?
959+
} else {
960+
process_command(socket, cmd).await?
961+
}
962+
}
948963
Some(WsEvent::Message(Ok(msg))) => match process_message(msg) {
949964
ProcessMessageResult::Audio(d) => audio_tx
950965
.send(ClientMsg::AudioChunk(d))
@@ -1213,14 +1228,20 @@ async fn handle_socket(
12131228
}
12141229

12151230
log::info!("`{}` starting socket io processing", id);
1216-
process_socket_io(&mut cmd_rx, audio_tx, &mut socket).await?;
1231+
process_socket_io(&mut cmd_rx, audio_tx, &mut socket, connect_params.opus).await?;
12171232

12181233
Ok(())
12191234
}
12201235

12211236
pub const SAMPLE_RATE: u32 = 16000;
12221237
pub const SAMPLE_RATE_BUFFER_SIZE: usize = 2 * (SAMPLE_RATE as usize) / 10;
12231238

1239+
pub const fn sample_120ms(sample_rate: u32) -> usize {
1240+
(sample_rate as usize) * 12 / 100
1241+
}
1242+
1243+
pub const SAMPLE_RATE_120MS: usize = sample_120ms(SAMPLE_RATE);
1244+
12241245
async fn process_command(ws: &mut WebSocket, cmd: WsCommand) -> anyhow::Result<()> {
12251246
match cmd {
12261247
WsCommand::AsrResult(texts) => {
@@ -1272,6 +1293,86 @@ async fn process_command(ws: &mut WebSocket, cmd: WsCommand) -> anyhow::Result<(
12721293
Ok(())
12731294
}
12741295

1296+
async fn process_command_with_opus(
1297+
ws: &mut WebSocket,
1298+
cmd: WsCommand,
1299+
opus_encode: &mut opus::Encoder,
1300+
ret_audio: &mut Vec<i16>,
1301+
) -> anyhow::Result<()> {
1302+
match cmd {
1303+
WsCommand::AsrResult(texts) => {
1304+
let asr = rmp_serde::to_vec(&crate::protocol::ServerEvent::ASR {
1305+
text: texts.join("\n"),
1306+
})
1307+
.expect("Failed to serialize ASR ServerEvent");
1308+
ws.send(Message::binary(asr)).await?;
1309+
}
1310+
1311+
WsCommand::Action { action } => {
1312+
let action = rmp_serde::to_vec(&crate::protocol::ServerEvent::Action { action })
1313+
.expect("Failed to serialize Action ServerEvent");
1314+
ws.send(Message::binary(action)).await?;
1315+
}
1316+
WsCommand::StartAudio(text) => {
1317+
log::trace!("StartAudio: {text:?}");
1318+
opus_encode
1319+
.reset_state()
1320+
.map_err(|e| anyhow::anyhow!("opus reset state error: {e}"))?;
1321+
let start_audio = rmp_serde::to_vec(&crate::protocol::ServerEvent::StartAudio { text })
1322+
.expect("Failed to serialize StartAudio ServerEvent");
1323+
ws.send(Message::binary(start_audio)).await?;
1324+
}
1325+
WsCommand::Audio(data) => {
1326+
log::trace!("Audio chunk size: {}", data.len());
1327+
for chunk in data.chunks_exact(2) {
1328+
let sample = i16::from_le_bytes([chunk[0], chunk[1]]);
1329+
ret_audio.push(sample);
1330+
}
1331+
1332+
// 120ms per chunk
1333+
for chunk in ret_audio.chunks(sample_120ms(SAMPLE_RATE)) {
1334+
if chunk.len() < sample_120ms(SAMPLE_RATE) {
1335+
*ret_audio = chunk.to_vec();
1336+
break;
1337+
}
1338+
let data = opus_encode.encode_vec(chunk, 2 * sample_120ms(SAMPLE_RATE) / 3)?;
1339+
1340+
let audio_chunk =
1341+
rmp_serde::to_vec(&crate::protocol::ServerEvent::AudioChunk { data })
1342+
.expect("Failed to serialize AudioChunk ServerEvent");
1343+
ws.send(Message::binary(audio_chunk)).await?;
1344+
}
1345+
}
1346+
WsCommand::EndAudio => {
1347+
log::trace!("EndAudio");
1348+
if !ret_audio.is_empty() {
1349+
let padded_audio_len = sample_120ms(SAMPLE_RATE) - ret_audio.len();
1350+
ret_audio.extend(vec![0i16; padded_audio_len]);
1351+
let data = opus_encode.encode_vec(&ret_audio, 2 * sample_120ms(SAMPLE_RATE) / 3)?;
1352+
let audio_chunk =
1353+
rmp_serde::to_vec(&crate::protocol::ServerEvent::AudioChunk { data })
1354+
.expect("Failed to serialize AudioChunk ServerEvent");
1355+
log::info!("Sending final audio chunk of size: {}", audio_chunk.len());
1356+
ws.send(Message::binary(audio_chunk)).await?;
1357+
ret_audio.clear();
1358+
}
1359+
let end_audio = rmp_serde::to_vec(&crate::protocol::ServerEvent::EndAudio)
1360+
.expect("Failed to serialize EndAudio ServerEvent");
1361+
ws.send(Message::binary(end_audio)).await?;
1362+
}
1363+
WsCommand::Video(_) => {
1364+
log::warn!("video command is not implemented yet");
1365+
}
1366+
WsCommand::EndResponse => {
1367+
log::debug!("EndResponse");
1368+
let end_response = rmp_serde::to_vec(&crate::protocol::ServerEvent::EndResponse)
1369+
.expect("Failed to serialize JsonCommand");
1370+
ws.send(Message::binary(end_response)).await?;
1371+
}
1372+
}
1373+
Ok(())
1374+
}
1375+
12751376
enum ProcessMessageResult {
12761377
Audio(Bytes),
12771378
Submit,

src/services/ws/stable/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ async fn handle_socket(
6868
})
6969
.map_err(|e| anyhow::anyhow!("send session error: {}", e))?;
7070

71-
super::process_socket_io(&mut cmd_rx, client_tx, &mut socket).await?;
71+
super::process_socket_io(&mut cmd_rx, client_tx, &mut socket, params.opus).await?;
7272
Ok(())
7373
}
7474

0 commit comments

Comments
 (0)