Skip to content

Commit f583fc8

Browse files
committed
fix(nats): Implemented new system pub/sub core channel and moved sendAnalyticsData to use it.
1 parent 45fdc05 commit f583fc8

File tree

3 files changed

+152
-46
lines changed

3 files changed

+152
-46
lines changed

src/components/whiteboard/helpers/handleRequests.ts

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
AnalyticsEventType,
1111
DataMsgBodyType,
1212
} from 'plugnmeet-protocol-js';
13+
import { throttle } from 'es-toolkit';
1314

1415
import { store } from '../../../store';
1516
import { updateRequestedWhiteboardData } from '../../../store/slices/whiteboard';
@@ -23,7 +24,29 @@ const broadcastedElementVersions: Map<string, number> = new Map(),
2324
DELETED_ELEMENT_TIMEOUT = 3 * 60 * 60 * 1000; // 3 hours
2425
let preScrollX = 0,
2526
preScrollY = 0,
26-
conn: ConnectNats;
27+
conn: ConnectNats,
28+
annotatedAnalyticsBatchCounter = 0;
29+
30+
const flushAnnotatedAnalyticsData = () => {
31+
if (annotatedAnalyticsBatchCounter > 0) {
32+
if (!conn) {
33+
conn = getNatsConn();
34+
}
35+
conn.sendAnalyticsData(
36+
AnalyticsEvents.ANALYTICS_EVENT_USER_WHITEBOARD_ANNOTATED,
37+
AnalyticsEventType.USER,
38+
'',
39+
'',
40+
annotatedAnalyticsBatchCounter.toString(),
41+
);
42+
annotatedAnalyticsBatchCounter = 0; // Reset after sending
43+
}
44+
};
45+
46+
// Initialize the throttled function once at module load
47+
const throttledFlushAnalytics = throttle(flushAnnotatedAnalyticsData, 5000, {
48+
edges: ['trailing'],
49+
});
2750

