Skip to content

Commit 04f66b4

Browse files
committed
feat: replace existing Realtime API with new Realtime API
1 parent f37643c commit 04f66b4

File tree

4 files changed

+58
-146
lines changed

4 files changed

+58
-146
lines changed

src/SupabaseClient.ts

Lines changed: 29 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,13 @@
11
import { FunctionsClient } from '@supabase/functions-js'
22
import { AuthChangeEvent } from '@supabase/gotrue-js'
3-
import { PostgrestClient } from '@supabase/postgrest-js'
4-
import {
5-
RealtimeChannel,
6-
RealtimeClient,
7-
RealtimeClientOptions,
8-
RealtimeSubscription,
9-
} from '@supabase/realtime-js'
3+
import { PostgrestClient, PostgrestQueryBuilder } from '@supabase/postgrest-js'
4+
import { RealtimeChannel, RealtimeClient, RealtimeClientOptions } from '@supabase/realtime-js'
105
import { SupabaseStorageClient } from '@supabase/storage-js'
116
import { DEFAULT_HEADERS, STORAGE_KEY } from './lib/constants'
127
import { fetchWithAuth } from './lib/fetch'
138
import { isBrowser, stripTrailingSlash } from './lib/helpers'
149
import { SupabaseAuthClient } from './lib/SupabaseAuthClient'
15-
import { SupabaseQueryBuilder } from './lib/SupabaseQueryBuilder'
10+
import { SupabaseRealtimeClient } from './lib/SupabaseRealtimeClient'
1611
import { Fetch, SupabaseClientOptions } from './lib/types'
1712

