@@ -51,6 +51,7 @@ export class Controller {
51
51
private ajv : Ajv ;
52
52
private pubSubClient : PubSubClient | null = null ;
53
53
private integrationName : string = "UNKNOWN" ;
54
+ private streamingPromises = new Map < string , Promise < void > > ( ) ;
54
55
55
56
constructor ( adapter : Adapter , contactCache : ContactCache | null ) {
56
57
this . adapter = adapter ;
@@ -189,6 +190,12 @@ export class Controller {
189
190
throw new ServerError ( 400 , "Missing parameters" ) ;
190
191
}
191
192
193
+ const { userId } = providerConfig ;
194
+
195
+ if ( ! userId ) {
196
+ throw new ServerError ( 400 , "Missing user ID" ) ;
197
+ }
198
+
192
199
const timestamp = Date . now ( ) ;
193
200
194
201
try {
@@ -216,7 +223,7 @@ export class Controller {
216
223
}
217
224
218
225
const message : PubSubContactsMessage = {
219
- userId : providerConfig . userId ,
226
+ userId,
220
227
timestamp,
221
228
contacts : contacts . map ( ( contact ) =>
222
229
sanitizeContact ( contact , providerConfig . locale )
@@ -239,7 +246,7 @@ export class Controller {
239
246
}
240
247
} ;
241
248
242
- streamContacts ( )
249
+ const streamingPromise = streamContacts ( )
243
250
. catch ( ( error ) =>
244
251
errorLogger (
245
252
"streamContacts" ,
@@ -248,15 +255,18 @@ export class Controller {
248
255
error
249
256
)
250
257
)
251
- . finally ( ( ) =>
258
+ . finally ( ( ) => {
252
259
this . pubSubClient ?. publishMessage ( {
253
260
userId : providerConfig . userId ,
254
261
timestamp,
255
262
contacts : [ ] ,
256
263
state : PubSubContactsState . COMPLETE ,
257
264
integrationName : this . integrationName ,
258
- } )
259
- ) ;
265
+ } ) ;
266
+ this . streamingPromises . delete ( `${ userId } :${ timestamp } ` ) ;
267
+ } ) ;
268
+
269
+ this . streamingPromises . set ( `${ userId } :${ timestamp } ` , streamingPromise ) ;
260
270
261
271
if ( this . adapter . getToken && req . providerConfig ) {
262
272
const { apiKey } = await this . adapter . getToken ( req . providerConfig ) ;
0 commit comments