diff --git a/src/content/docs/durable-objects/examples/websocket-hibernation-server.mdx b/src/content/docs/durable-objects/examples/websocket-hibernation-server.mdx index 9b7fbf4ee7f828..9693fca58d2758 100644 --- a/src/content/docs/durable-objects/examples/websocket-hibernation-server.mdx +++ b/src/content/docs/durable-objects/examples/websocket-hibernation-server.mdx @@ -24,89 +24,137 @@ WebSocket Hibernation is unavailable for outgoing WebSocket use cases. Hibernati ```ts -import { DurableObject } from "cloudflare:workers"; - -export interface Env { - WEBSOCKET_HIBERNATION_SERVER: DurableObjectNamespace; -} +import { DurableObject } from 'cloudflare:workers'; // Worker export default { - async fetch( - request: Request, - env: Env, - ctx: ExecutionContext, - ): Promise { - if (request.url.endsWith("/websocket")) { - // Expect to receive a WebSocket Upgrade request. - // If there is one, accept the request and return a WebSocket Response. - const upgradeHeader = request.headers.get("Upgrade"); - if (!upgradeHeader || upgradeHeader !== "websocket") { - return new Response("Durable Object expected Upgrade: websocket", { - status: 426, - }); - } - - // This example will refer to the same Durable Object, - // since the name "foo" is hardcoded. - let id = env.WEBSOCKET_HIBERNATION_SERVER.idFromName("foo"); - let stub = env.WEBSOCKET_HIBERNATION_SERVER.get(id); - - return stub.fetch(request); - } - - return new Response(null, { - status: 400, - statusText: "Bad Request", - headers: { - "Content-Type": "text/plain", - }, - }); - }, + async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise { + if (request.url.endsWith('/websocket')) { + // Expect to receive a WebSocket Upgrade request. + // If there is one, accept the request and return a WebSocket Response. + const upgradeHeader = request.headers.get('Upgrade'); + if (!upgradeHeader || upgradeHeader !== 'websocket') { + return new Response('Worker expected Upgrade: websocket', { + status: 426, + }); + } + + if (request.method !== 'GET') { + return new Response('Worker expected GET method', { + status: 400, + }); + } + + // Since we are hard coding the Durable Object ID by providing the constant name 'foo', + // all requests to this Worker will be sent to the same Durable Object instance. + let id = env.WEBSOCKET_HIBERNATION_SERVER.idFromName('foo'); + let stub = env.WEBSOCKET_HIBERNATION_SERVER.get(id); + + return stub.fetch(request); + } + + return new Response( + `Supported endpoints: +/websocket: Expects a WebSocket upgrade request`, + { + status: 200, + headers: { + 'Content-Type': 'text/plain', + }, + } + ); + } }; // Durable Object export class WebSocketHibernationServer extends DurableObject { - async fetch(request: Request): Promise { - // Creates two ends of a WebSocket connection. - const webSocketPair = new WebSocketPair(); - const [client, server] = Object.values(webSocketPair); - - // Calling `acceptWebSocket()` informs the runtime that this WebSocket is to begin terminating - // request within the Durable Object. It has the effect of "accepting" the connection, - // and allowing the WebSocket to send and receive messages. - // Unlike `ws.accept()`, `state.acceptWebSocket(ws)` informs the Workers Runtime that the WebSocket - // is "hibernatable", so the runtime does not need to pin this Durable Object to memory while - // the connection is open. During periods of inactivity, the Durable Object can be evicted - // from memory, but the WebSocket connection will remain open. If at some later point the - // WebSocket receives a message, the runtime will recreate the Durable Object - // (run the `constructor`) and deliver the message to the appropriate handler. - this.ctx.acceptWebSocket(server); - - return new Response(null, { - status: 101, - webSocket: client, - }); - } - - async webSocketMessage(ws: WebSocket, message: ArrayBuffer | string) { - // Upon receiving a message from the client, the server replies with the same message, - // and the total number of connections with the "[Durable Object]: " prefix - ws.send( - `[Durable Object] message: ${message}, connections: ${this.ctx.getWebSockets().length}`, - ); - } - - async webSocketClose( - ws: WebSocket, - code: number, - reason: string, - wasClean: boolean, - ) { - // If the client closes the connection, the runtime will invoke the webSocketClose() handler. - ws.close(code, "Durable Object is closing WebSocket"); - } + // Keeps track of all WebSocket connections + // When the DO hibernates, gets reconstructed in the constructor + sessions: Map; + + constructor(ctx: DurableObjectState, env: Env) { + super(ctx, env); + this.sessions = new Map(); + + // As part of constructing the Durable Object, + // we wake up any hibernating WebSockets and + // place them back in the `sessions` map. + + // Get all WebSocket connections from the DO + this.ctx.getWebSockets().forEach((ws) => { + let attachment = ws.deserializeAttachment(); + if (ws.deserializeAttachment()) { + // If we previously attached state to our WebSocket, + // let's add it to `sessions` map to restore the state of the connection. + const { ...session } = attachment; + this.sessions.set(ws, { ...session }); + } + }); + + // Sets an application level auto response that does not wake hibernated WebSockets. + this.ctx.setWebSocketAutoResponse(new WebSocketRequestResponsePair('ping', 'pong')); + } + + async fetch(request: Request): Promise { + // Creates two ends of a WebSocket connection. + const webSocketPair = new WebSocketPair(); + const [client, server] = Object.values(webSocketPair); + + // Calling `acceptWebSocket()` informs the runtime that this WebSocket is to begin terminating + // request within the Durable Object. It has the effect of "accepting" the connection, + // and allowing the WebSocket to send and receive messages. + // Unlike `ws.accept()`, `this.ctx.acceptWebSocket(ws)` informs the Workers Runtime that the WebSocket + // is "hibernatable", so the runtime does not need to pin this Durable Object to memory while + // the connection is open. During periods of inactivity, the Durable Object can be evicted + // from memory, but the WebSocket connection will remain open. If at some later point the + // WebSocket receives a message, the runtime will recreate the Durable Object + // (run the `constructor`) and deliver the message to the appropriate handler. + this.ctx.acceptWebSocket(server); + + // Generate a random UUID for the session. + const id = crypto.randomUUID(); + + // Attach the session ID to the WebSocket connection and serialize it. + // This is necessary to restore the state of the connection when the Durable Object wakes up. + server.serializeAttachment({ id }); + + // Add the WebSocket connection to the map of active sessions. + this.sessions.set(server, { id }); + + return new Response(null, { + status: 101, + webSocket: client, + }); + } + + async webSocketMessage(ws: WebSocket, message: ArrayBuffer | string) { + // Get the session associated with the WebSocket connection. + const session = this.sessions.get(ws)!; + + // Upon receiving a message from the client, the server replies with the same message, the session ID of the connection, + // and the total number of connections with the "[Durable Object]: " prefix + ws.send(`[Durable Object] message: ${message}, from: ${session.id}. Total connections: ${this.sessions.size}`); + + // Send a message to all WebSocket connections, loop over all the connected WebSockets. + this.sessions.forEach((attachment, session) => { + session.send(`[Durable Object] message: ${message}, from: ${attachment.id}. Total connections: ${this.sessions.size}`); + }); + + // Send a message to all WebSocket connections except the connection (ws), + // loop over all the connected WebSockets and filter out the connection (ws). + this.sessions.forEach((attachment, session) => { + if (session !== ws) { + session.send(`[Durable Object] message: ${message}, from: ${attachment.id}. Total connections: ${this.sessions.size}`); + } + }); + } + + async webSocketClose(ws: WebSocket, code: number, reason: string, wasClean: boolean) { + // If the client closes the connection, the runtime will invoke the webSocketClose() handler. + ws.close(code, 'Durable Object is closing WebSocket'); + } } + ``` diff --git a/src/content/docs/durable-objects/examples/websocket-server.mdx b/src/content/docs/durable-objects/examples/websocket-server.mdx index 3705ebd0021512..43a15c154f69d2 100644 --- a/src/content/docs/durable-objects/examples/websocket-server.mdx +++ b/src/content/docs/durable-objects/examples/websocket-server.mdx @@ -10,7 +10,7 @@ sidebar: description: Build a WebSocket server using Durable Objects and Workers. --- -import { TabItem, Tabs, GlossaryTooltip, WranglerConfig, TypeScriptExample } from "~/components"; +import { GlossaryTooltip, TypeScriptExample, WranglerConfig } from "~/components"; This example shows how to build a WebSocket server using Durable Objects and Workers. The example exposes an endpoint to create a new WebSocket connection. This WebSocket connection echos any message while including the total number of WebSocket connections currently established. For more information, refer to [Use Durable Objects with WebSockets](/durable-objects/best-practices/websockets/). @@ -22,92 +22,121 @@ WebSocket connections pin your Durable Object to memory, and so duration charges ```ts -import { DurableObject } from "cloudflare:workers"; - -export interface Env { - WEBSOCKET_SERVER: DurableObjectNamespace; -} +import { DurableObject } from 'cloudflare:workers'; // Worker export default { - async fetch(request, env, ctx): Promise { - if (request.url.endsWith("/websocket")) { - // Expect to receive a WebSocket Upgrade request. - // If there is one, accept the request and return a WebSocket Response. - const upgradeHeader = request.headers.get("Upgrade"); - if (!upgradeHeader || upgradeHeader !== "websocket") { - return new Response("Durable Object expected Upgrade: websocket", { - status: 426, - }); - } - - // This example will refer to the same Durable Object, - // since the name "foo" is hardcoded. - let id = env.WEBSOCKET_SERVER.idFromName("foo"); - let stub = env.WEBSOCKET_SERVER.get(id); - - return stub.fetch(request); - } - - return new Response(null, { - status: 400, - statusText: "Bad Request", - headers: { - "Content-Type": "text/plain", - }, - }); - }, -} satisfies ExportedHandler; +async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise { +if (request.url.endsWith('/websocket')) { +// Expect to receive a WebSocket Upgrade request. +// If there is one, accept the request and return a WebSocket Response. +const upgradeHeader = request.headers.get('Upgrade'); +if (!upgradeHeader || upgradeHeader !== 'websocket') { +return new Response('Worker expected Upgrade: websocket', { +status: 426, +}); +} + + if (request.method !== 'GET') { + return new Response('Worker expected GET method', { + status: 400, + }); + } + + // Since we are hard coding the Durable Object ID by providing the constant name 'foo', + // all requests to this Worker will be sent to the same Durable Object instance. + let id = env.WEBSOCKET_SERVER.idFromName('foo'); + let stub = env.WEBSOCKET_SERVER.get(id); + + return stub.fetch(request); + } + + return new Response( + `Supported endpoints: + +/websocket: Expects a WebSocket upgrade request`, +{ +status: 200, +headers: { +'Content-Type': 'text/plain', +}, +} +); +}, +}; // Durable Object export class WebSocketServer extends DurableObject { - currentlyConnectedWebSockets: number; - - constructor(ctx: DurableObjectState, env: Env) { - // This is reset whenever the constructor runs because - // regular WebSockets do not survive Durable Object resets. - // - // WebSockets accepted via the Hibernation API can survive - // a certain type of eviction, but we will not cover that here. - super(ctx, env); - this.currentlyConnectedWebSockets = 0; - } - - async fetch(request: Request): Promise { - // Creates two ends of a WebSocket connection. - const webSocketPair = new WebSocketPair(); - const [client, server] = Object.values(webSocketPair); - - // Calling `accept()` tells the runtime that this WebSocket is to begin terminating - // request within the Durable Object. It has the effect of "accepting" the connection, - // and allowing the WebSocket to send and receive messages. - server.accept(); - this.currentlyConnectedWebSockets += 1; - - // Upon receiving a message from the client, the server replies with the same message, - // and the total number of connections with the "[Durable Object]: " prefix - server.addEventListener("message", (event: MessageEvent) => { - server.send( - `[Durable Object] currentlyConnectedWebSockets: ${this.currentlyConnectedWebSockets}`, - ); - }); - - // If the client closes the connection, the runtime will close the connection too. - server.addEventListener("close", (cls: CloseEvent) => { - this.currentlyConnectedWebSockets -= 1; - server.close(cls.code, "Durable Object is closing WebSocket"); - }); - - return new Response(null, { - status: 101, - webSocket: client, - }); - } +// Keeps track of all WebSocket connections +sessions: Map; + + constructor(ctx: DurableObjectState, env: Env) { + super(ctx, env); + this.sessions = new Map(); + } + + async fetch(request: Request): Promise { + // Creates two ends of a WebSocket connection. + const webSocketPair = new WebSocketPair(); + const [client, server] = Object.values(webSocketPair); + + // Calling `accept()` tells the runtime that this WebSocket is to begin terminating + // request within the Durable Object. It has the effect of "accepting" the connection, + // and allowing the WebSocket to send and receive messages. + server.accept(); + + // Generate a random UUID for the session. + const id = crypto.randomUUID(); + // Add the WebSocket connection to the map of active sessions. + this.sessions.set(server, { id }); + + server.addEventListener('message', (event) => { + this.handleWebSocketMessage(server, event.data); + }); + + // If the client closes the connection, the runtime will close the connection too. + server.addEventListener('close', () => { + this.handleConnectionClose(server); + }); + + return new Response(null, { + status: 101, + webSocket: client, + }); + } + + async handleWebSocketMessage(ws: WebSocket, message: string | ArrayBuffer) { + const connection = this.sessions.get(ws)!; + + // Reply back with the same message to the connection + ws.send(`[Durable Object] message: ${message}, from: ${connection.id}`); + + // Broadcast the message to all the connections, + // except the one that sent the message. + this.sessions.forEach((k, session) => { + if (session !== ws) { + session.send(`[Durable Object] message: ${message}, from: ${connection.id}`); + } + }); + + // Broadcast the message to all the connections, + // including the one that sent the message. + this.sessions.forEach((k, session) => { + session.send(`[Durable Object] message: ${message}, from: ${connection.id}`); + }); + } + + async handleConnectionClose(ws: WebSocket) { + this.sessions.delete(ws); + ws.close(1000, 'Durable Object is closing WebSocket'); + } + } + ``` -Finally, configure your Wrangler file to include a Durable Object [binding](/durable-objects/get-started/#4-configure-durable-object-bindings) and [migration](/durable-objects/reference/durable-objects-migrations/) based on the namespace and class name chosen previously. +Finally, configure your `wrangler.toml` file to include a Durable Object [binding](/durable-objects/get-started/#4-configure-durable-object-bindings) and [migration](/durable-objects/reference/durable-objects-migrations/) based on the namespace and class name chosen previously.