Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 101 additions & 75 deletions src/content/docs/durable-objects/examples/websocket-server.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,87 +22,113 @@ WebSocket connections pin your Durable Object to memory, and so duration charges

<TypeScriptExample>
```ts
import { DurableObject } from "cloudflare:workers";

export interface Env {
WEBSOCKET_SERVER: DurableObjectNamespace<WebSocketServer>;
}
import { DurableObject } from 'cloudflare:workers';

// Worker
export default {
async fetch(request, env, ctx): Promise<Response> {
if (request.url.endsWith("/websocket")) {
// Expect to receive a WebSocket Upgrade request.
// If there is one, accept the request and return a WebSocket Response.
const upgradeHeader = request.headers.get("Upgrade");
if (!upgradeHeader || upgradeHeader !== "websocket") {
return new Response("Durable Object expected Upgrade: websocket", {
status: 426,
});
}

// This example will refer to the same Durable Object,
// since the name "foo" is hardcoded.
let id = env.WEBSOCKET_SERVER.idFromName("foo");
let stub = env.WEBSOCKET_SERVER.get(id);

return stub.fetch(request);
}

return new Response(null, {
status: 400,
statusText: "Bad Request",
headers: {
"Content-Type": "text/plain",
},
});
},
} satisfies ExportedHandler<Env>;
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
if (request.url.endsWith('/websocket')) {
// Expect to receive a WebSocket Upgrade request.
// If there is one, accept the request and return a WebSocket Response.
const upgradeHeader = request.headers.get('Upgrade');
if (!upgradeHeader || upgradeHeader !== 'websocket') {
return new Response('Worker expected Upgrade: websocket', {
status: 426,
});
}

if (request.method !== 'GET') {
return new Response('Worker expected GET method', {
status: 400,
});
}

// Since we are hard coding the Durable Object ID by providing the constant name 'foo',
// all requests to this Worker will be sent to the same Durable Object instance.
let id = env.WEBSOCKET_SERVER.idFromName('foo');
let stub = env.WEBSOCKET_SERVER.get(id);

return stub.fetch(request);
}

return new Response(
`Supported endpoints:
/websocket: Expects a WebSocket upgrade request`,
{
status: 200,
headers: {
'Content-Type': 'text/plain',
},
}
);
},
};

// Durable Object
export class WebSocketServer extends DurableObject {
currentlyConnectedWebSockets: number;

constructor(ctx: DurableObjectState, env: Env) {
// This is reset whenever the constructor runs because
// regular WebSockets do not survive Durable Object resets.
//
// WebSockets accepted via the Hibernation API can survive
// a certain type of eviction, but we will not cover that here.
super(ctx, env);
this.currentlyConnectedWebSockets = 0;
}

async fetch(request: Request): Promise<Response> {
// Creates two ends of a WebSocket connection.
const webSocketPair = new WebSocketPair();
const [client, server] = Object.values(webSocketPair);

// Calling `accept()` tells the runtime that this WebSocket is to begin terminating
// request within the Durable Object. It has the effect of "accepting" the connection,
// and allowing the WebSocket to send and receive messages.
server.accept();
this.currentlyConnectedWebSockets += 1;

// Upon receiving a message from the client, the server replies with the same message,
// and the total number of connections with the "[Durable Object]: " prefix
server.addEventListener("message", (event: MessageEvent) => {
server.send(
`[Durable Object] currentlyConnectedWebSockets: ${this.currentlyConnectedWebSockets}`,
);
});

// If the client closes the connection, the runtime will close the connection too.
server.addEventListener("close", (cls: CloseEvent) => {
this.currentlyConnectedWebSockets -= 1;
server.close(cls.code, "Durable Object is closing WebSocket");
});

return new Response(null, {
status: 101,
webSocket: client,
});
}
// Keeps track of all WebSocket connections
sessions: Map<WebSocket, { [key: string]: string }>;

constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env);
this.sessions = new Map();
}

async fetch(request: Request): Promise<Response> {
// Creates two ends of a WebSocket connection.
const webSocketPair = new WebSocketPair();
const [client, server] = Object.values(webSocketPair);

// Calling `accept()` tells the runtime that this WebSocket is to begin terminating
// request within the Durable Object. It has the effect of "accepting" the connection,
// and allowing the WebSocket to send and receive messages.
server.accept();

// Generate a random UUID for the session.
const id = crypto.randomUUID();
// Add the WebSocket connection to the map of active sessions.
this.sessions.set(server, { id });

server.addEventListener('message', (event) => {
this.handleWebSocketMessage(server, event.data);
});

// If the client closes the connection, the runtime will close the connection too.
server.addEventListener('close', () => {
this.handleConnectionClose(server);
});

return new Response(null, {
status: 101,
webSocket: client,
});
}

async handleWebSocketMessage(ws: WebSocket, message: string | ArrayBuffer) {
const connection = this.sessions.get(ws)!;

// Reply back with the same message to the connection
ws.send(`[Durable Object] message: ${message}, from: ${connection.id}`);

// Broadcast the message to all the connections,
// except the one that sent the message.
this.sessions.forEach((k, session) => {
if (session !== ws) {
session.send(`[Durable Object] message: ${message}, from: ${connection.id}`);
}
});

// Broadcast the message to all the connections,
// including the one that sent the message.
this.sessions.forEach((k, session) => {
session.send(`[Durable Object] message: ${message}, from: ${connection.id}`);
});
}

async handleConnectionClose(ws: WebSocket) {
this.sessions.delete(ws);
ws.close(1000, 'Durable Object is closing WebSocket');
}
}
```
</TypeScriptExample>
Expand Down
Loading