Skip to content

Commit 80f622a

Browse files
authored
refactor(standard-server-peer): dedupe + improve (#733)
<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added a utility function to efficiently convert Blob, File, Response, or Request objects into buffers. * Introduced a helper to standardize server peer message handling across adapters. * Added a utility to detect event stream headers. * **Improvements** * Broadened supported message types to include Uint8Array for encoded messages and message ports. * Unified and simplified message handling logic in server adapters, delegating processing to a shared utility. * Enhanced body and header serialization/deserialization for messages, improving consistency. * Updated buffer handling in client and server WebSocket adapters for better compatibility. * **Bug Fixes** * Improved compatibility for buffer extraction from Blobs and similar objects. * **Documentation** * Expanded test coverage for new utilities and updated tests to reflect changes in buffer types and message handling. * **Chores** * Updated ESLint rules to discourage unsupported `.bytes` property usage and recommend the new utility. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 8e119a1 commit 80f622a

File tree

29 files changed

+395
-578
lines changed

29 files changed

+395
-578
lines changed

eslint.config.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ export default antfu({
1515
name: ['JSON', 'stringify'],
1616
message: 'JSON.stringify can return undefined, use stringifyJSON instead',
1717
},
18+
{
19+
name: ['*', 'bytes'],
20+
message: 'Request/Blob/Response/... .bytes is not widely supported, use readAsBuffer instead',
21+
},
1822
],
1923
'no-restricted-imports': ['error', {
2024
patterns: [{

packages/client/src/adapters/message-port/message-port.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export type SupportedMessagePort = Pick<MessagePort, 'addEventListener' | 'postM
2424
/**
2525
* Message port can support [The structured clone algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm)
2626
*/
27-
export type SupportedMessagePortData = string | ArrayBufferLike
27+
export type SupportedMessagePortData = string | ArrayBufferLike | Uint8Array
2828

2929
export function postMessagePortMessage(port: SupportedMessagePort, data: SupportedMessagePortData): void {
3030
port.postMessage(data)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
export * from './link-websocket-client'
1+
export * from './link-client'
22
export * from './rpc-link'

packages/client/src/adapters/websocket/link-websocket-client.ts renamed to packages/client/src/adapters/websocket/link-client.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { StandardLazyResponse, StandardRequest } from '@orpc/standard-server'
22
import type { ClientContext, ClientOptions } from '../../types'
33
import type { StandardLinkClient } from '../standard'
4+
import { readAsBuffer } from '@orpc/shared'
45
import { ClientPeer } from '@orpc/standard-server-peer'
56

67
export interface experimental_LinkWebsocketClientOptions {
@@ -29,7 +30,7 @@ export class experimental_LinkWebsocketClient<T extends ClientContext> implement
2930

3031
options.websocket.addEventListener('message', async (event) => {
3132
const message = event.data instanceof Blob
32-
? await event.data.arrayBuffer()
33+
? await readAsBuffer(event.data)
3334
: event.data
3435

3536
this.peer.message(message)

packages/client/src/adapters/websocket/rpc-link.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import type { ClientContext } from '../../types'
22
import type { StandardRPCLinkOptions } from '../standard'
3-
import type { experimental_LinkWebsocketClientOptions as LinkWebsocketClientOptions } from './link-websocket-client'
3+
import type { experimental_LinkWebsocketClientOptions as LinkWebsocketClientOptions } from './link-client'
44
import { StandardRPCLink } from '../standard'
5-
import { experimental_LinkWebsocketClient as LinkWebsocketClient } from './link-websocket-client'
5+
import { experimental_LinkWebsocketClient as LinkWebsocketClient } from './link-client'
66

77
export interface experimental_RPCLinkOptions<T extends ClientContext>
88
extends Omit<StandardRPCLinkOptions<T>, 'url' | 'headers' | 'method' | 'fallbackMethod' | 'maxUrlLength'>, LinkWebsocketClientOptions {}

packages/server/package.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@
3535
"import": "./dist/adapters/standard/index.mjs",
3636
"default": "./dist/adapters/standard/index.mjs"
3737
},
38+
"./standard-peer": {
39+
"types": "./dist/adapters/standard-peer/index.d.mts",
40+
"import": "./dist/adapters/standard-peer/index.mjs",
41+
"default": "./dist/adapters/standard-peer/index.mjs"
42+
},
3843
"./fetch": {
3944
"types": "./dist/adapters/fetch/index.d.mts",
4045
"import": "./dist/adapters/fetch/index.mjs",
@@ -82,6 +87,7 @@
8287
"./plugins": "./src/plugins/index.ts",
8388
"./hibernation": "./src/hibernation/index.ts",
8489
"./standard": "./src/adapters/standard/index.ts",
90+
"./standard-peer": "./src/adapters/standard-peer/index.ts",
8591
"./fetch": "./src/adapters/fetch/index.ts",
8692
"./node": "./src/adapters/node/index.ts",
8793
"./aws-lambda": "./src/adapters/aws-lambda/index.ts",

packages/server/src/adapters/bun-ws/handler.ts

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
import type { MaybeOptionalOptions } from '@orpc/shared'
22
import type { Context } from '../../context'
33
import type { StandardHandler } from '../standard'
4-
import type { FriendlyStandardHandleOptions } from '../standard/utils'
4+
import type {
5+
experimental_HandleStandardServerPeerMessageOptions as HandleStandardServerPeerMessageOptions,
6+
} from '../standard-peer'
57
import { resolveMaybeOptionalOptions } from '@orpc/shared'
68
import { ServerPeer } from '@orpc/standard-server-peer'
7-
import { resolveFriendlyStandardHandleOptions } from '../standard/utils'
9+
import {
10+
experimental_handleStandardServerPeerMessage as handleStandardServerPeerMessage,
11+
} from '../standard-peer'
812

913
export interface ServerWebSocket {
10-
send(message: string | ArrayBufferLike): number
14+
send(message: string | ArrayBufferLike | Uint8Array): number
1115
}
1216

1317
export class experimental_BunWsHandler<T extends Context> {
@@ -20,8 +24,8 @@ export class experimental_BunWsHandler<T extends Context> {
2024

2125
async message(
2226
ws: ServerWebSocket,
23-
message: string | { buffer: ArrayBufferLike },
24-
...rest: MaybeOptionalOptions<Omit<FriendlyStandardHandleOptions<T>, 'prefix'>>
27+
message: string | ArrayBufferView,
28+
...rest: MaybeOptionalOptions<HandleStandardServerPeerMessageOptions<T>>
2529
): Promise<void> {
2630
let peer = this.peers.get(ws)
2731

@@ -31,17 +35,16 @@ export class experimental_BunWsHandler<T extends Context> {
3135
}))
3236
}
3337

34-
const [id, request] = await peer.message(typeof message === 'string' ? message : message.buffer)
38+
const encodedMessage = typeof message === 'string'
39+
? message
40+
: new Uint8Array(message.buffer, message.byteOffset, message.byteLength)
3541

36-
if (!request) {
37-
return
38-
}
39-
40-
const options = resolveFriendlyStandardHandleOptions(resolveMaybeOptionalOptions(rest))
41-
42-
const { response } = await this.standardHandler.handle({ ...request, body: () => Promise.resolve(request.body) }, options)
43-
44-
await peer.response(id, response ?? { status: 404, headers: {}, body: 'No procedure matched' })
42+
await handleStandardServerPeerMessage(
43+
this.standardHandler,
44+
peer,
45+
encodedMessage,
46+
resolveMaybeOptionalOptions(rest),
47+
)
4548
}
4649

4750
close(ws: ServerWebSocket): void {
Lines changed: 11 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { decodeResponseMessage, encodeRequestMessage, MessageType } from '@orpc/standard-server-peer'
1+
import { encodeRequestMessage, MessageType } from '@orpc/standard-server-peer'
22
import { os } from '../../builder'
33
import { experimental_RPCHandler as RPCHandler } from './rpc-handler'
44

@@ -16,92 +16,37 @@ describe('rpcHandler', async () => {
1616
await new Promise(resolve => setTimeout(resolve, 10))
1717
return 'pong'
1818
}),
19-
20-
file: os.handler(async ({ signal: _signal }) => {
21-
signal = _signal!
22-
await new Promise(resolve => setTimeout(resolve, 10))
23-
return new Blob(['pong'])
24-
}),
2519
})
2620

2721
const wss = {
2822
send: vi.fn(),
2923
}
3024

31-
const ping_request_message = await encodeRequestMessage('19', MessageType.REQUEST, {
25+
const string_request_message = await encodeRequestMessage('19', MessageType.REQUEST, {
3226
url: new URL('orpc:/ping'),
3327
body: { json: 'input' },
3428
headers: {},
3529
method: 'POST',
3630
}) as string
3731

38-
const file_request_message = {
39-
buffer: new TextEncoder().encode(await encodeRequestMessage('19', MessageType.REQUEST, {
40-
url: new URL('orpc:/file'),
41-
body: { json: 'input' },
42-
headers: {},
43-
method: 'POST',
44-
}) as string),
45-
}
46-
47-
const not_found_request_message = await encodeRequestMessage('19', MessageType.REQUEST, {
48-
url: new URL('orpc:/not-found'),
49-
body: { json: 'input' },
50-
headers: {},
51-
method: 'POST',
52-
}) as string
53-
54-
const abort_message = await encodeRequestMessage('19', MessageType.ABORT_SIGNAL, undefined) as string
32+
const buffer_request_message = {
33+
buffer: new TextEncoder().encode(string_request_message),
34+
} as any
5535

56-
it('on success', async () => {
57-
handler.message(wss, ping_request_message)
36+
it('work with string event', async () => {
37+
handler.message(wss, string_request_message)
5838

5939
await vi.waitFor(() => expect(wss.send).toHaveBeenCalledTimes(1))
60-
61-
const [id,, payload] = (await decodeResponseMessage(wss.send.mock.calls[0]![0]))
62-
63-
expect(id).toBeTypeOf('string')
64-
expect(payload).toEqual({
65-
status: 200,
66-
headers: {},
67-
body: { json: 'pong' },
68-
})
6940
})
7041

71-
it('on success with buffer data', async () => {
72-
handler.message(wss, file_request_message)
42+
it('work with buffer event', async () => {
43+
handler.message(wss, buffer_request_message)
7344

7445
await vi.waitFor(() => expect(wss.send).toHaveBeenCalledTimes(1))
75-
const [id, , payload] = (await decodeResponseMessage(wss.send.mock.calls[0]![0]))
76-
77-
expect(id).toBeTypeOf('string')
78-
expect(payload).toEqual({
79-
status: 200,
80-
headers: {
81-
'content-type': expect.any(String),
82-
},
83-
body: expect.any(FormData),
84-
})
85-
86-
expect(await (payload as any).body.get('0').text()).toBe('pong')
87-
})
88-
89-
it('on abort signal', async () => {
90-
handler.message(wss, ping_request_message)
91-
92-
await new Promise(resolve => setTimeout(resolve, 0))
93-
94-
expect(signal.aborted).toBe(false)
95-
expect(wss.send).not.toHaveBeenCalled()
96-
97-
handler.message(wss, abort_message)
98-
99-
await vi.waitFor(() => expect(signal.aborted).toBe(true))
100-
expect(wss.send).not.toHaveBeenCalled()
10146
})
10247

103-
it('on close', async () => {
104-
handler.message(wss, ping_request_message)
48+
it('abort on close', async () => {
49+
handler.message(wss, string_request_message)
10550

10651
await new Promise(resolve => setTimeout(resolve, 0))
10752

@@ -112,19 +57,4 @@ describe('rpcHandler', async () => {
11257
await vi.waitFor(() => expect(signal.aborted).toBe(true))
11358
expect(wss.send).not.toHaveBeenCalled()
11459
})
115-
116-
it('on no procedure matched', async () => {
117-
handler.message(wss, not_found_request_message)
118-
119-
await new Promise(resolve => setTimeout(resolve, 0))
120-
121-
const [id,, payload] = (await decodeResponseMessage(wss.send.mock.calls[0]![0]))
122-
123-
expect(id).toBeTypeOf('string')
124-
expect(payload).toEqual({
125-
status: 404,
126-
headers: {},
127-
body: 'No procedure matched',
128-
})
129-
})
13060
})

packages/server/src/adapters/crossws/handler.ts

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@ import type { MaybeOptionalOptions } from '@orpc/shared'
22
import type { Message, Peer } from 'crossws'
33
import type { Context } from '../../context'
44
import type { StandardHandler } from '../standard'
5-
import type { FriendlyStandardHandleOptions } from '../standard/utils'
5+
import type {
6+
experimental_HandleStandardServerPeerMessageOptions as HandleStandardServerPeerMessageOptions,
7+
} from '../standard-peer'
68
import { resolveMaybeOptionalOptions } from '@orpc/shared'
79
import { ServerPeer } from '@orpc/standard-server-peer'
8-
import { resolveFriendlyStandardHandleOptions } from '../standard/utils'
10+
import {
11+
experimental_handleStandardServerPeerMessage as handleStandardServerPeerMessage,
12+
} from '../standard-peer'
913

1014
export class experimental_CrosswsHandler<T extends Context> {
1115
private readonly peers: WeakMap<Pick<Peer, 'send'>, ServerPeer> = new WeakMap()
@@ -18,7 +22,7 @@ export class experimental_CrosswsHandler<T extends Context> {
1822
async message(
1923
ws: Pick<Peer, 'send'>,
2024
message: Pick<Message, 'rawData' | 'uint8Array'>,
21-
...rest: MaybeOptionalOptions<Omit<FriendlyStandardHandleOptions<T>, 'prefix'>>
25+
...rest: MaybeOptionalOptions<HandleStandardServerPeerMessageOptions<T>>
2226
): Promise<void> {
2327
let peer = this.peers.get(ws)
2428

@@ -28,17 +32,14 @@ export class experimental_CrosswsHandler<T extends Context> {
2832
}))
2933
}
3034

31-
const [id, request] = await peer.message(typeof message.rawData === 'string' ? message.rawData : message.uint8Array())
35+
const encodedMessage = typeof message.rawData === 'string' ? message.rawData : message.uint8Array()
3236

33-
if (!request) {
34-
return
35-
}
36-
37-
const options = resolveFriendlyStandardHandleOptions(resolveMaybeOptionalOptions(rest))
38-
39-
const { response } = await this.standardHandler.handle({ ...request, body: () => Promise.resolve(request.body) }, options)
40-
41-
await peer.response(id, response ?? { status: 404, headers: {}, body: 'No procedure matched' })
37+
await handleStandardServerPeerMessage(
38+
this.standardHandler,
39+
peer,
40+
encodedMessage,
41+
resolveMaybeOptionalOptions(rest),
42+
)
4243
}
4344

4445
close(ws: Pick<Peer, 'send'>): void {

0 commit comments

Comments
 (0)