Skip to content

Commit be620bd

Browse files
fix(realtime): preserve custom JWT tokens across channel resubscribe
Fixes #1904 When using setAuth(customToken) with private channels, custom JWTs are now preserved across removeChannel() and resubscribe operations. Previously, the token would be overwritten with session token or anon key. Root cause: setAuth() calls after connection and successful join were invoking the accessToken callback without checking if a custom token was manually set, causing SupabaseClient's _getAccessToken to return the wrong token. Solution: Track manually-set tokens with _manuallySetToken flag. Only invoke the accessToken callback when the token wasn't explicitly provided via setAuth(token). Changes: - Add _manuallySetToken flag to RealtimeClient - Update _performAuth() to track token source (manual vs callback) - Modify _setAuthSafely() to check flag before invoking callback - Update join callback in RealtimeChannel to check flag - Add error handling for accessToken callback failures - Add comprehensive regression tests (4 new tests) - Update existing tests for async subscribe Testing: - All 364 tests passing, zero regressions - Verified in React Native/Expo environment - Both setAuth(token) and accessToken callback patterns work - Workaround (accessToken callback) is now obsolete but remains supported Breaking changes: None
1 parent 8d1e0c5 commit be620bd

File tree

6 files changed

+243
-19
lines changed

6 files changed

+243
-19
lines changed

