Skip to content

Commit 704a5db

Browse files
authored
fix: onHeartbeat; Expo comparability; general improvements (#470)
1 parent 3c5ea35 commit 704a5db

File tree

6 files changed

+120
-90
lines changed

6 files changed

+120
-90
lines changed

package.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,15 @@
6262
"typescript": "^5.7.3",
6363
"vitest": "^2.1.9",
6464
"web-worker": "1.2.0"
65+
},
66+
"exports": {
67+
".": {
68+
"import": "./dist/module/index.js",
69+
"require": "./dist/main/index.js"
70+
},
71+
"./websocket": {
72+
"node": "./src/node.js",
73+
"default": "./src/native.js"
74+
}
6575
}
6676
}

src/RealtimeChannel.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,8 @@ export default class RealtimeChannel {
279279
})
280280
} else {
281281
this.unsubscribe()
282+
this.state = CHANNEL_STATES.errored
283+
282284
callback?.(
283285
REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR,
284286
new Error(
@@ -296,6 +298,7 @@ export default class RealtimeChannel {
296298
}
297299
})
298300
.receive('error', (error: { [key: string]: any }) => {
301+
this.state = CHANNEL_STATES.errored
299302
callback?.(
300303
REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR,
301304
new Error(
@@ -511,8 +514,6 @@ export default class RealtimeChannel {
511514
this._trigger(CHANNEL_EVENTS.close, 'leave', this._joinRef())
512515
}
513516

514-
this.rejoinTimer.reset()
515-
// Destroy joinPush to avoid connection timeouts during unscription phase
516517
this.joinPush.destroy()
517518

518519
return new Promise((resolve) => {
@@ -536,6 +537,16 @@ export default class RealtimeChannel {
536537
}
537538
})
538539
}
540+
/**
541+
* Teardown the channel.
542+
*
543+
* Destroys and stops related timers.
544+
*/
545+
teardown() {
546+
this.pushBuffer.forEach((push: Push) => push.destroy())
547+
this.rejoinTimer && clearTimeout(this.rejoinTimer.timer)
548+
this.joinPush.destroy()
549+
}
539550

540551
/** @internal */
541552

src/RealtimeClient.ts

Lines changed: 24 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { WebSocket as WSWebSocket } from 'ws'
1+
import WebSocket from './WebSocket'
22

33
import {
44
CHANNEL_EVENTS,
@@ -17,6 +17,7 @@ import Timer from './lib/timer'
1717
import { httpEndpointURL } from './lib/transformers'
1818
import RealtimeChannel from './RealtimeChannel'
1919
import type { RealtimeChannelOptions } from './RealtimeChannel'
20+
import Push from './lib/push'
2021

2122
type Fetch = typeof fetch
2223

@@ -54,7 +55,7 @@ export interface WebSocketLikeConstructor {
5455
): WebSocketLike
5556
}
5657

57-
export type WebSocketLike = WebSocket | WSWebSocket | WSWebSocketDummy
58+
export type WebSocketLike = WebSocket | WSWebSocketDummy
5859

5960
export interface WebSocketLikeError {
6061
error: any
@@ -81,17 +82,17 @@ export type RealtimeClientOptions = {
8182
accessToken?: () => Promise<string | null>
8283
}
8384

84-
const NATIVE_WEBSOCKET_AVAILABLE = typeof WebSocket !== 'undefined'
8585
const WORKER_SCRIPT = `
8686
addEventListener("message", (e) => {
8787
if (e.data.event === "start") {
8888
setInterval(() => postMessage({ event: "keepAlive" }), e.data.interval);
8989
}
9090
});`
91+
9192
export default class RealtimeClient {
9293
accessTokenValue: string | null = null
9394
apiKey: string | null = null
94-
channels: Set<RealtimeChannel> = new Set()
95+
channels: RealtimeChannel[] = new Array()
9596
endPoint: string = ''
9697
httpEndpoint: string = ''
9798
headers?: { [key: string]: string } = DEFAULT_HEADERS
@@ -209,33 +210,21 @@ export default class RealtimeClient {
209210
if (this.conn) {
210211
return
211212
}
212-
213+
if (!this.transport) {
214+
this.transport = WebSocket
215+
}
213216
if (this.transport) {
214217
this.conn = new this.transport(this.endpointURL(), undefined, {
215218
headers: this.headers,
216219
})
217220
this.setupConnection()
218221
return
219222
}
220-
221-
if (NATIVE_WEBSOCKET_AVAILABLE) {
222-
this.conn = new WebSocket(this.endpointURL())
223-
this.setupConnection()
224-
return
225-
}
226-
227223
this.conn = new WSWebSocketDummy(this.endpointURL(), undefined, {
228224
close: () => {
229225
this.conn = null
230226
},
231227
})
232-
233-
import('ws').then(({ default: WS }) => {
234-
this.conn = new WS(this.endpointURL(), undefined, {
235-
headers: this.headers,
236-
})
237-
this.setupConnection()
238-
})
239228
}
240229

241230
/**
@@ -264,17 +253,19 @@ export default class RealtimeClient {
264253
this.conn.close()
265254
}
266255
this.conn = null
256+
267257
// remove open handles
268258
this.heartbeatTimer && clearInterval(this.heartbeatTimer)
269259
this.reconnectTimer.reset()
260+
this.channels.forEach((channel) => channel.teardown())
270261
}
271262
}
272263

273264
/**
274265
* Returns all created channels
275266
*/
276267
getChannels(): RealtimeChannel[] {
277-
return Array.from(this.channels)
268+
return this.channels
278269
}
279270

280271
/**
@@ -285,9 +276,12 @@ export default class RealtimeClient {
285276
channel: RealtimeChannel
286277
): Promise<RealtimeRemoveChannelResponse> {
287278
const status = await channel.unsubscribe()
288-
if (this.channels.size === 0) {
279+
this.channels = this.channels.filter((c) => c._joinRef !== channel._joinRef)
280+
281+
if (this.channels.length === 0) {
289282
this.disconnect()
290283
}
284+
291285
return status
292286
}
293287

@@ -296,13 +290,10 @@ export default class RealtimeClient {
296290
*/
297291
async removeAllChannels(): Promise<RealtimeRemoveChannelResponse[]> {
298292
const values_1 = await Promise.all(
299-
Array.from(this.channels).map((channel) => {
300-
this.channels.delete(channel)
301-
return channel.unsubscribe()
302-
})
293+
this.channels.map((channel) => channel.unsubscribe())
303294
)
295+
this.channels = []
304296
this.disconnect()
305-
306297
return values_1
307298
}
308299

@@ -349,7 +340,8 @@ export default class RealtimeClient {
349340

350341
if (!exists) {
351342
const chan = new RealtimeChannel(`realtime:${topic}`, params, this)
352-
this.channels.add(chan)
343+
this.channels.push(chan)
344+
353345
return chan
354346
} else {
355347
return exists
@@ -492,7 +484,7 @@ export default class RealtimeClient {
492484
* @internal
493485
*/
494486
_leaveOpenTopic(topic: string): void {
495-
let dupChannel = Array.from(this.channels).find(
487+
let dupChannel = this.channels.find(
496488
(c) => c.topic === topic && (c._isJoined() || c._isJoining())
497489
)
498490
if (dupChannel) {
@@ -509,7 +501,7 @@ export default class RealtimeClient {
509501
* @internal
510502
*/
511503
_remove(channel: RealtimeChannel) {
512-
this.channels.delete(channel)
504+
this.channels = this.channels.filter((c) => c.topic !== channel.topic)
513505
}
514506

515507
/**
@@ -560,7 +552,7 @@ export default class RealtimeClient {
560552
}
561553

562554
/** @internal */
563-
private async _onConnOpen() {
555+
private _onConnOpen() {
564556
this.log('transport', `connected to ${this.endpointURL()}`)
565557
this.flushSendBuffer()
566558
this.reconnectTimer.reset()
@@ -576,11 +568,10 @@ export default class RealtimeClient {
576568
} else {
577569
this.log('worker', `starting default worker`)
578570
}
579-
580571
const objectUrl = this._workerObjectUrl(this.workerUrl!)
581572
this.workerRef = new Worker(objectUrl)
582573
this.workerRef.onerror = (error) => {
583-
this.log('worker', 'worker error', error.message)
574+
this.log('worker', 'worker error', (error as ErrorEvent).message)
584575
this.workerRef!.terminate()
585576
}
586577
this.workerRef.onmessage = (event) => {
@@ -593,12 +584,10 @@ export default class RealtimeClient {
593584
interval: this.heartbeatIntervalMs,
594585
})
595586
}
596-
597-
this.stateChangeCallbacks.open.forEach((callback) => callback())!
587+
this.stateChangeCallbacks.open.forEach((callback) => callback())
598588
}
599589

600590
/** @internal */
601-
602591
private _onConnClose(event: any) {
603592
this.log('transport', 'close', event)
604593
this._triggerChanError()
@@ -631,7 +620,6 @@ export default class RealtimeClient {
631620
}
632621
const prefix = url.match(/\?/) ? '&' : '?'
633622
const query = new URLSearchParams(params)
634-
635623
return `${url}${prefix}${query}`
636624
}
637625

src/WebSocket.native.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
// Native/browser WebSocket entry point
2+
const NativeWebSocket = typeof WebSocket !== 'undefined' ? WebSocket : undefined
3+
4+
export default NativeWebSocket

src/WebSocket.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
// Node.js WebSocket entry point
2+
import WebSocket from 'ws'
3+
4+
export default WebSocket

0 commit comments

Comments
 (0)