Skip to content

Commit a96b779

Browse files
committed
Merge branch 'contacts-streaming'
2 parents 1a752a0 + 4e4efb3 commit a96b779

13 files changed

+2330
-152
lines changed

package-lock.json

Lines changed: 2137 additions & 144 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"license": "UNLICENSED",
2020
"scripts": {
2121
"dev": "npm run build && npm link && tsc --watch",
22+
"watch": "tsc --watch",
2223
"prepare": "husky install",
2324
"test": "jest",
2425
"compile": "tsc",
@@ -63,6 +64,7 @@
6364
"typescript": "4.9.5"
6465
},
6566
"dependencies": {
67+
"@google-cloud/pubsub": "^3.7.1",
6668
"ajv": "^8.11.0",
6769
"awesome-phonenumber": "^3.2.0",
6870
"axios": "^0.27.2",

src/index.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import {
1212
} from "./models";
1313
import { CustomRouter } from "./models/custom-router.model";
1414
import { CustomRoute } from "./models/custom-routes.model";
15-
import { getContactCache } from "./util/get-contact-cache";
1615
import { errorLogger, infoLogger } from "./util";
16+
import { getContactCache } from "./util/get-contact-cache";
1717

1818
const PORT: number = Number(process.env.PORT) || 8080;
1919

@@ -43,51 +43,71 @@ export function start(
4343
app.get("/contacts", (req, res, next) =>
4444
controller.getContacts(req, res, next)
4545
);
46+
4647
app.post("/contacts", (req, res, next) =>
4748
controller.createContact(req, res, next)
4849
);
50+
4951
app.put("/contacts/:id", (req, res, next) =>
5052
controller.updateContact(req, res, next)
5153
);
54+
5255
app.delete("/contacts/:id", (req, res, next) =>
5356
controller.deleteContact(req, res, next)
5457
);
58+
59+
app.post("/contacts/stream", (req, res, next) =>
60+
controller.streamContacts(req, res, next)
61+
);
62+
5563
app.get(
5664
"/entity/:type/:id",
5765
(req: IntegrationEntityBridgeRequest, res, next) =>
5866
controller.getEntity(req, res, next)
5967
);
68+
6069
app.get("/calendar", (req, res, next) =>
6170
controller.getCalendarEvents(req, res, next)
6271
);
72+
6373
app.post("/calendar", (req, res, next) =>
6474
controller.createCalendarEvent(req, res, next)
6575
);
76+
6677
app.put("/calendar/:id", (req, res, next) =>
6778
controller.updateCalendarEvent(req, res, next)
6879
);
80+
6981
app.delete("/calendar/:id", (req, res, next) =>
7082
controller.deleteCalendarEvent(req, res, next)
7183
);
84+
7285
app.post("/events/calls", (req, res, next) =>
7386
controller.handleCallEvent(req, res, next)
7487
);
88+
7589
app.put("/events/calls/:id", (req, res, next) =>
7690
controller.updateCallEvent(req, res, next)
7791
);
92+
7893
app.post("/events/connected", (req, res, next) =>
7994
controller.handleConnectedEvent(req, res, next)
8095
);
96+
8197
app.put("/call-log", (req, res, next) =>
8298
controller.createOrUpdateCallLogsForEntities(req, res, next)
8399
);
100+
84101
app.put("/call-log/phoneNumber", (req, res, next) =>
85102
controller.createCallLogForPhoneNumber(req, res, next)
86103
);
104+
87105
app.get("/health", (req, res, next) => controller.getHealth(req, res, next));
106+
88107
app.get("/oauth2/redirect", (req, res, next) =>
89108
controller.oAuth2Redirect(req, res, next)
90109
);
110+
91111
app.get("/oauth2/callback", (req, res) =>
92112
controller.oAuth2Callback(req, res)
93113
);

src/middlewares/extract-header.middleware.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ export function extractHeaderMiddleware(
88
res: Response,
99
next: NextFunction
1010
): void {
11+
const userId = req.get("x-user-id") || "";
1112
const apiKey = req.get("x-provider-key") || "";
1213
const apiUrl = req.get("x-provider-url") || "";
1314
const locale = req.get("x-provider-locale") || DEFAULT_LOCALE;
1415

1516
req.providerConfig = {
17+
userId,
1618
apiKey,
1719
apiUrl,
1820
locale,

src/models/adapter.model.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import { IntegrationEntityType } from "./integration-entity.model";
1717
export interface Adapter {
1818
getToken?: (config: Config) => Promise<{ apiKey: string }>;
1919
getContacts?: (config: Config) => Promise<Contact[]>;
20+
streamContacts?: (config: Config) => AsyncGenerator<Contact[], void, unknown>;
2021
createContact?: (
2122
config: Config,
2223
contact: ContactTemplate

src/models/config.model.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export interface Config {
2+
userId: string;
23
apiKey: string;
34
apiUrl: string;
45
locale: string;

src/models/controller.model.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { Request, Response } from "express";
22
import {
3-
createRequest,
4-
createResponse,
53
MockRequest,
64
MockResponse,
5+
createRequest,
6+
createResponse,
77
} from "node-mocks-http";
88
import {
99
CalendarEvent,

src/models/controller.model.ts

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
ServerError,
1515
} from ".";
1616
import { calendarEventsSchema, contactsSchema } from "../schemas";
17+
import { isProduction } from "../util";
1718
import { shouldSkipCallEvent } from "../util/call-event.util";
1819
import { errorLogger, infoLogger } from "../util/logger.util";
1920
import { parsePhoneNumber } from "../util/phone-number-utils";
@@ -27,8 +28,10 @@ import {
2728
import { CacheItemStateType } from "./cache-item-state.model";
2829
import { CalendarFilterOptions } from "./calendar-filter-options.model";
2930
import { IntegrationErrorType } from "./integration-error.model";
31+
import { PubSubClient } from "./pubsub-client.model";
32+
import { PubSubContactsMessage } from "./pubsub-contacts-message.model";
3033

31-
const CONTACT_FETCH_TIMEOUT: number = 3000;
34+
const CONTACT_FETCH_TIMEOUT = 3000;
3235

3336
function sanitizeContact(contact: Contact, locale: string): Contact {
3437
const result: APIContact = {
@@ -44,11 +47,25 @@ export class Controller {
4447
private adapter: Adapter;
4548
private contactCache: ContactCache | null;
4649
private ajv: Ajv;
50+
private pubSubClient: PubSubClient | null = null;
4751

4852
constructor(adapter: Adapter, contactCache: ContactCache | null) {
4953
this.adapter = adapter;
5054
this.contactCache = contactCache;
5155
this.ajv = new Ajv();
56+
57+
const { PUBSUB_TOPIC_NAME: topicName } = process.env;
58+
59+
if (isProduction() && typeof this.adapter.streamContacts === "function") {
60+
if (!topicName) {
61+
throw new Error("No pubsub topic name provided.");
62+
}
63+
this.pubSubClient = new PubSubClient(topicName);
64+
infoLogger(
65+
"Controller",
66+
`Initialized PubSub client with topic ${topicName}`
67+
);
68+
}
5269
}
5370

5471
public async getContacts(
@@ -149,6 +166,102 @@ export class Controller {
149166
}
150167
}
151168

169+
public async streamContacts(
170+
req: BridgeRequest<unknown>,
171+
res: Response,
172+
next: NextFunction
173+
): Promise<void> {
174+
const { providerConfig } = req;
175+
176+
if (!providerConfig) {
177+
throw new ServerError(400, "Missing parameters");
178+
}
179+
180+
const timestamp = Date.now();
181+
182+
try {
183+
infoLogger(
184+
"streamContacts",
185+
`Starting contact streaming ${timestamp}`,
186+
providerConfig.apiKey
187+
);
188+
189+
const streamContacts = async () => {
190+
if (!this.adapter.streamContacts) {
191+
throw new ServerError(501, "Streaming contacts is not implemented");
192+
}
193+
194+
const iterator = this.adapter.streamContacts(providerConfig);
195+
196+
let result = await iterator.next();
197+
198+
while (!result.done) {
199+
const { value: contacts } = result;
200+
201+
try {
202+
if (!validate(this.ajv, contactsSchema, contacts)) {
203+
throw new Error("Invalid contacts received");
204+
}
205+
206+
const message: PubSubContactsMessage = {
207+
userId: providerConfig.userId,
208+
timestamp,
209+
contacts: contacts.map((contact) =>
210+
sanitizeContact(contact, providerConfig.locale)
211+
),
212+
};
213+
214+
await this.pubSubClient?.publishMessage(message);
215+
} catch (error) {
216+
errorLogger(
217+
"streamContacts",
218+
"Could not publish contacts",
219+
providerConfig.apiKey,
220+
error
221+
);
222+
} finally {
223+
result = await iterator.next();
224+
}
225+
}
226+
};
227+
228+
streamContacts().catch((error) =>
229+
errorLogger(
230+
"streamContacts",
231+
"Could not stream contacts",
232+
providerConfig.apiKey,
233+
error
234+
)
235+
);
236+
237+
if (this.adapter.getToken && req.providerConfig) {
238+
const { apiKey } = await this.adapter.getToken(req.providerConfig);
239+
res.header("X-Provider-Key", apiKey);
240+
}
241+
242+
infoLogger("streamContacts", "END", providerConfig.apiKey);
243+
244+
res.status(200).send({ timestamp });
245+
} catch (error: any) {
246+
// prevent logging of refresh errors
247+
if (
248+
error instanceof ServerError &&
249+
error.message === IntegrationErrorType.INTEGRATION_REFRESH_ERROR
250+
) {
251+
next(error);
252+
return;
253+
}
254+
255+
errorLogger(
256+
"streamContacts",
257+
"Could not stream contacts",
258+
providerConfig.apiKey,
259+
error
260+
);
261+
next(error);
262+
}
263+
}
264+
152265
public async createContact(
153266
req: BridgeRequest<ContactTemplate>,
154267
res: Response,

src/models/pubsub-client.model.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { PubSub } from "@google-cloud/pubsub";
2+
import { errorLogger, infoLogger } from "../util";
3+
import { PubSubContactsMessage } from "./pubsub-contacts-message.model";
4+
5+
export class PubSubClient {
6+
private client: PubSub;
7+
private topicName: string;
8+
9+
constructor(topicName: string) {
10+
this.client = new PubSub();
11+
this.topicName = topicName;
12+
}
13+
14+
async publishMessage(message: PubSubContactsMessage) {
15+
try {
16+
if (!this.topicName) {
17+
throw new Error("No pubsub topic name provided.");
18+
}
19+
20+
const json = JSON.stringify(message);
21+
const dataBuffer = Buffer.from(json);
22+
const topic = this.client.topic(this.topicName);
23+
await topic.publishMessage({ data: dataBuffer });
24+
25+
infoLogger(
26+
PubSubClient.name,
27+
`Published ${message.contacts.length} contacts for user ${message.userId} to topic ${this.topicName}`
28+
);
29+
} catch (error) {
30+
console.error(error);
31+
const message = (error as Error).message;
32+
errorLogger(PubSubClient.name, `Could not publish to pubsub: ${message}`);
33+
}
34+
}
35+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import { Contact } from "./contact.model";
2+
3+
export type PubSubContactsMessage = {
4+
userId: string;
5+
timestamp: number;
6+
contacts: Contact[];
7+
};

0 commit comments

Comments
 (0)