Skip to content

Commit b902551

Browse files
authored
feat: add deduplicate interceptor for request deduplication (#4679)
1 parent 80756ca commit b902551

File tree

9 files changed

+1597
-2
lines changed

9 files changed

+1597
-2
lines changed

docs/docs/api/DiagnosticsChannel.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,3 +268,46 @@ diagnosticsChannel.channel('undici:proxy:connected').subscribe(({ socket, connec
268268
// const { origin, port, path, signal, headers, servername } = connectParams
269269
})
270270
```
271+
272+
## `undici:request:pending-requests`
273+
274+
This message is published when the deduplicate interceptor's pending request map changes. This is useful for monitoring and debugging request deduplication behavior.
275+
276+
The deduplicate interceptor automatically deduplicates concurrent requests for the same resource. When multiple identical requests are made while one is already in-flight, only one request is sent to the origin server, and all waiting handlers receive the same response.
277+
278+
```js
279+
import diagnosticsChannel from 'diagnostics_channel'
280+
281+
diagnosticsChannel.channel('undici:request:pending-requests').subscribe(({ type, size, key }) => {
282+
console.log(type) // 'added' or 'removed'
283+
console.log(size) // current number of pending requests
284+
console.log(key) // the deduplication key for this request
285+
})
286+
```
287+
288+
### Event Properties
289+
290+
- `type` (`string`): Either `'added'` when a new pending request is registered, or `'removed'` when a pending request completes (successfully or with an error).
291+
- `size` (`number`): The current number of pending requests after the change.
292+
- `key` (`string`): The deduplication key for the request, composed of the origin, method, path, and request headers.
293+
294+
### Example: Monitoring Request Deduplication
295+
296+
```js
297+
import diagnosticsChannel from 'diagnostics_channel'
298+
299+
const channel = diagnosticsChannel.channel('undici:request:pending-requests')
300+
301+
channel.subscribe(({ type, size, key }) => {
302+
if (type === 'added') {
303+
console.log(`New pending request: ${key} (${size} total pending)`)
304+
} else {
305+
console.log(`Request completed: ${key} (${size} remaining)`)
306+
}
307+
})
308+
```
309+
310+
This can be useful for:
311+
- Verifying that request deduplication is working as expected
312+
- Monitoring the number of concurrent in-flight requests
313+
- Debugging deduplication behavior in production environments

docs/docs/api/Dispatcher.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,6 +1213,44 @@ The `cache` interceptor implements client-side response caching as described in
12131213
- `cacheByDefault` - The default expiration time to cache responses by if they don't have an explicit expiration and cannot have an heuristic expiry computed. If this isn't present, responses neither with an explicit expiration nor heuristically cacheable will not be cached. Default `undefined`.
12141214
- `type` - The [type of cache](https://developer.mozilla.org/en-US/docs/Web/HTTP/Guides/Caching#types_of_caches) for Undici to act as. Can be `shared` or `private`. Default `shared`. `private` implies privately cacheable responses will be cached and potentially shared with other users of your application.
12151215

1216+
##### `Deduplicate Interceptor`
1217+
1218+
The `deduplicate` interceptor deduplicates concurrent identical requests. When multiple identical requests are made while one is already in-flight, only one request is sent to the origin server, and all waiting handlers receive the same response. This reduces server load and improves performance.
1219+
1220+
**Options**
1221+
1222+
- `methods` - The [**safe** HTTP methods](https://www.rfc-editor.org/rfc/rfc9110#section-9.2.1) to deduplicate. Default `['GET']`.
1223+
- `skipHeaderNames` - Header names that, if present in a request, will cause the request to skip deduplication entirely. Useful for headers like `idempotency-key` where presence indicates unique processing. Header name matching is case-insensitive. Default `[]`.
1224+
- `excludeHeaderNames` - Header names to exclude from the deduplication key. Requests with different values for these headers will still be deduplicated together. Useful for headers like `x-request-id` that vary per request but shouldn't affect deduplication. Header name matching is case-insensitive. Default `[]`.
1225+
1226+
**Usage**
1227+
1228+
```js
1229+
const { Client, interceptors } = require("undici");
1230+
const { deduplicate, cache } = interceptors;
1231+
1232+
// Deduplicate only
1233+
const client = new Client("http://example.com").compose(
1234+
deduplicate()
1235+
);
1236+
1237+
// Deduplicate with caching
1238+
const clientWithCache = new Client("http://example.com").compose(
1239+
deduplicate(),
1240+
cache()
1241+
);
1242+
```
1243+
1244+
Requests are considered identical if they have the same:
1245+
- Origin
1246+
- HTTP method
1247+
- Path
1248+
- Request headers (excluding any headers specified in `excludeHeaderNames`)
1249+
1250+
All deduplicated requests receive the complete response including status code, headers, and body.
1251+
1252+
For observability, request deduplication events are published to the `undici:request:pending-requests` [diagnostic channel](/docs/docs/api/DiagnosticsChannel.md#undicirequestpending-requests).
1253+
12161254
## Instance Events
12171255

12181256
### Event: `'connect'`

index.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ module.exports.interceptors = {
4949
dump: require('./lib/interceptor/dump'),
5050
dns: require('./lib/interceptor/dns'),
5151
cache: require('./lib/interceptor/cache'),
52-
decompress: require('./lib/interceptor/decompress')
52+
decompress: require('./lib/interceptor/decompress'),
53+
deduplicate: require('./lib/interceptor/deduplicate')
5354
}
5455

5556
module.exports.cacheStores = {
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
'use strict'
2+
3+
/**
4+
* @typedef {import('../../types/dispatcher.d.ts').default.DispatchHandler} DispatchHandler
5+
*/
6+
7+
/**
8+
* Handler that buffers response data and notifies multiple waiting handlers.
9+
* Used for request deduplication.
10+
*
11+
* @implements {DispatchHandler}
12+
*/
13+
class DeduplicationHandler {
14+
/**
15+
* @type {DispatchHandler}
16+
*/
17+
#primaryHandler
18+
19+
/**
20+
* @type {DispatchHandler[]}
21+
*/
22+
#waitingHandlers = []
23+
24+
/**
25+
* @type {Buffer[]}
26+
*/
27+
#chunks = []
28+
29+
/**
30+
* @type {number}
31+
*/
32+
#statusCode = 0
33+
34+
/**
35+
* @type {Record<string, string | string[]>}
36+
*/
37+
#headers = {}
38+
39+
/**
40+
* @type {string}
41+
*/
42+
#statusMessage = ''
43+
44+
/**
45+
* @type {boolean}
46+
*/
47+
#aborted = false
48+
49+
/**
50+
* @type {import('../../types/dispatcher.d.ts').default.DispatchController | null}
51+
*/
52+
#controller = null
53+
54+
/**
55+
* @type {(() => void) | null}
56+
*/
57+
#onComplete = null
58+
59+
/**
60+
* @param {DispatchHandler} primaryHandler The primary handler
61+
* @param {() => void} onComplete Callback when request completes
62+
*/
63+
constructor (primaryHandler, onComplete) {
64+
this.#primaryHandler = primaryHandler
65+
this.#onComplete = onComplete
66+
}
67+
68+
/**
69+
* Add a waiting handler that will receive the buffered response
70+
* @param {DispatchHandler} handler
71+
*/
72+
addWaitingHandler (handler) {
73+
this.#waitingHandlers.push(handler)
74+
}
75+
76+
/**
77+
* @param {() => void} abort
78+
* @param {any} context
79+
*/
80+
onRequestStart (controller, context) {
81+
this.#controller = controller
82+
this.#primaryHandler.onRequestStart?.(controller, context)
83+
}
84+
85+
/**
86+
* @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
87+
* @param {number} statusCode
88+
* @param {import('../../types/header.d.ts').IncomingHttpHeaders} headers
89+
* @param {Socket} socket
90+
*/
91+
onRequestUpgrade (controller, statusCode, headers, socket) {
92+
this.#primaryHandler.onRequestUpgrade?.(controller, statusCode, headers, socket)
93+
}
94+
95+
/**
96+
* @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
97+
* @param {number} statusCode
98+
* @param {Record<string, string | string[]>} headers
99+
* @param {string} statusMessage
100+
*/
101+
onResponseStart (controller, statusCode, headers, statusMessage) {
102+
this.#statusCode = statusCode
103+
this.#headers = headers
104+
this.#statusMessage = statusMessage
105+
this.#primaryHandler.onResponseStart?.(controller, statusCode, headers, statusMessage)
106+
}
107+
108+
/**
109+
* @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
110+
* @param {Buffer} chunk
111+
*/
112+
onResponseData (controller, chunk) {
113+
// Buffer the chunk for waiting handlers
114+
this.#chunks.push(Buffer.from(chunk))
115+
this.#primaryHandler.onResponseData?.(controller, chunk)
116+
}
117+
118+
/**
119+
* @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
120+
* @param {object} trailers
121+
*/
122+
onResponseEnd (controller, trailers) {
123+
this.#primaryHandler.onResponseEnd?.(controller, trailers)
124+
this.#notifyWaitingHandlers()
125+
this.#onComplete?.()
126+
}
127+
128+
/**
129+
* @param {import('../../types/dispatcher.d.ts').default.DispatchController} controller
130+
* @param {Error} err
131+
*/
132+
onResponseError (controller, err) {
133+
this.#aborted = true
134+
this.#primaryHandler.onResponseError?.(controller, err)
135+
this.#notifyWaitingHandlersError(err)
136+
this.#onComplete?.()
137+
}
138+
139+
/**
140+
* Notify all waiting handlers with the buffered response
141+
*/
142+
#notifyWaitingHandlers () {
143+
const body = Buffer.concat(this.#chunks)
144+
145+
for (const handler of this.#waitingHandlers) {
146+
// Create a simple controller for each waiting handler
147+
const waitingController = {
148+
resume () {},
149+
pause () {},
150+
get paused () { return false },
151+
get aborted () { return false },
152+
get reason () { return null },
153+
abort () {}
154+
}
155+
156+
try {
157+
handler.onRequestStart?.(waitingController, null)
158+
159+
if (waitingController.aborted) {
160+
continue
161+
}
162+
163+
handler.onResponseStart?.(
164+
waitingController,
165+
this.#statusCode,
166+
this.#headers,
167+
this.#statusMessage
168+
)
169+
170+
if (waitingController.aborted) {
171+
continue
172+
}
173+
174+
if (body.length > 0) {
175+
handler.onResponseData?.(waitingController, body)
176+
}
177+
178+
handler.onResponseEnd?.(waitingController, {})
179+
} catch {
180+
// Ignore errors from waiting handlers
181+
}
182+
}
183+
184+
this.#waitingHandlers = []
185+
this.#chunks = []
186+
}
187+
188+
/**
189+
* Notify all waiting handlers of an error
190+
* @param {Error} err
191+
*/
192+
#notifyWaitingHandlersError (err) {
193+
for (const handler of this.#waitingHandlers) {
194+
const waitingController = {
195+
resume () {},
196+
pause () {},
197+
get paused () { return false },
198+
get aborted () { return true },
199+
get reason () { return err },
200+
abort () {}
201+
}
202+
203+
try {
204+
handler.onRequestStart?.(waitingController, null)
205+
handler.onResponseError?.(waitingController, err)
206+
} catch {
207+
// Ignore errors from waiting handlers
208+
}
209+
}
210+
211+
this.#waitingHandlers = []
212+
this.#chunks = []
213+
}
214+
}
215+
216+
module.exports = DeduplicationHandler

0 commit comments

Comments
 (0)