Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
SERVER_REQUEST,
WS_CLOSE_CODE
} from "#src/shared/enums.ts";
import type { JSONSerializable, StreamType, BusMessage } from "#src/shared/types";
import type { JSONSerializable, StreamType, BusMessage, AvailableFeatures } from "#src/shared/types";
import type { TransportConfig, SessionId, SessionInfo } from "#src/models/session";

interface Consumers {
Expand Down Expand Up @@ -141,6 +141,10 @@ const ACTIVE_STATES = new Set<SfuClientState>([
export class SfuClient extends EventTarget {
/** Connection errors encountered */
public errors: Error[] = [];
public availableFeatures: AvailableFeatures = {
"rtc": false,
"recording": false,
};
/** Current client state */
private _state: SfuClientState = SfuClientState.DISCONNECTED;
/** Communication bus */
Expand Down Expand Up @@ -256,6 +260,23 @@ export class SfuClient extends EventTarget {
await Promise.all(proms);
return stats;
}
async startRecording() {
return this._bus?.request(
{
name: CLIENT_REQUEST.START_RECORDING,
},
{ batch: true }
);
}

async stopRecording() {
return this._bus?.request(
{
name: CLIENT_REQUEST.STOP_RECORDING,
},
{ batch: true }
);
}

/**
* Updates the server with the info of the session (isTalking, isCameraOn,...) so that it can broadcast it to the
Expand Down Expand Up @@ -445,7 +466,8 @@ export class SfuClient extends EventTarget {
*/
webSocket.addEventListener(
"message",
() => {
(message) => {
this.availableFeatures = JSON.parse(message.data) as AvailableFeatures;
resolve(new Bus(webSocket));
},
{ once: true }
Expand Down
24 changes: 24 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ export const HTTP_INTERFACE: string = process.env.HTTP_INTERFACE || "0.0.0.0";
*/
export const PORT: number = Number(process.env.PORT) || 8070;

/**
* Whether the recording feature is enabled, true by default.
*/
export const RECORDING: boolean = !FALSY_INPUT.has(process.env.RECORDING!);

/**
* The number of workers to spawn (up to core limits) to manage RTC servers.
* 0 < NUM_WORKERS <= os.availableParallelism()
Expand Down Expand Up @@ -197,6 +202,25 @@ export const timeouts: TimeoutConfig = Object.freeze({
busBatch: process.env.JEST_WORKER_ID ? 10 : 300
});

export const recording = Object.freeze({
directory: os.tmpdir() + "/recordings",
enabled: RECORDING,
maxDuration: 1000 * 60 * 60, // 1 hour, could be a env-var.
fileTTL: 1000 * 60 * 60 * 24, // 24 hours
fileType: "mp4",
videoCodec: "libx264",
audioCodec: "aac",
audioLimit: 20,
cameraLimit: 4, // how many camera can be merged into one recording
screenLimit: 1,
});

export const dynamicPorts = Object.freeze({
min: 50000,
max: 59999,
});


// how many errors can occur before the session is closed, recovery attempts will be made until this limit is reached
export const maxSessionErrors: number = 6;

Expand Down
12 changes: 9 additions & 3 deletions src/models/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import {
type SessionId,
type SessionInfo
} from "#src/models/session.ts";
import { getWorker, type RtcWorker } from "#src/services/rtc.ts";
import { Recorder } from "#src/models/recorder.ts";
import { getWorker, type RtcWorker } from "#src/services/resources.ts";

const logger = new Logger("CHANNEL");

Expand Down Expand Up @@ -53,6 +54,7 @@ interface ChannelCreateOptions {
key?: string;
/** Whether to enable WebRTC functionality */
useWebRtc?: boolean;
recordingAddress?: string | null;
}
interface JoinResult {
/** The channel instance */
Expand Down Expand Up @@ -83,6 +85,8 @@ export class Channel extends EventEmitter {
public readonly key?: Buffer;
/** mediasoup Router for media routing */
public readonly router?: Router;
/** Manages the recording of this channel, undefined if the feature is disabled */
public readonly recorder?: Recorder;
/** Active sessions in this channel */
public readonly sessions = new Map<SessionId, Session>();
/** mediasoup Worker handling this channel */
Expand All @@ -102,7 +106,7 @@ export class Channel extends EventEmitter {
issuer: string,
options: ChannelCreateOptions = {}
): Promise<Channel> {
const { key, useWebRtc = true } = options;
const { key, useWebRtc = true, recordingAddress } = options;
const safeIssuer = `${remoteAddress}::${issuer}`;
const oldChannel = Channel.recordsByIssuer.get(safeIssuer);
if (oldChannel) {
Expand All @@ -112,7 +116,7 @@ export class Channel extends EventEmitter {
const channelOptions: ChannelCreateOptions & {
worker?: Worker;
router?: Router;
} = { key };
} = { key, recordingAddress: useWebRtc ? recordingAddress : null };
if (useWebRtc) {
channelOptions.worker = await getWorker();
channelOptions.router = await channelOptions.worker.createRouter({
Expand Down Expand Up @@ -183,6 +187,7 @@ export class Channel extends EventEmitter {
const now = new Date();
this.createDate = now.toISOString();
this.remoteAddress = remoteAddress;
this.recorder = config.recording.enabled && options.recordingAddress ? new Recorder(this, options.recordingAddress) : undefined;
this.key = key ? Buffer.from(key, "base64") : undefined;
this.uuid = crypto.randomUUID();
this.name = `${remoteAddress}*${this.uuid.slice(-5)}`;
Expand Down Expand Up @@ -295,6 +300,7 @@ export class Channel extends EventEmitter {
* @fires Channel#close
*/
close(): void {
this.recorder?.stop();
for (const session of this.sessions.values()) {
session.off("close", this._onSessionClose);
session.close({ code: SESSION_CLOSE_CODE.CHANNEL_CLOSED });
Expand Down
8 changes: 8 additions & 0 deletions src/models/ffmpeg.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { EventEmitter } from "node:events";

export class FFMPEG extends EventEmitter {

constructor() {
super();
}
}
51 changes: 51 additions & 0 deletions src/models/recorder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { EventEmitter } from "node:events";
import type { Channel } from "./channel";
import { getFolder } from "#src/services/resources";
import { Logger } from "#src/utils/utils.ts";

export enum RECORDER_STATE {
STARTED = "started",
STOPPED = "stopped",
}
const logger = new Logger("RECORDER");

export class Recorder extends EventEmitter {
channel: Channel;
state: RECORDER_STATE = RECORDER_STATE.STOPPED;
ffmpeg = null;
destPath: string | undefined;
/** Path to which the final recording will be uploaded to */
recordingAddress: string;

constructor(channel: Channel, recordingAddress: string) {
super();
this.channel = channel;
this.recordingAddress = recordingAddress;
}

async start() {
if (this.state === RECORDER_STATE.STOPPED) {
const { path, sealFolder } = getFolder();
this.destPath = path;
this.once("stopped", sealFolder);
this.state = RECORDER_STATE.STARTED;
logger.trace("TO IMPLEMENT");
// TODO ffmpeg instance creation for recording to destPath with proper name, start, build timestamps object
}

return { state: this.state };
}

async stop() {
if (this.state === RECORDER_STATE.STARTED) {

logger.trace("TO IMPLEMENT");
this.emit("stopped");
// TODO ffmpeg instance stop, cleanup,
// only resolve promise and switch state when completely ready to start a new recording.
this.state = RECORDER_STATE.STOPPED;
}

return { state: this.state };
}
}
50 changes: 49 additions & 1 deletion src/models/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
SERVER_REQUEST,
STREAM_TYPE
} from "#src/shared/enums.ts";
import type { JSONSerializable, StreamType, BusMessage } from "#src/shared/types";
import type {JSONSerializable, StreamType, BusMessage, AvailableFeatures } from "#src/shared/types";
import type { Bus } from "#src/shared/bus.ts";
import type { Channel } from "#src/models/channel.ts";

Expand Down Expand Up @@ -56,6 +56,9 @@ export enum SESSION_CLOSE_CODE {
KICKED = "kicked",
ERROR = "error"
}
export interface SessionPermissions {
recording?: boolean;
}
export interface TransportConfig {
/** Transport identifier */
id: string;
Expand Down Expand Up @@ -107,6 +110,7 @@ const logger = new Logger("SESSION");
*
* @fires Session#stateChange - Emitted when session state changes
* @fires Session#close - Emitted when session is closed
* @fires Session#producer - Emitted when a new producer is created
*/
export class Session extends EventEmitter {
/** Communication bus for WebSocket messaging */
Expand Down Expand Up @@ -135,6 +139,9 @@ export class Session extends EventEmitter {
camera: null,
screen: null
};
public readonly permissions: SessionPermissions = Object.seal({
recording: false
});
/** Parent channel containing this session */
private readonly _channel: Channel;
/** Recovery timeouts for failed consumers */
Expand All @@ -161,6 +168,13 @@ export class Session extends EventEmitter {
this.setMaxListeners(config.CHANNEL_SIZE * 2);
}

get availableFeatures(): AvailableFeatures {
return {
"rtc": Boolean(this._channel.router),
"recording": Boolean(this._channel.router && this._channel.recorder && this.permissions.recording)
}
}

get name(): string {
return `${this._channel.name}:${this.id}@${this.remote}`;
}
Expand All @@ -171,9 +185,26 @@ export class Session extends EventEmitter {

set state(state: SESSION_STATE) {
this._state = state;
/**
* @event Session#stateChange
* @type {{ state: SESSION_STATE }}
*/
this.emit("stateChange", state);
}

updatePermissions(permissions: SessionPermissions | undefined): void {
if (!permissions) {
return;
}
for (const key of Object.keys(this.permissions) as (keyof SessionPermissions)[]) {
const newVal = permissions[key];
if (newVal === undefined) {
continue;
}
this.permissions[key] = Boolean(permissions[key]);
}
}

async getProducerBitRates(): Promise<ProducerBitRates> {
const bitRates: ProducerBitRates = {};
const proms: Promise<void>[] = [];
Expand Down Expand Up @@ -630,8 +661,25 @@ export class Session extends EventEmitter {
logger.debug(`[${this.name}] producing ${type}: ${codec?.mimeType}`);
this._updateRemoteConsumers();
this._broadcastInfo();
/**
* @event Session#producer
* @type {{ type: StreamType, producer: Producer }}
*/
this.emit("producer", { type, producer });
return { id: producer.id };
}
case CLIENT_REQUEST.START_RECORDING: {
if (this.permissions.recording && this._channel.recorder) {
return this._channel.recorder.start();
}
return;
}
case CLIENT_REQUEST.STOP_RECORDING: {
if (this.permissions.recording && this._channel.recorder) {
return this._channel.recorder.stop();
}
return;
}
default:
logger.warn(`[${this.name}] Unknown request type: ${name}`);
throw new Error(`Unknown request type: ${name}`);
Expand Down
6 changes: 3 additions & 3 deletions src/server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as rtc from "#src/services/rtc.ts";
import * as resources from "#src/services/resources.ts";
import * as http from "#src/services/http.ts";
import * as auth from "#src/services/auth.ts";
import { Logger } from "#src/utils/utils.ts";
Expand All @@ -8,15 +8,15 @@ const logger = new Logger("SERVER", { logLevel: "all" });

async function run(): Promise<void> {
auth.start();
await rtc.start();
await resources.start();
await http.start();
logger.info(`ready - PID: ${process.pid}`);
}

function cleanup(): void {
Channel.closeAll();
http.close();
rtc.close();
resources.close();
logger.info("cleanup complete");
}

Expand Down
3 changes: 2 additions & 1 deletion src/services/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import crypto from "node:crypto";
import * as config from "#src/config.ts";
import { Logger } from "#src/utils/utils.ts";
import { AuthenticationError } from "#src/utils/errors.ts";
import type { SessionId } from "#src/models/session.ts";
import type { SessionId, SessionPermissions } from "#src/models/session.ts";
import type { StringLike } from "#src/shared/types.ts";

/**
Expand Down Expand Up @@ -43,6 +43,7 @@ interface PrivateJWTClaims {
sfu_channel_uuid?: string;
session_id?: SessionId;
ice_servers?: object[];
permissions?: SessionPermissions,
sessionIdsByChannel?: Record<string, SessionId[]>;
/** If provided when requesting a channel, this key will be used instead of the global key to verify JWTs related to this channel */
key?: string;
Expand Down
26 changes: 25 additions & 1 deletion src/services/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,29 @@ function setupRoutes(routeListener: RouteListener): void {
return res.end(JSON.stringify(channelStats));
}
});
/**
* GET /v1/channel
*
* Creates (or reuses) a media channel for the authenticated client.
*
* ### Headers
* - `Authorization: Bearer <JWT>` — required.
* The JWT must include the `iss` (issuer) claim identifying the caller.
*
* ### Query Parameters
* - `webRTC` — optional, defaults to `"true"`.
* When set to `"false"`, disables WebRTC setup and creates a non-media channel.
* - `recordingAddress` — optional.
* If provided, enables recording and specifies the destination address
* for recorded media streams. This address should most likely include a secret token,
* so that it can be used publicly. For example http://example.com/recording/123?token=asdasdasdasd
*
* ### Responses
* - `200 OK` — returns `{ uuid: string, url: string }`
* - `401 Unauthorized` — missing or invalid Authorization header
* - `403 Forbidden` — missing `iss` claim
* - `500 Internal Server Error` — failed to create the channel
*/
routeListener.get(`/v${API_VERSION}/channel`, {
callback: async (req, res, { host, protocol, remoteAddress, searchParams }) => {
try {
Expand All @@ -98,7 +121,8 @@ function setupRoutes(routeListener: RouteListener): void {
}
const channel = await Channel.create(remoteAddress, claims.iss, {
key: claims.key,
useWebRtc: searchParams.get("webRTC") !== "false"
useWebRtc: searchParams.get("webRTC") !== "false",
recordingAddress: searchParams.get("recordingAddress")
});
res.setHeader("Content-Type", "application/json");
res.statusCode = 200;
Expand Down
Loading