diff --git a/packages/collab/package.json b/packages/collab/package.json index c24a0a9..6c58ad8 100644 --- a/packages/collab/package.json +++ b/packages/collab/package.json @@ -8,6 +8,11 @@ "start": "wrangler dev", "cf-typegen": "wrangler types" }, + "dependencies": { + "yjs": "^13.6.20", + "y-protocols": "^1.0.6", + "lib0": "^0.2.99" + }, "devDependencies": { "typescript": "^5.5.2", "wrangler": "^4.54.0" diff --git a/packages/collab/src/index.ts b/packages/collab/src/index.ts index dfb539c..bc9f3b5 100644 --- a/packages/collab/src/index.ts +++ b/packages/collab/src/index.ts @@ -1,83 +1,87 @@ import { DurableObject } from "cloudflare:workers"; +import * as Y from "yjs"; +import * as encoding from "lib0/encoding"; +import * as decoding from "lib0/decoding"; +import * as syncProtocol from "y-protocols/sync"; +import * as awarenessProtocol from "y-protocols/awareness"; interface Env { COLLAB_DOCUMENT: DurableObjectNamespace; } -interface Message { - type: "init" | "change" | "cursor" | "cursor-leave"; - content?: string; - changes?: { - from: number; - to: number; - insert: string; - }; - position?: number; - selection?: { - from: number; - to: number; - }; - connectionId?: string; - user?: { - name: string; - image?: string; - }; - userName?: string; - userImage?: string; -} +// Message types for Yjs protocol +const MESSAGE_SYNC = 0; +const MESSAGE_AWARENESS = 1; -const MAX_MESSAGE_SIZE = 65536; // 64KB - generous limit for collaborative text editor +const MAX_MESSAGE_SIZE = 1048576; // 1MB - generous limit for Yjs updates // Cleanup inactive documents after 48 hours of no activity const CLEANUP_AFTER_MS = 2 * 24 * 60 * 60 * 1000; +// Checkpoint interval +const CHECKPOINT_INTERVAL_MS = 30000; + export class CollaborativeDocument extends DurableObject { - private content: string = ""; + private doc: Y.Doc; + private awareness: awarenessProtocol.Awareness; private checkpointScheduled: boolean = false; + private documentInitialized: boolean = false; constructor(ctx: DurableObjectState, env: Env) { super(ctx, env); - // Initialize SQLite storage for document content + // Initialize Yjs document + this.doc = new Y.Doc(); + this.awareness = new awarenessProtocol.Awareness(this.doc); + + // Set up awareness change handler + this.awareness.on( + "update", + ({ added, updated, removed }: { added: number[]; updated: number[]; removed: number[] }) => { + const changedClients = added.concat(updated, removed); + this.broadcastAwarenessUpdate(changedClients); + }, + ); + + // Initialize storage and load existing document state ctx.blockConcurrencyWhile(async () => { - // Create table if it doesn't exist (with last_activity for cleanup scheduling) + // Create table if it doesn't exist this.ctx.storage.sql.exec(` CREATE TABLE IF NOT EXISTS document ( id INTEGER PRIMARY KEY CHECK (id = 1), - content TEXT NOT NULL DEFAULT '', + state BLOB, last_activity INTEGER NOT NULL DEFAULT 0 ) `); - // Migration: add last_activity column if it doesn't exist (for existing documents) - const columns = this.ctx.storage.sql - .exec<{ name: string }>("PRAGMA table_info(document)") - .toArray() - .map((col) => col.name); - - if (!columns.includes("last_activity")) { - this.ctx.storage.sql.exec( - "ALTER TABLE document ADD COLUMN last_activity INTEGER NOT NULL DEFAULT 0", - ); - // Set last_activity to now for existing documents - this.ctx.storage.sql.exec("UPDATE document SET last_activity = ? WHERE id = 1", Date.now()); - } - - // Load existing content or initialize with empty + // Load existing state const result = this.ctx.storage.sql - .exec<{ content: string }>("SELECT content FROM document WHERE id = 1") + .exec<{ state: ArrayBuffer | null }>("SELECT state FROM document WHERE id = 1") .toArray(); - if (result.length > 0) { - this.content = result[0].content; + if (result.length > 0 && result[0].state) { + try { + const state = new Uint8Array(result[0].state); + Y.applyUpdate(this.doc, state); + // Document has existing content, mark as initialized + this.documentInitialized = true; + } catch (error) { + console.error("Failed to load document state:", error); + } } else { - // Insert initial empty document with current timestamp + // Insert initial empty document this.ctx.storage.sql.exec( - "INSERT INTO document (id, content, last_activity) VALUES (1, ?, ?)", - "", + "INSERT INTO document (id, state, last_activity) VALUES (1, NULL, ?)", Date.now(), ); - this.content = ""; + } + }); + + // Listen for document updates to schedule checkpoints + this.doc.on("update", () => { + if (!this.checkpointScheduled) { + this.ctx.storage.setAlarm(Date.now() + CHECKPOINT_INTERVAL_MS); + this.checkpointScheduled = true; } }); } @@ -94,25 +98,65 @@ export class CollaborativeDocument extends DurableObject { const userImage = request.headers.get("X-User-Image"); const userId = request.headers.get("X-User-Id"); + // Extract initial content if provided (for initialization) + const initialContent = request.headers.get("X-Initial-Content"); + // Create WebSocket pair const pair = new WebSocketPair(); const [client, server] = Object.values(pair); - // Accept the WebSocket with Hibernation API and store user info + // Accept the WebSocket with Hibernation API const tags: string[] = []; if (userName) tags.push(userName); if (userImage) tags.push(userImage); if (userId) tags.push(userId); this.ctx.acceptWebSocket(server, tags); + // Generate a unique connection ID for debugging/logging + const connectionId = crypto.randomUUID(); + + // Store attachment with connection info + server.serializeAttachment({ + connectionId, + userName: userName ? this.decodeUserName(userName) : undefined, + userImage: userImage || undefined, + userId: userId || undefined, + }); + + // If document is empty and initial content is provided, initialize it + // Use documentInitialized flag to prevent race conditions with multiple connections + if (!this.documentInitialized && initialContent) { + const text = this.doc.getText("content"); + if (text.length === 0) { + try { + const decoded = decodeURIComponent(escape(atob(initialContent))); + text.insert(0, decoded); + this.documentInitialized = true; + } catch { + // Ignore initialization errors + } + } else { + // Document has content now (from concurrent insert), mark as initialized + this.documentInitialized = true; + } + } + return new Response(null, { status: 101, webSocket: client, }); } + private decodeUserName(encoded: string): string { + try { + return decodeURIComponent(escape(atob(encoded))); + } catch { + return encoded; + } + } + async webSocketMessage(ws: WebSocket, message: string | ArrayBuffer) { - // Check message size before parsing to prevent memory exhaustion + // Check message size const messageSize = typeof message === "string" ? message.length : message.byteLength; if (messageSize > MAX_MESSAGE_SIZE) { ws.close(1009, "Message too big"); @@ -120,107 +164,43 @@ export class CollaborativeDocument extends DurableObject { } try { - const data = JSON.parse(message.toString()) as Message; - - switch (data.type) { - case "init": { - // Generate a unique connection ID for this client - const connectionId = crypto.randomUUID(); - - // Get user info from websocket tags (set during accept) - const tags = this.ctx.getTags(ws); - const userNameEncoded = tags[0]; - const userImage = tags[1]; - - // Decode base64 user name - let userName: string | undefined; - if (userNameEncoded) { - try { - userName = decodeURIComponent(escape(atob(userNameEncoded))); - } catch { - userName = userNameEncoded; // Fallback to encoded value - } + // Handle binary Yjs messages + const data = + message instanceof ArrayBuffer + ? new Uint8Array(message) + : new TextEncoder().encode(message as string); + + const decoder = decoding.createDecoder(data); + const messageType = decoding.readVarUint(decoder); + + switch (messageType) { + case MESSAGE_SYNC: { + // Handle sync protocol + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, MESSAGE_SYNC); + const syncMessageType = syncProtocol.readSyncMessage(decoder, encoder, this.doc, null); + + // If we have a response to send + if (encoding.length(encoder) > 1) { + ws.send(encoding.toUint8Array(encoder)); } - ws.serializeAttachment({ - connectionId, - userName: userName || undefined, - userImage: userImage || undefined, - }); - - // If DO has no content and client sent content, initialize with it - if (!this.content && data.content) { - this.content = data.content; - this.ctx.storage.sql.exec("UPDATE document SET content = ? WHERE id = 1", this.content); - } - - // Send current document state to client - ws.send( - JSON.stringify({ - type: "init", - content: this.content, - connectionId, - }), - ); - break; - } - - case "change": { - // Apply and broadcast document changes - const { changes } = data; - if (changes) { - // Apply change to content - const before = this.content.slice(0, changes.from); - const after = this.content.slice(changes.to); - this.content = before + changes.insert + after; - - // Schedule checkpoint alarm if not already scheduled - if (!this.checkpointScheduled) { - this.ctx.storage.setAlarm(Date.now() + 30000); - this.checkpointScheduled = true; - } - - // Get connection ID from attachment - const attachment = ws.deserializeAttachment() as { - connectionId: string; - userName?: string; - userImage?: string; - } | null; - - // Broadcast to all clients (including sender) with connectionId - this.broadcast( - { - type: "change", - changes, - connectionId: attachment?.connectionId, - }, - ws, - ); + // If this was a sync step 2 or update, broadcast to other clients + if (syncMessageType === syncProtocol.messageYjsUpdate) { + this.broadcastUpdate(data, ws); } break; } - case "cursor": { - // Broadcast cursor position to all other clients with connection ID and user info - if (data.position !== undefined) { - const attachment = ws.deserializeAttachment() as { - connectionId: string; - userName?: string; - userImage?: string; - } | null; - - this.broadcast( - { - type: "cursor", - position: data.position, - selection: data.selection, - connectionId: attachment?.connectionId, - userName: data.userName || attachment?.userName, - userImage: data.userImage || attachment?.userImage, - }, - ws, - ); - } + case MESSAGE_AWARENESS: { + // Handle awareness protocol + // The awareness protocol handles client presence automatically + // Clients send their own state updates and cleanup is handled via timeouts + awarenessProtocol.applyAwarenessUpdate( + this.awareness, + decoding.readVarUint8Array(decoder), + ws, + ); break; } } @@ -229,32 +209,41 @@ export class CollaborativeDocument extends DurableObject { } } - async webSocketClose(ws: WebSocket) { - const attachment = ws.deserializeAttachment() as { - connectionId: string; - userName?: string; - userImage?: string; - } | null; - - if (attachment?.connectionId) { - this.broadcast({ - type: "cursor-leave", - connectionId: attachment.connectionId, - }); + async webSocketOpen(ws: WebSocket) { + // Send sync step 1 (document state) to new client + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, MESSAGE_SYNC); + syncProtocol.writeSyncStep1(encoder, this.doc); + ws.send(encoding.toUint8Array(encoder)); + + // Send current awareness state + const awarenessStates = this.awareness.getStates(); + if (awarenessStates.size > 0) { + const awarenessEncoder = encoding.createEncoder(); + encoding.writeVarUint(awarenessEncoder, MESSAGE_AWARENESS); + encoding.writeVarUint8Array( + awarenessEncoder, + awarenessProtocol.encodeAwarenessUpdate(this.awareness, Array.from(awarenessStates.keys())), + ); + ws.send(encoding.toUint8Array(awarenessEncoder)); } + } + + async webSocketClose(ws: WebSocket) { + // Note: We don't manually remove awareness states here. + // The Yjs awareness protocol handles cleanup automatically: + // 1. Clients send awareness updates with null state before disconnecting + // 2. The awareness protocol has built-in timeout mechanisms for stale clients + // Manual cleanup would require knowing the client's Yjs clientID, which is + // generated client-side and not accessible from server-assigned connection IDs. // Check if this was the last client const remainingClients = this.ctx.getWebSockets().filter((c) => c !== ws); if (remainingClients.length === 0) { - // Persist content and last activity to SQLite before hibernation - this.ctx.storage.sql.exec( - "UPDATE document SET content = ?, last_activity = ? WHERE id = 1", - this.content, - Date.now(), - ); - this.checkpointScheduled = false; + // Persist state before hibernation + await this.checkpoint(); - // Schedule cleanup alarm for inactive document deletion + // Schedule cleanup alarm this.ctx.storage.setAlarm(Date.now() + CLEANUP_AFTER_MS); } } @@ -269,15 +258,11 @@ export class CollaborativeDocument extends DurableObject { if (connectedClients > 0) { // Clients connected: this is a checkpoint alarm - this.ctx.storage.sql.exec( - "UPDATE document SET content = ?, last_activity = ? WHERE id = 1", - this.content, - Date.now(), - ); + await this.checkpoint(); this.checkpointScheduled = false; // Reschedule checkpoint - this.ctx.storage.setAlarm(Date.now() + 30000); + this.ctx.storage.setAlarm(Date.now() + CHECKPOINT_INTERVAL_MS); this.checkpointScheduled = true; } else { // No clients: check if document is inactive and should be cleaned up @@ -301,20 +286,49 @@ export class CollaborativeDocument extends DurableObject { } } - private broadcast(message: Message, exclude?: WebSocket) { - const payload = JSON.stringify(message); + private async checkpoint() { + const state = Y.encodeStateAsUpdate(this.doc); + this.ctx.storage.sql.exec( + "UPDATE document SET state = ?, last_activity = ? WHERE id = 1", + state, + Date.now(), + ); + } + private broadcastUpdate(message: Uint8Array, exclude?: WebSocket) { for (const client of this.ctx.getWebSockets()) { if (client === exclude) continue; if (client.readyState === WebSocket.OPEN) { try { - client.send(payload); + client.send(message); } catch (error) { console.error("Broadcast error:", error); } } } } + + private broadcastAwarenessUpdate(changedClients: number[]) { + if (changedClients.length === 0) return; + + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, MESSAGE_AWARENESS); + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients), + ); + const message = encoding.toUint8Array(encoder); + + for (const client of this.ctx.getWebSockets()) { + if (client.readyState === WebSocket.OPEN) { + try { + client.send(message); + } catch (error) { + console.error("Awareness broadcast error:", error); + } + } + } + } } export default { diff --git a/packages/ui/package.json b/packages/ui/package.json index 7eee5b0..508815f 100644 --- a/packages/ui/package.json +++ b/packages/ui/package.json @@ -19,6 +19,10 @@ }, "devDependencies": { "@codemirror/commands": "^6.10.0", + "y-codemirror.next": "^0.3.5", + "y-protocols": "^1.0.6", + "yjs": "^13.6.20", + "lib0": "^0.2.99", "@codemirror/lang-markdown": "^6.5.0", "@codemirror/state": "^6.5.2", "@codemirror/view": "^6.39.4", diff --git a/packages/ui/src/lib/__tests__/codemirror.test.ts b/packages/ui/src/lib/__tests__/codemirror.test.ts index ace45c4..7f8f41c 100644 --- a/packages/ui/src/lib/__tests__/codemirror.test.ts +++ b/packages/ui/src/lib/__tests__/codemirror.test.ts @@ -119,58 +119,6 @@ describe("CodeMirror", () => { expect(cmContent?.textContent).toContain("bold"); }); - it("handles empty remoteCursors and remoteSelections", async () => { - const { container } = render(CodeMirror, { - props: { - value: "Test content", - remoteCursors: [], - remoteSelections: [], - }, - }); - - await vi.waitFor(() => { - const cmContent = container.querySelector(".cm-content"); - expect(cmContent).not.toBeNull(); - }); - }); - - it("handles remote cursors with position and color", async () => { - const { container } = render(CodeMirror, { - props: { - value: "Hello World", - remoteCursors: [ - { position: 5, color: "#ff0000", userName: "Alice" }, - { position: 8, color: "#00ff00", userName: "Bob" }, - ], - }, - }); - - await vi.waitFor(() => { - const cmContent = container.querySelector(".cm-content"); - expect(cmContent).not.toBeNull(); - }); - - // Remote cursors should be rendered as decorations - // This is a basic test - remote cursor widgets are created via CodeMirror extensions - }); - - it("handles remote selections array prop", async () => { - // Test that the component accepts remoteSelections prop - // Note: Remote selections are applied via CodeMirror state effects after initialization - const { container } = render(CodeMirror, { - props: { - value: "Select this text here for testing", - remoteSelections: [], // Start with empty, selections applied via effects - }, - }); - - await vi.waitFor(() => { - const cmContent = container.querySelector(".cm-content"); - expect(cmContent).not.toBeNull(); - expect(cmContent?.textContent).toContain("Select this text"); - }); - }); - it("updates content when value prop changes", async () => { const { container, rerender } = render(CodeMirror, { props: { @@ -191,4 +139,7 @@ describe("CodeMirror", () => { expect(cmContent?.textContent).toContain("Updated content"); }); }); + + // Note: Remote cursor/selection tests removed as they are now handled + // by Yjs awareness protocol via y-codemirror.next extension }); diff --git a/packages/ui/src/lib/components/editor/CodeMirror.svelte b/packages/ui/src/lib/components/editor/CodeMirror.svelte index f4665f9..beb1870 100644 --- a/packages/ui/src/lib/components/editor/CodeMirror.svelte +++ b/packages/ui/src/lib/components/editor/CodeMirror.svelte @@ -1,24 +1,14 @@
diff --git a/packages/ui/src/lib/yjs-provider.ts b/packages/ui/src/lib/yjs-provider.ts new file mode 100644 index 0000000..8267a6a --- /dev/null +++ b/packages/ui/src/lib/yjs-provider.ts @@ -0,0 +1,214 @@ +import * as Y from "yjs"; +import * as encoding from "lib0/encoding"; +import * as decoding from "lib0/decoding"; +import * as syncProtocol from "y-protocols/sync"; +import * as awarenessProtocol from "y-protocols/awareness"; + +// Message types matching the server +const MESSAGE_SYNC = 0; +const MESSAGE_AWARENESS = 1; + +export interface YjsProviderOptions { + url: string; + doc: Y.Doc; + awareness: awarenessProtocol.Awareness; + initialContent?: string; + onConnect?: () => void; + onDisconnect?: () => void; + onSynced?: () => void; +} + +export class YjsWebSocketProvider { + private ws: WebSocket | null = null; + private doc: Y.Doc; + private awareness: awarenessProtocol.Awareness; + private url: string; + private initialContent?: string; + private synced = false; + private reconnectTimeout: ReturnType | null = null; + private connected = false; + private destroyed = false; + + public onConnect?: () => void; + public onDisconnect?: () => void; + public onSynced?: () => void; + + constructor(options: YjsProviderOptions) { + this.url = options.url; + this.doc = options.doc; + this.awareness = options.awareness; + this.initialContent = options.initialContent; + this.onConnect = options.onConnect; + this.onDisconnect = options.onDisconnect; + this.onSynced = options.onSynced; + + // Set up document update handler + this.doc.on("update", this.handleDocUpdate); + + // Set up awareness update handler + this.awareness.on("update", this.handleAwarenessUpdate); + + // Connect + this.connect(); + } + + private connect() { + if (this.destroyed) return; + + try { + this.ws = new WebSocket(this.url); + this.ws.binaryType = "arraybuffer"; + + this.ws.onopen = () => { + this.connected = true; + this.onConnect?.(); + + // Send sync step 1 (request state from server) + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, MESSAGE_SYNC); + syncProtocol.writeSyncStep1(encoder, this.doc); + this.ws?.send(encoding.toUint8Array(encoder)); + + // Send local awareness state + const awarenessEncoder = encoding.createEncoder(); + encoding.writeVarUint(awarenessEncoder, MESSAGE_AWARENESS); + encoding.writeVarUint8Array( + awarenessEncoder, + awarenessProtocol.encodeAwarenessUpdate(this.awareness, [this.doc.clientID]), + ); + this.ws?.send(encoding.toUint8Array(awarenessEncoder)); + }; + + this.ws.onmessage = (event) => { + const data = new Uint8Array(event.data); + const decoder = decoding.createDecoder(data); + const messageType = decoding.readVarUint(decoder); + + switch (messageType) { + case MESSAGE_SYNC: { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, MESSAGE_SYNC); + const syncMessageType = syncProtocol.readSyncMessage(decoder, encoder, this.doc, this); + + // Send response if any + if (encoding.length(encoder) > 1) { + this.ws?.send(encoding.toUint8Array(encoder)); + } + + // If this was sync step 2, we're now synced + if (syncMessageType === syncProtocol.messageYjsSyncStep2) { + if (!this.synced) { + this.synced = true; + this.onSynced?.(); + } + } + break; + } + + case MESSAGE_AWARENESS: { + awarenessProtocol.applyAwarenessUpdate( + this.awareness, + decoding.readVarUint8Array(decoder), + this, + ); + break; + } + } + }; + + this.ws.onclose = () => { + this.connected = false; + this.synced = false; + this.onDisconnect?.(); + + // Reconnect after delay + if (!this.destroyed) { + this.reconnectTimeout = setTimeout(() => { + this.connect(); + }, 2000); + } + }; + + this.ws.onerror = (error) => { + console.error("WebSocket error:", error); + }; + } catch (error) { + console.error("Failed to connect:", error); + // Retry connection + if (!this.destroyed) { + this.reconnectTimeout = setTimeout(() => { + this.connect(); + }, 2000); + } + } + } + + private handleDocUpdate = (update: Uint8Array, origin: unknown) => { + // Don't send updates that came from the WebSocket + if (origin === this) return; + + if (this.ws?.readyState === WebSocket.OPEN) { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, MESSAGE_SYNC); + syncProtocol.writeUpdate(encoder, update); + this.ws.send(encoding.toUint8Array(encoder)); + } + }; + + private handleAwarenessUpdate = ({ + added, + updated, + removed, + }: { + added: number[]; + updated: number[]; + removed: number[]; + }) => { + const changedClients = added.concat(updated, removed); + + if (this.ws?.readyState === WebSocket.OPEN) { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, MESSAGE_AWARENESS); + encoding.writeVarUint8Array( + encoder, + awarenessProtocol.encodeAwarenessUpdate(this.awareness, changedClients), + ); + this.ws.send(encoding.toUint8Array(encoder)); + } + }; + + public isConnected(): boolean { + return this.connected; + } + + public isSynced(): boolean { + return this.synced; + } + + public destroy() { + this.destroyed = true; + + // Clean up timeout + if (this.reconnectTimeout) { + clearTimeout(this.reconnectTimeout); + } + + // Remove awareness state + awarenessProtocol.removeAwarenessStates( + this.awareness, + [this.doc.clientID], + "provider destroyed", + ); + + // Remove event listeners + this.doc.off("update", this.handleDocUpdate); + this.awareness.off("update", this.handleAwarenessUpdate); + + // Close WebSocket + if (this.ws) { + this.ws.onclose = null; // Prevent reconnect + this.ws.close(); + this.ws = null; + } + } +} diff --git a/packages/ui/src/routes/[org]/[repo]/blob/[...rest]/+page.svelte b/packages/ui/src/routes/[org]/[repo]/blob/[...rest]/+page.svelte index 27ffae5..1788674 100644 --- a/packages/ui/src/routes/[org]/[repo]/blob/[...rest]/+page.svelte +++ b/packages/ui/src/routes/[org]/[repo]/blob/[...rest]/+page.svelte @@ -7,8 +7,10 @@ import { Input } from "$lib/components/ui/input"; import { Textarea } from "$lib/components/ui/textarea"; import { onDestroy } from "svelte"; - import { SvelteMap } from "svelte/reactivity"; import CodeMirror, { type SelectionInfo } from "$lib/components/editor/CodeMirror.svelte"; + import * as Y from "yjs"; + import { Awareness } from "y-protocols/awareness"; + import { YjsWebSocketProvider } from "$lib/yjs-provider"; import FrontmatterEditor from "$lib/components/editor/FrontmatterEditor.svelte"; import { updateContributionState, @@ -337,27 +339,21 @@ let fileData = $state(null); let content = $state(""); let originalContent = $state(""); - let ws = $state(null); let wsConnectionStatus = $state<"connected" | "connecting" | "disconnected">("disconnected"); - let isRemoteUpdate = $state(false); - let lastValue = $state(""); let hasStartedEditing = $state(false); + // Yjs collaborative editing state + let yjsDoc = $state(undefined); + let yjsProvider = $state(undefined); + let awareness = $state(undefined); + let yText = $state(undefined); + // Frontmatter state let frontmatterFields = $state([]); let hasFrontmatter = $state(false); let bodyContent = $state(""); // Content without frontmatter // Editor content - what's shown in CodeMirror (bodyContent when hasFrontmatter, else content) let editorContent = $state(""); - const remoteCursors = new SvelteMap< - string, - { position: number; color: string; userName?: string } - >(); - const remoteSelections = new SvelteMap< - string, - { from: number; to: number; color: string; userName?: string } - >(); - let myConnectionId = $state(""); let editorRef: ReturnType | null = $state(null); let hasUnsavedChanges = $derived(content !== originalContent); @@ -395,7 +391,6 @@ fileData = result.fileData; content = result.fileData.content; originalContent = result.fileData.content; - lastValue = result.fileData.content; // Parse frontmatter if present const parsed = parseFrontmatter(result.fileData.content); @@ -422,16 +417,17 @@ lastPath = data.path; - // Close existing WebSocket connection before switching files - if (ws) { - ws.onclose = null; // Prevent reconnect logic - ws.close(); - ws = null; + // Clean up existing Yjs provider before switching files + if (yjsProvider) { + yjsProvider.destroy(); + yjsProvider = undefined; } - - // Clear remote cursors/selections from previous file - remoteCursors.clear(); - remoteSelections.clear(); + if (yjsDoc) { + yjsDoc.destroy(); + yjsDoc = undefined; + } + yText = undefined; + awareness = undefined; // Reset comment state selectedThread = null; @@ -449,7 +445,6 @@ fileData = fileResult.fileData; content = fileResult.fileData.content; originalContent = fileResult.fileData.content; - lastValue = fileResult.fileData.content; // Parse frontmatter if present const parsedFm = parseFrontmatter(fileResult.fileData.content); @@ -458,13 +453,12 @@ bodyContent = parsedFm.content; editorContent = parsedFm.hasFrontmatter ? parsedFm.content : fileResult.fileData.content; - // Reconnect WebSocket for new file (will happen via the session effect) + // Reconnect Yjs for new file (will happen via the session effect) } else if (fileResult.error) { // Handle error - reset file data fileData = null; content = ""; originalContent = ""; - lastValue = ""; hasFrontmatter = false; frontmatterFields = []; bodyContent = ""; @@ -647,7 +641,7 @@ $effect(() => { // Re-connect when session becomes available (if not already connected) - if (fileData && $session.data && !ws) { + if (fileData && $session.data && !yjsProvider) { connect(); } }); @@ -658,125 +652,98 @@ // Only connect if user is logged in if (!$session.data) return; - // Close existing connection before creating a new one - if (ws) { - ws.onclose = null; // Prevent reconnect logic - ws.close(); - ws = null; + // Clean up existing provider before creating a new one + if (yjsProvider) { + yjsProvider.destroy(); + yjsProvider = undefined; } + if (yjsDoc) { + yjsDoc.destroy(); + } + + // Create new Yjs document + yjsDoc = new Y.Doc(); + yText = yjsDoc.getText("content"); + awareness = new Awareness(yjsDoc); + + // Set local awareness state with user info and color + const userColor = getUserColor($session.data.user.id || $session.data.user.name || "anonymous"); + awareness.setLocalStateField("user", { + name: $session.data.user.name || "Anonymous", + color: userColor.color, + colorLight: userColor.light, + }); + + // Listen for Y.Text changes to sync with content state + yText.observe(() => { + const newContent = yText!.toString(); + if (newContent !== content) { + content = newContent; + // Update frontmatter state + const parsed = parseFrontmatter(newContent); + hasFrontmatter = parsed.hasFrontmatter; + frontmatterFields = parsed.frontmatter; + bodyContent = parsed.content; + editorContent = parsed.hasFrontmatter ? parsed.content : newContent; + + // Sync to sandbox for HMR + scheduleSyncToSandbox(newContent); + } + }); const protocol = window.location.protocol === "https:" ? "wss:" : "ws:"; const wsUrl = `${protocol}//${window.location.host}/${org}/${repo}/blob/${branch}/${path}/ws`; wsConnectionStatus = "connecting"; - ws = new WebSocket(wsUrl); - - ws.onopen = () => { - wsConnectionStatus = "connected"; - ws?.send(JSON.stringify({ type: "init", content: content })); - }; - ws.onmessage = (event) => { - const data = JSON.parse(event.data); - - switch (data.type) { - case "init": { - if (data.connectionId) { - myConnectionId = data.connectionId; - } - - // Sync to server content if it differs (collaborative changes on reload) - if (data.content !== undefined && data.content !== content) { - isRemoteUpdate = true; - content = data.content; - lastValue = data.content; - - // Update all editor state to reflect server content - const parsed = parseFrontmatter(data.content); - hasFrontmatter = parsed.hasFrontmatter; - frontmatterFields = parsed.frontmatter; - bodyContent = parsed.content; - editorContent = parsed.hasFrontmatter ? parsed.content : data.content; - - isRemoteUpdate = false; - } - break; + yjsProvider = new YjsWebSocketProvider({ + url: wsUrl, + doc: yjsDoc, + awareness: awareness, + initialContent: content, + onConnect: () => { + wsConnectionStatus = "connected"; + }, + onDisconnect: () => { + wsConnectionStatus = "disconnected"; + }, + onSynced: () => { + // When synced, update content state from Y.Text if different + const syncedContent = yText!.toString(); + if (syncedContent && syncedContent !== content) { + content = syncedContent; + const parsed = parseFrontmatter(syncedContent); + hasFrontmatter = parsed.hasFrontmatter; + frontmatterFields = parsed.frontmatter; + bodyContent = parsed.content; + editorContent = parsed.hasFrontmatter ? parsed.content : syncedContent; } + }, + }); + } - case "change": { - // Skip our own changes - if (data.connectionId && data.connectionId === myConnectionId) { - break; - } - - const { from, to, insert } = data.changes; - // Apply incremental change directly to CodeMirror - // This also updates the bound content value - editorRef?.applyRemoteChange({ from, to, insert }); - lastValue = content; - - // Sync remote changes to sandbox for HMR - scheduleSyncToSandbox(content); - break; - } - - case "cursor": { - if ( - data.position !== undefined && - data.connectionId && - data.connectionId !== myConnectionId - ) { - const color = getCursorColor(data.connectionId); - remoteCursors.set(data.connectionId, { - position: data.position, - color, - userName: data.userName, - }); - - // If selection data is included - if (data.selection && data.selection.from !== data.selection.to) { - remoteSelections.set(data.connectionId, { - from: data.selection.from, - to: data.selection.to, - color, - userName: data.userName, - }); - } else { - // No selection, remove it - remoteSelections.delete(data.connectionId); - } - } - break; - } - - case "cursor-leave": { - if (data.connectionId) { - remoteCursors.delete(data.connectionId); - remoteSelections.delete(data.connectionId); - } - break; - } - } - }; - - ws.onclose = () => { - wsConnectionStatus = "disconnected"; - // Only reconnect if still logged in - if ($session.data) { - setTimeout(connect, 2000); - } - }; + // Helper function to get a consistent color for a user + function getUserColor(userId: string): { color: string; light: string } { + const colors = [ + { color: "#3b82f6", light: "#3b82f620" }, // blue + { color: "#ef4444", light: "#ef444420" }, // red + { color: "#10b981", light: "#10b98120" }, // green + { color: "#f59e0b", light: "#f59e0b20" }, // orange + { color: "#8b5cf6", light: "#8b5cf620" }, // purple + { color: "#ec4899", light: "#ec489920" }, // pink + { color: "#14b8a6", light: "#14b8a620" }, // teal + ]; - ws.onerror = (error) => { - wsConnectionStatus = "disconnected"; - console.error("WebSocket error:", error); - }; + let hash = 0; + for (let i = 0; i < userId.length; i++) { + hash = userId.charCodeAt(i) + ((hash << 5) - hash); + } + return colors[Math.abs(hash) % colors.length]; } function handleEditorChange(newValue: string) { - if (isRemoteUpdate || !ws || ws.readyState !== WebSocket.OPEN) { - return; - } + // In Yjs mode, changes are automatically synced through Y.Text + // This handler is only used for PostHog tracking and non-Yjs content sync // Capture PostHog event when user starts editing if (!hasStartedEditing) { @@ -786,71 +753,28 @@ }); } - // Simple diff: find where the change occurred - let from = 0; - while ( - from < lastValue.length && - from < newValue.length && - lastValue[from] === newValue[from] - ) { - from++; - } - - let lastEnd = lastValue.length; - let newEnd = newValue.length; - while (lastEnd > from && newEnd > from && lastValue[lastEnd - 1] === newValue[newEnd - 1]) { - lastEnd--; - newEnd--; - } - - const insert = newValue.slice(from, newEnd); - - // Send change - ws.send( - JSON.stringify({ - type: "change", - changes: { from, to: lastEnd, insert }, - }), - ); - - lastValue = newValue; - - // Sync to sandbox for HMR if preview is running - scheduleSyncToSandbox(newValue); + // Update content state (Yjs observer will handle the rest) + content = newValue; } - function handleCursorChange(position: number, selection?: { from: number; to: number }) { - if (!ws || ws.readyState !== WebSocket.OPEN) { - return; - } - - ws.send( - JSON.stringify({ - type: "cursor", - position, - selection, - userName: $session.data?.user.name, - }), - ); - } + // Note: Cursor changes are now handled automatically by Yjs awareness protocol + // via the y-codemirror.next extension function handleFrontmatterChange(fields: FrontmatterField[]) { frontmatterFields = fields; // Recombine the document with updated frontmatter const newContent = combineDocument(fields, bodyContent); - content = newContent; - lastValue = newContent; - - // Broadcast the change to other connected clients - if (ws && ws.readyState === WebSocket.OPEN) { - ws.send( - JSON.stringify({ - type: "change", - changes: { from: 0, to: lastValue.length, insert: newContent }, - }), - ); + + // Update Y.Text with the new full content + if (yText) { + yjsDoc?.transact(() => { + yText!.delete(0, yText!.length); + yText!.insert(0, newContent); + }); } + content = newContent; + // Sync to sandbox for HMR if preview is running scheduleSyncToSandbox(newContent); } @@ -867,24 +791,6 @@ } } - function getCursorColor(connectionId: string): string { - const colors = [ - "rgba(59, 130, 246, 0.8)", // blue - "rgba(239, 68, 68, 0.8)", // red - "rgba(16, 185, 129, 0.8)", // green - "rgba(245, 158, 11, 0.8)", // orange - "rgba(139, 92, 246, 0.8)", // purple - "rgba(236, 72, 153, 0.8)", // pink - "rgba(20, 184, 166, 0.8)", // teal - ]; - - let hash = 0; - for (let i = 0; i < connectionId.length; i++) { - hash = connectionId.charCodeAt(i) + ((hash << 5) - hash); - } - return colors[Math.abs(hash) % colors.length]; - } - function handleEditBlocked() { if (!$session.data) { // User is not logged in @@ -901,9 +807,15 @@ "Are you sure you want to reset to the original GitHub content? All unsaved changes will be lost.", ) ) { - isRemoteUpdate = true; + // Update Y.Text with the original content (will sync to all clients via Yjs) + if (yText && yjsDoc) { + yjsDoc.transact(() => { + yText!.delete(0, yText!.length); + yText!.insert(0, originalContent); + }); + } + content = originalContent; - lastValue = originalContent; // Re-parse frontmatter from original content const parsed = parseFrontmatter(originalContent); @@ -911,18 +823,6 @@ frontmatterFields = parsed.frontmatter; bodyContent = parsed.content; editorContent = parsed.hasFrontmatter ? parsed.content : originalContent; - - isRemoteUpdate = false; - - // Broadcast the reset to other connected clients - if (ws && ws.readyState === WebSocket.OPEN) { - ws.send( - JSON.stringify({ - type: "change", - changes: { from: 0, to: lastValue.length, insert: originalContent }, - }), - ); - } } } @@ -983,8 +883,11 @@ } onDestroy(() => { - if (ws) { - ws.close(); + if (yjsProvider) { + yjsProvider.destroy(); + } + if (yjsDoc) { + yjsDoc.destroy(); } if (syncTimeoutId) { clearTimeout(syncTimeoutId); @@ -1315,15 +1218,14 @@ : newValue, ); }} - oncursorchange={handleCursorChange} oneditblocked={handleEditBlocked} - remoteCursors={Array.from(remoteCursors.values())} - remoteSelections={Array.from(remoteSelections.values())} {prComments} onCommentClick={handleCommentClick} onselectionchange={handleSelectionChange} canAddComments={!!existingPR && !!$session.data} readonly={!$session.data || !canEdit} + {yText} + {awareness} /> {/await} @@ -1379,15 +1281,14 @@ hasFrontmatter ? combineDocument(frontmatterFields, newValue) : newValue, ); }} - oncursorchange={handleCursorChange} oneditblocked={handleEditBlocked} - remoteCursors={Array.from(remoteCursors.values())} - remoteSelections={Array.from(remoteSelections.values())} {prComments} onCommentClick={handleCommentClick} onselectionchange={handleSelectionChange} canAddComments={!!existingPR && !!$session.data} readonly={!$session.data || !canEdit} + {yText} + {awareness} /> {/await} @@ -1436,15 +1337,14 @@ hasFrontmatter ? combineDocument(frontmatterFields, newValue) : newValue, ); }} - oncursorchange={handleCursorChange} oneditblocked={handleEditBlocked} - remoteCursors={Array.from(remoteCursors.values())} - remoteSelections={Array.from(remoteSelections.values())} {prComments} onCommentClick={handleCommentClick} onselectionchange={handleSelectionChange} canAddComments={!!existingPR && !!$session.data} readonly={!$session.data || !canEdit} + {yText} + {awareness} /> {/await} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8159846..e2c0943 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -48,6 +48,16 @@ importers: version: 8.49.0(eslint@9.39.2(jiti@2.6.1))(typescript@5.9.3) packages/collab: + dependencies: + lib0: + specifier: ^0.2.99 + version: 0.2.117 + y-protocols: + specifier: ^1.0.6 + version: 1.0.7(yjs@13.6.29) + yjs: + specifier: ^13.6.20 + version: 13.6.29 devDependencies: typescript: specifier: ^5.5.2 @@ -139,6 +149,9 @@ importers: codemirror: specifier: ^6.0.2 version: 6.0.2 + lib0: + specifier: ^0.2.99 + version: 0.2.117 mode-watcher: specifier: ^1.1.0 version: 1.1.0(svelte@5.46.0) @@ -199,6 +212,15 @@ importers: wrangler: specifier: ^4.54.0 version: 4.54.0(@cloudflare/workers-types@4.20251213.0) + y-codemirror.next: + specifier: ^0.3.5 + version: 0.3.5(@codemirror/state@6.5.2)(@codemirror/view@6.39.4)(yjs@13.6.29) + y-protocols: + specifier: ^1.0.6 + version: 1.0.7(yjs@13.6.29) + yjs: + specifier: ^13.6.20 + version: 13.6.29 packages: @@ -2402,6 +2424,9 @@ packages: isexe@2.0.0: resolution: {integrity: sha512-RHxMLp9lnKHGHRng9QFhRCMbYAcVpn69smSGcq3f36xjgVVWThj4qqLbTLlq7Ssj8B+fIQ1EuCEGI2lKsyQeIw==} + isomorphic.js@0.2.5: + resolution: {integrity: sha512-PIeMbHqMt4DnUP3MA/Flc0HElYjMXArsw1qwJZcm9sqR8mq3l8NYizFMty0pWwE/tzIGH3EKK5+jes5mAr85yw==} + jiti@2.6.1: resolution: {integrity: sha512-ekilCSN1jwRvIbgeg/57YFh8qQDNbwDb9xT/qu2DAHbFFZUicIl4ygVaAvzveMhMVr3LnpSKTNnwt8PoOfmKhQ==} hasBin: true @@ -2457,6 +2482,11 @@ packages: resolution: {integrity: sha512-+bT2uH4E5LGE7h/n3evcS/sQlJXCpIp6ym8OWJ5eV6+67Dsql/LaaT7qJBAt2rzfoa/5QBGBhxDix1dMt2kQKQ==} engines: {node: '>= 0.8.0'} + lib0@0.2.117: + resolution: {integrity: sha512-DeXj9X5xDCjgKLU/7RR+/HQEVzuuEUiwldwOGsHK/sfAfELGWEyTcf0x+uOvCvK3O2zPmZePXWL85vtia6GyZw==} + engines: {node: '>=16'} + hasBin: true + lightningcss-android-arm64@1.30.2: resolution: {integrity: sha512-BH9sEdOCahSgmkVhBLeU7Hc9DWeZ1Eb6wNS6Da8igvUwAe0sqROHddIlvU06q3WyXVEOYDZ6ykBZQnjTbmo4+A==} engines: {node: '>= 12.0.0'} @@ -3462,10 +3492,27 @@ packages: utf-8-validate: optional: true + y-codemirror.next@0.3.5: + resolution: {integrity: sha512-VluNu3e5HfEXybnypnsGwKAj+fKLd4iAnR7JuX1Sfyydmn1jCBS5wwEL/uS04Ch2ib0DnMAOF6ZRR/8kK3wyGw==} + peerDependencies: + '@codemirror/state': ^6.0.0 + '@codemirror/view': ^6.0.0 + yjs: ^13.5.6 + + y-protocols@1.0.7: + resolution: {integrity: sha512-YSVsLoXxO67J6eE/nV4AtFtT3QEotZf5sK5BHxFBXso7VDUT3Tx07IfA6hsu5Q5OmBdMkQVmFZ9QOA7fikWvnw==} + engines: {node: '>=16.0.0', npm: '>=8.0.0'} + peerDependencies: + yjs: ^13.0.0 + yaml@1.10.2: resolution: {integrity: sha512-r3vXyErRCYJ7wg28yvBY5VSoAF8ZvlcW9/BwUzEtUsjvX/DKs24dIkuwjtuprwJJHsbyUbLApepYTR1BN4uHrg==} engines: {node: '>= 6'} + yjs@13.6.29: + resolution: {integrity: sha512-kHqDPdltoXH+X4w1lVmMtddE3Oeqq48nM40FD5ojTd8xYhQpzIDcfE2keMSU5bAgRPJBe225WTUdyUgj1DtbiQ==} + engines: {node: '>=16.0.0', npm: '>=8.0.0'} + yocto-queue@0.1.0: resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==} engines: {node: '>=10'} @@ -5632,6 +5679,8 @@ snapshots: isexe@2.0.0: {} + isomorphic.js@0.2.5: {} + jiti@2.6.1: {} jose@6.1.3: {} @@ -5679,6 +5728,10 @@ snapshots: prelude-ls: 1.2.1 type-check: 0.4.0 + lib0@0.2.117: + dependencies: + isomorphic.js: 0.2.5 + lightningcss-android-arm64@1.30.2: optional: true @@ -6632,8 +6685,24 @@ snapshots: ws@8.18.3: {} + y-codemirror.next@0.3.5(@codemirror/state@6.5.2)(@codemirror/view@6.39.4)(yjs@13.6.29): + dependencies: + '@codemirror/state': 6.5.2 + '@codemirror/view': 6.39.4 + lib0: 0.2.117 + yjs: 13.6.29 + + y-protocols@1.0.7(yjs@13.6.29): + dependencies: + lib0: 0.2.117 + yjs: 13.6.29 + yaml@1.10.2: {} + yjs@13.6.29: + dependencies: + lib0: 0.2.117 + yocto-queue@0.1.0: {} youch-core@0.3.3: