Skip to content

Commit 653255d

Browse files
Merge pull request #416 from EpicGamesExt/backport/UE5.5/pr-409
[UE5.5] Merge pull request #409 from mcottontensor/heartbeat
2 parents 487bd5b + e1a5f80 commit 653255d

File tree

11 files changed

+142
-26
lines changed

11 files changed

+142
-26
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import * as Messages from '../Messages/signalling_messages';
2+
import * as MessageHelpers from '../Messages/message_helpers';
3+
import { SignallingProtocol } from './SignallingProtocol';
4+
5+
/**
6+
* Used to regularly ping a protocol connection to make sure the connection is still good and open.
7+
* When the pong doesn't come in response to a ping in time a callback is fired that can be handed
8+
* by the owner.
9+
*/
10+
export class KeepaliveMonitor {
11+
private protocol: SignallingProtocol;
12+
private timeout: number;
13+
private keepalive?: ReturnType<typeof setInterval>;
14+
private alive: boolean = false;
15+
private rtt: number = 0;
16+
17+
// naming a bound function so we can add and then remove it with on and off.
18+
private onResponse: (pongMsg: Messages.pong) => void;
19+
20+
/**
21+
* Called when a pong does not come back from a ping.
22+
*/
23+
onTimeout?: () => void;
24+
25+
/**
26+
* Gets the Round Trip Time of the current connection in milliseconds.
27+
*/
28+
get RTT(): number {
29+
return this.rtt;
30+
}
31+
32+
/**
33+
* Creates a new monitor and starts the ping timer. If a pong does not come back by the time we want
34+
* to send a second ping then the connection is considered dead and the onTimeout callback is fired.
35+
* @param protocol The connection that we want to monitor.
36+
* @param timeout The time in milliseconds between ping messages.
37+
*/
38+
constructor(protocol: SignallingProtocol, timeout: number) {
39+
this.protocol = protocol;
40+
this.timeout = timeout;
41+
this.onResponse = this.onHeartbeatResponse.bind(this);
42+
this.protocol.transport.on('close', this.stop.bind(this));
43+
this.start();
44+
}
45+
46+
private start(): void {
47+
this.alive = true;
48+
this.protocol.on('pong', this.onResponse);
49+
this.keepalive = setInterval(this.sendHeartbeat.bind(this), this.timeout);
50+
}
51+
52+
private stop(): void {
53+
clearInterval(this.keepalive);
54+
this.protocol.off('pong', this.onResponse);
55+
}
56+
57+
private sendHeartbeat(): void {
58+
// if we never got a response from the last heartbeat, assume the connection is dead and timeout
59+
if (this.alive === false) {
60+
this.onTimeout?.();
61+
return;
62+
}
63+
64+
// mark the connection as temporarily dead until we get a response from the ping
65+
this.alive = false;
66+
this.protocol.sendMessage(
67+
MessageHelpers.createMessage(Messages.ping, { time: new Date().getTime() })
68+
);
69+
}
70+
71+
private onHeartbeatResponse(pongMsg: Messages.pong): void {
72+
// we got a pong response from the other side, the connection is good.
73+
// we also store the round trip time if anyone is curious
74+
this.rtt = new Date().getTime() - pongMsg.time;
75+
this.alive = true;
76+
}
77+
}

Common/src/Protocol/SignallingProtocol.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
import { ITransport } from '../Transport/ITransport';
44
import { EventEmitter } from '../Event/EventEmitter';
55
import { BaseMessage } from '../Messages/base_message';
6-
import * as Messages from '../Messages/signalling_messages';
7-
import * as MessageHelpers from '../Messages/message_helpers';
86

