diff --git a/src/client.ts b/src/client.ts index f1bb5eb..bb07ede 100644 --- a/src/client.ts +++ b/src/client.ts @@ -18,7 +18,7 @@ import { SERVER_REQUEST, WS_CLOSE_CODE } from "#src/shared/enums.ts"; -import type { JSONSerializable, StreamType, BusMessage } from "#src/shared/types"; +import type { JSONSerializable, StreamType, BusMessage, AvailableFeatures } from "#src/shared/types"; import type { TransportConfig, SessionId, SessionInfo } from "#src/models/session"; interface Consumers { @@ -141,6 +141,10 @@ const ACTIVE_STATES = new Set([ export class SfuClient extends EventTarget { /** Connection errors encountered */ public errors: Error[] = []; + public availableFeatures: AvailableFeatures = { + "rtc": false, + "recording": false, + }; /** Current client state */ private _state: SfuClientState = SfuClientState.DISCONNECTED; /** Communication bus */ @@ -256,6 +260,23 @@ export class SfuClient extends EventTarget { await Promise.all(proms); return stats; } + async startRecording() { + return this._bus?.request( + { + name: CLIENT_REQUEST.START_RECORDING, + }, + { batch: true } + ); + } + + async stopRecording() { + return this._bus?.request( + { + name: CLIENT_REQUEST.STOP_RECORDING, + }, + { batch: true } + ); + } /** * Updates the server with the info of the session (isTalking, isCameraOn,...) so that it can broadcast it to the @@ -445,7 +466,8 @@ export class SfuClient extends EventTarget { */ webSocket.addEventListener( "message", - () => { + (message) => { + this.availableFeatures = JSON.parse(message.data) as AvailableFeatures; resolve(new Bus(webSocket)); }, { once: true } diff --git a/src/config.ts b/src/config.ts index 23474ca..3a7f614 100644 --- a/src/config.ts +++ b/src/config.ts @@ -64,6 +64,11 @@ export const HTTP_INTERFACE: string = process.env.HTTP_INTERFACE || "0.0.0.0"; */ export const PORT: number = Number(process.env.PORT) || 8070; +/** + * Whether the recording feature is enabled, true by default. + */ +export const RECORDING: boolean = !FALSY_INPUT.has(process.env.RECORDING!); + /** * The number of workers to spawn (up to core limits) to manage RTC servers. * 0 < NUM_WORKERS <= os.availableParallelism() @@ -197,6 +202,25 @@ export const timeouts: TimeoutConfig = Object.freeze({ busBatch: process.env.JEST_WORKER_ID ? 10 : 300 }); +export const recording = Object.freeze({ + directory: os.tmpdir() + "/recordings", + enabled: RECORDING, + maxDuration: 1000 * 60 * 60, // 1 hour, could be a env-var. + fileTTL: 1000 * 60 * 60 * 24, // 24 hours + fileType: "mp4", + videoCodec: "libx264", + audioCodec: "aac", + audioLimit: 20, + cameraLimit: 4, // how many camera can be merged into one recording + screenLimit: 1, +}); + +export const dynamicPorts = Object.freeze({ + min: 50000, + max: 59999, +}); + + // how many errors can occur before the session is closed, recovery attempts will be made until this limit is reached export const maxSessionErrors: number = 6; diff --git a/src/models/channel.ts b/src/models/channel.ts index 91889cc..e9ecc3a 100644 --- a/src/models/channel.ts +++ b/src/models/channel.ts @@ -12,7 +12,8 @@ import { type SessionId, type SessionInfo } from "#src/models/session.ts"; -import { getWorker, type RtcWorker } from "#src/services/rtc.ts"; +import { Recorder } from "#src/models/recorder.ts"; +import { getWorker, type RtcWorker } from "#src/services/resources.ts"; const logger = new Logger("CHANNEL"); @@ -53,6 +54,7 @@ interface ChannelCreateOptions { key?: string; /** Whether to enable WebRTC functionality */ useWebRtc?: boolean; + recordingAddress?: string | null; } interface JoinResult { /** The channel instance */ @@ -83,6 +85,8 @@ export class Channel extends EventEmitter { public readonly key?: Buffer; /** mediasoup Router for media routing */ public readonly router?: Router; + /** Manages the recording of this channel, undefined if the feature is disabled */ + public readonly recorder?: Recorder; /** Active sessions in this channel */ public readonly sessions = new Map(); /** mediasoup Worker handling this channel */ @@ -102,7 +106,7 @@ export class Channel extends EventEmitter { issuer: string, options: ChannelCreateOptions = {} ): Promise { - const { key, useWebRtc = true } = options; + const { key, useWebRtc = true, recordingAddress } = options; const safeIssuer = `${remoteAddress}::${issuer}`; const oldChannel = Channel.recordsByIssuer.get(safeIssuer); if (oldChannel) { @@ -112,7 +116,7 @@ export class Channel extends EventEmitter { const channelOptions: ChannelCreateOptions & { worker?: Worker; router?: Router; - } = { key }; + } = { key, recordingAddress: useWebRtc ? recordingAddress : null }; if (useWebRtc) { channelOptions.worker = await getWorker(); channelOptions.router = await channelOptions.worker.createRouter({ @@ -183,6 +187,7 @@ export class Channel extends EventEmitter { const now = new Date(); this.createDate = now.toISOString(); this.remoteAddress = remoteAddress; + this.recorder = config.recording.enabled && options.recordingAddress ? new Recorder(this, options.recordingAddress) : undefined; this.key = key ? Buffer.from(key, "base64") : undefined; this.uuid = crypto.randomUUID(); this.name = `${remoteAddress}*${this.uuid.slice(-5)}`; @@ -295,6 +300,7 @@ export class Channel extends EventEmitter { * @fires Channel#close */ close(): void { + this.recorder?.stop(); for (const session of this.sessions.values()) { session.off("close", this._onSessionClose); session.close({ code: SESSION_CLOSE_CODE.CHANNEL_CLOSED }); diff --git a/src/models/ffmpeg.ts b/src/models/ffmpeg.ts new file mode 100644 index 0000000..48189b9 --- /dev/null +++ b/src/models/ffmpeg.ts @@ -0,0 +1,8 @@ +import { EventEmitter } from "node:events"; + +export class FFMPEG extends EventEmitter { + + constructor() { + super(); + } +} diff --git a/src/models/recorder.ts b/src/models/recorder.ts new file mode 100644 index 0000000..561c2e1 --- /dev/null +++ b/src/models/recorder.ts @@ -0,0 +1,51 @@ +import { EventEmitter } from "node:events"; +import type { Channel } from "./channel"; +import { getFolder } from "#src/services/resources"; +import { Logger } from "#src/utils/utils.ts"; + +export enum RECORDER_STATE { + STARTED = "started", + STOPPED = "stopped", +} +const logger = new Logger("RECORDER"); + +export class Recorder extends EventEmitter { + channel: Channel; + state: RECORDER_STATE = RECORDER_STATE.STOPPED; + ffmpeg = null; + destPath: string | undefined; + /** Path to which the final recording will be uploaded to */ + recordingAddress: string; + + constructor(channel: Channel, recordingAddress: string) { + super(); + this.channel = channel; + this.recordingAddress = recordingAddress; + } + + async start() { + if (this.state === RECORDER_STATE.STOPPED) { + const { path, sealFolder } = getFolder(); + this.destPath = path; + this.once("stopped", sealFolder); + this.state = RECORDER_STATE.STARTED; + logger.trace("TO IMPLEMENT"); + // TODO ffmpeg instance creation for recording to destPath with proper name, start, build timestamps object + } + + return { state: this.state }; + } + + async stop() { + if (this.state === RECORDER_STATE.STARTED) { + + logger.trace("TO IMPLEMENT"); + this.emit("stopped"); + // TODO ffmpeg instance stop, cleanup, + // only resolve promise and switch state when completely ready to start a new recording. + this.state = RECORDER_STATE.STOPPED; + } + + return { state: this.state }; + } +} diff --git a/src/models/session.ts b/src/models/session.ts index 40c0884..b50eb81 100644 --- a/src/models/session.ts +++ b/src/models/session.ts @@ -20,7 +20,7 @@ import { SERVER_REQUEST, STREAM_TYPE } from "#src/shared/enums.ts"; -import type { JSONSerializable, StreamType, BusMessage } from "#src/shared/types"; +import type {JSONSerializable, StreamType, BusMessage, AvailableFeatures } from "#src/shared/types"; import type { Bus } from "#src/shared/bus.ts"; import type { Channel } from "#src/models/channel.ts"; @@ -56,6 +56,9 @@ export enum SESSION_CLOSE_CODE { KICKED = "kicked", ERROR = "error" } +export interface SessionPermissions { + recording?: boolean; +} export interface TransportConfig { /** Transport identifier */ id: string; @@ -107,6 +110,7 @@ const logger = new Logger("SESSION"); * * @fires Session#stateChange - Emitted when session state changes * @fires Session#close - Emitted when session is closed + * @fires Session#producer - Emitted when a new producer is created */ export class Session extends EventEmitter { /** Communication bus for WebSocket messaging */ @@ -135,6 +139,9 @@ export class Session extends EventEmitter { camera: null, screen: null }; + public readonly permissions: SessionPermissions = Object.seal({ + recording: false + }); /** Parent channel containing this session */ private readonly _channel: Channel; /** Recovery timeouts for failed consumers */ @@ -161,6 +168,13 @@ export class Session extends EventEmitter { this.setMaxListeners(config.CHANNEL_SIZE * 2); } + get availableFeatures(): AvailableFeatures { + return { + "rtc": Boolean(this._channel.router), + "recording": Boolean(this._channel.router && this._channel.recorder && this.permissions.recording) + } + } + get name(): string { return `${this._channel.name}:${this.id}@${this.remote}`; } @@ -171,9 +185,26 @@ export class Session extends EventEmitter { set state(state: SESSION_STATE) { this._state = state; + /** + * @event Session#stateChange + * @type {{ state: SESSION_STATE }} + */ this.emit("stateChange", state); } + updatePermissions(permissions: SessionPermissions | undefined): void { + if (!permissions) { + return; + } + for (const key of Object.keys(this.permissions) as (keyof SessionPermissions)[]) { + const newVal = permissions[key]; + if (newVal === undefined) { + continue; + } + this.permissions[key] = Boolean(permissions[key]); + } + } + async getProducerBitRates(): Promise { const bitRates: ProducerBitRates = {}; const proms: Promise[] = []; @@ -630,8 +661,25 @@ export class Session extends EventEmitter { logger.debug(`[${this.name}] producing ${type}: ${codec?.mimeType}`); this._updateRemoteConsumers(); this._broadcastInfo(); + /** + * @event Session#producer + * @type {{ type: StreamType, producer: Producer }} + */ + this.emit("producer", { type, producer }); return { id: producer.id }; } + case CLIENT_REQUEST.START_RECORDING: { + if (this.permissions.recording && this._channel.recorder) { + return this._channel.recorder.start(); + } + return; + } + case CLIENT_REQUEST.STOP_RECORDING: { + if (this.permissions.recording && this._channel.recorder) { + return this._channel.recorder.stop(); + } + return; + } default: logger.warn(`[${this.name}] Unknown request type: ${name}`); throw new Error(`Unknown request type: ${name}`); diff --git a/src/server.ts b/src/server.ts index 80ab97d..2d36a96 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,4 +1,4 @@ -import * as rtc from "#src/services/rtc.ts"; +import * as resources from "#src/services/resources.ts"; import * as http from "#src/services/http.ts"; import * as auth from "#src/services/auth.ts"; import { Logger } from "#src/utils/utils.ts"; @@ -8,7 +8,7 @@ const logger = new Logger("SERVER", { logLevel: "all" }); async function run(): Promise { auth.start(); - await rtc.start(); + await resources.start(); await http.start(); logger.info(`ready - PID: ${process.pid}`); } @@ -16,7 +16,7 @@ async function run(): Promise { function cleanup(): void { Channel.closeAll(); http.close(); - rtc.close(); + resources.close(); logger.info("cleanup complete"); } diff --git a/src/services/auth.ts b/src/services/auth.ts index 4c4c5aa..7efef4a 100644 --- a/src/services/auth.ts +++ b/src/services/auth.ts @@ -3,7 +3,7 @@ import crypto from "node:crypto"; import * as config from "#src/config.ts"; import { Logger } from "#src/utils/utils.ts"; import { AuthenticationError } from "#src/utils/errors.ts"; -import type { SessionId } from "#src/models/session.ts"; +import type { SessionId, SessionPermissions } from "#src/models/session.ts"; import type { StringLike } from "#src/shared/types.ts"; /** @@ -43,6 +43,7 @@ interface PrivateJWTClaims { sfu_channel_uuid?: string; session_id?: SessionId; ice_servers?: object[]; + permissions?: SessionPermissions, sessionIdsByChannel?: Record; /** If provided when requesting a channel, this key will be used instead of the global key to verify JWTs related to this channel */ key?: string; diff --git a/src/services/http.ts b/src/services/http.ts index c00d9b5..bb20ef3 100644 --- a/src/services/http.ts +++ b/src/services/http.ts @@ -79,6 +79,29 @@ function setupRoutes(routeListener: RouteListener): void { return res.end(JSON.stringify(channelStats)); } }); + /** + * GET /v1/channel + * + * Creates (or reuses) a media channel for the authenticated client. + * + * ### Headers + * - `Authorization: Bearer ` — required. + * The JWT must include the `iss` (issuer) claim identifying the caller. + * + * ### Query Parameters + * - `webRTC` — optional, defaults to `"true"`. + * When set to `"false"`, disables WebRTC setup and creates a non-media channel. + * - `recordingAddress` — optional. + * If provided, enables recording and specifies the destination address + * for recorded media streams. This address should most likely include a secret token, + * so that it can be used publicly. For example http://example.com/recording/123?token=asdasdasdasd + * + * ### Responses + * - `200 OK` — returns `{ uuid: string, url: string }` + * - `401 Unauthorized` — missing or invalid Authorization header + * - `403 Forbidden` — missing `iss` claim + * - `500 Internal Server Error` — failed to create the channel + */ routeListener.get(`/v${API_VERSION}/channel`, { callback: async (req, res, { host, protocol, remoteAddress, searchParams }) => { try { @@ -98,7 +121,8 @@ function setupRoutes(routeListener: RouteListener): void { } const channel = await Channel.create(remoteAddress, claims.iss, { key: claims.key, - useWebRtc: searchParams.get("webRTC") !== "false" + useWebRtc: searchParams.get("webRTC") !== "false", + recordingAddress: searchParams.get("recordingAddress") }); res.setHeader("Content-Type", "application/json"); res.statusCode = 200; diff --git a/src/services/rtc.ts b/src/services/resources.ts similarity index 83% rename from src/services/rtc.ts rename to src/services/resources.ts index 2e546c7..9f7cff2 100644 --- a/src/services/rtc.ts +++ b/src/services/resources.ts @@ -10,7 +10,7 @@ export interface RtcWorker extends mediasoup.types.Worker { }; } -const logger = new Logger("RTC"); +const logger = new Logger("RESOURCES"); const workers = new Set(); export async function start(): Promise { @@ -76,3 +76,20 @@ export async function getWorker(): Promise { logger.debug(`worker ${leastUsedWorker!.pid} with ${lowestUsage} ru_maxrss was selected`); return leastUsedWorker; } + +export function getFolder() { + // create a temp folder at a path, returns the path and a function to seal the folder + return { + path: "", + sealFolder: () => { + // move the content into a permanent folder location so it can easily be retrieved for processing later + // or directly forward for transcription + }, + } +} + +export function getPort() { +} + +export function releasePort(port: number) { +} diff --git a/src/services/ws.ts b/src/services/ws.ts index 5ca1bda..d6f6bea 100644 --- a/src/services/ws.ts +++ b/src/services/ws.ts @@ -112,7 +112,7 @@ function connect(webSocket: WebSocket, credentials: Credentials): Session { const { channelUUID, jwt } = credentials; let channel = channelUUID ? Channel.records.get(channelUUID) : undefined; const authResult = verify(jwt, channel?.key); - const { sfu_channel_uuid, session_id } = authResult; + const { sfu_channel_uuid, session_id, permissions } = authResult; if (!channelUUID && sfu_channel_uuid) { // Cases where the channelUUID is not provided in the credentials for backwards compatibility with version 1.1 and earlier. channel = Channel.records.get(sfu_channel_uuid); @@ -128,9 +128,10 @@ function connect(webSocket: WebSocket, credentials: Credentials): Session { if (!session_id) { throw new AuthenticationError("Malformed JWT payload"); } - webSocket.send(""); // client can start using ws after this message. const bus = new Bus(webSocket, { batchDelay: config.timeouts.busBatch }); const { session } = Channel.join(channel.uuid, session_id); + session.updatePermissions(permissions); + webSocket.send(JSON.stringify(session.availableFeatures)); // client can start using ws after this message. session.once("close", ({ code }: { code: string }) => { let wsCloseCode = WS_CLOSE_CODE.CLEAN; switch (code) { diff --git a/src/shared/enums.ts b/src/shared/enums.ts index a8703d6..e5a3a92 100644 --- a/src/shared/enums.ts +++ b/src/shared/enums.ts @@ -33,7 +33,11 @@ export enum CLIENT_REQUEST { /** Requests the server to connect the server-to-client transport */ CONNECT_STC_TRANSPORT = "CONNECT_STC_TRANSPORT", /** Requests the creation of a consumer that is used to upload a track to the server */ - INIT_PRODUCER = "INIT_PRODUCER" + INIT_PRODUCER = "INIT_PRODUCER", + /** Requests to start recording of the call */ + START_RECORDING = "START_RECORDING", + /** Requests to stop recording of the call */ + STOP_RECORDING = "STOP_RECORDING" } export enum CLIENT_MESSAGE { diff --git a/src/shared/types.ts b/src/shared/types.ts index 4210662..f59c891 100644 --- a/src/shared/types.ts +++ b/src/shared/types.ts @@ -10,6 +10,11 @@ export type StreamType = "audio" | "camera" | "screen"; export type StringLike = Buffer | string; +export type AvailableFeatures = { + "rtc": boolean, + "recording": boolean, +} + import type { DownloadStates } from "#src/client.ts"; import type { SessionId, SessionInfo, TransportConfig } from "#src/models/session.ts"; @@ -49,6 +54,8 @@ export type BusMessage = name: typeof CLIENT_REQUEST.INIT_PRODUCER; payload: { type: StreamType; kind: MediaKind; rtpParameters: RtpParameters }; } + | { name: typeof CLIENT_REQUEST.START_RECORDING; payload?: never } + | { name: typeof CLIENT_REQUEST.STOP_RECORDING; payload?: never } | { name: typeof SERVER_MESSAGE.BROADCAST; payload: { senderId: SessionId; message: JSONSerializable }; diff --git a/src/utils/utils.ts b/src/utils/utils.ts index 39773cb..8c668af 100644 --- a/src/utils/utils.ts +++ b/src/utils/utils.ts @@ -12,6 +12,7 @@ const ASCII = { green: "\x1b[32m", yellow: "\x1b[33m", white: "\x1b[37m", + cyan: "\x1b[36m", default: "\x1b[0m" } } as const; @@ -48,6 +49,19 @@ export interface ParseBodyOptions { json?: boolean; } +function getCallChain(depth: number = 8): string { + const stack = new Error().stack?.split("\n").slice(2, depth + 2) ?? []; + return stack + .map(line => { + const match = line.trim().match(/^at\s+(.*?)\s+\(/); + return match ? match[1] : null; + }) + .slice(1, depth + 1) + .filter(Boolean) + .reverse() + .join(" > "); +} + export class Logger { private readonly _name: string; private readonly _colorize: (text: string, color?: string) => string; @@ -83,6 +97,9 @@ export class Logger { verbose(text: string): void { this._log(console.log, ":VERBOSE:", text, ASCII.color.white); } + trace(message: string, { depth = 8 }: { depth?: number } = {}): void { + this._log(console.log, ":TRACE:", `${getCallChain(depth)} ${message}`, ASCII.color.cyan); + } private _generateTimeStamp(): string { const now = new Date(); return now.toISOString() + " "; diff --git a/tests/models.test.ts b/tests/models.test.ts index d84f49b..0b00d9d 100644 --- a/tests/models.test.ts +++ b/tests/models.test.ts @@ -1,17 +1,17 @@ import { describe, beforeEach, afterEach, expect, jest } from "@jest/globals"; -import * as rtc from "#src/services/rtc"; +import * as resources from "#src/services/resources"; import { Channel } from "#src/models/channel"; import { timeouts, CHANNEL_SIZE } from "#src/config"; import { OvercrowdedError } from "#src/utils/errors"; describe("Models", () => { beforeEach(async () => { - await rtc.start(); + await resources.start(); }); afterEach(() => { Channel.closeAll(); - rtc.close(); + resources.close(); }); test("Create channel and session", async () => { const channel = await Channel.create("testRemote", "testIssuer"); diff --git a/tests/network.test.ts b/tests/network.test.ts index 6a90558..080787f 100644 --- a/tests/network.test.ts +++ b/tests/network.test.ts @@ -284,4 +284,16 @@ describe("Full network", () => { expect(event1.detail.payload.message).toBe(message); expect(event2.detail.payload.message).toBe(message); }); + test("POC RECORDING", async () => { + const channelUUID = await network.getChannelUUID(); + const user1 = await network.connect(channelUUID, 1); + await once(user1.session, "stateChange"); + const sender = await network.connect(channelUUID, 3); + await once(sender.session, "stateChange"); + sender.session.updatePermissions({ recording: true }); + const startResult = await sender.sfuClient.startRecording() as { state: string }; + expect(startResult.state).toBe("started"); + const stopResult = await sender.sfuClient.stopRecording() as { state: string }; + expect(stopResult.state).toBe("stopped"); + }); }); diff --git a/tests/rtc.test.ts b/tests/rtc.test.ts index 08c188d..37caa6f 100644 --- a/tests/rtc.test.ts +++ b/tests/rtc.test.ts @@ -1,19 +1,19 @@ import { afterEach, beforeEach, describe, expect } from "@jest/globals"; -import * as rtc from "#src/services/rtc"; +import * as resources from "#src/services/resources"; import * as config from "#src/config"; describe("rtc service", () => { beforeEach(async () => { - await rtc.start(); + await resources.start(); }); afterEach(() => { - rtc.close(); + resources.close(); }); test("worker load should be evenly distributed", async () => { const usedWorkers = new Set(); for (let i = 0; i < config.NUM_WORKERS; ++i) { - const worker = await rtc.getWorker(); + const worker = await resources.getWorker(); const router = await worker.createRouter({}); const webRtcServer = await worker.createWebRtcServer(config.rtc.rtcServerOptions); const promises = []; diff --git a/tests/utils/network.ts b/tests/utils/network.ts index dace247..9ac1d16 100644 --- a/tests/utils/network.ts +++ b/tests/utils/network.ts @@ -5,7 +5,7 @@ import * as fakeParameters from "mediasoup-client/lib/test/fakeParameters"; import * as auth from "#src/services/auth"; import * as http from "#src/services/http"; -import * as rtc from "#src/services/rtc"; +import * as resources from "#src/services/resources"; import { SfuClient, SfuClientState } from "#src/client"; import { Channel } from "#src/models/channel"; import type { Session } from "#src/models/session"; @@ -69,7 +69,7 @@ export class LocalNetwork { this.port = port; // Start all services in correct order - await rtc.start(); + await resources.start(); await http.start({ httpInterface: hostname, port }); await auth.start(HMAC_B64_KEY); } @@ -217,7 +217,7 @@ export class LocalNetwork { // Stop all services auth.close(); http.close(); - rtc.close(); + resources.close(); // Clear network info this.hostname = undefined;