@@ -65,12 +65,13 @@ class Track {
65
65
66
66
class Operation {
67
67
68
- constructor ( id , transport , direction , tracks ) {
68
+ constructor ( id , transport , direction , tracks , legacy ) {
69
69
this . id = id ;
70
70
this . transport = transport ;
71
71
this . transportId = transport . id ;
72
72
this . direction = direction ;
73
73
this . tracks = tracks . map ( t => new Track ( this , t ) ) ;
74
+ this . legacy = legacy ;
74
75
this . promise = Promise . resolve ( ) ;
75
76
}
76
77
@@ -159,7 +160,15 @@ class RtcController extends EventEmitter {
159
160
// Destroy transport
160
161
if ( transport . locality ) {
161
162
log . debug ( `Destroy transport ${ transportId } ` ) ;
162
- this . rpcReq . destroyTransport ( transport . locality . node , transportId ) ;
163
+ this . rpcReq . destroyTransport ( transport . locality . node , transportId )
164
+ . catch ( ( e ) => {
165
+ log . debug ( `Faild to clean up transport ${ transportId } : ${ e } ` ) ;
166
+ } ) . then ( ( ) => {
167
+ const locality = transport . locality ;
168
+ log . debug ( `to recycleWorkerNode: ${ locality } task:, ${ transportId } ` ) ;
169
+ const taskConfig = { room : this . roomId , task : transportId } ;
170
+ return this . rpcReq . recycleWorkerNode ( locality . agent , locality . node , taskConfig )
171
+ } ) . catch ( ( e ) => log . debug ( `Failed to recycleWorkerNode ${ locality } ` ) ) ;
163
172
} else {
164
173
log . warn ( `No locality for failed transport ${ transportId } ` ) ;
165
174
}
@@ -231,36 +240,26 @@ class RtcController extends EventEmitter {
231
240
}
232
241
} else if ( info . type === 'tracks-complete' ) {
233
242
const operation = this . operations . get ( info . operationId ) ;
234
- operation . state = COMPLETED ;
235
- if ( operation . transport . state === COMPLETED ) {
236
- // Only emit when transport is completed
237
- this . emit ( 'session-established' , operation ) ;
243
+ if ( operation ) {
244
+ operation . state = COMPLETED ;
245
+ if ( operation . transport . state === COMPLETED ) {
246
+ // Only emit when transport is completed
247
+ this . emit ( 'session-established' , operation ) ;
248
+ }
238
249
}
239
250
}
240
251
}
241
252
242
- _createTransportIfNeeded ( ownerId , sessionId , origin , tId ) {
253
+ async _createTransportIfNeeded ( ownerId , sessionId , origin , tId ) {
243
254
if ( ! this . transports . has ( tId ) ) {
244
- this . transports . set ( tId , new Transport ( tId , ownerId , origin ) ) ;
245
- const taskConfig = { room : this . roomId , task : sessionId } ;
255
+ const taskConfig = { room : this . roomId , task : tId } ;
246
256
log . debug ( `getWorkerNode ${ this . clusterRpcId } , ${ taskConfig } , ${ origin } ` ) ;
247
- return this . rpcReq . getWorkerNode ( this . clusterRpcId , 'webrtc' , taskConfig , origin )
248
- . then ( ( locality ) => {
249
- if ( ! this . transports . has ( tId ) ) {
250
- log . debug ( `Transport destroyed after getWorkerNode ${ tId } ` ) ;
251
- this . rpcReq . recycleWorkerNode ( locality . agent , locality . node , taskConfig )
252
- . catch ( reason => {
253
- log . debug ( 'AccessNode not recycled' , locality ) ;
254
- } ) ;
255
- return Promise . reject ( 'Session has been aborted' ) ;
256
- }
257
- log . debug ( `getWorkerNode ok, sessionId: ${ sessionId } , locality: ${ locality } ` ) ;
258
- this . transports . get ( tId ) . setup ( locality ) ;
259
- return this . transports . get ( tId ) ;
260
- } ) ;
261
- } else {
262
- return Promise . resolve ( this . transports . get ( tId ) ) ;
257
+ const locality = await this . rpcReq . getWorkerNode (
258
+ this . clusterRpcId , 'webrtc' , taskConfig , origin ) ;
259
+ this . transports . set ( tId , new Transport ( tId , ownerId , origin ) ) ;
260
+ this . transports . get ( tId ) . setup ( locality ) ;
263
261
}
262
+ return this . transports . get ( tId ) ;
264
263
}
265
264
266
265
// tracks = [ {mid, type, formatPreference, from} ]
@@ -271,7 +270,7 @@ class RtcController extends EventEmitter {
271
270
272
271
return this . _createTransportIfNeeded ( ownerId , sessionId , origin , transportId )
273
272
. then ( transport => {
274
- if ( transport . state !== PENDING && transport . state !== COMPLETED ) {
273
+ if ( ! this . transports . has ( transportId ) ) {
275
274
return Promise . reject ( `Transport ${ transportId } is not ready` ) ;
276
275
}
277
276
const locality = transport . locality ;
@@ -281,12 +280,11 @@ class RtcController extends EventEmitter {
281
280
log . debug ( `operation exists:${ sessionId } ` ) ;
282
281
return Promise . reject ( `operation exists:${ sessionId } ` ) ;
283
282
}
284
- const op = new Operation ( sessionId , transport , direction , tracks ) ;
283
+ const op = new Operation ( sessionId , transport , direction , tracks , legacy ) ;
285
284
this . operations . set ( sessionId , op ) ;
286
- // Save promise for this operation
285
+ // Return promise for this operation
287
286
const options = { transportId, tracks, controller : this . roomRpcId } ;
288
- op . promise = this . rpcReq . initiate ( locality . node , sessionId , 'webrtc' , direction , options ) ;
289
- return op . promise ;
287
+ return this . rpcReq . initiate ( locality . node , sessionId , 'webrtc' , direction , options ) ;
290
288
} ) ;
291
289
}
292
290
@@ -301,102 +299,82 @@ class RtcController extends EventEmitter {
301
299
const operation = this . operations . get ( sessionId ) ;
302
300
const transport = this . transports . get ( operation . transportId ) ;
303
301
const locality = transport . locality ;
304
- operation . promise = operation . promise . then ( ( ) => {
305
- if ( ! this . operations . has ( sessionId ) ) {
306
- log . debug ( `operation does NOT exist:${ sessionId } ` ) ;
307
- return Promise . reject ( `operation does NOT exist:${ sessionId } ` ) ;
308
- }
309
- return this . rpcReq . terminate ( locality . node , sessionId , direction ) ;
310
- } ) . then ( ( ) => {
302
+ return this . rpcReq . terminate ( locality . node , sessionId , direction ) . then ( ( ) => {
311
303
if ( this . operations . has ( sessionId ) ) {
312
304
const owner = transport . owner ;
313
305
const abortData = { direction : operation . direction , owner, reason } ;
314
306
this . emit ( 'session-aborted' , sessionId , abortData ) ;
315
307
this . operations . delete ( sessionId ) ;
316
- log . debug ( `to recycleWorkerNode: ${ locality } task:, ${ sessionId } ` ) ;
317
- const taskConfig = { room : this . roomId , task : sessionId } ;
318
- return this . rpcReq . recycleWorkerNode ( locality . agent , locality . node , taskConfig )
319
- . catch ( reason => {
320
- log . debug ( `AccessNode not recycled ${ locality } , ${ reason } ` ) ;
321
- } ) ;
322
308
}
323
309
} )
324
310
. catch ( reason => {
325
311
log . debug ( `Operation terminate failed ${ operation } , ${ reason } ` ) ;
326
312
} ) ;
327
- return operation . promise ;
328
313
} ;
329
314
330
315
terminateByOwner ( ownerId ) {
331
316
log . debug ( `terminateByOwner ${ ownerId } ` ) ;
332
317
const terminations = [ ] ;
333
- // Or just destroy the transport
318
+ const transports = new Set ( ) ;
334
319
this . operations . forEach ( ( operation , operationId ) => {
335
320
const transport = this . transports . get ( operation . transportId ) ;
336
321
if ( transport . owner === ownerId ) {
337
322
const p = this . terminate ( operationId , operation . direction , 'Owner leave' ) ;
338
323
terminations . push ( p ) ;
324
+ transports . add ( transport . id ) ;
339
325
}
340
326
} ) ;
341
- return Promise . all ( terminations ) ;
327
+ return Promise . all ( terminations ) . then ( ( ) => {
328
+ transports . forEach ( ( transportId ) => {
329
+ const status = { type : 'failed' , reason : 'Owner leave' } ;
330
+ this . onTransportProgress ( transportId , status ) ;
331
+ } ) ;
332
+ } ) ;
342
333
} ;
343
334
344
335
terminateByLocality ( type , id ) {
345
336
log . debug ( `terminateByLocality ${ type } ${ id } ` ) ;
346
337
const terminations = [ ] ;
347
- // Or just destroy the transport
338
+ const transports = new Set ( ) ;
348
339
this . operations . forEach ( ( operation , operationId ) => {
349
340
const l = this . transports . get ( operation . transportId ) . locality ;
350
- if ( l ) {
351
- if ( ( type === 'worker' && l . agent === id ) ||
352
- ( type === 'node' && l . node === id ) ) {
353
- const p = this . terminate ( operationId , operation . direction , 'Node lost' ) ;
354
- terminations . push ( p ) ;
355
- }
341
+ if ( l && ( ( type === 'worker' && l . agent === id ) ||
342
+ ( type === 'node' && l . node === id ) ) ) {
343
+ const p = this . terminate ( operationId , operation . direction , 'Node lost' ) ;
344
+ terminations . push ( p ) ;
345
+ transports . add ( operation . transportId ) ;
356
346
}
357
347
} ) ;
358
- return Promise . all ( terminations ) ;
348
+ return Promise . all ( terminations ) . then ( ( ) => {
349
+ transports . forEach ( ( transportId ) => {
350
+ const status = { type : 'failed' , reason : 'Owner leave' } ;
351
+ this . onTransportProgress ( transportId , status ) ;
352
+ } ) ;
353
+ } ) ;
359
354
} ;
360
355
361
- onFaultDetected ( type , id ) {
362
-
363
- }
364
-
365
356
destroy ( ) {
366
357
log . debug ( `destroy` ) ;
367
- const terminations = [ ] ;
368
358
// Destroy all transports
369
359
this . transports . forEach ( ( transport , transportId ) => {
370
- const p = this . rpcReq . destroyTransport ( transportId ) ;
360
+ const status = { type : 'failed' , reason : 'Owner leave' } ;
361
+ this . onTransportProgress ( transportId , status ) ;
371
362
} ) ;
372
- return Promise . all ( terminations ) ;
373
363
} ;
374
364
375
365
setMute ( sessionId , tracks , muted ) {
376
366
log . debug ( `setMute, sessionId: ${ sessionId } tracks: ${ tracks } muted: ${ muted } ` ) ;
377
-
378
367
if ( ! this . operations . has ( sessionId ) ) {
379
368
log . debug ( `operation does NOT exist:${ sessionId } ` ) ;
380
369
return Promise . reject ( `operation does NOT exist:${ sessionId } ` ) ;
381
370
}
382
-
383
371
const operation = this . operations . get ( sessionId ) ;
384
372
const transport = this . transports . get ( operation . transportId ) ;
385
373
const locality = transport . locality ;
386
374
const onOff = muted ? 'off' : 'on' ;
387
-
388
- operation . promise = operation . promise . then ( ( ) => {
389
- if ( ! this . operations . has ( sessionId ) ) {
390
- log . debug ( `operation does NOT exist:${ sessionId } ` ) ;
391
- return Promise . reject ( `operation does NOT exist:${ sessionId } ` ) ;
392
- }
393
- return this . rpcReq . mediaOnOff (
394
- locality . node , sessionId , tracks , operation . direction , onOff ) ;
395
- } ) ;
396
- const ret = operation . promise ;
397
- operation . promise = operation . promise
398
- . catch ( reason => log . debug ( `setMute failed ${ sessionId } ` ) ) ;
399
- return ret ;
375
+ return this . rpcReq . mediaOnOff (
376
+ locality . node , sessionId , tracks , operation . direction , onOff )
377
+ . catch ( reason => log . debug ( `setMute failed ${ sessionId } ` ) ) ; ;
400
378
} ;
401
379
402
380
}
0 commit comments