Skip to content

Commit 740b006

Browse files
committed
feat: Realtime service for web sdk
1 parent a61c8be commit 740b006

File tree

2 files changed

+359
-0
lines changed

2 files changed

+359
-0
lines changed

src/SDK/Language/Web.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ public function getFiles(): array
4040
'destination' => 'src/services/{{service.name | caseKebab}}.ts',
4141
'template' => 'web/src/services/template.ts.twig',
4242
],
43+
[
44+
'scope' => 'default',
45+
'destination' => 'src/services/realtime.ts',
46+
'template' => 'web/src/services/realtime.ts.twig',
47+
],
4348
[
4449
'scope' => 'default',
4550
'destination' => 'src/models.ts',
Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
import { {{ spec.title | caseUcfirst}}Exception, Client } from '../client';
2+
3+
type RealtimeCallback = {
4+
channels: Set<string>;
5+
callback: (event: RealtimeResponseEvent) => void;
6+
}
7+
8+
type RealtimeResponseEvent = {
9+
events: string[];
10+
channels: string[];
11+
timestamp: string;
12+
payload: Record<string, any>;
13+
}
14+
15+
export type RealtimeSubscription = {
16+
close: () => Promise<void>;
17+
}
18+
19+
export class Realtime {
20+
private readonly TYPE_ERROR = 'error';
21+
private readonly TYPE_EVENT = 'event';
22+
private readonly TYPE_PONG = 'pong';
23+
private readonly DEBOUNCE_MS = 1;
24+
private readonly HEARTBEAT_INTERVAL = 20000; // 20 seconds in milliseconds
25+
26+
private client: Client;
27+
private socket?: WebSocket;
28+
private activeChannels = new Set<string>();
29+
private activeSubscriptions = new Map<number, RealtimeCallback>();
30+
private heartbeatTimer?: number;
31+
32+
private subCallDepth = 0;
33+
private reconnectAttempts = 0;
34+
private subscriptionsCounter = 0;
35+
private reconnect = true;
36+
37+
private onErrorCallbacks: Array<(error?: Error, statusCode?: number) => void> = [];
38+
private onCloseCallbacks: Array<() => void> = [];
39+
private onOpenCallbacks: Array<() => void> = [];
40+
41+
constructor(client: Client) {
42+
this.client = client;
43+
}
44+
45+
/**
46+
* Register a callback function to be called when an error occurs
47+
*
48+
* @param {Function} callback - Callback function to handle errors
49+
* @returns {void}
50+
*/
51+
public onError(callback: (error?: Error, statusCode?: number) => void): void {
52+
this.onErrorCallbacks.push(callback);
53+
}
54+
55+
/**
56+
* Register a callback function to be called when the connection closes
57+
*
58+
* @param {Function} callback - Callback function to handle connection close
59+
* @returns {void}
60+
*/
61+
public onClose(callback: () => void): void {
62+
this.onCloseCallbacks.push(callback);
63+
}
64+
65+
/**
66+
* Register a callback function to be called when the connection opens
67+
*
68+
* @param {Function} callback - Callback function to handle connection open
69+
* @returns {void}
70+
*/
71+
public onOpen(callback: () => void): void {
72+
this.onOpenCallbacks.push(callback);
73+
}
74+
75+
private startHeartbeat(): void {
76+
this.stopHeartbeat();
77+
this.heartbeatTimer = window.setInterval(() => {
78+
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
79+
this.socket.send(JSON.stringify({ type: 'ping' }));
80+
}
81+
}, this.HEARTBEAT_INTERVAL);
82+
}
83+
84+
private stopHeartbeat(): void {
85+
if (this.heartbeatTimer) {
86+
window.clearInterval(this.heartbeatTimer);
87+
this.heartbeatTimer = undefined;
88+
}
89+
}
90+
91+
private async createSocket(): Promise<void> {
92+
if (this.activeChannels.size === 0) {
93+
this.reconnect = false;
94+
await this.closeSocket();
95+
return;
96+
}
97+
98+
const projectId = this.client.config.project;
99+
if (!projectId) {
100+
throw new {{spec.title | caseUcfirst}}Exception('Missing project ID');
101+
}
102+
103+
let queryParams = `project=${projectId}`;
104+
for (const channel of this.activeChannels) {
105+
queryParams += `&channels[]=${encodeURIComponent(channel)}`;
106+
}
107+
108+
const endpoint = this.client.config.endpoint || '';
109+
const realtimeEndpoint = endpoint
110+
.replace('https://', 'wss://')
111+
.replace('http://', 'ws://');
112+
const url = `${realtimeEndpoint}/realtime?${queryParams}`;
113+
114+
if (this.socket) {
115+
this.reconnect = false;
116+
await this.closeSocket();
117+
}
118+
119+
return new Promise((resolve, reject) => {
120+
try {
121+
this.socket = new WebSocket(url);
122+
123+
this.socket.addEventListener('open', () => {
124+
this.reconnectAttempts = 0;
125+
this.onOpenCallbacks.forEach(callback => callback());
126+
this.startHeartbeat();
127+
resolve();
128+
});
129+
130+
this.socket.addEventListener('message', (event: MessageEvent) => {
131+
try {
132+
const message = JSON.parse(event.data);
133+
this.handleMessage(message);
134+
} catch (error) {
135+
console.error('Failed to parse message:', error);
136+
}
137+
});
138+
139+
this.socket.addEventListener('close', async () => {
140+
this.stopHeartbeat();
141+
this.onCloseCallbacks.forEach(callback => callback());
142+
143+
if (!this.reconnect) {
144+
this.reconnect = true;
145+
return;
146+
}
147+
148+
const timeout = this.getTimeout();
149+
console.log(`Realtime disconnected. Re-connecting in ${timeout / 1000} seconds.`);
150+
151+
await this.sleep(timeout);
152+
this.reconnectAttempts++;
153+
154+
try {
155+
await this.createSocket();
156+
} catch (error) {
157+
console.error('Failed to reconnect:', error);
158+
}
159+
});
160+
161+
this.socket.addEventListener('error', (event: Event) => {
162+
this.stopHeartbeat();
163+
const error = new Error('WebSocket error');
164+
console.error('WebSocket error:', error.message);
165+
this.onErrorCallbacks.forEach(callback => callback(error));
166+
reject(error);
167+
});
168+
} catch (error) {
169+
reject(error);
170+
}
171+
});
172+
}
173+
174+
private async closeSocket(): Promise<void> {
175+
this.stopHeartbeat();
176+
177+
if (this.socket) {
178+
return new Promise((resolve) => {
179+
if (!this.socket) {
180+
resolve();
181+
return;
182+
}
183+
184+
if (this.socket.readyState === WebSocket.OPEN ||
185+
this.socket.readyState === WebSocket.CONNECTING) {
186+
this.socket.addEventListener('close', () => {
187+
resolve();
188+
}, { once: true });
189+
this.socket.close();
190+
} else {
191+
resolve();
192+
}
193+
});
194+
}
195+
}
196+
197+
private getTimeout(): number {
198+
if (this.reconnectAttempts < 5) {
199+
return 1000;
200+
} else if (this.reconnectAttempts < 15) {
201+
return 5000;
202+
} else if (this.reconnectAttempts < 100) {
203+
return 10000;
204+
} else {
205+
return 60000;
206+
}
207+
}
208+
209+
private sleep(ms: number): Promise<void> {
210+
return new Promise(resolve => setTimeout(resolve, ms));
211+
}
212+
213+
/**
214+
* Subscribe to a single channel
215+
*
216+
* @param {string} channel - Channel name to subscribe to
217+
* @param {Function} callback - Callback function to handle events
218+
* @returns {Promise<RealtimeSubscription>} Subscription object with close method
219+
*/
220+
public async subscribe(
221+
channel: string,
222+
callback: (event: RealtimeResponseEvent) => void
223+
): Promise<RealtimeSubscription>;
224+
225+
/**
226+
* Subscribe to multiple channels
227+
*
228+
* @param {string[]} channels - Array of channel names to subscribe to
229+
* @param {Function} callback - Callback function to handle events
230+
* @returns {Promise<RealtimeSubscription>} Subscription object with close method
231+
*/
232+
public async subscribe(
233+
channels: string[],
234+
callback: (event: RealtimeResponseEvent) => void
235+
): Promise<RealtimeSubscription>;
236+
237+
public async subscribe(
238+
channelsOrChannel: string | string[],
239+
callback: (event: RealtimeResponseEvent) => void
240+
): Promise<RealtimeSubscription> {
241+
const channels = Array.isArray(channelsOrChannel)
242+
? new Set(channelsOrChannel)
243+
: new Set([channelsOrChannel]);
244+
245+
this.subscriptionsCounter++;
246+
const count = this.subscriptionsCounter;
247+
248+
for (const channel of channels) {
249+
this.activeChannels.add(channel);
250+
}
251+
252+
this.activeSubscriptions.set(count, {
253+
channels,
254+
callback
255+
});
256+
257+
this.subCallDepth++;
258+
259+
await this.sleep(this.DEBOUNCE_MS);
260+
261+
if (this.subCallDepth === 1) {
262+
await this.createSocket();
263+
}
264+
265+
this.subCallDepth--;
266+
267+
return {
268+
close: async () => {
269+
this.activeSubscriptions.delete(count);
270+
this.cleanUp(channels);
271+
await this.createSocket();
272+
}
273+
};
274+
}
275+
276+
private cleanUp(channels: Set<string>): void {
277+
this.activeChannels = new Set(
278+
Array.from(this.activeChannels).filter(channel => {
279+
if (!channels.has(channel)) {
280+
return true;
281+
}
282+
283+
const subsWithChannel = Array.from(this.activeSubscriptions.values())
284+
.filter(sub => sub.channels.has(channel));
285+
286+
return subsWithChannel.length > 0;
287+
})
288+
);
289+
}
290+
291+
private handleMessage(message: any): void {
292+
if (!message.type) {
293+
return;
294+
}
295+
296+
switch (message.type) {
297+
case this.TYPE_ERROR:
298+
this.handleResponseError(message);
299+
break;
300+
case this.TYPE_EVENT:
301+
this.handleResponseEvent(message);
302+
break;
303+
case this.TYPE_PONG:
304+
// Handle pong response if needed
305+
break;
306+
}
307+
}
308+
309+
private handleResponseError(message: any): void {
310+
throw new {{spec.title | caseUcfirst}}Exception(
311+
message.message || message.data?.message || 'Unknown error'
312+
);
313+
}
314+
315+
private handleResponseEvent(message: any): void {
316+
const data = message.data;
317+
if (!data) {
318+
return;
319+
}
320+
321+
const channels = data.channels as string[];
322+
const events = data.events as string[];
323+
const payload = data.payload as Record<string, any>;
324+
const timestamp = data.timestamp as string;
325+
326+
if (!channels || !events || !payload) {
327+
return;
328+
}
329+
330+
const hasActiveChannel = channels.some(channel =>
331+
this.activeChannels.has(channel)
332+
);
333+
334+
if (!hasActiveChannel) {
335+
return;
336+
}
337+
338+
for (const [_, subscription] of this.activeSubscriptions) {
339+
const hasSubscribedChannel = channels.some(channel =>
340+
subscription.channels.has(channel)
341+
);
342+
343+
if (hasSubscribedChannel) {
344+
const response: RealtimeResponseEvent = {
345+
events,
346+
channels,
347+
timestamp,
348+
payload
349+
};
350+
subscription.callback(response);
351+
}
352+
}
353+
}
354+
}

0 commit comments

Comments
 (0)