Skip to content

Commit 163607f

Browse files
authored
Merge pull request #14 from UCL-VR/audio-sender
Add AudioSender, fix binary stdout corruption, and fix audio-to-audio pipeline bugs
2 parents 768c90b + c3f4c1c commit 163607f

File tree

7 files changed

+366
-168
lines changed

7 files changed

+366
-168
lines changed

Node/apps/conversational_agent/app.ts

Lines changed: 45 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
import { NetworkId } from 'ubiq-server/ubiq';
21
import { ApplicationController } from '../../components/application';
32
import { TextToSpeechService } from '../../services/text_to_speech/service';
43
import { SpeechToTextService } from '../../services/speech_to_text/service';
54
import { TextGenerationService } from '../../services/text_generation/service';
65
import { AudioToAudioService } from '../../services/audio_to_audio/service';
76
import { VoipReceiver } from '../../components/voip_receiver';
7+
import { AudioSender } from '../../components/audio_sender';
88
import {
99
encodePacket,
1010
LengthPrefixedParser,
@@ -20,17 +20,6 @@ import { RTCAudioData } from '@roamhq/wrtc/types/nonstandard';
2020
import { fileURLToPath } from 'url';
2121
import nconf from 'nconf';
2222

23-
/**
24-
* How many milliseconds of model audio to accumulate before flushing to Unity.
25-
* Batching reduces AudioInfo message overhead and gives Unity's audio system
26-
* a larger buffer to work with, preventing playback stutter / latency.
27-
*
28-
* The model produces one 80 ms frame per step (~12.5 fps).
29-
* 240 ms ≈ 3 frames — a good trade-off between latency and smoothness.
30-
* Override with UBIQ_AUDIO_BATCH_MS.
31-
*/
32-
const AUDIO_BATCH_MS = Number(process.env.UBIQ_AUDIO_BATCH_MS) || 240;
33-
3423
export class ConversationalAgent extends ApplicationController {
3524
components: {
3625
voipReceiver?: VoipReceiver;
@@ -39,25 +28,17 @@ export class ConversationalAgent extends ApplicationController {
3928
textToSpeechService?: TextToSpeechService;
4029
audioToAudioService?: AudioToAudioService;
4130
} = {};
31+
32+
/** Shared sender that handles AudioInfo headers + chunked PCM protocol. */
33+
private audioSender!: AudioSender;
4234
targetPeerQueue: string[] = [];
4335

4436
/** Tracks the UUID of the peer that most recently sent audio. */
4537
private lastAudioSenderUuid: string = '';
4638

47-
/** Whether the PersonaPlex handshake has been received. */
48-
private personaplexReady: boolean = false;
49-
5039
/** Parser instance for decoding PersonaPlex stdout framing. */
5140
private stdoutParser: LengthPrefixedParser = new LengthPrefixedParser();
5241

53-
// --- Audio output batching ---
54-
/** Accumulated audio buffers (48 kHz PCM16LE) waiting to be sent to Unity. */
55-
private audioOutputQueue: Buffer[] = [];
56-
/** Total byte length of buffers currently in audioOutputQueue. */
57-
private audioOutputQueueBytes: number = 0;
58-
/** Timer handle for the periodic audio flush. */
59-
private audioFlushTimer: ReturnType<typeof setTimeout> | null = null;
60-
6142
constructor(configFile: string = 'config.json') {
6243
super(configFile);
6344
}
@@ -81,6 +62,9 @@ export class ConversationalAgent extends ApplicationController {
8162
}
8263

8364
registerComponents() {
65+
// Centralised audio sender — includes sampleRate in every AudioInfo header
66+
this.audioSender = new AudioSender(this.scene, 95, 48000);
67+
8468
// A VoipReceiver to receive audio data from peers via WebRTC VOIP
8569
this.components.voipReceiver = new VoipReceiver(this.scene);
8670

@@ -108,12 +92,20 @@ export class ConversationalAgent extends ApplicationController {
10892
/**
10993
* Audio-to-audio pipeline: audio from peers is downsampled to 24 kHz,
11094
* framed with the PersonaPlex binary protocol, and sent to the model.
111-
* Model output (audio + text) is parsed, upsampled to 48 kHz, batched,
112-
* and sent to Unity periodically (every AUDIO_BATCH_MS milliseconds).
95+
* Model output (audio + text) is parsed, upsampled to 48 kHz, and sent
96+
* to Unity immediately as raw PCM chunks.
97+
*
98+
* A single AudioInfo header is sent when the first audio frame arrives
99+
* from the model. Subsequent frames are streamed as raw PCM chunks
100+
* without new headers — this avoids Unity's `dropOnNewSequence`
101+
* clearing the playback queue on every header.
113102
*/
114103
private defineAudioToAudioPipeline() {
115104
const service = this.components.audioToAudioService!;
116105

106+
/** Whether the initial AudioInfo header has been sent for this stream. */
107+
let streamStarted = false;
108+
117109
// ---- Input: receive 48 kHz PCM16 from WebRTC ----
118110
this.components.voipReceiver?.on('audio', (uuid: string, data: RTCAudioData) => {
119111
if (this.roomClient.peers.get(uuid) === undefined) {
@@ -122,13 +114,17 @@ export class ConversationalAgent extends ApplicationController {
122114

123115
// Don't send audio to the model before it's ready — it would pile
124116
// up in the OS pipe buffer and create a stale backlog.
125-
if (!this.personaplexReady) {
117+
if (service.state !== 'ready' && service.state !== 'idle') {
126118
return;
127119
}
128120

129121
this.lastAudioSenderUuid = uuid;
130122

131-
const sampleBuffer = Buffer.from(data.samples.buffer);
123+
const sampleBuffer = Buffer.from(
124+
data.samples.buffer,
125+
data.samples.byteOffset,
126+
data.samples.byteLength,
127+
);
132128
const downsampled = downsample48kTo24k(sampleBuffer);
133129
const packet = encodePacket(KIND_AUDIO, downsampled);
134130

@@ -148,13 +144,24 @@ export class ConversationalAgent extends ApplicationController {
148144
for (const packet of packets) {
149145
switch (packet.kind) {
150146
case KIND_HANDSHAKE:
151-
this.personaplexReady = true;
147+
service.setReady();
152148
this.log('PersonaPlex handshake received — model is ready');
153149
break;
154150

155151
case KIND_AUDIO: {
156152
const upsampled = upsample24kTo48k(packet.payload);
157-
this.enqueueAudioForUnity(upsampled);
153+
154+
// Send one AudioInfo at stream start so Unity knows the
155+
// sample rate and target peer. After that, send only raw
156+
// PCM chunks — no new headers that would clear the queue.
157+
if (!streamStarted) {
158+
const targetPeerObj = this.roomClient.peers.get(this.lastAudioSenderUuid);
159+
const targetPeer = targetPeerObj?.properties.get('ubiq.displayname') ?? '';
160+
this.audioSender.sendHeader({ targetPeer });
161+
streamStarted = true;
162+
}
163+
164+
this.audioSender.sendChunks(upsampled);
158165
break;
159166
}
160167

@@ -178,78 +185,27 @@ export class ConversationalAgent extends ApplicationController {
178185
});
179186

180187
service.on('close', (code: number | null, signal: string | null, identifier: string) => {
181-
this.flushAudioToUnity();
188+
streamStarted = false;
182189
this.flushStream();
183190
this.log(`PersonaPlex process ${identifier} exited (code=${code}, signal=${signal})`, 'warning');
184-
this.personaplexReady = false;
185191
this.stdoutParser.reset();
186192
});
187193
}
188194

189-
// ---- Audio output batching helpers ----
190-
191-
/**
192-
* Queue upsampled audio and schedule a batched send to Unity.
193-
*
194-
* Instead of sending each 80 ms model frame individually (which causes
195-
* Unity to receive a new AudioInfo message 12.5×/sec and potentially
196-
* introduces playback startup overhead per message), we accumulate
197-
* frames and flush them as one larger AudioInfo batch every AUDIO_BATCH_MS.
198-
*/
199-
private enqueueAudioForUnity(upsampled: Buffer): void {
200-
this.audioOutputQueue.push(upsampled);
201-
this.audioOutputQueueBytes += upsampled.length;
202-
203-
// Start the flush timer on the first enqueued frame
204-
if (this.audioFlushTimer === null) {
205-
this.audioFlushTimer = setTimeout(() => this.flushAudioToUnity(), AUDIO_BATCH_MS);
206-
}
207-
}
208-
209-
/**
210-
* Flush all queued audio to Unity as a single AudioInfo + data batch.
211-
*/
212-
private flushAudioToUnity(): void {
213-
if (this.audioFlushTimer !== null) {
214-
clearTimeout(this.audioFlushTimer);
215-
this.audioFlushTimer = null;
216-
}
217-
218-
if (this.audioOutputQueue.length === 0) {
219-
return;
220-
}
221-
222-
const combined = Buffer.concat(this.audioOutputQueue);
223-
this.audioOutputQueue = [];
224-
this.audioOutputQueueBytes = 0;
225-
226-
// Resolve target peer
227-
const targetPeerObj = this.roomClient.peers.get(this.lastAudioSenderUuid);
228-
const targetPeer = targetPeerObj?.properties.get('ubiq.displayname') ?? '';
229-
230-
// Send one AudioInfo for the entire batch
231-
this.scene.send(new NetworkId(95), {
232-
type: 'AudioInfo',
233-
targetPeer: targetPeer,
234-
audioLength: combined.length,
235-
});
236-
237-
let remaining = combined;
238-
while (remaining.length > 0) {
239-
this.scene.send(new NetworkId(95), remaining.subarray(0, 16000));
240-
remaining = remaining.subarray(16000);
241-
}
242-
}
243-
244195
/**
245196
* Traditional 3-stage pipeline: STT → text generation → TTS.
246197
* This is the original pipeline, preserved for backwards compatibility.
247198
*/
248199
private defineTraditionalPipeline() {
249200
// Step 1: When we receive audio data from a peer we send it to the transcription service
250201
this.components.voipReceiver?.on('audio', (uuid: string, data: RTCAudioData) => {
251-
// Convert the Int16Array to a Buffer
252-
const sampleBuffer = Buffer.from(data.samples.buffer);
202+
// Convert the Int16Array to a Buffer (use byteOffset/byteLength
203+
// in case the TypedArray is a view into a larger ArrayBuffer)
204+
const sampleBuffer = Buffer.from(
205+
data.samples.buffer,
206+
data.samples.byteOffset,
207+
data.samples.byteLength,
208+
);
253209

254210
// Send the audio data to the transcription service
255211
if (this.roomClient.peers.get(uuid) !== undefined) {
@@ -296,19 +252,8 @@ export class ConversationalAgent extends ApplicationController {
296252
});
297253

298254
this.components.textToSpeechService?.on('data', (data: Buffer, identifier: string) => {
299-
let response = data;
300255
const targetPeer = this.targetPeerQueue.shift() ?? '';
301-
302-
this.scene.send(new NetworkId(95), {
303-
type: 'AudioInfo',
304-
targetPeer: targetPeer,
305-
audioLength: data.length,
306-
});
307-
308-
while (response.length > 0) {
309-
this.scene.send(new NetworkId(95), response.slice(0, 16000));
310-
response = response.slice(16000);
311-
}
256+
this.audioSender.send(data, { targetPeer });
312257
});
313258
}
314259
}

Node/apps/stream_describer/app.ts

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { MediaReceiver } from '../../components/media_receiver';
44
import { MessageReader } from '../../components/message_reader';
55
import { VisualQuestionAnsweringService } from '../../services/visual_question_answering/service';
66
import { TextToSpeechService } from '../../services/text_to_speech/service';
7+
import { AudioSender } from '../../components/audio_sender';
78
import path from 'path';
89
import { fileURLToPath } from 'url';
910
import nconf from 'nconf';
@@ -37,6 +38,9 @@ class StreamDescriber extends ApplicationController {
3738
tts?: TextToSpeechService;
3839
} = {};
3940

41+
/** Shared sender for TTS audio — includes sampleRate in AudioInfo headers. */
42+
private audioSender!: AudioSender;
43+
4044
/** Timestamp of the last frame sent per peer, to throttle. */
4145
private lastFrameTime = new Map<string, number>();
4246

@@ -83,6 +87,9 @@ class StreamDescriber extends ApplicationController {
8387
}
8488

8589
registerComponents(): void {
90+
// Centralised audio sender — includes sampleRate in every AudioInfo header
91+
this.audioSender = new AudioSender(this.scene, AUDIO_NETWORK_ID, 48000);
92+
8693
// MediaReceiver to receive video tracks sent by MediaTrackManager
8794
this.components.mediaReceiver = new MediaReceiver(this.scene);
8895

@@ -287,19 +294,7 @@ class StreamDescriber extends ApplicationController {
287294

288295
this.log(`Sending ${combined.length} bytes of TTS audio to Unity`);
289296

290-
// One AudioInfo for the entire speech sequence
291-
this.scene.send(new NetworkId(AUDIO_NETWORK_ID), {
292-
type: 'AudioInfo',
293-
audioLength: combined.length.toString(),
294-
});
295-
296-
// Stream the raw PCM16 data
297-
let offset = 0;
298-
while (offset < combined.length) {
299-
const end = Math.min(offset + 16000, combined.length);
300-
this.scene.send(new NetworkId(AUDIO_NETWORK_ID), combined.subarray(offset, end));
301-
offset = end;
302-
}
297+
this.audioSender.send(combined);
303298
}
304299
}
305300

Node/components/audio_sender.ts

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import { NetworkId, NetworkScene } from 'ubiq-server/ubiq';
2+
3+
/** Maximum bytes of raw PCM16 data per network message. */
4+
const MAX_CHUNK_BYTES = 16000;
5+
6+
/** Default sample rate when none is specified. */
7+
const DEFAULT_SAMPLE_RATE = 48000;
8+
9+
/**
10+
* Encapsulates the AudioInfo + chunked-PCM16 protocol used by Ubiq-Genie
11+
* to stream audio from the Node server to Unity's InjectableAudioSource.
12+
*
13+
* Using this component instead of inline `scene.send` calls ensures that:
14+
* - The `sampleRate` field is always included in AudioInfo headers so that
15+
* Unity can resample when the device output rate differs.
16+
* - The chunked-send loop and header format are consistent across all apps.
17+
*/
18+
export class AudioSender {
19+
private readonly scene: NetworkScene;
20+
private readonly networkId: NetworkId;
21+
private readonly sampleRate: number;
22+
23+
/**
24+
* @param scene The Ubiq NetworkScene to send messages on.
25+
* @param networkId The network ID that InjectableAudioSource listens on.
26+
* @param sampleRate The sample rate of the PCM16 audio that will be sent
27+
* (Hz). This is included in every AudioInfo header so
28+
* Unity can resample if its output rate differs.
29+
* Defaults to 48 000.
30+
*/
31+
constructor(scene: NetworkScene, networkId: number | NetworkId, sampleRate: number = DEFAULT_SAMPLE_RATE) {
32+
this.scene = scene;
33+
this.networkId = typeof networkId === 'number' ? new NetworkId(networkId) : networkId;
34+
this.sampleRate = sampleRate;
35+
}
36+
37+
/**
38+
* Send a complete audio buffer to Unity using the AudioInfo + chunked-PCM
39+
* protocol. Sends an AudioInfo header followed by the raw data chunks.
40+
*
41+
* Best for **one-shot** audio (e.g. TTS responses) where each call
42+
* represents a discrete utterance. For continuous streaming (e.g.
43+
* audio-to-audio), use `sendHeader()` once and then `sendChunks()` for
44+
* each frame to avoid Unity clearing its playback queue on every header.
45+
*
46+
* @param audio Raw PCM16-LE mono audio bytes at `this.sampleRate`.
47+
* @param options Optional metadata included in the AudioInfo header.
48+
*/
49+
send(audio: Buffer, options?: { targetPeer?: string }): void {
50+
if (audio.length === 0) return;
51+
this.sendHeader({ ...options, audioLength: audio.length });
52+
this.sendChunks(audio);
53+
}
54+
55+
/**
56+
* Send only the AudioInfo header (no audio data).
57+
*
58+
* Use this once at the start of a continuous stream, then call
59+
* `sendChunks()` for each audio frame. This avoids the queue-clearing
60+
* side-effect of `dropOnNewSequence` on the Unity side.
61+
*/
62+
sendHeader(options?: { targetPeer?: string; audioLength?: number }): void {
63+
this.scene.send(this.networkId, {
64+
type: 'AudioInfo',
65+
targetPeer: options?.targetPeer ?? '',
66+
audioLength: (options?.audioLength ?? 0).toString(),
67+
sampleRate: this.sampleRate.toString(),
68+
});
69+
}
70+
71+
/**
72+
* Send raw PCM16 data chunks without an AudioInfo header.
73+
*
74+
* Each chunk is at most MAX_CHUNK_BYTES. Unity's InjectableAudioSource
75+
* treats messages ≥ 200 bytes that are not valid JSON as raw PCM audio,
76+
* so no preceding AudioInfo is needed for the data to be played.
77+
*/
78+
sendChunks(audio: Buffer): void {
79+
if (audio.length === 0) return;
80+
let offset = 0;
81+
while (offset < audio.length) {
82+
const end = Math.min(offset + MAX_CHUNK_BYTES, audio.length);
83+
this.scene.send(this.networkId, audio.subarray(offset, end));
84+
offset = end;
85+
}
86+
}
87+
}

0 commit comments

Comments
 (0)