Skip to content

Commit d0e429d

Browse files
authored
feat(client, server): Event-Source Ping Interval (#183)
* feat: support encode event comment * feat: event-source ping internal * sync * clear ping internal on error * docs * update tests * improve naming * improve
1 parent b36125c commit d0e429d

File tree

32 files changed

+416
-78
lines changed

32 files changed

+416
-78
lines changed

apps/content/docs/client/rpc-link.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ const link = new RPCLink({
8989

9090
## Event Source Configuration
9191

92-
Customize the retry logic for event sources using these options:
92+
Customize the retry logic for event sources (the mechanism behind [Event Iterator](/docs/event-iterator)) using these options:
9393

9494
- **eventSourceMaxNumberOfRetries:** Maximum retry attempts.
9595
- **eventSourceRetryDelay:** Delay between retries.
@@ -115,3 +115,23 @@ const link = new RPCLink<ClientContext>({
115115
:::tip
116116
You should disable event source retries when streaming results from a chatbot AI.
117117
:::
118+
119+
## Event-Source Ping Interval
120+
121+
To keep EventSource connections alive (the mechanism behind [Event Iterator](/docs/event-iterator)), `RPCLink` periodically sends a ping comment to the server. You can configure this behavior using the following options:
122+
123+
- `eventSourcePingEnabled` (default: `true`) – Enables or disables pings.
124+
- `eventSourcePingInterval` (default: `5000`) – Time between pings (in milliseconds).
125+
- `eventSourcePingContent` (default: `''`) – Custom content for ping messages.
126+
127+
```ts
128+
const link = new RPCLink({
129+
eventSourcePingEnabled: true,
130+
eventSourcePingInterval: 5000, // 5 seconds
131+
eventSourcePingContent: '',
132+
})
133+
```
134+
135+
:::warning
136+
These options for sending [Event Iterator](/docs/event-iterator) from client to the server, not from the server to client as used in [RPCHandler](/docs/rpc-handler#event-source-ping-interval) or [OpenAPIHandler](/docs/openapi/openapi-handler#event-source-ping-interval).
137+
:::

apps/content/docs/openapi/openapi-handler.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,19 @@ export default async function fetch(request: Request) {
8686
return new Response('Not Found', { status: 404 })
8787
}
8888
```
89+
90+
## Event-Source Ping Interval
91+
92+
To keep EventSource connections alive (the mechanism behind [Event Iterator](/docs/event-iterator)), `OpenAPIHandler` periodically sends a ping comment to the client. You can configure this behavior using the following options:
93+
94+
- `eventSourcePingEnabled` (default: `true`) – Enables or disables pings.
95+
- `eventSourcePingInterval` (default: `5000`) – Time between pings (in milliseconds).
96+
- `eventSourcePingContent` (default: `''`) – Custom content for ping messages.
97+
98+
```ts
99+
const result = await handler.handle(request, {
100+
eventSourcePingEnabled: true,
101+
eventSourcePingInterval: 5000, // 5 seconds
102+
eventSourcePingContent: '',
103+
})
104+
```

apps/content/docs/rpc-handler.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,19 @@ export default async function fetch(request: Request) {
6565
return new Response('Not Found', { status: 404 })
6666
}
6767
```
68+
69+
## Event-Source Ping Interval
70+
71+
To keep EventSource connections alive (the mechanism behind [Event Iterator](/docs/event-iterator)), `RPCHandler` periodically sends a ping comment to the client. You can configure this behavior using the following options:
72+
73+
- `eventSourcePingEnabled` (default: `true`) – Enables or disables pings.
74+
- `eventSourcePingInterval` (default: `5000`) – Time between pings (in milliseconds).
75+
- `eventSourcePingContent` (default: `''`) – Custom content for ping messages.
76+
77+
```ts
78+
const result = await handler.handle(request, {
79+
eventSourcePingEnabled: true,
80+
eventSourcePingInterval: 5000, // 5 seconds
81+
eventSourcePingContent: '',
82+
})
83+
```

packages/client/src/adapters/fetch/rpc-link.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { Value } from '@orpc/shared'
22
import type { StandardBody } from '@orpc/standard-server'
3+
import type { ToFetchBodyOptions } from '@orpc/standard-server-fetch'
34
import type { ClientContext, ClientLink, ClientOptionsOut } from '../../types'
45
import type { FetchWithContext } from './types'
56
import { isAsyncIteratorObject, stringifyJSON, trim, value } from '@orpc/shared'
@@ -12,7 +13,7 @@ type HTTPMethod = 'GET' | 'POST' | 'PUT' | 'DELETE' | 'PATCH'
1213

1314
export class InvalidEventSourceRetryResponse extends Error { }
1415

15-
export interface RPCLinkOptions<TClientContext extends ClientContext> {
16+
export interface RPCLinkOptions<TClientContext extends ClientContext> extends ToFetchBodyOptions {
1617
/**
1718
* Base url for all requests.
1819
*/
@@ -105,6 +106,7 @@ export class RPCLink<TClientContext extends ClientContext> implements ClientLink
105106
private readonly eventSourceMaxNumberOfRetries: Exclude<RPCLinkOptions<TClientContext>['eventSourceMaxNumberOfRetries'], undefined>
106107
private readonly eventSourceRetryDelay: Exclude<RPCLinkOptions<TClientContext>['eventSourceRetryDelay'], undefined>
107108
private readonly eventSourceRetry: Exclude<RPCLinkOptions<TClientContext>['eventSourceRetry'], undefined>
109+
private readonly toFetchBodyOptions: ToFetchBodyOptions
108110

109111
constructor(options: RPCLinkOptions<TClientContext>) {
110112
this.fetch = options.fetch ?? globalThis.fetch.bind(globalThis)
@@ -120,6 +122,8 @@ export class RPCLink<TClientContext extends ClientContext> implements ClientLink
120122

121123
this.eventSourceRetryDelay = options.eventSourceRetryDelay
122124
?? (({ retryTimes, lastRetry }) => lastRetry ?? (1000 * 2 ** retryTimes))
125+
126+
this.toFetchBodyOptions = options
123127
}
124128

125129
async call(path: readonly string[], input: unknown, options: ClientOptionsOut<TClientContext>): Promise<unknown> {
@@ -160,7 +164,7 @@ export class RPCLink<TClientContext extends ClientContext> implements ClientLink
160164
): Promise<unknown> {
161165
const encoded = await this.encodeRequest(path, input, options)
162166

163-
const fetchBody = toFetchBody(encoded.body, encoded.headers)
167+
const fetchBody = toFetchBody(encoded.body, encoded.headers, this.toFetchBodyOptions)
164168

165169
if (options.lastEventId !== undefined) {
166170
encoded.headers.set('last-event-id', options.lastEventId)

packages/openapi/src/adapters/fetch/openapi-handler.test.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ describe('openAPIHandler', () => {
4444
},
4545
})
4646

47-
const result = await handler.handle(request, { prefix: '/api/v1', context: { db: 'postgres' } })
47+
const options = { prefix: '/api/v1', context: { db: 'postgres' } } as const
48+
const result = await handler.handle(request, options)
4849

4950
expect(result).toEqual({
5051
matched: true,
@@ -54,7 +55,7 @@ describe('openAPIHandler', () => {
5455
expect(handle).toHaveBeenCalledOnce()
5556
expect(handle).toHaveBeenCalledWith(
5657
vi.mocked(toStandardRequest).mock.results[0]!.value,
57-
{ prefix: '/api/v1', context: { db: 'postgres' } },
58+
options,
5859
)
5960

6061
expect(vi.mocked(toStandardRequest)).toHaveBeenCalledOnce()
@@ -65,7 +66,7 @@ describe('openAPIHandler', () => {
6566
status: 200,
6667
headers: {},
6768
body: '__body__',
68-
})
69+
}, options)
6970
})
7071

7172
it('on mismatch', async () => {

packages/openapi/src/adapters/fetch/openapi-handler.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { Context, Router } from '@orpc/server'
22
import type { FetchHandler, FetchHandleResult } from '@orpc/server/fetch'
33
import type { StandardHandleOptions } from '@orpc/server/standard'
44
import type { MaybeOptionalOptions } from '@orpc/shared'
5+
import type { ToFetchResponseOptions } from '@orpc/standard-server-fetch'
56
import type { OpenAPIHandlerOptions } from '../standard'
67
import { StandardHandler } from '@orpc/server/standard'
78
import { toFetchResponse, toStandardRequest } from '@orpc/standard-server-fetch'
@@ -17,18 +18,21 @@ export class OpenAPIHandler<T extends Context> implements FetchHandler<T> {
1718
this.standardHandler = new StandardHandler(router, matcher, codec, options)
1819
}
1920

20-
async handle(request: Request, ...rest: MaybeOptionalOptions<StandardHandleOptions<T>>): Promise<FetchHandleResult> {
21+
async handle(
22+
request: Request,
23+
...[options]: MaybeOptionalOptions<StandardHandleOptions<T> & ToFetchResponseOptions>
24+
): Promise<FetchHandleResult> {
2125
const standardRequest = toStandardRequest(request)
2226

23-
const result = await this.standardHandler.handle(standardRequest, ...rest)
27+
const result = await this.standardHandler.handle(standardRequest, options as any)
2428

2529
if (!result.matched) {
2630
return result
2731
}
2832

2933
return {
3034
matched: true,
31-
response: toFetchResponse(result.response),
35+
response: toFetchResponse(result.response, options),
3236
}
3337
}
3438
}

packages/openapi/src/adapters/node/openapi-handler.test.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ describe('openapiHandler', async () => {
6161
},
6262
})
6363

64-
const result = await handler.handle(req, res, { prefix: '/api/v1', context: { db: 'postgres' } })
64+
const options = { prefix: '/api/v1', context: { db: 'postgres' } } as const
65+
const result = await handler.handle(req, res, options)
6566

6667
expect(result).toEqual({
6768
matched: true,
@@ -70,7 +71,7 @@ describe('openapiHandler', async () => {
7071
expect(handle).toHaveBeenCalledOnce()
7172
expect(handle).toHaveBeenCalledWith(
7273
standardRequest,
73-
{ prefix: '/api/v1', context: { db: 'postgres' } },
74+
options,
7475
)
7576

7677
expect(toStandardRequest).toHaveBeenCalledOnce()
@@ -81,7 +82,7 @@ describe('openapiHandler', async () => {
8182
status: 200,
8283
headers: {},
8384
body: '__body__',
84-
})
85+
}, options)
8586
})
8687

8788
it('on mismatch', async () => {

packages/openapi/src/adapters/node/openapi-handler.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { Context, Router } from '@orpc/server'
22
import type { NodeHttpHandler, NodeHttpHandleResult, NodeHttpRequest, NodeHttpResponse } from '@orpc/server/node'
33
import type { StandardHandleOptions } from '@orpc/server/standard'
44
import type { MaybeOptionalOptions } from '@orpc/shared'
5+
import type { SendStandardResponseOptions } from '@orpc/standard-server-node'
56
import type { OpenAPIHandlerOptions } from '../standard'
67
import { StandardHandler } from '@orpc/server/standard'
78
import { sendStandardResponse, toStandardRequest } from '@orpc/standard-server-node'
@@ -20,17 +21,17 @@ export class OpenAPIHandler<T extends Context> implements NodeHttpHandler<T> {
2021
async handle(
2122
req: NodeHttpRequest,
2223
res: NodeHttpResponse,
23-
...rest: MaybeOptionalOptions<StandardHandleOptions<T>>
24+
...[options]: MaybeOptionalOptions<StandardHandleOptions<T> & SendStandardResponseOptions>
2425
): Promise<NodeHttpHandleResult> {
2526
const standardRequest = toStandardRequest(req, res)
2627

27-
const result = await this.standardHandler.handle(standardRequest, ...rest)
28+
const result = await this.standardHandler.handle(standardRequest, options as any)
2829

2930
if (!result.matched) {
3031
return { matched: false }
3132
}
3233

33-
await sendStandardResponse(res, result.response)
34+
await sendStandardResponse(res, result.response, options)
3435

3536
return { matched: true }
3637
}

packages/server/src/adapters/fetch/rpc-handler.test.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ describe('rpcHandler', () => {
3939
body: '__body__',
4040
} })
4141

42-
const result = await handler.handle(request, { prefix: '/api/v1', context: { db: 'postgres' } })
42+
const options = { prefix: '/api/v1', context: { db: 'postgres' } } as const
43+
const result = await handler.handle(request, options)
4344

4445
expect(result).toEqual({
4546
matched: true,
@@ -49,7 +50,7 @@ describe('rpcHandler', () => {
4950
expect(handle).toHaveBeenCalledOnce()
5051
expect(handle).toHaveBeenCalledWith(
5152
vi.mocked(toStandardRequest).mock.results[0]!.value,
52-
{ prefix: '/api/v1', context: { db: 'postgres' } },
53+
options,
5354
)
5455

5556
expect(vi.mocked(toStandardRequest)).toHaveBeenCalledOnce()
@@ -60,7 +61,7 @@ describe('rpcHandler', () => {
6061
status: 200,
6162
headers: {},
6263
body: '__body__',
63-
})
64+
}, options)
6465
})
6566

6667
it('on mismatch', async () => {

packages/server/src/adapters/fetch/rpc-handler.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { MaybeOptionalOptions } from '@orpc/shared'
2+
import type { ToFetchResponseOptions } from '@orpc/standard-server-fetch'
23
import type { Context } from '../../context'
34
import type { Router } from '../../router'
45
import type { RPCHandlerOptions, StandardHandleOptions } from '../standard'
@@ -15,18 +16,21 @@ export class RPCHandler<T extends Context> implements FetchHandler<T> {
1516
this.standardHandler = new StandardHandler(router, matcher, codec, options)
1617
}
1718

18-
async handle(request: Request, ...rest: MaybeOptionalOptions<StandardHandleOptions<T>>): Promise<FetchHandleResult> {
19+
async handle(
20+
request: Request,
21+
...[options]: MaybeOptionalOptions<StandardHandleOptions<T> & ToFetchResponseOptions>
22+
): Promise<FetchHandleResult> {
1923
const standardRequest = toStandardRequest(request)
2024

21-
const result = await this.standardHandler.handle(standardRequest, ...rest)
25+
const result = await this.standardHandler.handle(standardRequest, options as any)
2226

2327
if (!result.matched) {
2428
return result
2529
}
2630

2731
return {
2832
matched: true,
29-
response: toFetchResponse(result.response),
33+
response: toFetchResponse(result.response, options),
3034
}
3135
}
3236
}

0 commit comments

Comments
 (0)