97
/**
108
* Signalling protocol for handling messages from the signalling server.
@@ -38,14 +36,6 @@ export class SignallingProtocol extends EventEmitter {
3836
this.transport = transport;
3937

4038
transport.onMessage = (msg: BaseMessage) => {
41-
// auto handle ping messages
42-
if (msg.type == Messages.ping.typeName) {
43-
const pongMessage = MessageHelpers.createMessage(Messages.pong, {
44-
time: new Date().getTime()
45-
});
46-
transport.sendMessage(pongMessage);
47-
}
48-
4939
// call the handlers
5040
transport.emit('message', msg); // emit this for listeners listening to any message
5141
if (!this.emit(msg.type, msg)) {

Common/src/Transport/WebSocketTransport.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,12 @@ export class WebSocketTransport extends EventEmitter implements ITransport {
114114
return;
115115
}
116116

117-
Logger.Info('received => \n' + JSON.stringify(JSON.parse(event.data as string), undefined, 4));
118-
119117
let parsedMessage: BaseMessage;
120118
try {
121-
parsedMessage = JSON.parse(event.data as string) as BaseMessage;
119+
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
120+
const parsedData = JSON.parse(event.data as string);
121+
Logger.Info('received => \n' + JSON.stringify(parsedData, undefined, 4));
122+
parsedMessage = parsedData as BaseMessage;
122123
} catch (e: unknown) {
123124
if (e instanceof Error) {
124125
Logger.Error(`Error parsing message string ${event.data}.\n${e.message}`);

Common/src/pixelstreamingcommon.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ export { EventEmitter } from './Event/EventEmitter';
99
export { MessageRegistry } from './Messages/message_registry';
1010
export * as Messages from './Messages/signalling_messages';
1111
export * as MessageHelpers from './Messages/message_helpers';
12+
export { KeepaliveMonitor } from './Protocol/KeepaliveMonitor';

Extras/JSStreamer/src/streamer.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ export class Streamer extends EventEmitter {
121121
this.protocol.addListener(Messages.iceCandidate.typeName, (msg: BaseMessage) =>
122122
this.handleIceMessage(msg as Messages.iceCandidate)
123123
);
124+
125+
this.transport.on('timeout', () => console.log('Streamer connection timeout'));
124126
}
125127

126128
startStreaming(signallingURL: string, stream: MediaStream) {

Frontend/library/src/Config/Config.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ export class NumericParameters {
5959
static WebRTCMaxBitrate = 'WebRTCMaxBitrate' as const;
6060
static MaxReconnectAttempts = 'MaxReconnectAttempts' as const;
6161
static StreamerAutoJoinInterval = 'StreamerAutoJoinInterval' as const;
62+
static KeepaliveDelay = 'KeepaliveDelay' as const;
6263
}
6364

6465
export type NumericParametersKeys = Exclude<keyof typeof NumericParameters, 'prototype'>;
@@ -761,6 +762,21 @@ export class Config {
761762
useUrlParams
762763
)
763764
);
765+
766+
this.numericParameters.set(
767+
NumericParameters.KeepaliveDelay,
768+
new SettingNumber(
769+
NumericParameters.KeepaliveDelay,
770+
'Connection Keepalive delay',
771+
'Delay between keepalive pings to the signalling server.',
772+
0 /*min*/,
773+
900000 /*max*/,
774+
settings && Object.prototype.hasOwnProperty.call(settings, NumericParameters.KeepaliveDelay)
775+
? settings[NumericParameters.KeepaliveDelay]
776+
: 30000 /*value*/,
777+
useUrlParams
778+
)
779+
);
764780
}
765781

766782
/**

Frontend/library/src/WebRtcPlayer/WebRtcPlayerController.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import {
77
ITransport,
88
Messages,
99
MessageHelpers,
10-
BaseMessage
10+
BaseMessage,
11+
KeepaliveMonitor
1112
} from '@epicgames-ps/lib-pixelstreamingcommon-ue5.5';
1213
import { StreamController } from '../VideoPlayer/StreamController';
1314
import { FreezeFrameController } from '../FreezeFrame/FreezeFrameController';
@@ -104,6 +105,7 @@ export class WebRtcPlayerController {
104105
subscribedStream: string;
105106
signallingUrlBuilder: () => string;
106107
autoJoinTimer: ReturnType<typeof setTimeout> = undefined;
108+
keepalive: KeepaliveMonitor;
107109

108110
/**
109111
*
@@ -171,6 +173,9 @@ export class WebRtcPlayerController {
171173
this.protocol.addListener(Messages.config.typeName, (msg: BaseMessage) =>
172174
this.handleOnConfigMessage(msg as Messages.config)
173175
);
176+
this.protocol.addListener(Messages.ping.typeName, (msg: BaseMessage) =>
177+
this.handlePingMessage(msg as Messages.ping)
178+
);
174179
this.protocol.addListener(Messages.streamerList.typeName, (msg: BaseMessage) =>
175180
this.handleStreamerListMessage(msg as Messages.streamerList)
176181
);
@@ -846,7 +851,7 @@ export class WebRtcPlayerController {
846851
// if the connection is open, first close it and force a reconnect.
847852
if (this.protocol.isConnected()) {
848853
if (!this.forceReconnect) {
849-
message = `${message} Reconnecting.`;
854+
this.disconnectMessage = `${message} Reconnecting.`;
850855
}
851856
this.closeSignalingServer(message, true);
852857
} else {
@@ -1023,6 +1028,15 @@ export class WebRtcPlayerController {
10231028
this.disconnectMessage = null;
10241029
const signallingUrl = this.signallingUrlBuilder();
10251030
this.protocol.connect(signallingUrl);
1031+
const keepaliveDelay = this.config.getNumericSettingValue(NumericParameters.KeepaliveDelay);
1032+
if (keepaliveDelay > 0) {
1033+
this.keepalive = new KeepaliveMonitor(this.protocol, keepaliveDelay);
1034+
this.keepalive.onTimeout = () => {
1035+
// if the ping fails just disconnect
1036+
Logger.Error(`Protocol timeout`);
1037+
this.protocol.disconnect();
1038+
};
1039+
}
10261040
}
10271041

10281042
/**
@@ -1138,6 +1152,10 @@ export class WebRtcPlayerController {
11381152
this.startSession(messageConfig.peerConnectionOptions);
11391153
}
11401154

1155+
handlePingMessage(pingMessage: Messages.ping) {
1156+
this.protocol.sendMessage(MessageHelpers.createMessage(Messages.pong, { time: pingMessage.time }));
1157+
}
1158+
11411159
/**
11421160
* Handles when the signalling server gives us the list of streamer ids.
11431161
*/