1813
const DEFAULT_OPTIONS = {
@@ -129,13 +124,11 @@ export default class SupabaseClient {
129124
*
130125
* @param table The table name to operate on.
131126
*/
132-
from<T = any>(table: string): SupabaseQueryBuilder<T> {
127+
from<T = any>(table: string): PostgrestQueryBuilder<T> {
133128
const url = `${this.restUrl}/${table}`
134-
return new SupabaseQueryBuilder<T>(url, {
129+
return new PostgrestQueryBuilder<T>(url, {
135130
headers: this.headers,
136131
schema: this.schema,
137-
realtime: this.realtime,
138-
table,
139132
fetch: this.fetch,
140133
shouldThrowOnError: this.shouldThrowOnError,
141134
})
@@ -164,32 +157,31 @@ export default class SupabaseClient {
164157

165158
/**
166159
* Creates a channel with Broadcast and Presence.
167-
* Activated when vsndate query param is present in the WebSocket URL.
168160
*/
169-
channel(name: string, opts: { selfBroadcast: boolean; [key: string]: any }): RealtimeChannel {
170-
const userToken = this.auth.session()?.access_token ?? this.supabaseKey
161+
channel(name: string, opts?: { [key: string]: any }): SupabaseRealtimeClient {
162+
const token = this.realtime.accessToken ?? this.supabaseKey
171163

172164
if (!this.realtime.isConnected()) {
173165
this.realtime.connect()
174166
}
175167

176-
return this.realtime.channel(name, { ...opts, user_token: userToken }) as RealtimeChannel
168+
return new SupabaseRealtimeClient(this.realtime, name, token, opts)
177169
}
178170

179171
/**
180-
* Closes and removes all subscriptions and returns a list of removed
181-
* subscriptions and their errors.
172+
* Closes and removes all channels and returns a list of removed
173+
* channels and their errors.
182174
*/
183-
async removeAllSubscriptions(): Promise<
184-
{ data: { subscription: RealtimeSubscription }; error: Error | null }[]
175+
async removeAllChannels(): Promise<
176+
{ data: { channels: RealtimeChannel }; error: Error | null }[]
185177
> {
186-
const allSubs: RealtimeSubscription[] = this.getSubscriptions().slice()
187-
const allSubPromises = allSubs.map((sub) => this.removeSubscription(sub))
188-
const allRemovedSubs = await Promise.all(allSubPromises)
178+
const allChans: RealtimeChannel[] = this.getChannels().slice()
179+
const allChanPromises = allChans.map((chan) => this.removeChannel(chan))
180+
const allRemovedChans = await Promise.all(allChanPromises)
189181

190-
return allRemovedSubs.map(({ error }, i) => {
182+
return allRemovedChans.map(({ error }, i) => {
191183
return {
192-
data: { subscription: allSubs[i] },
184+
data: { channels: allChans[i] },
193185
error,
194186
}
195187
})
@@ -203,58 +195,37 @@ export default class SupabaseClient {
203195
async removeChannel(
204196
channel: RealtimeChannel
205197
): Promise<{ data: { openChannels: number }; error: Error | null }> {
206-
const { error } = await this._closeSubscription(channel)
207-
const allChans: RealtimeSubscription[] = this.getSubscriptions()
198+
const { error } = await this._closeChannel(channel)
199+
const allChans: RealtimeChannel[] = this.getChannels()
208200
const openChanCount = allChans.filter((chan) => chan.isJoined()).length
209201

210202
if (allChans.length === 0) await this.realtime.disconnect()
211203

212204
return { data: { openChannels: openChanCount }, error }
213205
}
214206

215-
/**
216-
* Closes and removes a subscription and returns the number of open subscriptions.
217-
*
218-
* @param subscription The subscription you want to close and remove.
219-
*/
220-
async removeSubscription(
221-
subscription: RealtimeSubscription
222-
): Promise<{ data: { openSubscriptions: number }; error: Error | null }> {
223-
const { error } = await this._closeSubscription(subscription)
224-
const allSubs: RealtimeSubscription[] = this.getSubscriptions()
225-
const openSubCount = allSubs.filter((chan) => chan.isJoined()).length
226-
227-
if (allSubs.length === 0) await this.realtime.disconnect()
228-
229-
return { data: { openSubscriptions: openSubCount }, error }
230-
}
231-
232207
private async _getAccessToken() {
233208
const { session } = await this.auth.getSession()
234209

235210
return session?.access_token ?? null
236211
}
237212

238-
private async _closeSubscription(
239-
subscription: RealtimeSubscription | RealtimeChannel
240-
): Promise<{ error: Error | null }> {
213+
private async _closeChannel(channel: RealtimeChannel): Promise<{ error: Error | null }> {
241214
let error = null
242215

243-
if (!subscription.isClosed()) {
244-
const { error: unsubError } = await this._unsubscribeSubscription(subscription)
216+
if (!channel.isClosed()) {
217+
const { error: unsubError } = await this._unsubscribeChannel(channel)
245218
error = unsubError
246219
}
247220

248-
this.realtime.remove(subscription)
221+
this.realtime.remove(channel)
249222

250223
return { error }
251224
}
252225

253-
private _unsubscribeSubscription(
254-
subscription: RealtimeSubscription | RealtimeChannel
255-
): Promise<{ error: Error | null }> {
226+
private _unsubscribeChannel(channel: RealtimeChannel): Promise<{ error: Error | null }> {
256227
return new Promise((resolve) => {
257-
subscription
228+
channel
258229
.unsubscribe()
259230
.receive('ok', () => resolve({ error: null }))
260231
.receive('error', (error: Error) => resolve({ error }))
@@ -263,10 +234,10 @@ export default class SupabaseClient {
263234
}
264235

265236
/**
266-
* Returns an array of all your subscriptions.
237+
* Returns an array of all your channels.
267238
*/
268-
getSubscriptions(): RealtimeSubscription[] {
269-
return this.realtime.channels as RealtimeSubscription[]
239+
getChannels(): RealtimeChannel[] {
240+
return this.realtime.channels as RealtimeChannel[]
270241
}
271242

272243
private _initSupabaseAuthClient({
@@ -299,7 +270,7 @@ export default class SupabaseClient {
299270
private _initRealtimeClient(options?: RealtimeClientOptions) {
300271
return new RealtimeClient(this.realtimeUrl, {
301272
...options,
302-
params: { ...options?.params, apikey: this.supabaseKey },
273+
params: { ...{ apikey: this.supabaseKey, vsndate: '2022' }, ...options?.params },
303274
})
304275
}
305276

src/lib/SupabaseQueryBuilder.ts

Lines changed: 0 additions & 61 deletions
This file was deleted.

src/lib/SupabaseRealtimeClient.ts

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
1-
import { RealtimeSubscription, RealtimeClient, Transformers } from '@supabase/realtime-js'
2-
import { GenericObject, SupabaseEventTypes, SupabaseRealtimePayload } from './types'
1+
import { RealtimeChannel, RealtimeClient, Transformers } from '@supabase/realtime-js'
2+
import { GenericObject, SupabaseRealtimePayload } from './types'
33

44
export class SupabaseRealtimeClient {
5-
subscription: RealtimeSubscription
5+
channel: RealtimeChannel
66

7-
constructor(socket: RealtimeClient, headers: GenericObject, schema: string, tableName: string) {
8-
const chanParams: GenericObject = {}
9-
const topic = tableName === '*' ? `realtime:${schema}` : `realtime:${schema}:${tableName}`
10-
const userToken = headers['Authorization'].split(' ')[1]
7+
constructor(socket: RealtimeClient, name: string, token: string, opts?: { [key: string]: any }) {
8+
let chanParams: GenericObject = { user_token: token }
119

12-
if (userToken) {
13-
chanParams['user_token'] = userToken
10+
if (opts) {
11+
chanParams = { ...chanParams, ...opts }
1412
}
1513

16-
this.subscription = socket.channel(topic, chanParams) as RealtimeSubscription
14+
this.channel = socket.channel(`realtime:${name}`, chanParams) as RealtimeChannel
1715
}
1816

1917
private getPayloadRecords(payload: any) {
@@ -37,38 +35,44 @@ export class SupabaseRealtimeClient {
3735
* The event you want to listen to.
3836
*
3937
* @param event The event
38+
* @param filter An object that specifies what you want to listen to from the event.
4039
* @param callback A callback function that is called whenever the event occurs.
4140
*/
42-
on(event: SupabaseEventTypes, callback: (payload: SupabaseRealtimePayload<any>) => void) {
43-
this.subscription.on(event, (payload: any) => {
41+
on(
42+
event: string,
43+
filter?: GenericObject,
44+
callback?: (payload: SupabaseRealtimePayload<any>) => void
45+
) {
46+
this.channel.on(event, filter ?? {}, (payload: any) => {
47+
const { schema, table, commit_timestamp, type, errors } = payload.payload
4448
let enrichedPayload: SupabaseRealtimePayload<any> = {
45-
schema: payload.schema,
46-
table: payload.table,
47-
commit_timestamp: payload.commit_timestamp,
48-
eventType: payload.type,
49+
schema: schema,
50+
table: table,
51+
commit_timestamp: commit_timestamp,
52+
eventType: type,
4953
new: {},
5054
old: {},
51-
errors: payload.errors,
55+
errors: errors,
5256
}
5357

54-
enrichedPayload = { ...enrichedPayload, ...this.getPayloadRecords(payload) }
58+
enrichedPayload = { ...enrichedPayload, ...this.getPayloadRecords(payload.payload) }
5559

56-
callback(enrichedPayload)
60+
callback && callback(enrichedPayload)
5761
})
5862
return this
5963
}
6064

6165
/**
62-
* Enables the subscription.
66+
* Enables the channel.
6367
*/
6468
subscribe(callback: Function = () => {}) {
65-
this.subscription.onError((e: Error) => callback('SUBSCRIPTION_ERROR', e))
66-
this.subscription.onClose(() => callback('CLOSED'))
67-
this.subscription
69+
this.channel.onError((e: Error) => callback('CHANNEL_ERROR', e))
70+
this.channel.onClose(() => callback('CLOSED'))
71+
this.channel
6872
.subscribe()
6973
.receive('ok', () => callback('SUBSCRIBED'))
70-
.receive('error', (e: Error) => callback('SUBSCRIPTION_ERROR', e))
74+
.receive('error', (e: Error) => callback('CHANNEL_ERROR', e))
7175
.receive('timeout', () => callback('RETRYING_AFTER_TIMEOUT'))
72-
return this.subscription
76+
return this.channel
7377
}
7478
}

src/lib/types.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,5 +71,3 @@ export type SupabaseRealtimePayload<T> = {
7171
old: T
7272
errors: string[] | null
7373
}
74-
75-
export type SupabaseEventTypes = 'INSERT' | 'UPDATE' | 'DELETE' | '*'

0 commit comments

Comments
 (0)