15
15
// all agents. They are defined in base-agent.js.
16
16
17
17
'use strict' ;
18
- const Connections = require ( './connections' ) ;
19
18
const InternalConnectionFactory = require ( './InternalConnectionFactory' ) ;
20
19
const logger = require ( '../logger' ) . logger ;
21
20
const QuicTransportServer = require ( './webtransport/quicTransportServer' ) ;
@@ -25,6 +24,7 @@ const log = logger.getLogger('QuicNode');
25
24
const addon = require ( './build/Release/quic' ) ;
26
25
const cipher = require ( '../cipher' ) ;
27
26
const path = require ( 'path' ) ;
27
+ const { InternalConnectionRouter} = require ( './internalConnectionRouter' ) ;
28
28
29
29
log . info ( 'QUIC transport node.' )
30
30
@@ -34,7 +34,7 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
34
34
agentID : parentRpcId ,
35
35
clusterIP : clusterWorkerIP
36
36
} ;
37
- const connections = new Connections ;
37
+ const router = new InternalConnectionRouter ( global . config . internal ) ;
38
38
const internalConnFactory = new InternalConnectionFactory ;
39
39
const incomingStreamPipelines =
40
40
new Map ( ) ; // Key is publication ID, value is stream pipeline.
@@ -100,18 +100,22 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
100
100
password , validateToken ) ;
101
101
quicTransportServer . start ( ) ;
102
102
quicTransportServer . on ( 'streamadded' , ( stream ) => {
103
- log . debug ( 'A stream with session ID ' + stream . contentSessionId + ' is added.' ) ;
104
- const conn = connections . getConnection ( stream . contentSessionId ) ;
105
- if ( conn ) {
106
- // TODO: verify transport ID.
107
- conn . connection . quicStream ( stream ) ;
108
- // TODO: Make RPC call to conference node for session-established.
103
+ log . debug (
104
+ 'A stream with session ID ' + stream . contentSessionId +
105
+ ' is added.' ) ;
106
+ let pipeline = null ;
107
+ if ( outgoingStreamPipelines . has ( stream . contentSessionId ) ) {
108
+ pipeline = outgoingStreamPipelines . get ( stream . contentSessionId ) ;
109
+ } else if ( incomingStreamPipelines . has ( stream . contentSessionId ) ) {
110
+ pipeline = incomingStreamPipelines . get ( stream . contentSessionId ) ;
109
111
} else {
110
112
log . warn (
111
113
'Cannot find a pipeline for QUIC stream. Content session ID: ' +
112
114
stream . contentSessionId ) ;
113
115
stream . close ( ) ;
116
+ return ;
114
117
}
118
+ pipeline . quicStream ( stream ) ;
115
119
} ) ;
116
120
quicTransportServer . on ( 'connectionadded' , ( connection ) => {
117
121
log . debug (
@@ -159,32 +163,21 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
159
163
} ;
160
164
} ;
161
165
162
- that . createInternalConnection = function ( connectionId , direction , internalOpt , callback ) {
163
- internalOpt . minport = global . config . internal . minport ;
164
- internalOpt . maxport = global . config . internal . maxport ;
165
- var portInfo = internalConnFactory . create ( connectionId , direction , internalOpt ) ;
166
- callback ( 'callback' , { ip : that . clusterIP , port : portInfo } ) ;
167
- } ;
168
-
169
- that . destroyInternalConnection = function ( connectionId , direction , callback ) {
170
- internalConnFactory . destroy ( connectionId , direction ) ;
171
- callback ( 'callback' , 'ok' ) ;
166
+ that . getInternalAddress = function ( callback ) {
167
+ const ip = global . config . internal . ip_address ;
168
+ const port = router . internalPort ;
169
+ callback ( 'callback' , { ip, port} ) ;
172
170
} ;
173
171
174
172
// functions: publish, unpublish, subscribe, unsubscribe, linkup, cutoff
175
173
that . publish = function ( connectionId , connectionType , options , callback ) {
176
174
log . debug ( 'publish, connectionId:' , connectionId , 'connectionType:' , connectionType , 'options:' , options ) ;
177
- if ( connections . getConnection ( connectionId ) ) {
175
+ if ( router . getConnection ( connectionId ) ) {
178
176
return callback ( 'callback' , { type : 'failed' , reason : 'Connection already exists:' + connectionId } ) ;
179
177
}
180
178
181
179
var conn = null ;
182
180
switch ( connectionType ) {
183
- case 'internal' :
184
- conn = internalConnFactory . fetch ( connectionId , 'in' ) ;
185
- if ( conn )
186
- conn . connect ( options ) ;
187
- break ;
188
181
case 'quic' :
189
182
conn = createStreamPipeline ( connectionId , 'in' , options , callback ) ;
190
183
if ( ! conn ) {
@@ -198,14 +191,16 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
198
191
log . error ( 'Create connection failed' , connectionId , connectionType ) ;
199
192
return callback ( 'callback' , { type : 'failed' , reason : 'Create Connection failed' } ) ;
200
193
}
201
- connections . addConnection ( connectionId , connectionType , options . controller , conn , 'in' )
202
- . then ( onSuccess ( callback ) , onError ( callback ) ) ;
194
+ conn . bindRouterAsSourceCallback = function ( stream ) {
195
+ router . addLocalSource ( connectionId , connectionType , stream ) ;
196
+ }
197
+ onSuccess ( callback ) ( ) ;
203
198
} ;
204
199
205
200
that . unpublish = function ( connectionId , callback ) {
206
201
log . debug ( 'unpublish, connectionId:' , connectionId ) ;
207
- var conn = connections . getConnection ( connectionId ) ;
208
- connections . removeConnection ( connectionId ) . then ( function ( ok ) {
202
+ var conn = router . getConnection ( connectionId ) ;
203
+ router . removeConnection ( connectionId ) . then ( function ( ok ) {
209
204
if ( conn && conn . type === 'internal' ) {
210
205
internalConnFactory . destroy ( connectionId , 'in' ) ;
211
206
} else if ( conn ) {
@@ -225,17 +220,12 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
225
220
if ( ! options . data ) {
226
221
log . error ( 'Subscription request does not include data field.' ) ;
227
222
}
228
- if ( connections . getConnection ( connectionId ) ) {
223
+ if ( router . getConnection ( connectionId ) ) {
229
224
return callback ( 'callback' , { type : 'failed' , reason : 'Connection already exists:' + connectionId } ) ;
230
225
}
231
226
232
227
var conn = null ;
233
228
switch ( connectionType ) {
234
- case 'internal' :
235
- conn = internalConnFactory . fetch ( connectionId , 'out' ) ;
236
- if ( conn )
237
- conn . connect ( options ) ; //FIXME: May FAIL here!!!!!
238
- break ;
239
229
case 'quic' :
240
230
conn = createStreamPipeline ( connectionId , 'out' , options , callback ) ;
241
231
const stream = quicTransportServer . createSendStream ( options . transport . id , connectionId ) ;
@@ -252,14 +242,14 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
252
242
return callback ( 'callback' , { type : 'failed' , reason : 'Create Connection failed' } ) ;
253
243
}
254
244
255
- connections . addConnection ( connectionId , connectionType , options . controller , conn , 'out' )
245
+ router . addLocalDestination ( connectionId , connectionType , conn )
256
246
. then ( onSuccess ( callback ) , onError ( callback ) ) ;
257
247
} ;
258
248
259
249
that . unsubscribe = function ( connectionId , callback ) {
260
250
log . debug ( 'unsubscribe, connectionId:' , connectionId ) ;
261
- var conn = connections . getConnection ( connectionId ) ;
262
- connections . removeConnection ( connectionId ) . then ( function ( ok ) {
251
+ var conn = router . getConnection ( connectionId ) ;
252
+ router . removeConnection ( connectionId ) . then ( function ( ok ) {
263
253
if ( conn && conn . type === 'internal' ) {
264
254
internalConnFactory . destroy ( connectionId , 'out' ) ;
265
255
} else if ( conn ) {
@@ -269,19 +259,19 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
269
259
} , onError ( callback ) ) ;
270
260
} ;
271
261
272
- that . linkup = function ( connectionId , audioFrom , videoFrom , dataFrom , callback ) {
273
- log . debug ( 'linkup.' ) ;
274
- connections . linkupConnection ( connectionId , audioFrom , videoFrom , dataFrom ) . then ( onSuccess ( callback ) , onError ( callback ) ) ;
262
+ that . linkup = function ( connectionId , from , callback ) {
263
+ log . debug ( 'linkup, connectionId:' , connectionId , 'from:' , from ) ;
264
+ router . linkup ( connectionId , from ) . then ( onSuccess ( callback ) , onError ( callback ) ) ;
275
265
} ;
276
266
277
267
that . cutoff = function ( connectionId , callback ) {
278
268
log . debug ( 'cutoff, connectionId:' , connectionId ) ;
279
- connections . cutoffConnection ( connectionId ) . then ( onSuccess ( callback ) , onError ( callback ) ) ;
269
+ router . cutoff ( connectionId ) . then ( onSuccess ( callback ) , onError ( callback ) ) ;
280
270
} ;
281
271
282
272
that . mediaOnOff = function ( connectionId , track , direction , action , callback ) {
283
273
log . debug ( 'mediaOnOff, connection id:' , connectionId , 'track:' , track , 'direction:' , direction , 'action:' , action ) ;
284
- var conn = connections . getConnection ( connectionId ) ;
274
+ var conn = router . getConnection ( connectionId ) ;
285
275
if ( conn ) {
286
276
if ( conn . type === 'quic' ) { //NOTE: Only webrtc connection supports media-on-off
287
277
conn . connection . onTrackControl ( track ,
@@ -304,21 +294,11 @@ module.exports = function (rpcClient, selfRpcId, parentRpcId, clusterWorkerIP) {
304
294
} ;
305
295
306
296
that . close = function ( ) {
307
- log . debug ( 'close called' ) ;
308
- var connIds = connections . getIds ( ) ;
309
- for ( let connectionId of connIds ) {
310
- var conn = connections . getConnection ( connectionId ) ;
311
- connections . removeConnection ( connectionId ) ;
312
- if ( conn && conn . type === 'internal' ) {
313
- internalConnFactory . destroy ( connectionId , conn . direction ) ;
314
- } else if ( conn && conn . connection ) {
315
- conn . connection . close ( ) ;
316
- }
317
- }
297
+ router . clear ( ) ;
318
298
} ;
319
299
320
300
that . onFaultDetected = function ( message ) {
321
- connections . onFaultDetected ( message ) ;
301
+ router . onFaultDetected ( message ) ;
322
302
} ;
323
303
324
304
return that ;
0 commit comments