Skip to content

Commit 1db63a1

Browse files
committed
♻️ add integration name and state to PubSubContactMessage
1 parent 0fa85f5 commit 1db63a1

File tree

2 files changed

+41
-10
lines changed

2 files changed

+41
-10
lines changed

src/models/controller.model.ts

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ import { CacheItemStateType } from "./cache-item-state.model";
2828
import { CalendarFilterOptions } from "./calendar-filter-options.model";
2929
import { IntegrationErrorType } from "./integration-error.model";
3030
import { PubSubClient } from "./pubsub-client.model";
31-
import { PubSubContactsMessage } from "./pubsub-contacts-message.model";
31+
import {
32+
PubSubContactsMessage,
33+
PubSubContactsState,
34+
} from "./pubsub-contacts-message.model";
3235

3336
const CONTACT_FETCH_TIMEOUT = 3000;
3437

@@ -47,19 +50,28 @@ export class Controller {
4750
private contactCache: ContactCache | null;
4851
private ajv: Ajv;
4952
private pubSubClient: PubSubClient | null = null;
53+
private integrationName: string = "UNKNOWN";
5054

5155
constructor(adapter: Adapter, contactCache: ContactCache | null) {
5256
this.adapter = adapter;
5357
this.contactCache = contactCache;
5458
this.ajv = new Ajv();
5559

5660
if (this.adapter.streamContacts) {
57-
const { PUBSUB_TOPIC_NAME: topicName } = process.env;
61+
const {
62+
PUBSUB_TOPIC_NAME: topicName,
63+
INTEGRATION_NAME: integrationName,
64+
} = process.env;
5865

5966
if (!topicName) {
60-
throw new Error("No pubsub topic name provided.");
67+
throw new Error("No PUBSUB_TOPIC_NAME provided.");
68+
}
69+
70+
if (!integrationName) {
71+
throw new Error("No INTEGRATION_NAME provided.");
6172
}
6273

74+
this.integrationName = integrationName;
6375
this.pubSubClient = new PubSubClient(topicName);
6476
infoLogger(
6577
"Controller",
@@ -209,6 +221,8 @@ export class Controller {
209221
contacts: contacts.map((contact) =>
210222
sanitizeContact(contact, providerConfig.locale)
211223
),
224+
state: PubSubContactsState.IN_PROGRESS,
225+
integrationName: this.integrationName,
212226
};
213227

214228
await this.pubSubClient?.publishMessage(message);
@@ -225,14 +239,24 @@ export class Controller {
225239
}
226240
};
227241

228-
streamContacts().catch((error) =>
229-
errorLogger(
230-
"streamContacts",
231-
"Could not stream contacts",
232-
providerConfig.apiKey,
233-
error
242+
streamContacts()
243+
.catch((error) =>
244+
errorLogger(
245+
"streamContacts",
246+
"Could not stream contacts",
247+
providerConfig.apiKey,
248+
error
249+
)
234250
)
235-
);
251+
.finally(() =>
252+
this.pubSubClient?.publishMessage({
253+
userId: providerConfig.userId,
254+
timestamp,
255+
contacts: [],
256+
state: PubSubContactsState.COMPLETE,
257+
integrationName: this.integrationName,
258+
})
259+
);
236260

237261
if (this.adapter.getToken && req.providerConfig) {
238262
const { apiKey } = await this.adapter.getToken(req.providerConfig);
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
import { Contact } from "./contact.model";
22

3+
export enum PubSubContactsState {
4+
"IN_PROGRESS",
5+
"COMPLETE",
6+
}
7+
38
export type PubSubContactsMessage = {
49
userId: string;
510
timestamp: number;
611
contacts: Contact[];
12+
state: PubSubContactsState;
13+
integrationName: string;
714
};

0 commit comments

Comments
 (0)