Skip to content

Commit 8cebf6f

Browse files
authored
fix(client): wait until WebSocket is open before sending (#615)
<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Bug Fixes** - Ensured messages are only sent after the WebSocket connection is fully open, preventing premature message sending. - **Tests** - Improved test coverage to verify correct message handling when the WebSocket is not yet open and after it becomes open. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 5f23895 commit 8cebf6f

File tree

2 files changed

+53
-3
lines changed

2 files changed

+53
-3
lines changed

packages/client/src/adapters/websocket/link-websocket-client.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,28 @@ import type { StandardLinkClient } from '../standard'
44
import { ClientPeer } from '@orpc/standard-server-peer'
55

66
export interface experimental_LinkWebsocketClientOptions {
7-
websocket: Pick<WebSocket, 'addEventListener' | 'send'>
7+
websocket: Pick<WebSocket, 'addEventListener' | 'send' | 'readyState'>
88
}
99

1010
export class experimental_LinkWebsocketClient<T extends ClientContext> implements StandardLinkClient<T> {
1111
private readonly peer: ClientPeer
1212

1313
constructor(options: experimental_LinkWebsocketClientOptions) {
14-
this.peer = new ClientPeer(options.websocket.send.bind(options.websocket))
14+
const untilOpen = new Promise<void>((resolve) => {
15+
if (options.websocket.readyState === 0) { // CONNECTING
16+
options.websocket.addEventListener('open', () => {
17+
resolve()
18+
}, { once: true })
19+
}
20+
else {
21+
resolve()
22+
}
23+
})
24+
25+
this.peer = new ClientPeer(async (message) => {
26+
await untilOpen
27+
return options.websocket.send(message)
28+
})
1529

1630
options.websocket.addEventListener('message', (event) => {
1731
this.peer.message(event.data)

packages/client/src/adapters/websocket/rpc-link.test.ts

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ describe('rpcLink', () => {
77
let onClose: any
88

99
const websocket = {
10+
readyState: 1,
1011
addEventListener: vi.fn((event, callback) => {
1112
if (event === 'message')
1213
onMessage = callback
@@ -23,7 +24,7 @@ describe('rpcLink', () => {
2324
const orpc = createORPCClient(link) as any
2425

2526
it('on success', async () => {
26-
expect(orpc.ping('input')).resolves.toEqual('pong')
27+
const promise = expect(orpc.ping('input')).resolves.toEqual('pong')
2728

2829
await vi.waitFor(() => expect(websocket.send).toHaveBeenCalledTimes(1))
2930

@@ -38,6 +39,8 @@ describe('rpcLink', () => {
3839
})
3940

4041
onMessage({ data: await encodeResponseMessage(id, MessageType.RESPONSE, { body: { json: 'pong' }, status: 200, headers: {} }) })
42+
43+
await promise
4144
})
4245

4346
it('on close', async () => {
@@ -47,4 +50,37 @@ describe('rpcLink', () => {
4750

4851
onClose()
4952
})
53+
54+
it('waits until open before sending', async () => {
55+
let onOpen: any
56+
57+
const websocket = {
58+
readyState: 0,
59+
addEventListener: vi.fn((event, callback) => {
60+
if (event === 'message')
61+
onMessage = callback
62+
if (event === 'close')
63+
onClose = callback
64+
if (event === 'open')
65+
onOpen = callback
66+
}),
67+
send: vi.fn(),
68+
}
69+
const orpc = createORPCClient(new RPCLink({
70+
websocket,
71+
})) as any
72+
73+
const promise = expect(orpc.ping('input')).resolves.toEqual('pong')
74+
75+
await new Promise(resolve => setTimeout(resolve, 10))
76+
expect(websocket.send).toHaveBeenCalledTimes(0)
77+
78+
onOpen()
79+
await vi.waitFor(() => expect(websocket.send).toHaveBeenCalledTimes(1))
80+
81+
const [id] = (await decodeRequestMessage(websocket.send.mock.calls[0]![0]))
82+
onMessage({ data: await encodeResponseMessage(id, MessageType.RESPONSE, { body: { json: 'pong' }, status: 200, headers: {} }) })
83+
84+
await promise
85+
})
5086
})

0 commit comments

Comments
 (0)