diff --git a/examples/demo/demo.ts b/examples/demo/demo.ts index 22f1a8d6ed..808e7af337 100644 --- a/examples/demo/demo.ts +++ b/examples/demo/demo.ts @@ -21,6 +21,7 @@ import { ParticipantEvent, RemoteParticipant, RemoteTrackPublication, + RemoteVideoTrack, Room, RoomEvent, ScreenSharePresets, @@ -34,10 +35,12 @@ import { isLocalTrack, isRemoteParticipant, isRemoteTrack, + isVideoTrack, setLogLevel, supportsAV1, supportsVP9, } from '../../src/index'; +import { TrackEvent } from '../../src/room/events'; import { isSVCCodec, sleep, supportsH265 } from '../../src/room/utils'; setLogLevel(LogLevel.debug); @@ -237,6 +240,43 @@ const appActions = { appendLog('subscribed to track', pub.trackSid, participant.identity); renderParticipant(participant); renderScreenShare(room); + + // Display the user timestamp that matches the frame currently on screen. + // Publish & Subscribe timestamps update every frame; latency updates at ~2 Hz + // so it's readable (matching the Rust subscriber's approach). + if (track instanceof RemoteVideoTrack) { + let lastLatencyUpdate = 0; + let cachedUserTimestampUs: number | undefined; + let cachedLatencyStr = ''; + + track.on(TrackEvent.TimeSyncUpdate, ({ rtpTimestamp }) => { + const timestampUs = track.lookupUserTimestamp(rtpTimestamp); + if (timestampUs !== undefined) { + cachedUserTimestampUs = timestampUs; + } + if (cachedUserTimestampUs === undefined) return; + + const now = Date.now(); + + if (now - lastLatencyUpdate >= 500) { + const nowUs = now * 1000; + const latencyMs = (nowUs - cachedUserTimestampUs) / 1000; + cachedLatencyStr = `${latencyMs.toFixed(1)}ms`; + lastLatencyUpdate = now; + } + + const container = getParticipantsAreaElement(); + const tsElm = container.querySelector(`#user-ts-${participant.identity}`); + if (tsElm) { + const pubStr = new Date(cachedUserTimestampUs / 1000).toISOString().substring(11, 23); + const subStr = new Date(now).toISOString().substring(11, 23); + tsElm.innerHTML = + `Publish:   ${pubStr}
` + + `Subscribe: ${subStr}
` + + `Latency:   ${cachedLatencyStr}`; + } + }); + } }) .on(RoomEvent.TrackUnsubscribed, (_, pub, participant) => { appendLog('unsubscribed from track', pub.trackSid); @@ -810,6 +850,8 @@ function renderParticipant(participant: Participant, remove: boolean = false) { +
+
${ !isLocalParticipant(participant) ? `
diff --git a/examples/demo/styles.css b/examples/demo/styles.css index 059a0815e5..79858c9b58 100644 --- a/examples/demo/styles.css +++ b/examples/demo/styles.css @@ -164,3 +164,17 @@ position: absolute; z-index: 4; } + +.participant .user-ts-overlay { + position: absolute; + bottom: 28px; + left: 0; + z-index: 5; + font-family: monospace; + font-size: 0.65em; + color: #eee; + background: rgba(0, 0, 0, 0.5); + padding: 3px 6px; + line-height: 1.4; + border-radius: 0 3px 0 0; +} diff --git a/rollup.config.worker.js b/rollup.config.worker.js index 14115d7108..86fbf66ee9 100644 --- a/rollup.config.worker.js +++ b/rollup.config.worker.js @@ -3,23 +3,38 @@ import typescript from 'rollup-plugin-typescript2'; import packageJson from './package.json'; import { commonPlugins, kebabCaseToPascalCase } from './rollup.config'; -export default { - input: 'src/e2ee/worker/e2ee.worker.ts', - output: [ - { - file: `dist/${packageJson.name}.e2ee.worker.mjs`, - format: 'es', - strict: true, - sourcemap: true, - }, - { - file: `dist/${packageJson.name}.e2ee.worker.js`, - format: 'umd', - strict: true, - sourcemap: true, - name: kebabCaseToPascalCase(packageJson.name) + '.e2ee.worker', - plugins: [terser()], - }, - ], - plugins: [typescript({ tsconfig: './src/e2ee/worker/tsconfig.json' }), ...commonPlugins], -}; +function workerConfig(input, suffix, umdName) { + return { + input, + output: [ + { + file: `dist/${packageJson.name}.${suffix}.mjs`, + format: 'es', + strict: true, + sourcemap: true, + }, + { + file: `dist/${packageJson.name}.${suffix}.js`, + format: 'umd', + strict: true, + sourcemap: true, + name: umdName, + plugins: [terser()], + }, + ], + plugins: [typescript({ tsconfig: './src/e2ee/worker/tsconfig.json' }), ...commonPlugins], + }; +} + +export default [ + workerConfig( + 'src/e2ee/worker/e2ee.worker.ts', + 'e2ee.worker', + kebabCaseToPascalCase(packageJson.name) + '.e2ee.worker', + ), + workerConfig( + 'src/user_timestamp/userTimestamp.worker.ts', + 'user-timestamp.worker', + kebabCaseToPascalCase(packageJson.name) + '.userTimestamp.worker', + ), +]; diff --git a/src/e2ee/E2eeManager.ts b/src/e2ee/E2eeManager.ts index b10676a891..2e60c48b37 100644 --- a/src/e2ee/E2eeManager.ts +++ b/src/e2ee/E2eeManager.ts @@ -8,6 +8,7 @@ import { ConnectionState } from '../room/Room'; import { DeviceUnsupportedError } from '../room/errors'; import { EngineEvent, ParticipantEvent, RoomEvent } from '../room/events'; import type RemoteTrack from '../room/track/RemoteTrack'; +import RemoteVideoTrack from '../room/track/RemoteVideoTrack'; import type { Track } from '../room/track/Track'; import type { VideoCodec } from '../room/track/options'; import { mimeTypeToVideoCodecString } from '../room/track/utils'; @@ -221,6 +222,14 @@ export class E2EEManager encryptFuture.resolve(data as EncryptDataResponseMessage['data']); } break; + case 'userTimestamp': + this.handleUserTimestamp( + data.trackId, + data.participantIdentity, + data.timestampUs, + data.rtpTimestamp, + ); + break; default: break; } @@ -231,6 +240,31 @@ export class E2EEManager this.emit(EncryptionEvent.EncryptionError, ev.error, undefined); }; + private handleUserTimestamp( + trackId: string, + participantIdentity: string, + timestampUs: number, + rtpTimestamp?: number, + ) { + if (!this.room) { + return; + } + const participant = this.room.getParticipantByIdentity(participantIdentity); + if (!participant) { + return; + } + for (const pub of participant.trackPublications.values()) { + if ( + pub.track && + pub.track.mediaStreamID === trackId && + pub.track instanceof RemoteVideoTrack + ) { + pub.track.setUserTimestamp(timestampUs, rtpTimestamp); + return; + } + } + } + public setupEngine(engine: RTCEngine) { engine.on(EngineEvent.RTPVideoMapUpdate, (rtpMap) => { this.postRTPMap(rtpMap); diff --git a/src/e2ee/types.ts b/src/e2ee/types.ts index c941700fec..b786fa6f8d 100644 --- a/src/e2ee/types.ts +++ b/src/e2ee/types.ts @@ -149,6 +149,16 @@ export interface EncryptDataResponseMessage extends BaseMessage { }; } +export interface UserTimestampMessage extends BaseMessage { + kind: 'userTimestamp'; + data: { + trackId: string; + participantIdentity: string; + timestampUs: number; + rtpTimestamp?: number; + }; +} + export type E2EEWorkerMessage = | InitMessage | SetKeyMessage @@ -165,7 +175,8 @@ export type E2EEWorkerMessage = | DecryptDataRequestMessage | DecryptDataResponseMessage | EncryptDataRequestMessage - | EncryptDataResponseMessage; + | EncryptDataResponseMessage + | UserTimestampMessage; export type KeySet = { material: CryptoKey; encryptionKey: CryptoKey }; diff --git a/src/e2ee/worker/FrameCryptor.ts b/src/e2ee/worker/FrameCryptor.ts index b7628d911a..97a373ff1c 100644 --- a/src/e2ee/worker/FrameCryptor.ts +++ b/src/e2ee/worker/FrameCryptor.ts @@ -4,10 +4,17 @@ import { EventEmitter } from 'events'; import type TypedEventEmitter from 'typed-emitter'; import { workerLogger } from '../../logger'; import type { VideoCodec } from '../../room/track/options'; +import { stripUserTimestampFromEncodedFrame } from '../../user_timestamp/UserTimestampTransformer'; import { ENCRYPTION_ALGORITHM, IV_LENGTH, UNENCRYPTED_BYTES } from '../constants'; import { CryptorError, CryptorErrorReason } from '../errors'; import { type CryptorCallbacks, CryptorEvent } from '../events'; -import type { DecodeRatchetOptions, KeyProviderOptions, KeySet, RatchetResult } from '../types'; +import type { + DecodeRatchetOptions, + KeyProviderOptions, + KeySet, + RatchetResult, + UserTimestampMessage, +} from '../types'; import { deriveKeys, isVideoFrame, needsRbspUnescaping, parseRbsp, writeRbsp } from '../utils'; import type { ParticipantKeyHandler } from './ParticipantKeyHandler'; import { processNALUsForEncryption } from './naluUtils'; @@ -454,6 +461,31 @@ export class FrameCryptor extends BaseFrameCryptor { encodedFrame: RTCEncodedVideoFrame | RTCEncodedAudioFrame, controller: TransformStreamDefaultController, ) { + // Always attempt to strip LKTS user timestamp trailer before any e2ee + // processing. On the send side, the trailer is appended *after* encryption, + // so it must be removed *before* decryption. + if (isVideoFrame(encodedFrame) && encodedFrame.data.byteLength > 0) { + try { + const userTsResult = stripUserTimestampFromEncodedFrame( + encodedFrame as RTCEncodedVideoFrame, + ); + if (userTsResult !== undefined && this.trackId && this.participantIdentity) { + const msg: UserTimestampMessage = { + kind: 'userTimestamp', + data: { + trackId: this.trackId, + participantIdentity: this.participantIdentity, + timestampUs: userTsResult.timestampUs, + rtpTimestamp: userTsResult.rtpTimestamp, + }, + }; + postMessage(msg); + } + } catch { + // Best-effort: never break media pipeline if timestamp parsing fails. + } + } + if ( !this.isEnabled() || // skip for decryption for empty dtx frames diff --git a/src/index.ts b/src/index.ts index 826f5c5e4e..6a1996dfc1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -167,3 +167,12 @@ export { } from './room/data-track/packet/extensions'; export { LocalTrackRecorder } from './room/track/record'; + +export { + type UserTimestampInfo, + type UserTimestampWithRtp, + USER_TS_MAGIC, + USER_TS_TRAILER_SIZE, + extractUserTimestampTrailer, + stripUserTimestampFromEncodedFrame, +} from './user_timestamp'; diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index b49c4f0894..c596863eb0 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -80,6 +80,7 @@ import type { TrackPublishOptions, VideoCodec } from './track/options'; import { getTrackPublicationInfo } from './track/utils'; import type { LoggerOptions } from './types'; import { + isChromiumBased, isVideoCodec, isVideoTrack, isWeb, @@ -661,9 +662,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit private makeRTCConfiguration(serverResponse: JoinResponse | ReconnectResponse): RTCConfiguration { const rtcConfig = { ...this.rtcConfig }; - if (this.signalOpts?.e2eeEnabled) { - this.log.debug('E2EE - setting up transports with insertable streams', this.logContext); - // this makes sure that no data is sent before the transforms are ready + if (this.signalOpts?.e2eeEnabled || isChromiumBased()) { + this.log.debug('setting up transports with insertable streams', this.logContext); // @ts-ignore rtcConfig.encodedInsertableStreams = true; } diff --git a/src/room/Room.ts b/src/room/Room.ts index 7de0c84fad..b9895ea45f 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -36,6 +36,7 @@ import type TypedEmitter from 'typed-emitter'; import { ensureTrailingSlash } from '../api/utils'; import { EncryptionEvent } from '../e2ee'; import { type BaseE2EEManager, E2EEManager } from '../e2ee/E2eeManager'; +import { isScriptTransformSupported } from '../e2ee/utils'; import log, { LoggerNames, getLogger } from '../logger'; import type { InternalRoomConnectOptions, @@ -43,6 +44,7 @@ import type { RoomConnectOptions, RoomOptions, } from '../options'; +import { stripUserTimestampFromEncodedFrame } from '../user_timestamp/UserTimestampTransformer'; import TypedPromise from '../utils/TypedPromise'; import { getBrowser } from '../utils/browserParser'; import { BackOffStrategy } from './BackOffStrategy'; @@ -81,6 +83,7 @@ import LocalTrackPublication from './track/LocalTrackPublication'; import LocalVideoTrack from './track/LocalVideoTrack'; import type RemoteTrack from './track/RemoteTrack'; import RemoteTrackPublication from './track/RemoteTrackPublication'; +import RemoteVideoTrack from './track/RemoteVideoTrack'; import { Track } from './track/Track'; import type { TrackPublication } from './track/TrackPublication'; import type { TrackProcessor } from './track/processor/types'; @@ -100,6 +103,7 @@ import { getDisconnectReasonFromConnectionError, getEmptyAudioStreamTrack, isBrowserSupported, + isChromiumBased, isCloud, isLocalAudioTrack, isLocalParticipant, @@ -123,6 +127,10 @@ export enum ConnectionState { SignalReconnecting = 'signalReconnecting', } +function createUserTimestampWorker(): Worker { + return new Worker(new URL('./livekit-client.user-timestamp.worker.js', import.meta.url)); +} + const CONNECTION_RECONCILE_FREQUENCY_MS = 4 * 1000; /** @@ -2186,6 +2194,13 @@ class Room extends (EventEmitter as new () => TypedEmitter) track.on(TrackEvent.VideoPlaybackFailed, this.handleVideoPlaybackFailed); track.on(TrackEvent.VideoPlaybackStarted, this.handleVideoPlaybackStarted); } + // When e2ee is NOT enabled, set up a standalone insertable-streams + // transform to extract LKTS user timestamp trailers from inbound + // video frames. When e2ee IS enabled, the FrameCryptor worker + // handles this before decryption. + if (!this.hasE2EESetup && track instanceof RemoteVideoTrack && track.receiver) { + this.setupUserTimestampTransform(track); + } this.emit(RoomEvent.TrackSubscribed, track, publication, participant); }, ) @@ -2600,6 +2615,66 @@ class Room extends (EventEmitter as new () => TypedEmitter) } } + /** + * Sets up an insertable-streams transform on a remote video track's receiver + * to strip LKTS user timestamp trailers. Used only when e2ee is NOT enabled + * (when e2ee is enabled, the FrameCryptor worker handles this). + * + * Uses RTCRtpScriptTransform on browsers that support it (non-Chromium), + * and falls back to createEncodedStreams (legacy insertable streams) on + * Chromium where encodedInsertableStreams is enabled on the PeerConnection. + */ + private setupUserTimestampTransform(track: RemoteVideoTrack) { + const receiver = track.receiver; + if (!receiver) { + return; + } + + try { + if (isScriptTransformSupported() && !isChromiumBased()) { + const worker = createUserTimestampWorker(); + worker.onmessage = (ev: MessageEvent) => { + if (ev.data?.kind === 'userTimestamp' && typeof ev.data.timestampUs === 'number') { + track.setUserTimestamp(ev.data.timestampUs, ev.data.rtpTimestamp); + } + }; + // @ts-ignore + receiver.transform = new RTCRtpScriptTransform(worker, { kind: 'decode' }); + return; + } + + // @ts-ignore + if (typeof receiver.createEncodedStreams === 'function') { + // @ts-ignore + const { readable, writable } = receiver.createEncodedStreams(); + + const transformStream = new TransformStream({ + transform: (encodedFrame, controller) => { + try { + const result = stripUserTimestampFromEncodedFrame(encodedFrame); + if (result !== undefined) { + track.setUserTimestamp(result.timestampUs, result.rtpTimestamp); + } + } catch { + // Best-effort: never break the media pipeline if parsing fails. + } + controller.enqueue(encodedFrame); + }, + }); + + readable + .pipeThrough(transformStream) + .pipeTo(writable) + .catch(() => {}); + return; + } + + this.log.debug('user timestamp transform: no supported API available', this.logContext); + } catch { + // If anything goes wrong (e.g., unsupported environment), just skip. + } + } + // /** @internal */ emit( event: E, diff --git a/src/room/track/RemoteVideoTrack.ts b/src/room/track/RemoteVideoTrack.ts index 2681db8fdd..54b05f051d 100644 --- a/src/room/track/RemoteVideoTrack.ts +++ b/src/room/track/RemoteVideoTrack.ts @@ -12,6 +12,8 @@ import type { AdaptiveStreamSettings } from './types'; const REACTION_DELAY = 100; +const MAX_USER_TIMESTAMP_MAP_ENTRIES = 300; + export default class RemoteVideoTrack extends RemoteTrack { private prevStats?: VideoReceiverStats; @@ -23,6 +25,19 @@ export default class RemoteVideoTrack extends RemoteTrack { private lastDimensions?: Track.Dimensions; + /** + * RTP timestamp -> user timestamp (microseconds) map. + * Mirrors the Rust SDK's `recv_map_`: populated when an LKTS trailer is + * stripped from an encoded frame, keyed by the frame's RTP timestamp so + * decoded frames can look up their user timestamp regardless of frame + * drops or reordering. + * + * @experimental + */ + private userTimestampMap = new Map(); + + private userTimestampMapOrder: number[] = []; + constructor( mediaTrack: MediaStreamTrack, sid: string, @@ -55,6 +70,52 @@ export default class RemoteVideoTrack extends RemoteTrack { return this._mediaStreamTrack; } + /** + * Called when a user timestamp (LKTS trailer) has been extracted from an + * inbound encoded video frame — either by the e2ee worker or by the + * standalone insertable-streams transform. + * + * @param timestampUs - user timestamp in microseconds since Unix epoch + * @param rtpTimestamp - RTP timestamp from the encoded frame (90 kHz clock). + * When provided, the mapping is stored so decoded frames can look up their + * user timestamp via {@link lookupUserTimestamp}. + * + * @internal + */ + setUserTimestamp(timestampUs: number, rtpTimestamp?: number) { + if (rtpTimestamp !== undefined) { + while (this.userTimestampMap.size >= MAX_USER_TIMESTAMP_MAP_ENTRIES) { + const oldest = this.userTimestampMapOrder.shift(); + if (oldest !== undefined) { + this.userTimestampMap.delete(oldest); + } + } + this.userTimestampMap.set(rtpTimestamp, timestampUs); + this.userTimestampMapOrder.push(rtpTimestamp); + } + } + + /** + * Look up the user timestamp associated with a given RTP timestamp. + * The entry is consumed (removed) after lookup, matching the Rust SDK + * behaviour. + * + * @returns The user timestamp in microseconds, or `undefined` if not found. + * @experimental + */ + lookupUserTimestamp(rtpTimestamp: number): number | undefined { + const ts = this.userTimestampMap.get(rtpTimestamp); + if (ts === undefined) { + return undefined; + } + this.userTimestampMap.delete(rtpTimestamp); + const idx = this.userTimestampMapOrder.indexOf(rtpTimestamp); + if (idx !== -1) { + this.userTimestampMapOrder.splice(idx, 1); + } + return ts; + } + /** @internal */ setMuted(muted: boolean) { super.setMuted(muted); diff --git a/src/room/track/Track.ts b/src/room/track/Track.ts index 94e4d915b6..fb9de30afc 100644 --- a/src/room/track/Track.ts +++ b/src/room/track/Track.ts @@ -537,6 +537,7 @@ export type TrackEventCallbacks = { trackProcessorUpdate: (processor?: TrackProcessor) => void; audioTrackFeatureUpdate: (track: any, feature: AudioTrackFeature, enabled: boolean) => void; timeSyncUpdate: (update: { timestamp: number; rtpTimestamp: number }) => void; + userTimestamp: (timestampUs: number, rtpTimestamp?: number) => void; preConnectBufferFlushed: (buffer: Uint8Array[]) => void; cpuConstrained: () => void; }; diff --git a/src/room/track/utils.ts b/src/room/track/utils.ts index e4974e76eb..19e99354e4 100644 --- a/src/room/track/utils.ts +++ b/src/room/track/utils.ts @@ -280,7 +280,10 @@ export function getLogContextFromTrack(track: Track | TrackPublication): Record< } export function supportsSynchronizationSources(): boolean { - return typeof RTCRtpReceiver !== 'undefined' && 'getSynchronizationSources' in RTCRtpReceiver; + return ( + typeof RTCRtpReceiver !== 'undefined' && + typeof RTCRtpReceiver.prototype.getSynchronizationSources === 'function' + ); } export function diffAttributes( diff --git a/src/user_timestamp/UserTimestampTransformer.ts b/src/user_timestamp/UserTimestampTransformer.ts new file mode 100644 index 0000000000..f3323bdadb --- /dev/null +++ b/src/user_timestamp/UserTimestampTransformer.ts @@ -0,0 +1,81 @@ +/** + * Utilities for parsing LiveKit user timestamp (LKTS) trailers from encoded + * video frames on the receive side. + * + * Trailer format (matches rust-sdks user_timestamp.h): + * - 8-byte big-endian int64 timestamp in microseconds since Unix epoch + * - 4-byte ASCII magic "LKTS" + * + * Total trailer size: 12 bytes. + */ + +export interface UserTimestampInfo { + /** Frame payload with the LKTS trailer removed. */ + payload: ArrayBuffer; + /** Embedded user timestamp in microseconds since Unix epoch. */ + timestampUs: number; +} + +export interface UserTimestampWithRtp { + /** Embedded user timestamp in microseconds since Unix epoch. */ + timestampUs: number; + /** RTP timestamp from the encoded frame (90 kHz clock for video). */ + rtpTimestamp: number; +} + +/** ASCII bytes for 'L', 'K', 'T', 'S'. */ +export const USER_TS_MAGIC = Uint8Array.from([0x4c, 0x4b, 0x54, 0x53]); +export const USER_TS_TRAILER_SIZE = 8 + USER_TS_MAGIC.length; // 12 + +/** + * Extracts an LKTS trailer from the end of an encoded frame, if present. + * Returns the payload without the trailer and the parsed timestamp, + * or `undefined` if no valid trailer is found. + */ +export function extractUserTimestampTrailer(frameData: ArrayBuffer): UserTimestampInfo | undefined { + const bytes = new Uint8Array(frameData); + if (bytes.byteLength < USER_TS_TRAILER_SIZE) { + return undefined; + } + + // Check magic bytes at the very end of the frame. + const magicStart = bytes.byteLength - USER_TS_MAGIC.length; + for (let i = 0; i < USER_TS_MAGIC.length; i++) { + if (bytes[magicStart + i] !== USER_TS_MAGIC[i]) { + return undefined; + } + } + + // Read the 8-byte big-endian int64 timestamp preceding the magic. + const tsStart = bytes.byteLength - USER_TS_TRAILER_SIZE; + const view = new DataView(frameData); + const high = view.getUint32(tsStart); + const low = view.getUint32(tsStart + 4); + const timestampUs = high * 0x100000000 + low; + + const payload = frameData.slice(0, tsStart); + return { payload, timestampUs }; +} + +/** + * Strips an LKTS trailer from an RTCEncodedVideoFrame in-place. + * Replaces `encodedFrame.data` with the payload (trailer removed). + * + * @returns The extracted user timestamp and the frame's RTP timestamp, + * or `undefined` if no trailer was found. + */ +export function stripUserTimestampFromEncodedFrame( + encodedFrame: RTCEncodedVideoFrame, +): UserTimestampWithRtp | undefined { + const info = extractUserTimestampTrailer(encodedFrame.data); + if (!info) { + return undefined; + } + encodedFrame.data = info.payload; + // getMetadata().rtpTimestamp is the 32-bit RTP timestamp (90 kHz clock for + // video) that matches getSynchronizationSources().rtpTimestamp on the + // receiver side. encodedFrame.timestamp is a different value (microseconds). + const meta = encodedFrame.getMetadata(); + const rtpTimestamp = (meta as Record).rtpTimestamp as number | undefined; + return { timestampUs: info.timestampUs, rtpTimestamp: rtpTimestamp ?? 0 }; +} diff --git a/src/user_timestamp/index.ts b/src/user_timestamp/index.ts new file mode 100644 index 0000000000..145d79423f --- /dev/null +++ b/src/user_timestamp/index.ts @@ -0,0 +1,8 @@ +export { + type UserTimestampInfo, + type UserTimestampWithRtp, + USER_TS_MAGIC, + USER_TS_TRAILER_SIZE, + extractUserTimestampTrailer, + stripUserTimestampFromEncodedFrame, +} from './UserTimestampTransformer'; diff --git a/src/user_timestamp/userTimestamp.worker.ts b/src/user_timestamp/userTimestamp.worker.ts new file mode 100644 index 0000000000..c1f85c71f0 --- /dev/null +++ b/src/user_timestamp/userTimestamp.worker.ts @@ -0,0 +1,71 @@ +/** + * Lightweight worker for stripping LKTS user timestamp trailers from inbound + * encoded video frames via RTCRtpScriptTransform. + * + * When a valid trailer is found, the timestamp is posted back to the main + * thread so the SDK can store the RTP-to-user-timestamp mapping on the + * corresponding RemoteVideoTrack. + */ +import { USER_TS_MAGIC, USER_TS_TRAILER_SIZE } from './UserTimestampTransformer'; + +function stripAndForward( + readable: ReadableStream, + writable: WritableStream, +) { + const transformStream = new TransformStream({ + transform(encodedFrame, controller) { + try { + const bytes = new Uint8Array(encodedFrame.data); + if (bytes.byteLength >= USER_TS_TRAILER_SIZE) { + const magicStart = bytes.byteLength - USER_TS_MAGIC.length; + let match = true; + for (let i = 0; i < USER_TS_MAGIC.length; i++) { + if (bytes[magicStart + i] !== USER_TS_MAGIC[i]) { + match = false; + break; + } + } + if (match) { + const tsStart = bytes.byteLength - USER_TS_TRAILER_SIZE; + const view = new DataView(encodedFrame.data); + const high = view.getUint32(tsStart); + const low = view.getUint32(tsStart + 4); + const timestampUs = high * 0x100000000 + low; + const meta = encodedFrame.getMetadata() as Record; + const rtpTimestamp = meta.rtpTimestamp as number | undefined; + + encodedFrame.data = encodedFrame.data.slice(0, tsStart); + postMessage({ kind: 'userTimestamp', timestampUs, rtpTimestamp: rtpTimestamp ?? 0 }); + } + } + } catch { + // Best-effort: never break the media pipeline. + } + controller.enqueue(encodedFrame); + }, + }); + + readable + .pipeThrough(transformStream) + .pipeTo(writable) + .catch(() => {}); +} + +// RTCRtpScriptTransform path +// @ts-ignore +if (self.RTCTransformEvent) { + // @ts-ignore + self.onrtctransform = (event: RTCTransformEvent) => { + // @ts-ignore + const transformer = event.transformer; + stripAndForward(transformer.readable, transformer.writable); + }; +} + +// createEncodedStreams (legacy insertable streams) path +self.onmessage = (event: MessageEvent) => { + const { kind, readable, writable } = event.data; + if (kind === 'decode' && readable && writable) { + stripAndForward(readable, writable); + } +};