Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit a3a8b6b

Browse files
committed
Enable authentication for QUIC transport.
QUIC transport is authenticated by transport ID which is issued by the signaling module of portal. In the future, when QUIC transport is allowed for signaling, other authentication method should be used. A possible one is token, which is similar to the socket.io channel we are using today.
1 parent d1bb4ff commit a3a8b6b

File tree

4 files changed

+76
-23
lines changed

4 files changed

+76
-23
lines changed

source/agent/quic/index.js

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,18 @@
22
//
33
// SPDX-License-Identifier: Apache-2.0
44

5+
// QUIC agent starts a QUIC transport server which listens incoming QUIC
6+
// connections. After a new QUIC connection is established, it waits for a
7+
// transport ID transmitted through signaling stream(content session ID is 0).
8+
// Transport ID is issued by portal. QUIC agent gets authorized transport ID
9+
// from |options| of publication and subscription.
10+
// QuicStreamPipeline is 1:1 mapped to a publication or a subscription. But the
11+
// QuicStream associated with QuicStreamPipeline could be a QuicStream instance
12+
// or |undefined|, which means QuicStream is not ready at this moment. It also
13+
// allows updating its associated QuicStream to a new one.
14+
// publish, unpublish, subscribe, unsubscribe, linkup, cutoff are required by
15+
// all agents. AFAIK, there is no documentation about these interfaces.
16+
517
'use strict';
618
const Connections = require('./connections');
719
const InternalConnectionFactory = require('./InternalConnectionFactory');
@@ -34,7 +46,13 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
3446
quicTransportServer.on('streamadded', (stream) => {
3547
const conn = connections.getConnection(stream.contentSessionId);
3648
if (conn) {
49+
// TODO: verify transport ID.
3750
conn.connection.quicStream(stream);
51+
} else {
52+
log.warn(
53+
'Cannot find a pipeline for QUIC stream. Content session ID: ' +
54+
stream.contentSessionId);
55+
stream.close();
3856
}
3957
});
4058

