@@ -28,12 +28,21 @@ qx.Class.define("osparc.store.StreamTasks", {
2828 } ,
2929
3030 members : {
31- createStreamTask : function ( streamPromise , interval ) {
31+ getStreamTask : function ( action , params , streamPromise , interval ) {
32+ const internalId = action + "_" + JSON . stringify ( params ) ;
33+ const task = this . __getStreamTask ( internalId ) ;
34+ if ( task ) {
35+ return Promise . resolve ( task ) ;
36+ }
37+ return this . __createStreamTask ( internalId , streamPromise , interval ) ;
38+ } ,
39+
40+ __createStreamTask : function ( internalId , streamPromise , interval ) {
3241 return new Promise ( ( resolve , reject ) => {
3342 streamPromise
3443 . then ( streamData => {
3544 if ( "status_href" in streamData ) {
36- const task = this . __addTask ( streamData , interval ) ;
45+ const task = this . __addStreamTask ( internalId , streamData , interval ) ;
3746 resolve ( task ) ;
3847 } else {
3948 throw Error ( "Status missing" ) ;
@@ -43,32 +52,33 @@ qx.Class.define("osparc.store.StreamTasks", {
4352 } ) ;
4453 } ,
4554
46- __removeTask : function ( task ) {
55+ __getStreamTask : function ( internalId ) {
4756 const tasks = this . getTasks ( ) ;
48- const index = tasks . findIndex ( t => t . getTaskId ( ) === task . getTaskId ( ) ) ;
49- if ( index > - 1 ) {
50- tasks . splice ( index , 1 ) ;
57+ if ( internalId in tasks ) {
58+ return tasks [ internalId ] ;
5159 }
60+ return null ;
5261 } ,
5362
54- __addTask : function ( streamData , interval ) {
55- const tasks = this . getTasks ( ) ;
56- if ( streamData [ "task_id" ] in tasks ) {
57- return tasks [ streamData [ "task_id" ] ] ;
63+ __addStreamTask : function ( internalId , streamData , interval ) {
64+ const task = this . __getStreamTask ( internalId ) ;
65+ if ( task ) {
66+ return task ;
5867 }
5968
6069 const stream = new osparc . data . StreamTask ( streamData , interval ) ;
61- stream . addListener ( "resultReceived" , ( ) => this . __removeTask ( stream ) , this ) ;
62- stream . addListener ( "taskAborted" , ( ) => this . __removeTask ( stream ) , this ) ;
63- tasks [ stream . getTaskId ( ) ] = stream ;
70+ stream . addListener ( "resultReceived" , ( ) => this . __removeStreamTask ( stream ) , this ) ;
71+ stream . addListener ( "taskAborted" , ( ) => this . __removeStreamTask ( stream ) , this ) ;
72+ const tasks = this . getTasks ( ) ;
73+ tasks [ internalId ] = stream ;
6474 return stream ;
6575 } ,
6676
67- fetchStream : function ( taskId ) {
77+ __removeStreamTask : function ( stream ) {
6878 const tasks = this . getTasks ( ) ;
69- if ( taskId in tasks ) {
70- const task = tasks [ taskId ] ;
71- task . fetchStream ( ) ;
79+ const index = tasks . findIndex ( t => t . getTaskId ( ) === stream . getTaskId ( ) ) ;
80+ if ( index > - 1 ) {
81+ tasks . splice ( index , 1 ) ;
7282 }
7383 } ,
7484 }
0 commit comments