Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/@types/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export enum EventType {
RoomMessage = "m.room.message",
RoomMessageEncrypted = "m.room.encrypted",
Sticker = "m.sticker",

CallInvite = "m.call.invite",
CallCandidates = "m.call.candidates",
CallAnswer = "m.call.answer",
Expand All @@ -54,6 +55,10 @@ export enum EventType {
CallReplaces = "m.call.replaces",
CallAssertedIdentity = "m.call.asserted_identity",
CallAssertedIdentityPrefix = "org.matrix.call.asserted_identity",
CallTrackSubscription = "m.call.track_subscription",
CallPing = "m.call.ping",
CallPong = "m.call.pong",

KeyVerificationRequest = "m.key.verification.request",
KeyVerificationStart = "m.key.verification.start",
KeyVerificationCancel = "m.key.verification.cancel",
Expand Down
185 changes: 89 additions & 96 deletions src/webrtc/call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,14 @@ import {
MCallCandidates,
MCallBase,
MCallHangupReject,
ISfuTrackDesc,
ISfuBaseDataChannelMessage,
ISfuSelectDataChannelMessage,
ISfuAnswerDataChannelMessage,
ISfuPublishDataChannelMessage,
ISfuUnpublishDataChannelMessage,
ISfuOfferDataChannelMessage,
SFUDataChannelMessageOp,
ISfuMetadataDataChannelMessage,
FocusTrackDescription,
FocusTrackSubscriptionEvent,
FocusNegotiateEvent,
FocusSDPStreamMetadataChangedEvent,
SDPStreamMetadataTracks,
FocusEvent,
SDPStreamMetadataKeyStable,
FocusEventBaseContent,
} from './callEventTypes';
import { CallFeed } from './callFeed';
import { MatrixClient } from "../client";
Expand Down Expand Up @@ -269,8 +267,6 @@ const CALL_TIMEOUT_MS = 60 * 1000; // ms
const CALL_LENGTH_INTERVAL = 1000; // ms
/** The time after which we end the call, if ICE got disconnected */
const ICE_DISCONNECTED_TIMEOUT = 30 * 1000; // ms
/** The time after which the SFU will consider the connection as stale */
const SFU_KEEP_ALIVE_INTERVAL = 30 * 1000; // 30 seconds

export class CallError extends Error {
public readonly code: string;
Expand Down Expand Up @@ -356,7 +352,7 @@ export class MatrixCall extends TypedEventEmitter<CallEvent, CallEventHandlerMap

// our transceivers for each purpose and type of media
private transceivers = new Map<TransceiverKey, RTCRtpTransceiver>();
private subscribedTracks: ISfuTrackDesc[] = [];
private subscribedTracks: FocusTrackDescription[] = [];

private inviteOrAnswerSent = false;
private waitForLocalAVStream = false;
Expand Down Expand Up @@ -849,20 +845,7 @@ export class MatrixCall extends TypedEventEmitter<CallEvent, CallEventHandlerMap
`purpose="${callFeed.purpose}")`,
);

// If we're calling with an SFU and we aren't setting up the call, we
// send the offer manually
if (this.isFocus && this.state !== CallState.Fledgling && addToPeerConnection) {
this.peerConn!.createOffer().then(async (offer) => {
await this.peerConn!.setLocalDescription(offer);

this.sendSFUDataChannelMessage(SFUDataChannelMessageOp.Publish, {
sdp: offer.sdp,
} as ISfuPublishDataChannelMessage);
this.emit(CallEvent.FeedsChanged, this.feeds);
});
} else {
this.emit(CallEvent.FeedsChanged, this.feeds);
}
this.emit(CallEvent.FeedsChanged, this.feeds);
}

/**
Expand All @@ -874,7 +857,7 @@ export class MatrixCall extends TypedEventEmitter<CallEvent, CallEventHandlerMap
const audioTransceiverKey = getTransceiverKey(callFeed.purpose, "audio");
const videoTransceiverKey = getTransceiverKey(callFeed.purpose, "video");

const tracksToUnpublish: ISfuTrackDesc[] = [];
const tracksToUnpublish: FocusTrackDescription[] = [];
for (const transceiverKey of [audioTransceiverKey, videoTransceiverKey]) {
// this is slightly mixing the track and transceiver API but is basically just shorthand.
// There is no way to actually remove a transceiver, so this just sets it to inactive
Expand All @@ -898,17 +881,6 @@ export class MatrixCall extends TypedEventEmitter<CallEvent, CallEventHandlerMap
}

this.deleteFeed(callFeed);

if (this.isFocus) {
this.peerConn!.createOffer().then(async (offer) => {
await this.peerConn!.setLocalDescription(offer);

this.sendSFUDataChannelMessage(SFUDataChannelMessageOp.Unpublish, {
sdp: offer.sdp,
stop: tracksToUnpublish,
} as ISfuUnpublishDataChannelMessage);
});
}
}

private deleteAllFeeds(): void {
Expand Down Expand Up @@ -1554,7 +1526,7 @@ export class MatrixCall extends TypedEventEmitter<CallEvent, CallEventHandlerMap

public async sendMetadataUpdate(): Promise<void> {
if (this.isFocus) {
this.sendSFUDataChannelMessage(SFUDataChannelMessageOp.Metadata);
this.sendFocusEvent(EventType.CallSDPStreamMetadataChanged);
} else {
await this.sendVoipEvent(EventType.CallSDPStreamMetadataChangedPrefix, {
[SDPStreamMetadataKey]: this.getLocalSDPStreamMetadata(),
Expand Down Expand Up @@ -1975,68 +1947,75 @@ export class MatrixCall extends TypedEventEmitter<CallEvent, CallEventHandlerMap
private onDataChannelMessage = async (event: MessageEvent): Promise<void> => {
// TODO: play nice with application layer DC listeners

let json: ISfuBaseDataChannelMessage;
let json: FocusEvent;
try {
json = JSON.parse(event.data);
} catch (e) {
logger.warn("Ignoring non-JSON DC event:", event.data);
return;
}

if (!json.op) {
logger.warn("Ignoring unrecognized DC event:", json.op);
if (!json.type) {
logger.warn("Ignoring unrecognized DC event:", json);
return;
}

logger.warn(`Received DC ${json.op} event`, json);
logger.info(`Received DC ${json.type} event`, json);

switch (json.op) {
case SFUDataChannelMessageOp.Offer: {
try {
const offer = json as ISfuOfferDataChannelMessage;
this.updateRemoteSDPStreamMetadata(offer.metadata!);
await this.peerConn!.setRemoteDescription({
"type": "offer",
"sdp": offer.sdp,
});
switch (json.type) {
case EventType.CallNegotiate: {
// TODO: Use MatrixCall::onNegotiateReceived()

const answer = await this.peerConn!.createAnswer();
await this.peerConn!.setLocalDescription(answer);
const negotiate = json.content as FocusNegotiateEvent;
this.updateRemoteSDPStreamMetadata(negotiate[SDPStreamMetadataKeyStable]!);

this.sendSFUDataChannelMessage(SFUDataChannelMessageOp.Answer, {
sdp: answer.sdp,
} as ISfuAnswerDataChannelMessage);
} catch (e) {
logger.debug(`Call ${this.callId} Failed to set remote description`, e);
this.terminate(CallParty.Local, CallErrorCode.SetRemoteDescription, false);
return;
if (negotiate.description.type === "offer") {
try {
await this.peerConn!.setRemoteDescription({
"type": "offer",
"sdp": negotiate.description.sdp,
});

const answer = await this.peerConn!.createAnswer();
await this.peerConn!.setLocalDescription(answer);

this.sendFocusEvent(EventType.CallNegotiate, {
description: answer,
} as FocusNegotiateEvent);
} catch (e) {
logger.debug(`Call ${this.callId} Failed to set remote description`, e);
this.terminate(CallParty.Local, CallErrorCode.SetRemoteDescription, false);
return;
}
} else if (negotiate.description.type === "answer") {
await this.peerConn!.setRemoteDescription({
type: "answer",
sdp: negotiate.description.sdp,
});
} else {
throw new Error("Unknown description type - ignoring");
}
}
break;
case SFUDataChannelMessageOp.Answer: {
const answer = json as ISfuAnswerDataChannelMessage;
this.updateRemoteSDPStreamMetadata(answer.metadata!);
await this.peerConn!.setRemoteDescription({
type: "answer",
sdp: answer.sdp,
});
case EventType.CallSDPStreamMetadataChanged: {
const metadata = json.content as FocusSDPStreamMetadataChangedEvent;
this.updateRemoteSDPStreamMetadata(metadata[SDPStreamMetadataKeyStable]!);
}
break;
case SFUDataChannelMessageOp.Metadata: {
const metadata = json as ISfuMetadataDataChannelMessage;
this.updateRemoteSDPStreamMetadata(metadata.metadata!);
case EventType.CallPing: {
this.sendFocusEvent(EventType.CallPong);
}
break;
default:
logger.warn("Ignoring unrecognized DC event op ", json.op);
logger.warn("Ignoring unrecognized DC event op ", json.type);

break;
}
};

private async waitForDatachannelToBeOpen(): Promise<void> {
if (this.dataChannel?.readyState === 'connecting') {
const p = new Promise<void>(resolve => {
const p = new Promise<void>((resolve) => {
this.dataChannel!.onopen = (): void => resolve();
this.dataChannel!.onclose = (): void => resolve();
});
Expand All @@ -2050,24 +2029,24 @@ export class MatrixCall extends TypedEventEmitter<CallEvent, CallEventHandlerMap
if (!this.remoteSDPStreamMetadata) return;
if (this.dataChannel?.readyState !== "open") return;

const tracks: ISfuTrackDesc[] = Object.entries(this.remoteSDPStreamMetadata)
const tracks: FocusTrackDescription[] = Object.entries(this.remoteSDPStreamMetadata)
.filter(([, info]) => Boolean(info.tracks)) // Skip trackless feeds
.reduce((a: ISfuTrackDesc[], [s, i]) => (
.reduce((a: FocusTrackDescription[], [s, i]) => (
[...a, ...Object.keys(i.tracks).map((t) => ({ stream_id: s, track_id: t }))]
), []) // Get array of tracks from feeds
.filter((track) => !this.subscribedTracks.find((subscribed) => utils.deepCompare(track, subscribed))); // Filter out already subscribed tracks

if (tracks.length === 0) {
logger.warn("Failed to find any new streams to subscribe to");
logger.info("Failed to find any new streams to subscribe to");
return;
} else {
this.subscribedTracks.push(...tracks);
logger.warn("Subscribing to:", tracks);
}

this.sendSFUDataChannelMessage(SFUDataChannelMessageOp.Select, {
start: tracks,
} as ISfuSelectDataChannelMessage);
this.sendFocusEvent(EventType.CallTrackSubscription, {
subscribe: tracks,
unsubscribe: [],
} as FocusTrackSubscriptionEvent);
}

public updateRemoteSDPStreamMetadata(metadata: SDPStreamMetadata): void {
Expand Down Expand Up @@ -2167,6 +2146,18 @@ export class MatrixCall extends TypedEventEmitter<CallEvent, CallEventHandlerMap

if (this.callHasEnded()) return;

// TODO: Handle this better
// Other than the initial offer, we handle negotiation manually when calling with an SFU
if (this.isFocus && ![CallState.Fledgling, CallState.CreateOffer].includes(this.state)) {
this.sendFocusEvent(EventType.CallNegotiate, {
description: this.peerConn!.localDescription?.toJSON(),
} as FocusNegotiateEvent);

// TODO: Handle errors

return;
}

const eventType = this.state === CallState.CreateOffer ? EventType.CallInvite : EventType.CallNegotiate;

const content = {
Expand Down Expand Up @@ -2281,11 +2272,6 @@ export class MatrixCall extends TypedEventEmitter<CallEvent, CallEventHandlerMap
this.emit(CallEvent.LengthChanged, Math.round((Date.now() - this.callStartTime!) / 1000));
}, CALL_LENGTH_INTERVAL);
}
if (!this.sfuKeepAliveInterval) {
this.sfuKeepAliveInterval = setInterval(() => {
this.sendSFUDataChannelMessage(SFUDataChannelMessageOp.Alive);
}, SFU_KEEP_ALIVE_INTERVAL * 3 / 4);
}
} else if (this.peerConn?.iceConnectionState == 'failed') {
// Firefox for Android does not yet have support for restartIce()
// (the types say it's always defined though, so we have to cast
Expand Down Expand Up @@ -2332,6 +2318,15 @@ export class MatrixCall extends TypedEventEmitter<CallEvent, CallEventHandlerMap
if (!this.removeTrackListeners.has(stream)) {
const onRemoveTrack = (): void => {
if (stream.getTracks().length === 0) {
// FIXME: We should really be doing this per-track. Šimon
// leaves this for when we switch to mids for signalling
const getIndex = (): number => this.subscribedTracks.findIndex((t) => t.stream_id === stream.id);
let indexOfTrackToRemove = getIndex();
while (indexOfTrackToRemove !== -1) {
this.subscribedTracks.splice(indexOfTrackToRemove, 1);
indexOfTrackToRemove = getIndex();
}

logger.info(`Call ${this.callId} removing track streamId: ${stream.id}`);
this.deleteFeedByStream(stream);
stream.removeEventListener("removetrack", onRemoveTrack);
Expand Down Expand Up @@ -2388,9 +2383,6 @@ export class MatrixCall extends TypedEventEmitter<CallEvent, CallEventHandlerMap
}

private onNegotiationNeeded = async (): Promise<void> => {
// Other than the initial offer, we handle negotiation manually when calling with an SFU
if (this.isFocus && ![CallState.Fledgling, CallState.CreateOffer].includes(this.state)) return;

logger.info(`Call ${this.callId} Negotiation is needed!`);

if (this.state !== CallState.CreateOffer && this.opponentVersion === 0) {
Expand Down Expand Up @@ -2508,18 +2500,19 @@ export class MatrixCall extends TypedEventEmitter<CallEvent, CallEventHandlerMap
}
}

private sendSFUDataChannelMessage(op: SFUDataChannelMessageOp, content: object = {}): void {
const realContent: ISfuBaseDataChannelMessage = Object.assign(content, {
op: op,
});
private sendFocusEvent(type: EventType, content: FocusEventBaseContent = {}): void {
const event: FocusEvent = {
type: type,
content: content,
};

if (![SFUDataChannelMessageOp.Select, SFUDataChannelMessageOp.Alive].includes(op)) {
realContent.metadata = this.getLocalSDPStreamMetadata();
if ([EventType.CallNegotiate, EventType.CallSDPStreamMetadataChanged].includes(type)) {
event.content[SDPStreamMetadataKeyStable] = this.getLocalSDPStreamMetadata();
}

// FIXME: RPC reliability over DC
this.dataChannel!.send(JSON.stringify(realContent));
logger.warn(`Sent ${realContent.op} over DC:`, realContent);
this.dataChannel!.send(JSON.stringify(event));
logger.warn(`Sent ${event.type} over DC:`, event);
}

/**
Expand Down
Loading