@@ -81,8 +81,29 @@ module.exports = (arg) => {
8181
8282 // Drop the request once we are actually done
8383 if ( ps . listenerCount ( topic ) === 0 ) {
84- subscriptions [ topic ] . abort ( )
84+ if ( ! callback ) {
85+ return new Promise ( ( resolve , reject ) => {
86+ // When the response stream has ended, resolve the promise
87+ eos ( subscriptions [ topic ] . res , ( err ) => {
88+ // FIXME: Artificial timeout needed to ensure unsubscribed
89+ setTimeout ( ( ) => {
90+ if ( err ) return reject ( err )
91+ resolve ( )
92+ } )
93+ } )
94+ subscriptions [ topic ] . req . abort ( )
95+ subscriptions [ topic ] = null
96+ } )
97+ }
98+
99+ // When the response stream has ended, call the callback
100+ eos ( subscriptions [ topic ] . res , ( err ) => {
101+ // FIXME: Artificial timeout needed to ensure unsubscribed
102+ setTimeout ( ( ) => callback ( err ) )
103+ } )
104+ subscriptions [ topic ] . req . abort ( )
85105 subscriptions [ topic ] = null
106+ return
86107 }
87108
88109 if ( ! callback ) {
@@ -154,13 +175,16 @@ module.exports = (arg) => {
154175
155176 // Start the request and transform the response
156177 // stream to Pubsub messages stream
157- subscriptions [ topic ] = send . andTransform ( request , PubsubMessageStream . from , ( err , stream ) => {
178+ subscriptions [ topic ] = { }
179+ subscriptions [ topic ] . req = send . andTransform ( request , PubsubMessageStream . from , ( err , stream ) => {
158180 if ( err ) {
159181 subscriptions [ topic ] = null
160182 ps . removeListener ( topic , handler )
161183 return callback ( err )
162184 }
163185
186+ subscriptions [ topic ] . res = stream
187+
164188 stream . on ( 'data' , ( msg ) => {
165189 ps . emit ( topic , msg )
166190 } )
0 commit comments