Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/soft-paws-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/rtc-node': patch
---

Add frame processor support for audio streams
7 changes: 7 additions & 0 deletions packages/livekit-rtc/src/audio_frame.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export class AudioFrame {
channels: number;
samplesPerChannel: number;

private _userdata: Record<string, any> = {};

// note: if converting from Uint8Array to Int16Array, *do not* use buffer.slice!
// it is marked unstable by Node and can cause undefined behaviour, such as massive chunks of
// noise being added to the end.
Expand Down Expand Up @@ -51,6 +53,11 @@ export class AudioFrame {
samplesPerChannel: this.samplesPerChannel,
});
}

/** Returns the user data associated with the audio frame. */
get userdata() {
return this._userdata;
}
}

/**
Expand Down
28 changes: 21 additions & 7 deletions packages/livekit-rtc/src/audio_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import type { UnderlyingSource } from 'node:stream/web';
import { AudioFrame } from './audio_frame.js';
import type { FfiEvent } from './ffi_client.js';
import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js';
import { FrameProcessor } from './frame_processor.js';
import { log } from './log.js';
import type { NewAudioStreamResponse } from './proto/audio_frame_pb.js';
import { AudioStreamType, NewAudioStreamRequest } from './proto/audio_frame_pb.js';
import type { Track } from './track.js';

export interface AudioStreamOptions {
noiseCancellation?: NoiseCancellationOptions;
noiseCancellation?: NoiseCancellationOptions | FrameProcessor<AudioFrame>;
sampleRate?: number;
numChannels?: number;
frameSizeMs?: number;
Expand All @@ -26,7 +28,8 @@ class AudioStreamSource implements UnderlyingSource<AudioFrame> {
private ffiHandle: FfiHandle;
private sampleRate: number;
private numChannels: number;
private ncOptions?: NoiseCancellationOptions;
private legacyNcOptions?: NoiseCancellationOptions;
private frameProcessor?: FrameProcessor<AudioFrame>;
private frameSizeMs?: number;

constructor(
Expand All @@ -37,7 +40,11 @@ class AudioStreamSource implements UnderlyingSource<AudioFrame> {
if (sampleRateOrOptions !== undefined && typeof sampleRateOrOptions !== 'number') {
this.sampleRate = sampleRateOrOptions.sampleRate ?? 48000;
this.numChannels = sampleRateOrOptions.numChannels ?? 1;
this.ncOptions = sampleRateOrOptions.noiseCancellation;
if (sampleRateOrOptions.noiseCancellation instanceof FrameProcessor) {
this.frameProcessor = sampleRateOrOptions.noiseCancellation;
} else {
this.legacyNcOptions = sampleRateOrOptions.noiseCancellation;
}
this.frameSizeMs = sampleRateOrOptions.frameSizeMs;
} else {
this.sampleRate = (sampleRateOrOptions as number) ?? 48000;
Expand All @@ -50,10 +57,10 @@ class AudioStreamSource implements UnderlyingSource<AudioFrame> {
sampleRate: this.sampleRate,
numChannels: this.numChannels,
frameSizeMs: this.frameSizeMs,
...(this.ncOptions
...(this.legacyNcOptions
? {
audioFilterModuleId: this.ncOptions.moduleId,
audioFilterOptions: JSON.stringify(this.ncOptions.options),
audioFilterModuleId: this.legacyNcOptions.moduleId,
audioFilterOptions: JSON.stringify(this.legacyNcOptions.options),
}
: {}),
});
Expand Down Expand Up @@ -85,7 +92,14 @@ class AudioStreamSource implements UnderlyingSource<AudioFrame> {
const streamEvent = ev.message.value.message;
switch (streamEvent.case) {
case 'frameReceived':
const frame = AudioFrame.fromOwnedInfo(streamEvent.value.frame!);
let frame = AudioFrame.fromOwnedInfo(streamEvent.value.frame!);
if (this.frameProcessor && this.frameProcessor.isEnabled()) {
try {
frame = this.frameProcessor.process(frame);
} catch (err: unknown) {
log.warn(`Frame processing failed, passing through original frame: ${err}`);
}
}
this.controller.enqueue(frame);
break;
case 'eos':
Expand Down
27 changes: 27 additions & 0 deletions packages/livekit-rtc/src/frame_processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { AudioFrame } from './audio_frame.js';
import { VideoFrame } from './video_frame.js';

export type FrameProcessorStreamInfo = {
roomName: string;
participantIdentity: string;
publicationSid: string;
};

export type FrameProcessorCredentials = {
token: string;
url: string;
};

export abstract class FrameProcessor<Frame extends VideoFrame | AudioFrame> {
abstract isEnabled(): boolean;
abstract setEnabled(enabled: boolean): void;

abstract onStreamInfoUpdated(info: FrameProcessorStreamInfo): void;
abstract onCredentialsUpdated(credentials: FrameProcessorCredentials): void;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue: these two have a default implementation in python and aren't treated as abstract. We should make them non-required also for node.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, I think I missed that. Fixed!


abstract process(frame: Frame): Frame;
abstract close(): void;
}
5 changes: 5 additions & 0 deletions packages/livekit-rtc/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ export type { ChatMessage } from './types.js';
export { VideoFrame } from './video_frame.js';
export { VideoSource } from './video_source.js';
export { VideoStream, type VideoFrameEvent } from './video_stream.js';
export {
FrameProcessor,
type FrameProcessorStreamInfo,
type FrameProcessorCredentials,
} from './frame_processor.js';
Loading