Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit fc9039f

Browse files
committed
Fix publish method after rebasing.
Server can detect new streams from QUIC agent, but this change breaks rtcController and stream events sent from server don't have "data" property. These issues along with other subscribe issues will be fixed in following commits.
1 parent 7c44e90 commit fc9039f

File tree

5 files changed

+48
-25
lines changed

5 files changed

+48
-25
lines changed

source/agent/conference/conference.js

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,8 @@ var Conference = function (rpcClient, selfRpcId) {
397397
const sessionInfo = {
398398
locality: transport.locality,
399399
media: media,
400-
info: { type: 'webrtc', owner: transport.owner }
400+
data: rtcInfo.data,
401+
info: { type: 'quic', owner: transport.owner }
401402
};
402403
sendMsgTo(transport.owner, 'progress', {id: transport.id, sessionId, status: 'ready'});
403404
onSessionEstablished(transport.owner, sessionId, direction, sessionInfo);
@@ -949,7 +950,8 @@ var Conference = function (rpcClient, selfRpcId) {
949950
type: pubInfo.type,
950951
transportId: pubInfo.transportId,
951952
tracks: pubInfo.media.tracks,
952-
legacy: pubInfo.legacy
953+
legacy: pubInfo.legacy,
954+
data: pubInfo.data
953955
};
954956
return rtcPubInfo;
955957
};
@@ -1013,7 +1015,7 @@ var Conference = function (rpcClient, selfRpcId) {
10131015
}
10141016

