Skip to content

Commit 8eedcbd

Browse files
committed
fix(realtime): realtime explicit REST call
1 parent 00c4a11 commit 8eedcbd

File tree

2 files changed

+326
-0
lines changed

2 files changed

+326
-0
lines changed

packages/core/realtime-js/src/RealtimeChannel.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,68 @@ export default class RealtimeChannel {
435435
}
436436
return this._on(type, filter, callback)
437437
}
438+
/**
439+
* Sends a broadcast message explicitly via REST API.
440+
*
441+
* This method always uses the REST API endpoint regardless of WebSocket connection state.
442+
* Useful when you want to guarantee REST delivery or when gradually migrating from implicit REST fallback.
443+
*
444+
* @param event The name of the broadcast event
445+
* @param payload Payload to be sent (required)
446+
* @param opts Options including timeout
447+
* @returns Promise resolving to object with success status, and error details if failed
448+
*/
449+
async postSend(
450+
event: string,
451+
payload: any,
452+
opts: { timeout?: number } = {}
453+
): Promise<{ success: true } | { success: false; status: number; error: string }> {
454+
if (!this.socket.accessTokenValue) {
455+
return Promise.reject('Access token is required for postSend()')
456+
}
457+
458+
if (payload === undefined || payload === null) {
459+
return Promise.reject('Payload is required for postSend()')
460+
}
461+
462+
const options = {
463+
method: 'POST',
464+
headers: {
465+
Authorization: `Bearer ${this.socket.accessTokenValue}`,
466+
apikey: this.socket.apiKey ? this.socket.apiKey : '',
467+
'Content-Type': 'application/json',
468+
},
469+
body: JSON.stringify({
470+
messages: [
471+
{
472+
topic: this.subTopic,
473+
event,
474+
payload: payload,
475+
private: this.private,
476+
},
477+
],
478+
}),
479+
}
480+
481+
const response = await this._fetchWithTimeout(
482+
this.broadcastEndpointURL,
483+
options,
484+
opts.timeout ?? this.timeout
485+
)
486+
487+
if (response.status === 202) {
488+
return { success: true }
489+
}
490+
491+
let errorMessage = response.statusText
492+
try {
493+
const errorBody = await response.json()
494+
errorMessage = errorBody.error || errorBody.message || errorMessage
495+
} catch {}
496+
497+
return Promise.reject(new Error(errorMessage))
498+
}
499+
438500
/**
439501
* Sends a message into the channel.
440502
*
@@ -454,6 +516,12 @@ export default class RealtimeChannel {
454516
opts: { [key: string]: any } = {}
455517
): Promise<RealtimeChannelSendResponse> {
456518
if (!this._canPush() && args.type === 'broadcast') {
519+
console.warn(
520+
'Realtime send() is automatically falling back to REST API. ' +
521+
'This behavior will be deprecated in the future. ' +
522+
'Please use postSend() explicitly for REST delivery.'
523+
)
524+
457525
const { event, payload: endpoint_payload } = args
458526
const authorization = this.socket.accessTokenValue
459527
? `Bearer ${this.socket.accessTokenValue}`

packages/core/realtime-js/test/RealtimeChannel.messaging.test.ts

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,3 +473,261 @@ describe('send', () => {
473473
})
474474
})
475475
})
476+
477+
describe('postSend', () => {
478+
test('rejects when access token is not set', async () => {
479+
const socket = new RealtimeClient(testSetup.url, {
480+
params: { apikey: 'abc123' },
481+
})
482+
const channel = socket.channel('topic')
483+
484+
await expect(channel.postSend('test', { data: 'test' })).rejects.toBe(
485+
'Access token is required for postSend()'
486+
)
487+
})
488+
489+
test('rejects when payload is not provided', async () => {
490+
const socket = new RealtimeClient(testSetup.url, {
491+
params: { apikey: 'abc123' },
492+
accessToken: () => Promise.resolve('token123'),
493+
})
494+
await socket.setAuth()
495+
const channel = socket.channel('topic')
496+
497+
await expect(channel.postSend('test', undefined as any)).rejects.toBe(
498+
'Payload is required for postSend()'
499+
)
500+
})
501+
502+
test('returns success true on 202 status', async () => {
503+
const mockResponse = {
504+
status: 202,
505+
statusText: 'Accepted',
506+
headers: new Headers(),
507+
body: null,
508+
}
509+
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
510+
511+
const socket = new RealtimeClient(testSetup.url, {
512+
fetch: fetchStub as unknown as typeof fetch,
513+
timeout: defaultTimeout,
514+
params: { apikey: 'def456' },
515+
accessToken: () => Promise.resolve('access_token_456'),
516+
})
517+
518+
await socket.setAuth()
519+
520+
const channel = socket.channel('topic', {
521+
config: { private: true },
522+
})
523+
524+
const result = await channel.postSend('test-explicit', {
525+
data: 'explicit',
526+
})
527+
528+
expect(result).toEqual({ success: true })
529+
530+
const expectedUrl = testSetup.url
531+
.replace('/socket', '')
532+
.replace('wss', 'https')
533+
.concat('/api/broadcast')
534+
535+
expect(fetchStub).toHaveBeenCalledTimes(1)
536+
const [url, options] = fetchStub.mock.calls[0]
537+
expect(url).toBe(expectedUrl)
538+
expect(options.method).toBe('POST')
539+
expect(options.headers.Authorization).toBe('Bearer access_token_456')
540+
expect(options.headers.apikey).toBe('def456')
541+
expect(options.body).toBe(
542+
'{"messages":[{"topic":"topic","event":"test-explicit","payload":{"data":"explicit"},"private":true}]}'
543+
)
544+
})
545+
546+
test('throws on timeout error', async () => {
547+
const timeoutError = new Error('Request timeout')
548+
timeoutError.name = 'AbortError'
549+
550+
const fetchStub = vi.fn().mockRejectedValue(timeoutError)
551+
552+
const socket = new RealtimeClient(testSetup.url, {
553+
fetch: fetchStub as unknown as typeof fetch,
554+
params: { apikey: 'abc123' },
555+
accessToken: () => Promise.resolve('token123'),
556+
})
557+
await socket.setAuth()
558+
const channel = socket.channel('topic')
559+
560+
await expect(channel.postSend('test', { data: 'test' })).rejects.toThrow('Request timeout')
561+
})
562+
563+
test('returns error object on non-202 status', async () => {
564+
const mockResponse = {
565+
status: 500,
566+
statusText: 'Internal Server Error',
567+
headers: new Headers(),
568+
body: null,
569+
json: vi.fn().mockResolvedValue({ error: 'Server error' }),
570+
}
571+
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
572+
573+
const socket = new RealtimeClient(testSetup.url, {
574+
fetch: fetchStub as unknown as typeof fetch,
575+
params: { apikey: 'abc123' },
576+
accessToken: () => Promise.resolve('token123'),
577+
})
578+
await socket.setAuth()
579+
const channel = socket.channel('topic')
580+
581+
const result = await channel.postSend('test', { data: 'test' })
582+
583+
expect(result).toEqual({
584+
success: false,
585+
status: 500,
586+
error: 'Server error',
587+
})
588+
})
589+
590+
test('respects custom timeout option', async () => {
591+
const mockResponse = {
592+
status: 202,
593+
headers: new Headers(),
594+
body: null,
595+
}
596+
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
597+
598+
const socket = new RealtimeClient(testSetup.url, {
599+
fetch: fetchStub as unknown as typeof fetch,
600+
timeout: 5000,
601+
params: { apikey: 'abc123' },
602+
accessToken: () => Promise.resolve('token123'),
603+
})
604+
await socket.setAuth()
605+
const channel = socket.channel('topic')
606+
607+
const result = await channel.postSend('test', { data: 'test' }, { timeout: 3000 })
608+
609+
expect(result).toEqual({ success: true })
610+
expect(fetchStub).toHaveBeenCalledTimes(1)
611+
const [, options] = fetchStub.mock.calls[0]
612+
expect(options.signal).toBeDefined()
613+
expect(options.signal).toBeInstanceOf(AbortSignal)
614+
})
615+
616+
test('uses default timeout when not specified', async () => {
617+
const mockResponse = {
618+
status: 202,
619+
headers: new Headers(),
620+
body: null,
621+
}
622+
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
623+
624+
const socket = new RealtimeClient(testSetup.url, {
625+
fetch: fetchStub as unknown as typeof fetch,
626+
timeout: 5000,
627+
params: { apikey: 'abc123' },
628+
accessToken: () => Promise.resolve('token123'),
629+
})
630+
await socket.setAuth()
631+
const channel = socket.channel('topic')
632+
633+
const result = await channel.postSend('test', { data: 'test' })
634+
635+
expect(result).toEqual({ success: true })
636+
expect(fetchStub).toHaveBeenCalledTimes(1)
637+
const [, options] = fetchStub.mock.calls[0]
638+
expect(options.signal).toBeDefined()
639+
})
640+
641+
test('allows non-empty payload', async () => {
642+
const mockResponse = {
643+
status: 202,
644+
headers: new Headers(),
645+
body: null,
646+
}
647+
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
648+
649+
const socket = new RealtimeClient(testSetup.url, {
650+
fetch: fetchStub as unknown as typeof fetch,
651+
params: { apikey: 'abc123' },
652+
accessToken: () => Promise.resolve('token123'),
653+
})
654+
await socket.setAuth()
655+
const channel = socket.channel('topic')
656+
657+
const result = await channel.postSend('test-payload', { data: 'value' })
658+
659+
expect(result).toEqual({ success: true })
660+
expect(fetchStub).toHaveBeenCalledTimes(1)
661+
662+
const [, options] = fetchStub.mock.calls[0]
663+
const body = JSON.parse(options.body)
664+
expect(body.messages[0].payload).toEqual({ data: 'value' })
665+
})
666+
667+
test('rejects when payload is null', async () => {
668+
const socket = new RealtimeClient(testSetup.url, {
669+
params: { apikey: 'abc123' },
670+
accessToken: () => Promise.resolve('token123'),
671+
})
672+
await socket.setAuth()
673+
const channel = socket.channel('topic')
674+
675+
await expect(channel.postSend('test', null as any)).rejects.toBe(
676+
'Payload is required for postSend()'
677+
)
678+
})
679+
680+
test('uses statusText when error body has no error field', async () => {
681+
const mockResponse = {
682+
status: 400,
683+
statusText: 'Bad Request',
684+
headers: new Headers(),
685+
body: null,
686+
json: vi.fn().mockResolvedValue({ message: 'Invalid request' }),
687+
}
688+
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
689+
690+
const socket = new RealtimeClient(testSetup.url, {
691+
fetch: fetchStub as unknown as typeof fetch,
692+
params: { apikey: 'abc123' },
693+
accessToken: () => Promise.resolve('token123'),
694+
})
695+
await socket.setAuth()
696+
const channel = socket.channel('topic')
697+
698+
const result = await channel.postSend('test', { data: 'test' })
699+
700+
expect(result).toEqual({
701+
success: false,
702+
status: 400,
703+
error: 'Invalid request',
704+
})
705+
})
706+
707+
test('falls back to statusText when json parsing fails', async () => {
708+
const mockResponse = {
709+
status: 503,
710+
statusText: 'Service Unavailable',
711+
headers: new Headers(),
712+
body: null,
713+
json: vi.fn().mockRejectedValue(new Error('Invalid JSON')),
714+
}
715+
const fetchStub = vi.fn().mockResolvedValue(mockResponse)
716+
717+
const socket = new RealtimeClient(testSetup.url, {
718+
fetch: fetchStub as unknown as typeof fetch,
719+
params: { apikey: 'abc123' },
720+
accessToken: () => Promise.resolve('token123'),
721+
})
722+
await socket.setAuth()
723+
const channel = socket.channel('topic')
724+
725+
const result = await channel.postSend('test', { data: 'test' })
726+
727+
expect(result).toEqual({
728+
success: false,
729+
status: 503,
730+
error: 'Service Unavailable',
731+
})
732+
})
733+
})

0 commit comments

Comments
 (0)