Skip to content

Commit a35cba0

Browse files
committed
feat: allow audio resampling for twilio transport layer
1 parent 885f6cf commit a35cba0

File tree

2 files changed

+230
-4
lines changed

2 files changed

+230
-4
lines changed

packages/agents-extensions/src/TwilioRealtimeTransport.ts

Lines changed: 82 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,18 @@ export type TwilioRealtimeTransportLayerOptions =
2525
* connection gets passed into your request handler when running your WebSocket server.
2626
*/
2727
twilioWebSocket: WebSocket | NodeWebSocket;
28+
// Optional resampler hooks. They can be async or sync.
29+
// data: ArrayBuffer audio payload, from: input audio format label, to: target audio format label
30+
resampleIncoming?: (
31+
data: ArrayBuffer,
32+
from?: string,
33+
to?: string,
34+
) => Promise<ArrayBuffer> | ArrayBuffer;
35+
resampleOutgoing?: (
36+
data: ArrayBuffer,
37+
from?: string,
38+
to?: string,
39+
) => Promise<ArrayBuffer> | ArrayBuffer;
2840
};
2941

3042
/**
@@ -60,10 +72,24 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket {
6072
#lastPlayedChunkCount: number = 0;
6173
#previousItemId: string | null = null;
6274
#logger = getLogger('openai-agents:extensions:twilio');
75+
#resampleIncoming?: (
76+
data: ArrayBuffer,
77+
from?: string,
78+
to?: string,
79+
) => Promise<ArrayBuffer> | ArrayBuffer;
80+
#resampleOutgoing?: (
81+
data: ArrayBuffer,
82+
from?: string,
83+
to?: string,
84+
) => Promise<ArrayBuffer> | ArrayBuffer;
85+
// audio format expected by Twilio (default g711_ulaw)
86+
#twilioAudioFormat: string = 'g711_ulaw';
6387

6488
constructor(options: TwilioRealtimeTransportLayerOptions) {
6589
super(options);
6690
this.#twilioWebSocket = options.twilioWebSocket;
91+
this.#resampleIncoming = options.resampleIncoming;
92+
this.#resampleOutgoing = options.resampleOutgoing;
6793
}
6894

6995
_setInputAndOutputAudioFormat(
@@ -91,10 +117,17 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket {
91117
options.initialSessionConfig = this._setInputAndOutputAudioFormat(
92118
options.initialSessionConfig,
93119
);
120+
121+
// Keep the transport's twilioAudioFormat in sync with initial session config
122+
// (outputAudioFormat is what we will send to Twilio)
123+
this.#twilioAudioFormat =
124+
// @ts-expect-error - this is a valid config
125+
options.initialSessionConfig?.outputAudioFormat ?? 'g711_ulaw';
126+
94127
// listen to Twilio messages as quickly as possible
95128
this.#twilioWebSocket.addEventListener(
96129
'message',
97-
(message: MessageEvent | NodeMessageEvent) => {
130+
async (message: MessageEvent | NodeMessageEvent) => {
98131
try {
99132
const data = JSON.parse(message.data.toString());
100133
if (this.#logger.dontLogModelData) {
@@ -109,7 +142,30 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket {
109142
switch (data.event) {
110143
case 'media':
111144
if (this.status === 'connected') {
112-
this.sendAudio(utils.base64ToArrayBuffer(data.media.payload));
145+
// Twilio sends base64 payloads
146+
let buffer = utils.base64ToArrayBuffer(data.media.payload);
147+
148+
// If user supplied a resampler, call it to convert to the internal Realtime expected format
149+
if (this.#resampleIncoming) {
150+
try {
151+
const maybePromise = this.#resampleIncoming(
152+
buffer,
153+
// Twilio payload format (we assume Twilio->transport input)
154+
data.media?.format ?? undefined,
155+
// target format we used for inputAudioFormat
156+
// (we infer from initialSessionConfig or default to g711_ulaw)
157+
// @ts-expect-error - this is a valid config
158+
options.initialSessionConfig?.inputAudioFormat ??
159+
'g711_ulaw',
160+
);
161+
buffer = (await maybePromise) ?? buffer;
162+
} catch (err) {
163+
this.#logger.error('Incoming resampling failed:', err);
164+
// fall back to original buffer
165+
}
166+
}
167+
168+
this.sendAudio(buffer);
113169
}
114170
break;
115171
case 'mark':
@@ -199,15 +255,35 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket {
199255
super._interrupt(elapsedTime, cancelOngoingResponse);
200256
}
201257

202-
protected _onAudio(audioEvent: TransportLayerAudio) {
258+
protected async _onAudio(audioEvent: TransportLayerAudio) {
203259
this.#logger.debug(
204260
`Sending audio to Twilio ${audioEvent.responseId}: (${audioEvent.data.byteLength} bytes)`,
205261
);
262+
// Allow user-provided resampler to convert outgoing Realtime audio to Twilio format.
263+
let twilioPayloadBuffer: ArrayBuffer = audioEvent.data;
264+
265+
if (this.#resampleOutgoing) {
266+
try {
267+
const maybePromise = this.#resampleOutgoing(
268+
audioEvent.data,
269+
// from: Realtime internal audio format (unknown here), leave undefined
270+
undefined,
271+
// to: format Twilio expects for outgoing audio
272+
this.#twilioAudioFormat,
273+
);
274+
twilioPayloadBuffer = (await maybePromise) ?? audioEvent.data;
275+
} catch (err) {
276+
this.#logger.error('Outgoing resampling failed:', err);
277+
// fall back to original audioEvent.data
278+
twilioPayloadBuffer = audioEvent.data;
279+
}
280+
}
281+
206282
const audioDelta = {
207283
event: 'media',
208284
streamSid: this.#streamSid,
209285
media: {
210-
payload: utils.arrayBufferToBase64(audioEvent.data),
286+
payload: utils.arrayBufferToBase64(twilioPayloadBuffer),
211287
},
212288
};
213289
if (this.#previousItemId !== this.currentItemId && this.currentItemId) {
@@ -228,3 +304,5 @@ export class TwilioRealtimeTransportLayer extends OpenAIRealtimeWebSocket {
228304
this.emit('audio', audioEvent);
229305
}
230306
}
307+
308+
// vim:ts=2 sw=2 et:
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
import { describe, test, expect, vi, beforeEach } from 'vitest';
2+
import { EventEmitter } from 'events';
3+
import { TwilioRealtimeTransportLayer } from '../src/TwilioRealtimeTransport';
4+
5+
import type { MessageEvent as NodeMessageEvent } from 'ws';
6+
import type { MessageEvent } from 'undici-types';
7+
8+
// Mock the realtime package like other tests do so we can observe sendAudio, etc.
9+
vi.mock('@openai/agents/realtime', () => {
10+
// eslint-disable-next-line @typescript-eslint/no-require-imports
11+
const { EventEmitter } = require('events');
12+
const utils = {
13+
base64ToArrayBuffer: (b64: string) =>
14+
Uint8Array.from(Buffer.from(b64, 'base64')).buffer,
15+
arrayBufferToBase64: (buf: ArrayBuffer) =>
16+
Buffer.from(new Uint8Array(buf)).toString('base64'),
17+
};
18+
class FakeOpenAIRealtimeWebSocket extends EventEmitter {
19+
status: 'connected' | 'disconnected' = 'disconnected';
20+
currentItemId: string | null = null;
21+
}
22+
FakeOpenAIRealtimeWebSocket.prototype.connect = vi.fn(async function (
23+
this: any,
24+
) {
25+
this.status = 'connected';
26+
});
27+
FakeOpenAIRealtimeWebSocket.prototype.sendAudio = vi.fn();
28+
FakeOpenAIRealtimeWebSocket.prototype.close = vi.fn();
29+
FakeOpenAIRealtimeWebSocket.prototype._interrupt = vi.fn();
30+
FakeOpenAIRealtimeWebSocket.prototype.updateSessionConfig = vi.fn();
31+
return { OpenAIRealtimeWebSocket: FakeOpenAIRealtimeWebSocket, utils };
32+
});
33+
34+
class FakeTwilioWebSocket extends EventEmitter {
35+
send = vi.fn();
36+
close = vi.fn();
37+
}
38+
39+
// @ts-expect-error - make the node EventEmitter compatible with the browser style used in the transport
40+
FakeTwilioWebSocket.prototype.addEventListener = function (
41+
type: string,
42+
listener: (evt: MessageEvent | NodeMessageEvent) => void,
43+
) {
44+
// When the transport registers addEventListener('message', ...) it expects the listener
45+
// to receive an object with a `.data` that responds to toString(). Tests below emit the
46+
// raw payload as the event argument and this wrapper synthesizes { data: evt }.
47+
this.on(type, (evt) => listener(type === 'message' ? { data: evt } : evt));
48+
};
49+
50+
describe('TwilioRealtimeTransportLayer resampling hooks', () => {
51+
beforeEach(() => {
52+
vi.clearAllMocks();
53+
});
54+
55+
test('resampleIncoming is called and its result is passed to sendAudio', async () => {
56+
const resampleIncoming = vi.fn(
57+
async (data: ArrayBuffer, from?: string, to?: string) => {
58+
// ensure we receive the original data (we won't assert exact bytes here, just that the hook was called)
59+
_ = from;
60+
_ = to;
61+
return data;
62+
},
63+
);
64+
65+
const twilio = new FakeTwilioWebSocket();
66+
const transport = new TwilioRealtimeTransportLayer({
67+
twilioWebSocket: twilio as any,
68+
resampleIncoming,
69+
});
70+
71+
// connect the transport (mocks will set the OpenAI websocket to connected)
72+
await transport.connect({ apiKey: 'ek_test' } as any);
73+
74+
// Grab the mocked OpenAIRealtimeWebSocket prototype to assert sendAudio was called with our resampled buffer
75+
const { OpenAIRealtimeWebSocket } = await import('@openai/agents/realtime');
76+
const sendAudioSpy = vi.mocked(OpenAIRealtimeWebSocket.prototype.sendAudio);
77+
78+
// Prepare a Twilio 'media' message (base64-encoded payload). Use small bytes.
79+
const originalBytes = Buffer.from([1, 2, 3]);
80+
const payloadB64 = originalBytes.toString('base64');
81+
const twilioMessage = {
82+
event: 'media',
83+
streamSid: 'FAKE',
84+
media: { payload: payloadB64 },
85+
};
86+
87+
// Emit the message (the FakeTwilioWebSocket addEventListener wrapper will provide { data: evt })
88+
twilio.emit('message', { toString: () => JSON.stringify(twilioMessage) });
89+
90+
// wait a tick for async handler to run
91+
await Promise.resolve();
92+
93+
// resampleIncoming should have been called
94+
expect(resampleIncoming).toHaveBeenCalled();
95+
// sendAudio should have been called with the resampled buffer
96+
expect(sendAudioSpy).toHaveBeenCalled();
97+
const calledArg = sendAudioSpy.mock.calls[0][0] as ArrayBuffer;
98+
expect(Array.from(new Uint8Array(calledArg))).toEqual(
99+
Array.from(new Uint8Array(originalBytes)),
100+
);
101+
});
102+
103+
test('resampleOutgoing is called and Twilio receives its result', async () => {
104+
const resampleOutgoing = vi.fn(
105+
async (data: ArrayBuffer, from?: string, to?: string) => {
106+
_ = from;
107+
_ = to;
108+
return data;
109+
},
110+
);
111+
112+
const twilio = new FakeTwilioWebSocket();
113+
const transport = new TwilioRealtimeTransportLayer({
114+
twilioWebSocket: twilio as any,
115+
resampleOutgoing,
116+
});
117+
118+
await transport.connect({ apiKey: 'ek_test' } as any);
119+
120+
// set a currentItemId so the transport resets chunk count and emits marks like real usage
121+
// @ts-expect-error - we're setting a protected field for test
122+
transport.currentItemId = 'test-item';
123+
124+
// Call the protected _onAudio to simulate outgoing audio from OpenAI -> Twilio
125+
const outgoingBuffer = new Uint8Array([10, 11, 12]).buffer;
126+
await transport['_onAudio']({
127+
responseId: 'FAKE_ID',
128+
type: 'audio',
129+
data: outgoingBuffer,
130+
});
131+
132+
// twilio.send should have been called at least twice (media and mark). Inspect the first call (media)
133+
const sendCalls = vi.mocked(twilio.send).mock.calls;
134+
expect(sendCalls.length).toBeGreaterThanOrEqual(1);
135+
136+
const firstArg = sendCalls[0][0] as string;
137+
const parsed = JSON.parse(firstArg);
138+
expect(parsed.event).toBe('media');
139+
// verify media.payload decodes to the resampled bytes
140+
const decoded = Buffer.from(parsed.media.payload, 'base64');
141+
expect(Array.from(decoded)).toEqual(
142+
Array.from(new Uint8Array(outgoingBuffer)),
143+
);
144+
145+
// ensure the outgoing resampler was called
146+
expect(resampleOutgoing).toHaveBeenCalled();
147+
});
148+
});

0 commit comments

Comments
 (0)