@@ -10,11 +10,13 @@ import {
10
10
CallEventWithIntegrationEntities ,
11
11
Contact ,
12
12
ContactCache ,
13
- ContactChangeEvent ,
14
13
ContactDelta ,
15
14
ContactTemplate ,
16
15
ContactUpdate ,
17
16
IntegrationEntityType ,
17
+ IntegrationsEvent ,
18
+ PubSubClient ,
19
+ PubSubIntegrationsEventMessage ,
18
20
ServerError ,
19
21
} from '.' ;
20
22
import { calendarEventsSchema , contactsSchema } from '../schemas' ;
@@ -32,12 +34,11 @@ import {
32
34
import { CacheItemStateType } from './cache-item-state.model' ;
33
35
import { CalendarFilterOptions } from './calendar-filter-options.model' ;
34
36
import { IntegrationErrorType } from './integration-error.model' ;
35
- import { PubSubClient } from './pubsub-client.model' ;
36
- import { PubSubContactChangeEventMessage } from './pubsub-contact-change-event-message.model' ;
37
+
37
38
import {
38
39
PubSubContactsMessage ,
39
40
PubSubContactsState ,
40
- } from './pubsub-contacts-message.model' ;
41
+ } from './pubsub/pubsub -contacts-message.model' ;
41
42
42
43
const CONTACT_FETCH_TIMEOUT = 5000 ;
43
44
@@ -57,7 +58,7 @@ export class Controller {
57
58
private ajv : Ajv ;
58
59
private pubSubContactStreamingClient : PubSubClient < PubSubContactsMessage > | null =
59
60
null ;
60
- private pubSubContactChangesClient : PubSubClient < PubSubContactChangeEventMessage > | null =
61
+ private pubSubIntegrationEventsClient : PubSubClient < PubSubIntegrationsEventMessage > | null =
61
62
null ;
62
63
private integrationName : string = 'UNKNOWN' ;
63
64
private streamingPromises = new Map < string , Promise < void > > ( ) ;
@@ -107,13 +108,13 @@ export class Controller {
107
108
}
108
109
109
110
private initContactChanges ( ) {
110
- const { PUBSUB_TOPIC_NAME_CONTACT_CHANGES : topicName } = process . env ;
111
+ const { PUBSUB_TOPIC_NAME_INTEGRATION_EVENTS : topicName } = process . env ;
111
112
112
113
if ( ! topicName ) {
113
- throw new Error ( 'No PUBSUB_TOPIC_NAME_CONTACT_CHANGES provided.' ) ;
114
+ throw new Error ( 'No PUBSUB_TOPIC_NAME_INTEGRATION_EVENTS provided.' ) ;
114
115
}
115
116
116
- this . pubSubContactChangesClient = new PubSubClient ( topicName ) ;
117
+ this . pubSubIntegrationEventsClient = new PubSubClient ( topicName ) ;
117
118
118
119
infoLogger (
119
120
'Controller' ,
@@ -1411,36 +1412,42 @@ export class Controller {
1411
1412
const verified = await this . adapter . verifyWebhookRequest ( req ) ;
1412
1413
1413
1414
if ( ! verified ) {
1414
- errorLogger ( 'handleWebhook' , 'Webhook verification failed' , '' ) ;
1415
+ errorLogger ( 'handleWebhook' , 'Webhook verification failed' ) ;
1415
1416
throw new ServerError ( 403 , 'Webhook verification failed' ) ;
1416
1417
}
1417
1418
1418
- infoLogger ( 'handleWebhook' , 'START' , '' ) ;
1419
-
1420
- const changeEvents : ContactChangeEvent [ ] =
1421
- await this . adapter . handleWebhook ( req ) ;
1419
+ infoLogger ( 'handleWebhook' , 'START' ) ;
1422
1420
1423
- infoLogger (
1424
- 'handleWebhook' ,
1425
- `Got ${ changeEvents . length } changed contacts` ,
1426
- '' ,
1427
- ) ;
1421
+ const events : IntegrationsEvent [ ] = await this . adapter . handleWebhook ( req ) ;
1428
1422
1429
- const deduplicatedChangeEvents = uniqWith ( changeEvents , isEqual ) ;
1423
+ infoLogger ( 'handleWebhook' , `Got ${ events . length } events` ) ;
1430
1424
1431
- deduplicatedChangeEvents . map ( ( changeEvent : ContactChangeEvent ) => {
1432
- infoLogger (
1433
- 'handleWebhook' ,
1434
- `Publishing contact change event with accountId ${ changeEvent . accountId } and contactId ${ changeEvent . contactId } ` ,
1435
- '' ,
1436
- ) ;
1425
+ const deduplicatedEvents = uniqWith ( events , isEqual ) ;
1437
1426
1438
- const message : PubSubContactChangeEventMessage = {
1439
- integrationName : this . integrationName ,
1440
- ...changeEvent ,
1441
- } ;
1427
+ const publishResults = await Promise . allSettled (
1428
+ deduplicatedEvents
1429
+ . map < PubSubIntegrationsEventMessage > ( ( event : IntegrationsEvent ) => ( {
1430
+ integrationName : this . integrationName ,
1431
+ ...event ,
1432
+ } ) )
1433
+ . map ( ( message ) => {
1434
+ infoLogger (
1435
+ 'handleWebhook' ,
1436
+ `Publishing event ${ message . type } with accountId ${ message . accountId } ` ,
1437
+ ) ;
1438
+ return this . pubSubIntegrationEventsClient ?. publishMessage ( message ) ;
1439
+ } ) ,
1440
+ ) ;
1442
1441
1443
- this . pubSubContactChangesClient ?. publishMessage ( message ) ;
1442
+ publishResults . forEach ( ( result ) => {
1443
+ if ( result . status === 'rejected' ) {
1444
+ errorLogger (
1445
+ 'handleWebhook' ,
1446
+ `Could not publish event ${ result . reason . type } with accountId ${ result . reason . accountId } ` ,
1447
+ '' ,
1448
+ result . reason ,
1449
+ ) ;
1450
+ }
1444
1451
} ) ;
1445
1452
1446
1453
infoLogger ( 'handleWebhook' , 'END' , '' ) ;
0 commit comments