2851
export const sendRequestedForWhiteboardData = async () => {
2952
if (!conn) {
@@ -171,13 +194,9 @@ export const broadcastScreenDataByNats = async (
171194
JSON.stringify(elements),
172195
sendTo,
173196
);
174-
conn.sendAnalyticsData(
175-
AnalyticsEvents.ANALYTICS_EVENT_USER_WHITEBOARD_ANNOTATED,
176-
AnalyticsEventType.USER,
177-
'',
178-
'',
179-
'1',
180-
);
197+
annotatedAnalyticsBatchCounter++;
198+
// Trigger the throttled flush whenever there's activity
199+
throttledFlushAnalytics();
181200
};
182201

183202
export const broadcastCurrentPageNumber = async (

src/helpers/nats/ConnectNats.ts

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ const RENEW_TOKEN_FREQUENT = 3 * 60 * 1000,
5858
PING_INTERVAL = 10 * 1000,
5959
STATUS_CHECKER_INTERVAL = 500,
6060
USERS_SYNC_INTERVAL = 30 * 1000,
61-
MAX_MISSED_PONGS = 6;
61+
MAX_MISSED_PONGS = 12;
6262
export type PrivateDataDeliveryType = 'CHAT' | 'DATA_MSG';
6363

6464
export default class ConnectNats {
@@ -263,6 +263,7 @@ export default class ConnectNats {
263263
pingInterval: PING_INTERVAL,
264264
noEcho: true,
265265
});
266+
this.messageQueue.setNc(this._nc);
266267

267268
console.info(`connected ${this._nc.getServer()}`);
268269
} catch (e) {
@@ -431,12 +432,19 @@ export default class ConnectNats {
431432
}
432433
}
433434

435+
/**
436+
* Sends a message to the system worker with guaranteed delivery.
437+
* This method uses JetStream to ensure the message is received by the server.
438+
* It is used for critical messages like PINGs, token renewals, and private data delivery.
439+
* @param data The message to send.
440+
*/
434441
public sendMessageToSystemWorker = (data: NatsMsgClientToServer) => {
435442
const subject =
436443
this._subjects.systemJsWorker + '.' + this._roomId + '.' + this._userId;
437444
this.messageQueue.addToQueue({
438445
subject,
439446
payload: toBinary(NatsMsgClientToServerSchema, data),
447+
useJetStream: true,
440448
});
441449
};
442450

@@ -495,6 +503,12 @@ export default class ConnectNats {
495503
return undefined;
496504
}
497505

506+
/**
507+
* Sends a chat message.
508+
* Private messages are sent via the system worker with JetStream's guaranteed delivery.
509+
* Public messages are sent as fire-and-forget core NATS messages to the public chat subject.
510+
* Both are managed by the MessageQueue.
511+
*/
498512
public sendChatMsg = async (to: string, msg: string) => {
499513
if (!this._nc || this._nc.isClosed()) {
500514
return;
@@ -551,7 +565,10 @@ export default class ConnectNats {
551565
this.sendPrivateData(payload, 'CHAT', to, false);
552566
} else {
553567
const subject = `${this._subjects.chat}.${this._roomId}`;
554-
this._nc.publish(subject, payload);
568+
this.messageQueue.addToQueue({
569+
subject,
570+
payload,
571+
});
555572
}
556573

557574
if (isPrivate) {
@@ -576,6 +593,11 @@ export default class ConnectNats {
576593
await this.subscriptionHandler.handleChat.handleMsg(chatMessage);
577594
};
578595

596+
/**
597+
* Sends whiteboard data as a fire-and-forget message.
598+
* This method uses the core NATS `publish` method via the `MessageQueue`
599+
* to avoid blocking critical messages.
600+
*/
579601
public sendWhiteboardData = async (
580602
type: DataMsgBodyType,
581603
msg: string,
@@ -602,11 +624,17 @@ export default class ConnectNats {
602624
}
603625

604626
const subject = `${this._subjects.whiteboard}.${this._roomId}`;
605-
this._nc.publish(subject, payload);
627+
this.messageQueue.addToQueue({
628+
subject,
629+
payload,
630+
});
606631
};
607632

608633
/**
609-
* sendDataMessage method mostly use to communicate between clients
634+
* Sends a generic data message.
635+
* Private messages are sent via the system worker with JetStream's guaranteed delivery.
636+
* Public messages are sent as fire-and-forget core NATS messages to the public data channel subject.
637+
* Both are managed by the MessageQueue.
610638
*/
611639
public sendDataMessage = async (
612640
type: DataMsgBodyType,
@@ -637,10 +665,19 @@ export default class ConnectNats {
637665
this.sendPrivateData(payload, 'DATA_MSG', to, false);
638666
} else {
639667
const subject = `${this._subjects.dataChannel}.${this._roomId}`;
640-
this._nc.publish(subject, payload);
668+
this.messageQueue.addToQueue({
669+
subject,
670+
payload,
671+
});
641672
}
642673
};
643674

675+
/**
676+
* Sends analytics data to the server as a lightweight, fire-and-forget message.
677+
* This method uses the core NATS `publish` method via the `MessageQueue`
678+
* by publishing to the `systemJsWorker` subject without the `useJetStream` flag.
679+
* This avoids blocking critical JetStream messages.
680+
*/
644681
public sendAnalyticsData = (
645682
event_name: AnalyticsEvents,
646683
event_type: AnalyticsEventType = AnalyticsEventType.USER,
@@ -662,7 +699,13 @@ export default class ConnectNats {
662699
event: NatsMsgClientToServerEvents.PUSH_ANALYTICS_DATA,
663700
msg: toJsonString(AnalyticsDataMsgSchema, analyticsMsg),
664701
});
665-
this.sendMessageToSystemWorker(data);
702+
703+
const subject =
704+
this._subjects.systemJsWorker + '.' + this._roomId + '.' + this._userId;
705+
this.messageQueue.addToQueue({
706+
subject,
707+
payload: toBinary(NatsMsgClientToServerSchema, data),
708+
});
666709
};
667710

668711
private startTokenRenewInterval() {
@@ -686,7 +729,7 @@ export default class ConnectNats {
686729

687730
private startPingToServer() {
688731
const ping = async () => {
689-
if (this.missedPongs === 3) {
732+
if (this.missedPongs === 6) {
690733
this.pongMissedToastId = toast.loading(
691734
i18n.t('notifications.server-not-responding'),
692735
{

src/helpers/nats/MessageQueue.ts

Lines changed: 75 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { JetStreamClient, JetStreamError } from '@nats-io/jetstream';
2-
import { errors } from '@nats-io/nats-core';
2+
import { errors, NatsConnection } from '@nats-io/nats-core';
33

44
import { formatNatsError } from '../utils';
55
import { store } from '../../store';
@@ -9,10 +9,13 @@ import i18n from '../i18n';
99
interface Message {
1010
subject: string;
1111
payload: any;
12+
useJetStream?: boolean;
13+
retries?: number;
1214
}
1315

1416
// A Map where the key is the full, unique NATS subject.
1517
type SubjectQueues = Map<string, Array<Message>>;
18+
const MAX_RETRIES = 2; // Max number of retries for transient errors
1619

1720
/**
1821
* A message queue that processes messages in independent, parallel queues
@@ -23,10 +26,10 @@ type SubjectQueues = Map<string, Array<Message>>;
2326
export default class MessageQueue {
2427
private _isConnected: boolean = false;
2528
private _js: JetStreamClient | undefined;
29+
private _nc: NatsConnection | undefined;
2630

2731
private readonly queues: SubjectQueues = new Map();
2832
private readonly processingSubjects: Set<string> = new Set();
29-
3033
private _isHoldingNotificationShown = false;
3134

3235
public setIsConnected = (value: boolean) => {
@@ -42,6 +45,10 @@ export default class MessageQueue {
4245
this._js = value;
4346
};
4447

48+
public setNc = (value: NatsConnection) => {
49+
this._nc = value;
50+
};
51+
4552
public addToQueue = (message: Message) => {
4653
const { subject } = message;
4754

@@ -65,62 +72,99 @@ export default class MessageQueue {
6572
}
6673

6774
this.processingSubjects.add(subject);
68-
6975
const request = subjectQueue[0];
7076

7177
try {
72-
if (!this._js) {
73-
return;
78+
if (request.useJetStream) {
79+
// Assuming this._js is always available if _isConnected is true
80+
await this._js!.publish(request.subject, request.payload);
81+
} else {
82+
// Assuming this._nc is always available if _isConnected is true
83+
this._nc!.publish(request.subject, request.payload);
7484
}
7585

76-
await this._js.publish(request.subject, request.payload);
86+
// Message sent successfully.
7787
subjectQueue.shift();
88+
this.processingSubjects.delete(subject);
7889
this._isHoldingNotificationShown = false;
90+
if (subjectQueue.length > 0) {
91+
this.processSubjectQueue(subject).then();
92+
}
7993
} catch (e: any) {
94+
const formattedError = formatNatsError(e); // Format the error here
95+
96+
// Check for transient errors that are eligible for retry.
8097
if (
81-
e instanceof errors.TimeoutError ||
82-
e instanceof errors.NoRespondersError ||
83-
e instanceof JetStreamError
98+
request.useJetStream && // Only retry JetStream messages
99+
(e instanceof errors.TimeoutError ||
100+
e instanceof errors.NoRespondersError ||
101+
e instanceof JetStreamError)
84102
) {
85-
console.error(
86-
`NATS transient error for subject '${subject}': ${e.message}. Holding queue.`,
87-
);
88-
if (e.message.includes('connection draining')) {
89-
return;
90-
}
91-
if (!this._isHoldingNotificationShown) {
92-
const msg = formatNatsError(e);
103+
request.retries = (request.retries || 0) + 1;
104+
105+
if (request.retries > MAX_RETRIES) {
106+
// Exceeded max retries, discard the message.
107+
console.error(
108+
`NATS message for subject '${subject}' failed after ${MAX_RETRIES} retries. Discarding.`,
109+
{ error: formattedError, message: request },
110+
);
93111
store.dispatch(
94112
addUserNotification({
95-
message: i18n.t('notifications.queue-holding-messages', {
96-
error: msg,
113+
message: i18n.t('notifications.queue-discarded-message', {
114+
error: formattedError,
97115
}),
98-
typeOption: 'warning',
116+
typeOption: 'error',
99117
}),
100118
);
101-
this._isHoldingNotificationShown = true;
119+
120+
subjectQueue.shift(); // Discard
121+
this.processingSubjects.delete(subject);
122+
if (subjectQueue.length > 0) {
123+
this.processSubjectQueue(subject).then();
124+
}
125+
} else {
126+
// Retry with a delay.
127+
console.warn(
128+
`NATS transient error for subject '${subject}'. Retrying (${request.retries}/${MAX_RETRIES})...`,
129+
{ error: formattedError },
130+
);
131+
if (!this._isHoldingNotificationShown) {
132+
store.dispatch(
133+
addUserNotification({
134+
message: i18n.t('notifications.queue-holding-messages', {
135+
error: formattedError,
136+
}),
137+
typeOption: 'warning',
138+
}),
139+
);
140+
this._isHoldingNotificationShown = true;
141+
}
142+
143+
this.processingSubjects.delete(subject);
144+
setTimeout(() => {
145+
this.processSubjectQueue(subject).then();
146+
}, 500 * request.retries); // Simple exponential backoff
102147
}
103-
return;
104148
} else {
149+
// This is a non-recoverable or non-JetStream error. Discard immediately.
105150
console.error(
106-
`Found poison message for subject '${subject}'. Discarding.`,
107-
{ error: e.message, message: request },
151+
`Found poison message or non-retryable error for subject '${subject}'. Discarding.`,
152+
{ error: formattedError, message: request }, // Use formatted error
108153
);
109154
store.dispatch(
110155
addUserNotification({
111156
message: i18n.t('notifications.queue-discarded-message', {
112-
error: e.message,
157+
error: formattedError, // Use formatted error
113158
}),
114159
typeOption: 'error',
115160
}),
116161
);
117-
subjectQueue.shift();
118-
}
119-
} finally {
120-
this.processingSubjects.delete(subject);
121162

122-
if (subjectQueue.length > 0) {
123-
this.processSubjectQueue(subject).then();
163+
subjectQueue.shift(); // Discard
164+
this.processingSubjects.delete(subject);
165+
if (subjectQueue.length > 0) {
166+
this.processSubjectQueue(subject).then();
167+
}
124168
}
125169
}
126170
}

0 commit comments

Comments
 (0)