Frontend/ui-library/src/Config/ConfigUI.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,11 @@ export class ConfigUI {
188188
psSettingsSection,
189189
this.numericParametersUi.get(NumericParameters.StreamerAutoJoinInterval)
190190
);
191+
if (isSettingEnabled(settingsConfig, NumericParameters.KeepaliveDelay))
192+
this.addSettingNumeric(
193+
psSettingsSection,
194+
this.numericParametersUi.get(NumericParameters.KeepaliveDelay)
195+
);
191196
}
192197

193198
if (isSectionEnabled(settingsConfig, SettingsSections.UI)) {

Signalling/src/PlayerConnection.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ export class PlayerConnection implements IPlayer, LogUtils.IMessageLogger {
108108
Messages.listStreamers.typeName,
109109
LogUtils.createHandlerListener(this, this.onListStreamers)
110110
);
111+
this.protocol.on(Messages.ping.typeName, LogUtils.createHandlerListener(this, this.onPingMessage));
111112
/* eslint-enable @typescript-eslint/unbound-method */
112113

113114
this.protocol.on(Messages.offer.typeName, this.sendToStreamer.bind(this));
@@ -118,7 +119,7 @@ export class PlayerConnection implements IPlayer, LogUtils.IMessageLogger {
118119
this.protocol.on(Messages.layerPreference.typeName, this.sendToStreamer.bind(this));
119120

120121
this.protocol.on('unhandled', (message: BaseMessage) => {
121-
Logger.warn(`Unhandled protocol message: ${JSON.stringify(message)}`);
122+
Logger.warn(`Unhandled player protocol message: ${JSON.stringify(message)}`);
122123
});
123124
}
124125

@@ -226,7 +227,7 @@ export class PlayerConnection implements IPlayer, LogUtils.IMessageLogger {
226227
this.sendMessage(renameMessage);
227228
}
228229

229-
private onStreamerRemoved() {
230-
this.disconnect();
230+
private onPingMessage(message: Messages.ping): void {
231+
this.sendMessage(MessageHelpers.createMessage(Messages.pong, { time: message.time }));
231232
}
232233
}

Signalling/src/SFUConnection.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ export class SFUConnection extends EventEmitter implements IPlayer, IStreamer, L
155155
Messages.stopStreaming.typeName,
156156
LogUtils.createHandlerListener(this, this.onStopStreaming)
157157
);
158+
this.protocol.on(Messages.ping.typeName, LogUtils.createHandlerListener(this, this.onPingMessage));
158159
/* eslint-enable @typescript-eslint/unbound-method */
159160

160161
this.protocol.on(Messages.offer.typeName, this.sendToPlayer.bind(this));
@@ -297,4 +298,8 @@ export class SFUConnection extends EventEmitter implements IPlayer, IStreamer, L
297298
this.streaming = false;
298299
this.emit('disconnect');
299300
}
301+
302+
private onPingMessage(message: Messages.ping): void {
303+
this.sendMessage(MessageHelpers.createMessage(Messages.pong, { time: message.time }));
304+
}
300305
}

0 commit comments

Comments
 (0)