@@ -104,6 +122,7 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
104122
conn.connect(options);
105123
break;
106124
case 'quic':
125+
quicTransportServer.addTransportId(options.transport.id);
107126
conn = createStreamPipeline(connectionId, 'in', options, callback);
108127
if (!conn) {
109128
return;
@@ -155,8 +174,9 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
155174
conn.connect(options);//FIXME: May FAIL here!!!!!
156175
break;
157176
case 'quic':
177+
quicTransportServer.addTransportId(options.transport.id);
158178
conn = createStreamPipeline(connectionId, 'out', options, callback);
159-
const stream = quicTransportServer.createSendStream('id', connectionId);
179+
const stream = quicTransportServer.createSendStream(options.transport.id, connectionId);
160180
conn.quicStream(stream);
161181
if (!conn) {
162182
return;

source/agent/quic/webtransport/quicTransportServer.js

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,27 +22,36 @@ module.exports = class QuicTransportServer extends EventEmitter {
2222
constructor(port) {
2323
super();
2424
this._server = new addon.QuicTransportServer(port);
25-
this._connections = new Map(); // Key is user ID.
25+
this._connections = new Map(); // Key is transport ID.
2626
this._streams = new Map(); // Key is content session ID.
2727
this._unAuthenticatedConnections = []; // When it's authenticated, it will be moved to this.connections.
2828
this._unAssociatedStreams = []; // No content session ID assgined to them.
29+
this._assignedTransportIds = []; // Transport channels assigned to this server.
2930
this._server.onconnection = (connection) => {
3031
this._unAuthenticatedConnections.push(connection);
3132
connection.onincomingstream = (stream) => {
3233
stream.oncontentsessionid = (id) => {
3334
const streamId = this._uuidBytesToString(new Uint8Array(id))
3435
stream.contentSessionId = streamId;
36+
stream.transportId = connection.transportId;
3537
if (streamId === zeroUuid) {
36-
// Signaling stream. Connect to portal in the future. For now, we use it for authentication.
38+
// Signaling stream. Waiting for transport ID.
39+
} else if (!connection.transportId) {
40+
log.error(
41+
'Stream ' + streamId +
42+
' added on unauthroized transport. Close connection.');
3743
} else {
44+
log.debug(
45+
'A new stream ' + streamId + ' is created on transport ' +
46+
connection.transportId);
3847
this.emit('streamadded', stream);
3948
}
40-
}
49+
};
4150
stream.ondata=(data)=>{
4251
if (stream.contentSessionId === zeroUuid) {
43-
connection.userId = data.toString('utf8');
44-
this._connections.set('id', connection);
45-
log.info('User ID: ' + connection.userId);
52+
connection.transportId = data.toString('utf8');
53+
this._connections.set(connection.transportId, connection);
54+
log.info('Transport ID: ' + connection.transportId);
4655
}
4756
}
4857
}
@@ -51,27 +60,35 @@ module.exports = class QuicTransportServer extends EventEmitter {
5160

5261
start() {
5362
this._server.start();
63+
// TODO: Return error if server is not started, e.g.: port is not available.
64+
}
65+
66+
addTransportId(id) {
67+
if (!(id in this._assignedTransportIds)) {
68+
this._assignedTransportIds.push(id);
69+
}
5470
}
5571

56-
createSendStream(userId, contentSessionId) {
57-
userId = 'id';
58-
if (!this._connections.has(userId)) {
59-
log.error('No QUIC transport for '+userId);
72+
// Create a send stream for specfied QuicTransport. If specified QuicTransport
73+
// doesn't exist, no stream will be created.
74+
createSendStream(transportId, contentSessionId) {
75+
log.debug(
76+
'Create send stream ' + contentSessionId + ' on transport ' +
77+
transportId);
78+
if (!this._connections.has(transportId)) {
79+
// TODO: Waiting for transport to be created, and create stream for it.
80+
// It's a common case that subscribe request is received by QUIC agent
81+
// before client creates QuicTransport.
82+
log.error('No QUIC transport for ' + transportId);
6083
return;
6184
}
62-
const stream = this._connections.get(userId).createBidirectionalStream();
63-
log.info('Stream: ' + JSON.stringify(stream));
85+
const stream = this._connections.get(transportId).createBidirectionalStream();
6486
const uuidBytes=this._uuidStringToUint8Array(contentSessionId);
65-
log.info('Data: '+uuidBytes);
6687
stream.write(uuidBytes, uuidBytes.length);
6788
this._streams.set(contentSessionId, stream);
6889
return stream;
6990
}
7091

71-
_authenticate(){
72-
// TODO: Send token to nuve for authentication.
73-
}
74-
7592
_uuidBytesToString(uuidBytes) {
7693
if (uuidBytes.length != 16) {
7794
log.error('Invalid UUID.');

source/portal/requestDataValidator.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ const PublicationRequest = {
7171
'transport': {
7272
type: 'object',
7373
properties: {
74-
'type': {type: 'string'}
74+
'type': {type: 'string'},
75+
'id': {type: 'string'},
7576
},
7677
additionalProperties: false,
7778
}
@@ -180,7 +181,8 @@ const SubscriptionRequest = {
180181
'transport': {
181182
type: 'object',
182183
properties: {
183-
'type': {type: 'string'}
184+
'type': {type: 'string'},
185+
'id': {type: 'string'},
184186
},
185187
},
186188
},

source/portal/v11Client.js

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ var V11Client = function(clientId, sigConnection, portal) {
107107
});
108108
};
109109

110+
const uuidWithoutDash = function() {
111+
return uuid().replace(/-/g, '');
112+
};
113+
110114
const listenAt = (socket) => {
111115
socket.on('text', function(textReq, callback) {
112116
if(!that.inRoom){
@@ -126,17 +130,22 @@ var V11Client = function(clientId, sigConnection, portal) {
126130
return safeCall(callback, 'error', 'Illegal request');
127131
}
128132

129-
var stream_id = uuid().replace(/-/g,'');
133+
var stream_id = uuidWithoutDash();
134+
let transport_id;
130135
return validatePubReq(pubReq)
131136
.then((req) => {
132137
if (pubReq.transport && pubReq.transport.type == 'quic') {
133138
req.type = 'quic';
139+
if (!req.transport.id) {
140+
req.transport.id = uuidWithoutDash();
141+
}
142+
transport_id = req.transport.id;
134143
} else {
135144
req.type = 'webrtc'; //FIXME: For backend compatibility with v3.4 clients.
136145
}
137146
return portal.publish(clientId, stream_id, req);
138147
}).then((result) => {
139-
safeCall(callback, 'ok', {id: stream_id});
148+
safeCall(callback, 'ok', {id: stream_id, transportId: transport_id});
140149
}).catch(onError('publish', callback));
141150
});
142151

@@ -172,16 +181,21 @@ var V11Client = function(clientId, sigConnection, portal) {
172181
}
173182

174183
var subscription_id = uuid().replace(/-/g,'');
184+
let transport_id;
175185
return validateSubReq(subReq)
176186
.then((req) => {
177187
if (req.transport && req.transport.type == 'quic') {
178188
req.type = 'quic';
189+
if (!req.transport.id) {
190+
req.transport.id = uuidWithoutDash();
191+
}
192+
transport_id = req.transport.id;
179193
} else {
180194
req.type = 'webrtc';//FIXME: For backend compatibility with v3.4 clients.
181195
}
182196
return portal.subscribe(clientId, subscription_id, req);
183197
}).then((result) => {
184-
safeCall(callback, 'ok', {id: subscription_id});
198+
safeCall(callback, 'ok', {id: subscription_id, transportId: transport_id});
185199
}).catch(onError('subscribe', callback));
186200
});
187201

0 commit comments

Comments
 (0)