Skip to content

Commit 2965db1

Browse files
committed
✨ add message ordering to contact streaming messages
1 parent fddb96d commit 2965db1

File tree

2 files changed

+45
-34
lines changed

2 files changed

+45
-34
lines changed

src/models/controller.model.ts

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ export class Controller {
5050
private pubSubIntegrationEventsClient: PubSubClient<PubSubIntegrationsEventMessage> | null =
5151
null;
5252
private integrationName: string = 'UNKNOWN';
53+
54+
// used for garbage collection reasons, to prevent long running promises from getting canceled
5355
private streamingPromises = new Map<string, Promise<void>>();
5456

5557
constructor(adapter: Adapter, contactCache: ContactCache | null) {
@@ -288,10 +290,11 @@ export class Controller {
288290
}
289291

290292
const timestamp = Date.now();
293+
const orderingKey = `${userId}:${timestamp}`;
291294

292295
infoLogger(
293296
'streamContacts',
294-
`Starting contact streaming ${timestamp}`,
297+
`Starting contact streaming ${timestamp} - orderingKey ${orderingKey}`,
295298
providerConfig.apiKey,
296299
);
297300

@@ -303,7 +306,6 @@ export class Controller {
303306
const iterator = this.adapter.streamContacts(providerConfig);
304307

305308
let result = await iterator.next();
306-
307309
while (!result.done) {
308310
const { value: contacts } = result;
309311

@@ -312,18 +314,19 @@ export class Controller {
312314
throw new Error('Invalid contacts received');
313315
}
314316

315-
const message: PubSubContactsMessage = {
316-
userId,
317-
timestamp,
318-
contacts: contacts.map((contact) =>
319-
sanitizeContact(contact, providerConfig.locale),
320-
),
321-
state: PubSubContactsState.IN_PROGRESS,
322-
integrationName: this.integrationName,
323-
// traceparent: tracer.getTraceParent(),
324-
};
325-
326-
await this.pubSubContactStreamingClient?.publishMessage(message);
317+
await this.pubSubContactStreamingClient?.publishMessage(
318+
{
319+
userId,
320+
timestamp,
321+
contacts: contacts.map((contact) =>
322+
sanitizeContact(contact, providerConfig.locale),
323+
),
324+
state: PubSubContactsState.IN_PROGRESS,
325+
integrationName: this.integrationName,
326+
// traceparent: tracer.getTraceParent(),
327+
},
328+
orderingKey,
329+
);
327330
} catch (error) {
328331
errorLogger(
329332
'streamContacts',
@@ -339,14 +342,17 @@ export class Controller {
339342

340343
const streamingPromise = streamContacts()
341344
.then(() => {
342-
return this.pubSubContactStreamingClient?.publishMessage({
343-
userId: providerConfig.userId,
344-
timestamp,
345-
contacts: [],
346-
state: PubSubContactsState.COMPLETE,
347-
integrationName: this.integrationName,
348-
// traceparent: tracer.getTraceParent(),
349-
});
345+
return this.pubSubContactStreamingClient?.publishMessage(
346+
{
347+
userId: providerConfig.userId,
348+
timestamp,
349+
contacts: [],
350+
state: PubSubContactsState.COMPLETE,
351+
integrationName: this.integrationName,
352+
// traceparent: tracer.getTraceParent(),
353+
},
354+
orderingKey,
355+
);
350356
})
351357
.catch(async (error) => {
352358
errorLogger(
@@ -355,14 +361,17 @@ export class Controller {
355361
providerConfig.apiKey,
356362
error,
357363
);
358-
return this.pubSubContactStreamingClient?.publishMessage({
359-
userId: providerConfig.userId,
360-
timestamp,
361-
contacts: [],
362-
state: PubSubContactsState.FAILED,
363-
integrationName: this.integrationName,
364-
// traceparent: tracer.getTraceParent(),
365-
});
364+
return this.pubSubContactStreamingClient?.publishMessage(
365+
{
366+
userId: providerConfig.userId,
367+
timestamp,
368+
contacts: [],
369+
state: PubSubContactsState.FAILED,
370+
integrationName: this.integrationName,
371+
// traceparent: tracer.getTraceParent(),
372+
},
373+
orderingKey,
374+
);
366375
})
367376
.catch((error) => {
368377
errorLogger(

src/models/pubsub/pubsub-client.model.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { PubSub } from '@google-cloud/pubsub';
2-
import { timeout } from '../../util/timeout';
32
import { context, propagation } from '@opentelemetry/api';
3+
import { timeout } from '../../util/timeout';
44

55
const PUBLISH_TIMEOUT = 10_000;
66

@@ -13,7 +13,7 @@ export class PubSubClient<T> {
1313
this.topicName = topicName;
1414
}
1515

16-
async publishMessage(message: T) {
16+
async publishMessage(message: T, orderingKey?: string) {
1717
if (!this.topicName) {
1818
throw new Error('No pubsub topic name provided.');
1919
}
@@ -22,10 +22,12 @@ export class PubSubClient<T> {
2222

2323
const json = JSON.stringify(message);
2424
const dataBuffer = Buffer.from(json);
25-
const topic = this.client.topic(this.topicName);
25+
const topic = this.client.topic(this.topicName, {
26+
messageOrdering: orderingKey !== undefined,
27+
});
2628

2729
await Promise.race([
28-
topic.publishMessage({ data: dataBuffer }),
30+
topic.publishMessage({ data: dataBuffer, orderingKey }),
2931
timeout(
3032
PUBLISH_TIMEOUT,
3133
'Could not publish message in time. Did you forget to authenticate with GCP? (gcloud auth application-default login)',

0 commit comments

Comments
 (0)