1
1
import Ajv from 'ajv' ;
2
2
import { NextFunction , Request , Response } from 'express' ;
3
+ import { isEqual , uniqWith } from 'lodash' ;
3
4
import { stringify } from 'querystring' ;
4
5
import {
5
6
Adapter ,
@@ -32,13 +33,11 @@ import { CacheItemStateType } from './cache-item-state.model';
32
33
import { CalendarFilterOptions } from './calendar-filter-options.model' ;
33
34
import { IntegrationErrorType } from './integration-error.model' ;
34
35
import { PubSubClient } from './pubsub-client.model' ;
36
+ import { PubSubContactChangeEventMessage } from './pubsub-contact-change-event-message.model' ;
35
37
import {
36
38
PubSubContactsMessage ,
37
39
PubSubContactsState ,
38
40
} from './pubsub-contacts-message.model' ;
39
- import { PubSubContactChangeEventClient } from './pubsub-contact-change-event-client.model' ;
40
- import { PubSubContactChangeEventMessage } from './pubsub-contact-change-event-message.model' ;
41
- import { isEqual , uniqWith } from 'lodash' ;
42
41
43
42
const CONTACT_FETCH_TIMEOUT = 5000 ;
44
43
@@ -56,8 +55,9 @@ export class Controller {
56
55
private adapter : Adapter ;
57
56
private contactCache : ContactCache | null ;
58
57
private ajv : Ajv ;
59
- private pubSubClient : PubSubClient | null = null ;
60
- private pubSubContactsChangedClient : PubSubContactChangeEventClient | null =
58
+ private pubSubContactStreamingClient : PubSubClient < PubSubContactsMessage > | null =
59
+ null ;
60
+ private pubSubContactChangesClient : PubSubClient < PubSubContactChangeEventMessage > | null =
61
61
null ;
62
62
private integrationName : string = 'UNKNOWN' ;
63
63
private streamingPromises = new Map < string , Promise < void > > ( ) ;
@@ -68,50 +68,57 @@ export class Controller {
68
68
this . ajv = new Ajv ( ) ;
69
69
70
70
if ( this . adapter . streamContacts ) {
71
- const {
72
- PUBSUB_TOPIC_NAME : topicName ,
73
- INTEGRATION_NAME : integrationName ,
74
- } = process . env ;
71
+ this . initContactStreaming ( ) ;
72
+ }
75
73
76
- if ( ! topicName ) {
77
- throw new Error ( 'No PUBSUB_TOPIC_NAME provided.' ) ;
78
- }
74
+ if ( this . adapter . handleWebhook ) {
75
+ this . initContactChanges ( ) ;
76
+ }
77
+
78
+ if ( this . adapter . streamContacts || this . adapter . handleWebhook ) {
79
+ const { INTEGRATION_NAME : integrationName } = process . env ;
79
80
80
81
if ( ! integrationName ) {
81
82
throw new Error ( 'No INTEGRATION_NAME provided.' ) ;
82
83
}
83
84
84
85
this . integrationName = integrationName ;
85
- this . pubSubClient = new PubSubClient ( topicName ) ;
86
- infoLogger (
87
- 'Controller' ,
88
- `Initialized PubSub client with topic ${ topicName } ` ,
89
- ) ;
90
86
}
87
+ }
91
88
92
- if ( this . adapter . handleWebhook ) {
93
- const {
94
- PUBSUB_CONTACTS_CHANGED_TOPIC_NAME : contactsChangedTopicName ,
95
- INTEGRATION_NAME : integrationName ,
96
- } = process . env ;
89
+ private initContactStreaming ( ) {
90
+ const {
91
+ PUBSUB_TOPIC_NAME : topicNameLegacy ,
92
+ PUBSUB_TOPIC_NAME_CONTACT_STREAMING : topicName ,
93
+ } = process . env ;
97
94
98
- if ( ! contactsChangedTopicName ) {
99
- throw new Error ( 'No PUBSUB_CONTACTS_CHANGED_TOPIC_NAME provided.' ) ;
100
- }
95
+ const topicNameProvided = topicName ?? topicNameLegacy ;
101
96
102
- if ( ! integrationName ) {
103
- throw new Error ( 'No INTEGRATION_NAME provided.' ) ;
104
- }
105
- this . integrationName = integrationName ;
97
+ if ( ! topicNameProvided ) {
98
+ throw new Error ( 'No PUBSUB_TOPIC_NAME_CONTACT_STREAMING provided.' ) ;
99
+ }
106
100
107
- this . pubSubContactsChangedClient = new PubSubContactChangeEventClient (
108
- contactsChangedTopicName ,
109
- ) ;
110
- infoLogger (
111
- 'Controller' ,
112
- `Initialized PubSub client with topic ${ contactsChangedTopicName } ` ,
113
- ) ;
101
+ this . pubSubContactStreamingClient = new PubSubClient ( topicNameProvided ) ;
102
+
103
+ infoLogger (
104
+ 'Controller' ,
105
+ `Initialized PubSub client for topic ${ topicName } ` ,
106
+ ) ;
107
+ }
108
+
109
+ private initContactChanges ( ) {
110
+ const { PUBSUB_TOPIC_NAME_CONTACT_CHANGES : topicName } = process . env ;
111
+
112
+ if ( ! topicName ) {
113
+ throw new Error ( 'No PUBSUB_TOPIC_NAME_CONTACT_CHANGES provided.' ) ;
114
114
}
115
+
116
+ this . pubSubContactChangesClient = new PubSubClient ( topicName ) ;
117
+
118
+ infoLogger (
119
+ 'Controller' ,
120
+ `Initialized PubSub client for topic ${ topicName } ` ,
121
+ ) ;
115
122
}
116
123
117
124
public async isValidToken (
@@ -314,7 +321,7 @@ export class Controller {
314
321
integrationName : this . integrationName ,
315
322
} ;
316
323
317
- await this . pubSubClient ?. publishMessage ( message ) ;
324
+ await this . pubSubContactStreamingClient ?. publishMessage ( message ) ;
318
325
} catch ( error ) {
319
326
errorLogger (
320
327
'streamContacts' ,
@@ -330,7 +337,7 @@ export class Controller {
330
337
331
338
const streamingPromise = streamContacts ( )
332
339
. then ( ( ) =>
333
- this . pubSubClient ?. publishMessage ( {
340
+ this . pubSubContactStreamingClient ?. publishMessage ( {
334
341
userId : providerConfig . userId ,
335
342
timestamp,
336
343
contacts : [ ] ,
@@ -345,7 +352,7 @@ export class Controller {
345
352
providerConfig . apiKey ,
346
353
error ,
347
354
) ;
348
- return this . pubSubClient ?. publishMessage ( {
355
+ return this . pubSubContactStreamingClient ?. publishMessage ( {
349
356
userId : providerConfig . userId ,
350
357
timestamp,
351
358
contacts : [ ] ,
@@ -1427,7 +1434,7 @@ export class Controller {
1427
1434
...changeEvent ,
1428
1435
} ;
1429
1436
1430
- this . pubSubContactsChangedClient ?. publishMessage ( message ) ;
1437
+ this . pubSubContactChangesClient ?. publishMessage ( message ) ;
1431
1438
} ) ;
1432
1439
1433
1440
infoLogger ( 'handleWebhook' , 'END' , '' ) ;
0 commit comments