packages/core/realtime-js/src/RealtimeChannel.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,11 @@ export default class RealtimeChannel {
308308

309309
this.joinPush
310310
.receive('ok', async ({ postgres_changes }: PostgresChangesFilters) => {
311-
this.socket.setAuth()
311+
// FIX for issue #1904: Don't overwrite manually-set tokens
312+
// Only refresh if token wasn't manually set via setAuth(token)
313+
if (!this.socket['_manuallySetToken']) {
314+
this.socket.setAuth()
315+
}
312316
if (postgres_changes === undefined) {
313317
callback?.(REALTIME_SUBSCRIBE_STATES.SUBSCRIBED)
314318
return
@@ -531,7 +535,7 @@ export default class RealtimeChannel {
531535
'channel',
532536
`resubscribe to ${this.topic} due to change in presence callbacks on joined channel`
533537
)
534-
this.unsubscribe().then(() => this.subscribe())
538+
this.unsubscribe().then(async () => await this.subscribe())
535539
}
536540
return this._on(type, filter, callback)
537541
}

packages/core/realtime-js/src/RealtimeClient.ts

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ const WORKER_SCRIPT = `
102102
export default class RealtimeClient {
103103
accessTokenValue: string | null = null
104104
apiKey: string | null = null
105+
private _manuallySetToken: boolean = false
105106
channels: RealtimeChannel[] = new Array()
106107
endPoint: string = ''
107108
httpEndpoint: string = ''
@@ -779,16 +780,34 @@ export default class RealtimeClient {
779780
*/
780781
private async _performAuth(token: string | null = null): Promise<void> {
781782
let tokenToSend: string | null
783+
let isManualToken = false
782784

783785
if (token) {
784786
tokenToSend = token
787+
// FIX for issue #1904: Track if this is a manually-provided token
788+
isManualToken = true
785789
} else if (this.accessToken) {
786790
// Always call the accessToken callback to get fresh token
787-
tokenToSend = await this.accessToken()
791+
try {
792+
tokenToSend = await this.accessToken()
793+
} catch (e) {
794+
this.log('error', 'error fetching access token from callback', e)
795+
// Fall back to cached value if callback fails
796+
tokenToSend = this.accessTokenValue
797+
}
788798
} else {
789799
tokenToSend = this.accessTokenValue
790800
}
791801

802+
// FIX for issue #1904: Set manual token flag even if token doesn't change
803+
// This ensures the flag persists across calls
804+
if (isManualToken) {
805+
this._manuallySetToken = true
806+
} else if (this.accessToken) {
807+
// If we used the callback, clear the manual flag
808+
this._manuallySetToken = false
809+
}
810+
792811
if (this.accessTokenValue != tokenToSend) {
793812
this.accessTokenValue = tokenToSend
794813
this.channels.forEach((channel) => {
@@ -823,9 +842,13 @@ export default class RealtimeClient {
823842
* @internal
824843
*/
825844
private _setAuthSafely(context = 'general'): void {
826-
this.setAuth().catch((e) => {
827-
this.log('error', `error setting auth in ${context}`, e)
828-
})
845+
// FIX for issue #1904: Don't overwrite manually-set tokens
846+
// Only refresh if token wasn't manually set via setAuth(token)
847+
if (!this._manuallySetToken) {
848+
this.setAuth().catch((e) => {
849+
this.log('error', `error setting auth in ${context}`, e)
850+
})
851+
}
829852
}
830853

831854
/**

packages/core/realtime-js/test/RealtimeChannel.lifecycle.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,10 @@ describe('Channel Lifecycle Management', () => {
229229
assert.equal(channel.state, CHANNEL_STATES.joining)
230230
})
231231

232-
test('updates join push payload access token', () => {
232+
test('updates join push payload access token', async () => {
233233
testSetup.socket.accessTokenValue = 'token123'
234234

235-
channel.subscribe()
235+
await channel.subscribe()
236236

237237
assert.deepEqual(channel.joinPush.payload, {
238238
access_token: 'token123',
@@ -257,15 +257,15 @@ describe('Channel Lifecycle Management', () => {
257257
})
258258
const channel = testSocket.channel('topic')
259259

260-
channel.subscribe()
260+
await channel.subscribe()
261261
await new Promise((resolve) => setTimeout(resolve, 50))
262262
assert.equal(channel.socket.accessTokenValue, tokens[0])
263263

264264
testSocket.disconnect()
265265
// Wait for disconnect to complete (including fallback timer)
266266
await new Promise((resolve) => setTimeout(resolve, 150))
267267

268-
channel.subscribe()
268+
await channel.subscribe()
269269
await new Promise((resolve) => setTimeout(resolve, 50))
270270
assert.equal(channel.socket.accessTokenValue, tokens[1])
271271
})
@@ -549,7 +549,7 @@ describe('Channel Lifecycle Management', () => {
549549
const resendSpy = vi.spyOn(channel.joinPush, 'resend')
550550

551551
// Call _rejoin - should return early due to leaving state
552-
channel._rejoin()
552+
channel['_rejoin']()
553553

554554
// Verify no actions were taken
555555
expect(leaveOpenTopicSpy).not.toHaveBeenCalled()
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
import assert from 'assert'
2+
import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest'
3+
import { testBuilders, EnhancedTestSetup } from './helpers/setup'
4+
import { utils } from './helpers/auth'
5+
import { CHANNEL_STATES } from '../src/lib/constants'
6+
7+
let testSetup: EnhancedTestSetup
8+
9+
beforeEach(() => {
10+
testSetup = testBuilders.standardClient()
11+
})
12+
13+
afterEach(() => {
14+
testSetup.cleanup()
15+
testSetup.socket.removeAllChannels()
16+
})
17+
18+
describe('Issue #1904: Resubscribe with custom JWT fails', () => {
19+
test('REPRODUCES BUG: second subscription after removeChannel loses access token', async () => {
20+
// Scenario from issue #1904:
21+
// 1. Set custom JWT via setAuth (not using accessToken callback)
22+
// 2. Subscribe to private channel -> Works
23+
// 3. removeChannel
24+
// 4. Create new channel with same topic and subscribe -> FAILS (no token in join)
25+
26+
const customToken = utils.generateJWT('1h')
27+
28+
// Step 1: Set auth with custom token (mimics user's setup)
29+
await testSetup.socket.setAuth(customToken)
30+
31+
// Verify token was set
32+
assert.strictEqual(testSetup.socket.accessTokenValue, customToken)
33+
34+
// Step 2: Create and subscribe to private channel (first time)
35+
const channel1 = testSetup.socket.channel('conversation:dc3fb8c1-ceef-4c00-9f92-e496acd03593', {
36+
config: { private: true },
37+
})
38+
39+
// Spy on the push to verify join payload
40+
const pushSpy = vi.spyOn(testSetup.socket, 'push')
41+
42+
// Simulate successful subscription
43+
channel1.state = CHANNEL_STATES.closed // Start from closed
44+
await channel1.subscribe()
45+
46+
// Verify first join includes access_token
47+
const firstJoinCall = pushSpy.mock.calls.find((call) => call[0]?.event === 'phx_join')
48+
expect(firstJoinCall).toBeDefined()
49+
expect(firstJoinCall![0].payload).toHaveProperty('access_token', customToken)
50+
51+
// Step 3: Remove channel (mimics user cleanup)
52+
await testSetup.socket.removeChannel(channel1)
53+
54+
// Verify channel was removed
55+
expect(testSetup.socket.getChannels()).not.toContain(channel1)
56+
57+
// Step 4: Create NEW channel with SAME topic and subscribe
58+
pushSpy.mockClear()
59+
const channel2 = testSetup.socket.channel('conversation:dc3fb8c1-ceef-4c00-9f92-e496acd03593', {
60+
config: { private: true },
61+
})
62+
63+
// This should be a different channel instance
64+
expect(channel2).not.toBe(channel1)
65+
66+
// Subscribe to the new channel
67+
channel2.state = CHANNEL_STATES.closed
68+
await channel2.subscribe()
69+
70+
// BUG: Second join should ALSO include access_token, but it might not!
71+
const secondJoinCall = pushSpy.mock.calls.find((call) => call[0]?.event === 'phx_join')
72+
73+
expect(secondJoinCall).toBeDefined()
74+
75+
// THIS IS THE BUG: access_token is missing from second join
76+
expect(secondJoinCall![0].payload).toHaveProperty('access_token', customToken)
77+
})
78+
79+
test('WORKAROUND: using accessToken callback works for resubscribe', async () => {
80+
// This test shows that the workaround (using accessToken callback) works
81+
const customToken = utils.generateJWT('1h')
82+
let callCount = 0
83+
84+
const clientWithCallback = testBuilders.standardClient({
85+
accessToken: async () => {
86+
callCount++
87+
return customToken
88+
},
89+
})
90+
91+
// Set initial auth
92+
await clientWithCallback.socket.setAuth()
93+
94+
// Create and subscribe to first channel
95+
const channel1 = clientWithCallback.socket.channel('conversation:test', {
96+
config: { private: true },
97+
})
98+
99+
const pushSpy = vi.spyOn(clientWithCallback.socket, 'push')
100+
channel1.state = CHANNEL_STATES.closed
101+
await channel1.subscribe()
102+
103+
const firstJoin = pushSpy.mock.calls.find((call) => call[0]?.event === 'phx_join')
104+
expect(firstJoin![0].payload).toHaveProperty('access_token', customToken)
105+
106+
// Remove and recreate
107+
await clientWithCallback.socket.removeChannel(channel1)
108+
pushSpy.mockClear()
109+
110+
const channel2 = clientWithCallback.socket.channel('conversation:test', {
111+
config: { private: true },
112+
})
113+
114+
channel2.state = CHANNEL_STATES.closed
115+
await channel2.subscribe()
116+
117+
const secondJoin = pushSpy.mock.calls.find((call) => call[0]?.event === 'phx_join')
118+
119+
// With accessToken callback, this WORKS
120+
expect(secondJoin![0].payload).toHaveProperty('access_token', customToken)
121+
122+
clientWithCallback.cleanup()
123+
})
124+
125+
test('EDGE CASE: resubscribe to different topic after removeChannel should work', async () => {
126+
const customToken = utils.generateJWT('1h')
127+
await testSetup.socket.setAuth(customToken)
128+
129+
// Subscribe to first topic
130+
const channel1 = testSetup.socket.channel('topic1', { config: { private: true } })
131+
channel1.state = CHANNEL_STATES.closed
132+
await channel1.subscribe()
133+
134+
await testSetup.socket.removeChannel(channel1)
135+
136+
// Subscribe to DIFFERENT topic
137+
const pushSpy = vi.spyOn(testSetup.socket, 'push')
138+
const channel2 = testSetup.socket.channel('topic2', { config: { private: true } })
139+
channel2.state = CHANNEL_STATES.closed
140+
await channel2.subscribe()
141+
142+
const joinCall = pushSpy.mock.calls.find((call) => call[0]?.event === 'phx_join')
143+
expect(joinCall![0].payload).toHaveProperty('access_token', customToken)
144+
})
145+
146+
test('handles accessToken callback errors gracefully during subscribe', async () => {
147+
const errorMessage = 'Token fetch failed during subscribe'
148+
let callCount = 0
149+
const tokens = ['initial-token', null] // Second call will throw
150+
151+
const accessToken = vi.fn(() => {
152+
if (callCount++ === 0) {
153+
return Promise.resolve(tokens[0])
154+
}
155+
return Promise.reject(new Error(errorMessage))
156+
})
157+
158+
const logSpy = vi.fn()
159+
160+
const client = testBuilders.standardClient({
161+
accessToken,
162+
logger: logSpy,
163+
})
164+
165+
// First subscribe should work
166+
await client.socket.setAuth()
167+
const channel1 = client.socket.channel('test', { config: { private: true } })
168+
channel1.state = CHANNEL_STATES.closed
169+
await channel1.subscribe()
170+
171+
expect(client.socket.accessTokenValue).toBe(tokens[0])
172+
173+
// Remove and resubscribe - callback will fail but should fall back
174+
await client.socket.removeChannel(channel1)
175+
176+
const channel2 = client.socket.channel('test', { config: { private: true } })
177+
channel2.state = CHANNEL_STATES.closed
178+
await channel2.subscribe()
179+
180+
// Verify error was logged
181+
expect(logSpy).toHaveBeenCalledWith(
182+
'error',
183+
'error fetching access token from callback',
184+
expect.any(Error)
185+
)
186+
187+
// Verify subscription still succeeded with cached token
188+
expect(client.socket.accessTokenValue).toBe(tokens[0])
189+
190+
client.cleanup()
191+
})
192+
})

packages/core/realtime-js/test/RealtimeClient.auth.test.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,12 @@ describe('auth during connection states', () => {
140140

141141
await new Promise((resolve) => setTimeout(() => resolve(undefined), 100))
142142

143-
// Verify that the error was logged
144-
expect(logSpy).toHaveBeenCalledWith('error', 'error setting auth in connect', expect.any(Error))
143+
// Verify that the error was logged with more specific message
144+
expect(logSpy).toHaveBeenCalledWith(
145+
'error',
146+
'error fetching access token from callback',
147+
expect.any(Error)
148+
)
145149

146150
// Verify that the connection was still established despite the error
147151
assert.ok(socketWithError.conn, 'connection should still exist')
@@ -199,7 +203,7 @@ describe('auth during connection states', () => {
199203
expect(socket.accessTokenValue).toBe(tokens[0])
200204

201205
// Call the callback and wait for async operations to complete
202-
await socket.reconnectTimer.callback()
206+
await socket.reconnectTimer?.callback()
203207
await new Promise((resolve) => setTimeout(resolve, 100))
204208
expect(socket.accessTokenValue).toBe(tokens[1])
205209
expect(accessToken).toHaveBeenCalledTimes(2)

packages/core/realtime-js/test/RealtimeClient.channels.test.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ describe('channel', () => {
104104
const connectStub = vi.spyOn(testSetup.socket, 'connect')
105105
const disconnectStub = vi.spyOn(testSetup.socket, 'disconnect')
106106

107-
channel = testSetup.socket.channel('topic').subscribe()
107+
channel = testSetup.socket.channel('topic')
108+
await channel.subscribe()
108109

109110
assert.equal(testSetup.socket.getChannels().length, 1)
110111
expect(connectStub).toHaveBeenCalled()
@@ -118,11 +119,11 @@ describe('channel', () => {
118119
test('does not remove other channels when removing one', async () => {
119120
const connectStub = vi.spyOn(testSetup.socket, 'connect')
120121
const disconnectStub = vi.spyOn(testSetup.socket, 'disconnect')
121-
const channel1 = testSetup.socket.channel('chan1').subscribe()
122-
const channel2 = testSetup.socket.channel('chan2').subscribe()
122+
const channel1 = testSetup.socket.channel('chan1')
123+
const channel2 = testSetup.socket.channel('chan2')
123124

124-
channel1.subscribe()
125-
channel2.subscribe()
125+
await channel1.subscribe()
126+
await channel2.subscribe()
126127
assert.equal(testSetup.socket.getChannels().length, 2)
127128
expect(connectStub).toHaveBeenCalled()
128129

0 commit comments

Comments
 (0)