Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 0255b2b

Browse files
authored
Add WebSocket support to dispatchFetch/fetch and custom service bindings (#453)
* Refactor test helpers into `test-shared` directory * Add custom `Request`/`Response` classes * Implement `WebSocket` class and coupler * Implement WebSocket upgrading `fetch` * Add WebSocket coupling to loopback requests * Allow `request.cf` to be customised in `dispatchFetch` and loopbacks * Add WebSocket kitchen sink test * Fix type bundling errors * Apply PR suggestions
1 parent 3c1f95c commit 0255b2b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+3350
-3686
lines changed

package-lock.json

Lines changed: 1675 additions & 3556 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,33 @@
2727
"prepublishOnly": "npm run lint && npm run clean && npm run build && npm run types:bundle && npm run test",
2828
"test": "npm run build && ava && rimraf ./.tmp",
2929
"types:build": "tsc",
30-
"types:bundle": "npm run types:build && node scripts/types.mjs"
30+
"___$comment:types:bundle": "api-extractor doesn't know to load index.ts instead of index.d.ts when resolving imported types",
31+
"types:bundle": "cp node_modules/@cloudflare/workers-types/experimental/index.ts node_modules/@cloudflare/workers-types/experimental/index.d.ts && npm run types:build && node scripts/types.mjs"
3132
},
3233
"devDependencies": {
34+
"@ava/get-port": "^2.0.0",
3335
"@ava/typescript": "^3.0.1",
34-
"@microsoft/api-extractor": "^7.19.4",
36+
"@microsoft/api-extractor": "^7.33.6",
3537
"@types/node": "^18.11.9",
3638
"@types/rimraf": "^3.0.2",
3739
"@types/which": "^2.0.1",
3840
"@typescript-eslint/eslint-plugin": "^5.9.1",
3941
"@typescript-eslint/parser": "^5.9.1",
4042
"ava": "^5.0.1",
4143
"capnpc-ts": "^0.7.0",
44+
"capnp-ts": "^0.7.0",
4245
"esbuild": "^0.12.20",
4346
"eslint": "^8.6.0",
4447
"eslint-config-prettier": "^8.3.0",
4548
"eslint-plugin-es": "^4.1.0",
4649
"eslint-plugin-import": "^2.24.2",
4750
"eslint-plugin-prettier": "^3.4.1",
4851
"esm": "^3.2.25",
52+
"expect-type": "^0.15.0",
4953
"patch-package": "^6.4.7",
5054
"prettier": "^2.3.2",
5155
"rimraf": "^3.0.2",
52-
"typescript": "^4.5.4",
56+
"typescript": "^4.8.4",
5357
"which": "^2.0.2"
5458
},
5559
"engines": {

packages/tre/src/http/fetch.ts

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import http from "http";
2+
import { Headers, RequestInfo, fetch as baseFetch } from "undici";
3+
import NodeWebSocket from "ws";
4+
import { DeferredPromise } from "../shared";
5+
import { Request, RequestInit } from "./request";
6+
import { Response } from "./response";
7+
import { WebSocketPair, coupleWebSocket } from "./websocket";
8+
9+
const ignored = ["transfer-encoding", "connection", "keep-alive", "expect"];
10+
function headersFromIncomingRequest(req: http.IncomingMessage): Headers {
11+
const entries = Object.entries(req.headers).filter(
12+
(pair): pair is [string, string | string[]] => {
13+
const [name, value] = pair;
14+
return !ignored.includes(name) && value !== undefined;
15+
}
16+
);
17+
return new Headers(Object.fromEntries(entries));
18+
}
19+
20+
export async function fetch(
21+
input: RequestInfo,
22+
init?: RequestInit | Request
23+
): Promise<Response> {
24+
const request = new Request(input, init as RequestInit);
25+
26+
// Handle WebSocket upgrades
27+
if (
28+
request.method === "GET" &&
29+
request.headers.get("upgrade") === "websocket"
30+
) {
31+
const url = new URL(request.url);
32+
if (url.protocol !== "http:" && url.protocol !== "https:") {
33+
throw new TypeError(
34+
`Fetch API cannot load: ${url.toString()}.\nMake sure you're using http(s):// URLs for WebSocket requests via fetch.`
35+
);
36+
}
37+
url.protocol = url.protocol.replace("http", "ws");
38+
39+
// Normalise request headers to a format ws understands, extracting the
40+
// Sec-WebSocket-Protocol header as ws treats this differently
41+
const headers: Record<string, string> = {};
42+
let protocols: string[] | undefined;
43+
for (const [key, value] of request.headers.entries()) {
44+
if (key.toLowerCase() === "sec-websocket-protocol") {
45+
protocols = value.split(",").map((protocol) => protocol.trim());
46+
} else {
47+
headers[key] = value;
48+
}
49+
}
50+
51+
// Establish web socket connection
52+
const ws = new NodeWebSocket(url, protocols, {
53+
followRedirects: request.redirect === "follow",
54+
headers,
55+
});
56+
57+
// Get response headers from upgrade
58+
const headersPromise = new DeferredPromise<Headers>();
59+
ws.once("upgrade", (req) => {
60+
headersPromise.resolve(headersFromIncomingRequest(req));
61+
});
62+
63+
// Couple web socket with pair and resolve
64+
const [worker, client] = Object.values(new WebSocketPair());
65+
await coupleWebSocket(ws, client);
66+
return new Response(null, {
67+
status: 101,
68+
webSocket: worker,
69+
headers: await headersPromise,
70+
});
71+
}
72+
73+
const response = await baseFetch(request);
74+
return new Response(response.body, response);
75+
}

packages/tre/src/http/index.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
export * from "./fetch";
2+
export * from "./request";
3+
export * from "./response";
4+
export * from "./websocket";
5+
6+
export { File, FormData, Headers } from "undici";
7+
export type {
8+
BodyInit,
9+
HeadersInit,
10+
ReferrerPolicy,
11+
RequestCache,
12+
RequestCredentials,
13+
RequestDestination,
14+
RequestDuplex,
15+
RequestInfo,
16+
RequestMode,
17+
RequestRedirect,
18+
ResponseRedirectStatus,
19+
ResponseType,
20+
} from "undici";

packages/tre/src/http/request.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import type {
2+
IncomingRequestCfProperties,
3+
RequestInitCfProperties,
4+
} from "@cloudflare/workers-types/experimental";
5+
import {
6+
Request as BaseRequest,
7+
RequestInit as BaseRequestInit,
8+
RequestInfo,
9+
} from "undici";
10+
11+
export type RequestInitCfType =
12+
| Partial<IncomingRequestCfProperties>
13+
| RequestInitCfProperties;
14+
15+
export interface RequestInit<
16+
CfType extends RequestInitCfType = RequestInitCfType
17+
> extends BaseRequestInit {
18+
cf?: CfType;
19+
}
20+
21+
const kCf = Symbol("kCf");
22+
export class Request<
23+
CfType extends RequestInitCfType = RequestInitCfType
24+
> extends BaseRequest {
25+
// We should be able to use a private `#cf` property here instead of a symbol
26+
// here, but we need to set this on a clone, which would otherwise lead to a
27+
// "Cannot write private member to an object whose class did not declare it"
28+
// error.
29+
[kCf]?: CfType;
30+
31+
constructor(input: RequestInfo, init?: RequestInit<CfType>) {
32+
super(input, init);
33+
this[kCf] = init?.cf;
34+
}
35+
36+
get cf() {
37+
return this[kCf];
38+
}
39+
40+
// JSDoc comment so retained when bundling types with api-extractor
41+
/** @ts-expect-error `clone` is actually defined as a method internally */
42+
clone(): Request<CfType> {
43+
const request = super.clone() as Request<CfType>;
44+
// Update prototype so cloning a clone clones `cf`
45+
Object.setPrototypeOf(request, Request.prototype);
46+
request[kCf] = this[kCf];
47+
return request;
48+
}
49+
}

packages/tre/src/http/response.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import {
2+
Response as BaseResponse,
3+
ResponseInit as BaseResponseInit,
4+
BodyInit,
5+
ResponseRedirectStatus,
6+
} from "undici";
7+
import { WebSocket } from "./websocket";
8+
9+
export interface ResponseInit extends BaseResponseInit {
10+
webSocket?: WebSocket | null;
11+
}
12+
13+
const kWebSocket = Symbol("kWebSocket");
14+
export class Response extends BaseResponse {
15+
// We should be able to use a private `#webSocket` property here instead of a
16+
// symbol here, but `undici` calls `this.status` in its constructor, which
17+
// causes a "Cannot read private member from an object whose class did not
18+
// declare it" error.
19+
readonly [kWebSocket]: WebSocket | null;
20+
21+
// Override BaseResponse's static methods for building Responses to return
22+
// our type instead. Ideally, we don't want to use `Object.setPrototypeOf`.
23+
// Unfortunately, `error()` and `redirect()` set the internal header guard
24+
// to "immutable".
25+
static error(): Response {
26+
const response = BaseResponse.error() as Response;
27+
Object.setPrototypeOf(response, Response.prototype);
28+
return response;
29+
}
30+
static redirect(url: string | URL, status: ResponseRedirectStatus): Response {
31+
const response = BaseResponse.redirect(url, status) as Response;
32+
Object.setPrototypeOf(response, Response.prototype);
33+
return response;
34+
}
35+
static json(data: any, init?: ResponseInit): Response {
36+
// https://fetch.spec.whatwg.org/#dom-response-json
37+
const body = JSON.stringify(data);
38+
const response = new Response(body, init);
39+
response.headers.set("Content-Type", "application/json");
40+
return response;
41+
}
42+
43+
constructor(body?: BodyInit, init?: ResponseInit) {
44+
// Status 101 Switching Protocols would normally throw a RangeError, but we
45+
// need to allow it for WebSockets
46+
if (init?.webSocket) {
47+
if (init.status !== 101) {
48+
throw new RangeError(
49+
"Responses with a WebSocket must have status code 101."
50+
);
51+
}
52+
init = { ...init, status: 200 };
53+
}
54+
55+
super(body, init);
56+
this[kWebSocket] = init?.webSocket ?? null;
57+
}
58+
59+
// JSDoc comment so retained when bundling types with api-extractor
60+
/** @ts-expect-error `status` is actually defined as a getter internally */
61+
get status() {
62+
// When passing a WebSocket, we validate that the passed status was actually
63+
// 101, but we can't store this because `undici` rightfully complains.
64+
return this[kWebSocket] ? 101 : super.status;
65+
}
66+
67+
get webSocket() {
68+
return this[kWebSocket];
69+
}
70+
71+
// JSDoc comment so retained when bundling types with api-extractor
72+
/** @ts-expect-error `clone` is actually defined as a method internally */
73+
clone(): Response {
74+
if (this[kWebSocket]) {
75+
throw new TypeError("Cannot clone a response to a WebSocket handshake.");
76+
}
77+
const response = super.clone() as Response;
78+
Object.setPrototypeOf(response, Response.prototype);
79+
return response;
80+
}
81+
}

0 commit comments

Comments
 (0)