10151017
if (pubInfo.type === 'sip') {
1016-
return addStream(streamId, pubInfo.locality, pubInfo.transport, pubInfo.media, pubInfo.data, {owner: participantId, type: 'sip'})
1018+
return addStream(streamId, pubInfo.locality, pubInfo.transport, pubInfo.media, pubInfo.data, {owner: participantId, type: pubInfo.type})
10171019
.then((result) => {
10181020
callback('callback', result);
10191021
})
@@ -1033,12 +1035,14 @@ var Conference = function (rpcClient, selfRpcId) {
10331035
} else {
10341036
var origin = participants[participantId].getOrigin();
10351037
var format_preference;
1036-
if (pubInfo.type === 'webrtc') {
1038+
if (pubInfo.type === 'webrtc' || pubInfo.type === 'quic') {
10371039
const rtcPubInfo = translateRtcPubIfNeeded(pubInfo);
1038-
// Set formatPreference
1039-
rtcPubInfo.tracks.forEach(track => {
1040-
track.formatPreference = {optional: room_config.mediaIn[track.type]};
1041-
});
1040+
if (rtcPubInfo.tracks) {
1041+
// Set formatPreference
1042+
rtcPubInfo.tracks.forEach(track => {
1043+
track.formatPreference = { optional : room_config.mediaIn[track.type] };
1044+
});
1045+
}
10421046
initiateStream(streamId, {owner: participantId, type: pubInfo.type, origin});
10431047
return rtcController.initiate(participantId, streamId, 'in', participants[participantId].getOrigin(), rtcPubInfo)
10441048
.then((result) => {
@@ -1755,9 +1759,13 @@ var Conference = function (rpcClient, selfRpcId) {
17551759
callback('callback', 'ok');
17561760
};
17571761

1758-
that.onSessionProgress = function (sessionId, direction, sessionStatus) {
1759-
log.debug('onSessionProgress, sessionId:', sessionId, 'direction:', direction, 'sessionStatus:', sessionStatus);
1760-
accessController && accessController.onSessionStatus(sessionId, sessionStatus);
1762+
that.onSessionProgress = function(sessionId, direction, sessionStatus) {
1763+
log.debug('onSessionProgress, sessionId:', sessionId, 'direction:', direction, 'sessionStatus:', sessionStatus);
1764+
if (sessionStatus.data) {
1765+
rtcController.onSessionProgress(sessionId, sessionStatus);
1766+
} else {
1767+
accessController && accessController.onSessionStatus(sessionId, sessionStatus);
1768+
}
17611769
};
17621770

17631771
that.onTransportProgress = function (transportId, status) {

source/agent/conference/stream.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ class ForwardStream extends Stream {
256256
log.error(`Unexpected track type: ${track.type}`);
257257
}
258258
}
259-
if (this.media.tracks.length === 0) {
259+
if (!this.data && this.media.tracks.length === 0) {
260260
err = 'No valid tracks in stream';
261261
}
262262
return err;

source/agent/quic/index.js

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,15 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
4545
global.config.cluster.name :
4646
undefined;
4747

48+
const getConferenceController = async (roomId) => {
49+
return await rpcChannel.makeRPC(clusterName, 'schedule', [
50+
'conference', roomId, 'preference' /*TODO: specify preference*/,
51+
30 * 1000
52+
]);
53+
};
54+
4855
const getRpcPortal = async (roomId, participantId) => {
49-
const controllerAgent =
50-
await rpcChannel.makeRPC(clusterName, 'schedule', [
51-
'conference', roomId, 'preference' /*TODO: specify preference*/,
52-
30 * 1000
53-
]);
56+
const controllerAgent = await getConferenceController(roomId);
5457
let controller = null;
5558
let portal = null;
5659
const retry = 5;
@@ -97,17 +100,24 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
97100
password, validateToken);
98101
quicTransportServer.start();
99102
quicTransportServer.on('streamadded', (stream) => {
103+
log.debug('A stream with session ID '+stream.contentSessionId+' is added.');
100104
const conn = connections.getConnection(stream.contentSessionId);
101105
if (conn) {
102106
// TODO: verify transport ID.
103107
conn.connection.quicStream(stream);
108+
// TODO: Make RPC call to conference node for session-established.
104109
} else {
105110
log.warn(
106111
'Cannot find a pipeline for QUIC stream. Content session ID: ' +
107112
stream.contentSessionId);
108113
stream.close();
109114
}
110115
});
116+
quicTransportServer.on('connectionadded', (connection) => {
117+
log.debug(
118+
'A connection for transport ID ' + connection.transportId +
119+
' is created.');
120+
});
111121
});
112122

113123
const createStreamPipeline =
@@ -176,7 +186,6 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
176186
conn.connect(options);
177187
break;
178188
case 'quic':
179-
quicTransportServer.addTransportId(options.transport.id);
180189
conn = createStreamPipeline(connectionId, 'in', options, callback);
181190
if (!conn) {
182191
return;

source/agent/quic/webtransport/quicTransportServer.js

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ module.exports = class QuicTransportServer extends EventEmitter {
3030
this._streams = new Map(); // Key is content session ID.
3131
this._unAuthenticatedConnections = []; // When it's authenticated, it will be moved to this.connections.
3232
this._unAssociatedStreams = []; // No content session ID assgined to them.
33-
this._assignedTransportIds = []; // Transport channels assigned to this server.
3433
this._validateTokenCallback = validateTokenCallback;
3534
this._server.onconnection = (connection) => {
3635
this._unAuthenticatedConnections.push(connection);
@@ -58,6 +57,9 @@ module.exports = class QuicTransportServer extends EventEmitter {
5857
'A new stream ' + streamId + ' is created on transport ' +
5958
connection.transportId);
6059
this.emit('streamadded', stream);
60+
const uuidBytes =
61+
this._uuidStringToUint8Array(connection.transportId);
62+
stream.write(uuidBytes, uuidBytes.length);
6163
}
6264
};
6365
stream.ondata = (data) => {
@@ -113,9 +115,17 @@ module.exports = class QuicTransportServer extends EventEmitter {
113115
connection.close();
114116
return;
115117
}
118+
log.debug('Created connection for transport ID '+connection.transportId);
116119
connection.transportId = token.transportId;
120+
connection.participantId = token.participantId;
121+
// A user can join multiple rooms. And it would be better if a
122+
// connection can be shared by all these rooms. But it looks
123+
// like existing architecture doesn't support to do so. So we
124+
// bind a connection to a single room here.
125+
connection.roomId = token.roomId;
117126
stream.transportId = token.transportId;
118127
this._connections.set(connection.transportId, connection);
128+
this.emit('connectionadded', connection);
119129
});
120130
return;
121131
} else { // Store the data.
@@ -135,12 +145,6 @@ module.exports = class QuicTransportServer extends EventEmitter {
135145
// TODO: Return error if server is not started, e.g.: port is not available.
136146
}
137147

138-
addTransportId(id) {
139-
if (!(id in this._assignedTransportIds)) {
140-
this._assignedTransportIds.push(id);
141-
}
142-
}
143-
144148
// Create a send stream for specfied QuicTransport. If specified QuicTransport
145149
// doesn't exist, no stream will be created.
146150
createSendStream(transportId, contentSessionId) {

source/agent/quic/webtransport/quicTransportStreamPipeline.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ module.exports = class QuicTransportStreamPipeline {
3232
type: 'ready',
3333
audio: false,
3434
video: false,
35+
data: true,
3536
simulcast: false
3637
});
3738
}
@@ -50,6 +51,7 @@ module.exports = class QuicTransportStreamPipeline {
5051
};
5152

5253
this.close = function(){
54+
return;
5355
this._quicStream.close();
5456
}
5557
}

0 commit comments

Comments
 (0)