diff --git a/apps/events/src/routes/playground.tsx b/apps/events/src/routes/playground.tsx index d298163c..ab2eb011 100644 --- a/apps/events/src/routes/playground.tsx +++ b/apps/events/src/routes/playground.tsx @@ -3,6 +3,7 @@ import { DebugSpaceEvents } from '@/components/debug-space-events'; import { DebugSpaceState } from '@/components/debug-space-state'; import { Button } from '@/components/ui/button'; import { assertExhaustive } from '@/lib/assertExhaustive'; +import { uuid } from '@automerge/automerge'; import { bytesToHex, hexToBytes } from '@noble/hashes/utils'; import { createFileRoute } from '@tanstack/react-router'; import { Effect, Exit } from 'effect'; @@ -12,6 +13,7 @@ import type { RequestAcceptInvitationEvent, RequestCreateInvitationEvent, RequestCreateSpaceEvent, + RequestCreateUpdate, RequestListInvitations, RequestListSpaces, RequestSubscribeToSpace, @@ -57,6 +59,8 @@ type SpaceStorageEntry = { events: SpaceEvent[]; state: SpaceState | undefined; keys: { id: string; key: string }[]; + updates: string[]; + lastUpdateClock: number; }; const decodeResponseMessage = Schema.decodeUnknownEither(ResponseMessage); @@ -79,6 +83,7 @@ const App = ({ const [websocketConnection, setWebsocketConnection] = useState(); const [spaces, setSpaces] = useState([]); const [invitations, setInvitations] = useState([]); + const [updatesInFlight, setUpdatesInFlight] = useState([]); // Create a stable WebSocket connection that only depends on accountId useEffect(() => { @@ -128,6 +133,8 @@ const App = ({ events: existingSpace?.events ?? [], state: existingSpace?.state, keys: existingSpace?.keys ?? [], + updates: existingSpace?.updates ?? [], + lastUpdateClock: existingSpace?.lastUpdateClock ?? -1, }; }); }); @@ -163,12 +170,29 @@ const App = ({ setSpaces((spaces) => spaces.map((space) => { if (space.id === response.id) { + let lastUpdateClock = space.lastUpdateClock; + const updates = []; + if (space.updates) { + updates.push(...space.updates); + } + if (response.updates) { + console.log('response.updates', response.updates, lastUpdateClock); + if (response.updates.firstUpdateClock === lastUpdateClock + 1) { + lastUpdateClock = response.updates.lastUpdateClock; + updates.push(...response.updates.updates); + } else { + // TODO request missing updates from server + } + } + // TODO fix readonly type issue return { ...space, events: response.events as SpaceEvent[], state: newState, keys, + lastUpdateClock, + updates, }; } return space; @@ -207,6 +231,40 @@ const App = ({ setInvitations(response.invitations.map((invitation) => invitation)); break; } + case 'update-confirmed': { + setSpaces((spaces) => + spaces.map((space) => { + if (space.id === response.spaceId && space.lastUpdateClock + 1 === response.clock) { + return { ...space, lastUpdateClock: response.clock }; + } + return space; + }), + ); + setUpdatesInFlight((updatesInFlight) => updatesInFlight.filter((id) => id !== response.ephemeralId)); + break; + } + case 'updates-notification': { + setSpaces((spaces) => + spaces.map((space) => { + if (space.id === response.spaceId) { + let lastUpdateClock = space.lastUpdateClock; + if (response.updates.firstUpdateClock === space.lastUpdateClock + 1) { + lastUpdateClock = response.updates.lastUpdateClock; + } else { + // TODO request missing updates from server + } + + return { + ...space, + updates: [...space.updates, ...response.updates.updates], + lastUpdateClock, + }; + } + return space; + }), + ); + break; + } default: assertExhaustive(response); } @@ -379,8 +437,56 @@ const App = ({ ); })} +

Updates

+ +

Updates Content

+

last update clock: {space.lastUpdateClock}

+

+ {space.updates.map((update, index) => { + return ( + // biome-ignore lint/suspicious/noArrayIndexKey: we need a unique identifier here + + {update} + + ); + })} +

+

Updates in flight

+
    + {updatesInFlight.map((updateInFlight) => { + return ( +
  • + {updateInFlight} +
  • + ); + })} +
+

State

+

Events


diff --git a/apps/server/prisma/migrations/20241122132737_add_update/migration.sql b/apps/server/prisma/migrations/20241122132737_add_update/migration.sql new file mode 100644 index 00000000..b7d98cf3 --- /dev/null +++ b/apps/server/prisma/migrations/20241122132737_add_update/migration.sql @@ -0,0 +1,9 @@ +-- CreateTable +CREATE TABLE "Update" ( + "spaceId" TEXT NOT NULL, + "clock" INTEGER NOT NULL, + "content" BLOB NOT NULL, + + PRIMARY KEY ("spaceId", "clock"), + CONSTRAINT "Update_spaceId_fkey" FOREIGN KEY ("spaceId") REFERENCES "Space" ("id") ON DELETE RESTRICT ON UPDATE CASCADE +); diff --git a/apps/server/prisma/schema.prisma b/apps/server/prisma/schema.prisma index e0cea4ad..3df7958b 100644 --- a/apps/server/prisma/schema.prisma +++ b/apps/server/prisma/schema.prisma @@ -28,6 +28,7 @@ model Space { members Account[] invitations Invitation[] keys SpaceKey[] + updates Update[] } model SpaceKey { @@ -68,3 +69,12 @@ model Invitation { @@unique([spaceId, inviteeAccountId]) } + +model Update { + space Space @relation(fields: [spaceId], references: [id]) + spaceId String + clock Int + content Bytes + + @@id([spaceId, clock]) +} diff --git a/apps/server/src/handlers/createUpdate.ts b/apps/server/src/handlers/createUpdate.ts new file mode 100644 index 00000000..6cbcf3be --- /dev/null +++ b/apps/server/src/handlers/createUpdate.ts @@ -0,0 +1,65 @@ +import { prisma } from '../prisma.js'; + +type Params = { + accountId: string; + update: string; + spaceId: string; +}; + +export const createUpdate = async ({ accountId, update, spaceId }: Params) => { + // throw error if account is not a member of the space + await prisma.space.findUniqueOrThrow({ + where: { id: spaceId, members: { some: { id: accountId } } }, + }); + + let success = false; + let retries = 0; + const maxRetries = 5; + const retryDelay = 100; // milliseconds + let result: + | { + spaceId: string; + clock: number; + content: Buffer; + } + | undefined = undefined; + + while (!success && retries < maxRetries) { + try { + result = await prisma.$transaction(async (prisma) => { + const lastUpdate = await prisma.update.findFirst({ + where: { spaceId }, + orderBy: { clock: 'desc' }, + }); + + const clock = lastUpdate ? lastUpdate.clock + 1 : 0; + + return await prisma.update.create({ + data: { + spaceId, + clock, + content: Buffer.from(update), + }, + }); + }); + success = true; + return result; + } catch (error) { + const dbError = error as { code?: string; message?: string }; + if (dbError.code === 'P2034' || dbError.code === 'P1008' || dbError.message?.includes('database is locked')) { + retries += 1; + console.warn(`Database is busy, retrying (${retries}/${maxRetries})...`); + await new Promise((resolve) => setTimeout(resolve, retryDelay)); + } else { + console.error('Database error:', error); + break; + } + } + } + + if (!result) { + throw new Error('Failed to create update'); + } + + return result; +}; diff --git a/apps/server/src/handlers/getSpace.ts b/apps/server/src/handlers/getSpace.ts index 9ed63271..14acc7f1 100644 --- a/apps/server/src/handlers/getSpace.ts +++ b/apps/server/src/handlers/getSpace.ts @@ -35,6 +35,11 @@ export const getSpace = async ({ spaceId, accountId }: Params) => { }, }, }, + updates: { + orderBy: { + clock: 'asc', + }, + }, }, }); @@ -52,5 +57,13 @@ export const getSpace = async ({ spaceId, accountId }: Params) => { id: space.id, events: space.events.map((wrapper) => JSON.parse(wrapper.event)), keyBoxes, + updates: + space.updates.length > 0 + ? { + updates: space.updates.map((update) => update.content.toString()), + firstUpdateClock: space.updates[0].clock, + lastUpdateClock: space.updates[space.updates.length - 1].clock, + } + : undefined, }; }; diff --git a/apps/server/src/handlers/listUpdates.ts b/apps/server/src/handlers/listUpdates.ts new file mode 100644 index 00000000..ea43ad57 --- /dev/null +++ b/apps/server/src/handlers/listUpdates.ts @@ -0,0 +1,26 @@ +import { prisma } from '../prisma.js'; + +type Params = { + accountId: string; + spaceId: string; + after?: number; +}; + +export const listUpdates = async ({ spaceId, accountId, after }: Params) => { + // throw error if account is not a member of the space + await prisma.space.findUniqueOrThrow({ + where: { id: spaceId, members: { some: { id: accountId } } }, + }); + + return await prisma.update.findMany({ + where: after + ? { + spaceId, + clock: { gt: after }, + } + : { spaceId }, + orderBy: { + clock: 'desc', + }, + }); +}; diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index b07bdde4..45f0dbc0 100755 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -8,6 +8,9 @@ import type { ResponseListSpaces, ResponseSpace, ResponseSpaceEvent, + ResponseUpdateConfirmed, + ResponseUpdatesNotification, + Updates, } from 'graph-framework-messages'; import { RequestMessage } from 'graph-framework-messages'; import type { SpaceEvent } from 'graph-framework-space-events'; @@ -15,6 +18,7 @@ import { applyEvent } from 'graph-framework-space-events'; import WebSocket, { WebSocketServer } from 'ws'; import { applySpaceEvent } from './handlers/applySpaceEvent.js'; import { createSpace } from './handlers/createSpace.js'; +import { createUpdate } from './handlers/createUpdate.js'; import { getSpace } from './handlers/getSpace.js'; import { listInvitations } from './handlers/listInvitations.js'; import { listSpaces } from './handlers/listSpaces.js'; @@ -66,6 +70,25 @@ function broadcastSpaceEvents({ } } +function broadcastUpdates({ + spaceId, + updates, + currentClient, +}: { spaceId: string; updates: Updates; currentClient: CustomWebSocket }) { + for (const client of webSocketServer.clients as Set) { + if (currentClient === client) continue; + + const outgoingMessage: ResponseUpdatesNotification = { + type: 'updates-notification', + updates, + spaceId, + }; + if (client.readyState === WebSocket.OPEN && client.subscribedSpaces.has(spaceId)) { + client.send(JSON.stringify(outgoingMessage)); + } + } +} + webSocketServer.on('connection', async (webSocket: CustomWebSocket, request: Request) => { const params = parse(request.url, true); if (!params.query.accountId || typeof params.query.accountId !== 'string') { @@ -159,6 +182,27 @@ webSocketServer.on('connection', async (webSocket: CustomWebSocket, request: Req broadcastSpaceEvents({ spaceId: data.spaceId, event: data.event, currentClient: webSocket }); break; } + case 'create-update': { + const update = await createUpdate({ accountId, spaceId: data.spaceId, update: data.update }); + const outgoingMessage: ResponseUpdateConfirmed = { + type: 'update-confirmed', + ephemeralId: data.ephemeralId, + clock: update.clock, + spaceId: data.spaceId, + }; + webSocket.send(JSON.stringify(outgoingMessage)); + + broadcastUpdates({ + spaceId: data.spaceId, + updates: { + updates: [update.content.toString()], + firstUpdateClock: update.clock, + lastUpdateClock: update.clock, + }, + currentClient: webSocket, + }); + break; + } default: assertExhaustive(data); break; diff --git a/packages/graph-framework-messages/src/types.ts b/packages/graph-framework-messages/src/types.ts index a262fd38..67b4fd72 100644 --- a/packages/graph-framework-messages/src/types.ts +++ b/packages/graph-framework-messages/src/types.ts @@ -6,6 +6,14 @@ import { SpaceEvent, } from 'graph-framework-space-events'; +export const Updates = Schema.Struct({ + updates: Schema.Array(Schema.String), + firstUpdateClock: Schema.Number, + lastUpdateClock: Schema.Number, +}); + +export type Updates = Schema.Schema.Type; + export const KeyBox = Schema.Struct({ accountId: Schema.String, ciphertext: Schema.String, @@ -52,6 +60,7 @@ export type RequestAcceptInvitationEvent = Schema.Schema.Type; @@ -68,6 +77,15 @@ export const RequestListInvitations = Schema.Struct({ export type RequestListInvitations = Schema.Schema.Type; +export const RequestCreateUpdate = Schema.Struct({ + type: Schema.Literal('create-update'), + update: Schema.String, + spaceId: Schema.String, + ephemeralId: Schema.String, // used to identify the confirmation message +}); + +export type RequestCreateUpdate = Schema.Schema.Type; + export const RequestMessage = Schema.Union( RequestCreateSpaceEvent, RequestCreateInvitationEvent, @@ -75,6 +93,7 @@ export const RequestMessage = Schema.Union( RequestSubscribeToSpace, RequestListSpaces, RequestListInvitations, + RequestCreateUpdate, ); export type RequestMessage = Schema.Schema.Type; @@ -118,15 +137,35 @@ export const ResponseSpace = Schema.Struct({ id: Schema.String, events: Schema.Array(SpaceEvent), keyBoxes: Schema.Array(KeyBoxWithKeyId), + updates: Schema.optional(Updates), }); export type ResponseSpace = Schema.Schema.Type; +export const ResponseUpdateConfirmed = Schema.Struct({ + type: Schema.Literal('update-confirmed'), + ephemeralId: Schema.String, + clock: Schema.Number, + spaceId: Schema.String, +}); + +export type ResponseUpdateConfirmed = Schema.Schema.Type; + +export const ResponseUpdatesNotification = Schema.Struct({ + type: Schema.Literal('updates-notification'), + updates: Updates, + spaceId: Schema.String, +}); + +export type ResponseUpdatesNotification = Schema.Schema.Type; + export const ResponseMessage = Schema.Union( ResponseListSpaces, ResponseListInvitations, ResponseSpace, ResponseSpaceEvent, + ResponseUpdateConfirmed, + ResponseUpdatesNotification, ); export type ResponseMessage = Schema.Schema.Type;