Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions apps/events/src/routes/playground.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { DebugSpaceEvents } from '@/components/debug-space-events';
import { DebugSpaceState } from '@/components/debug-space-state';
import { Button } from '@/components/ui/button';
import { assertExhaustive } from '@/lib/assertExhaustive';
import { uuid } from '@automerge/automerge';
import { bytesToHex, hexToBytes } from '@noble/hashes/utils';
import { createFileRoute } from '@tanstack/react-router';
import { Effect, Exit } from 'effect';
Expand All @@ -12,6 +13,7 @@ import type {
RequestAcceptInvitationEvent,
RequestCreateInvitationEvent,
RequestCreateSpaceEvent,
RequestCreateUpdate,
RequestListInvitations,
RequestListSpaces,
RequestSubscribeToSpace,
Expand Down Expand Up @@ -57,6 +59,8 @@ type SpaceStorageEntry = {
events: SpaceEvent[];
state: SpaceState | undefined;
keys: { id: string; key: string }[];
updates: string[];
lastUpdateClock: number;
};

const decodeResponseMessage = Schema.decodeUnknownEither(ResponseMessage);
Expand All @@ -79,6 +83,7 @@ const App = ({
const [websocketConnection, setWebsocketConnection] = useState<WebSocket>();
const [spaces, setSpaces] = useState<SpaceStorageEntry[]>([]);
const [invitations, setInvitations] = useState<Invitation[]>([]);
const [updatesInFlight, setUpdatesInFlight] = useState<string[]>([]);

// Create a stable WebSocket connection that only depends on accountId
useEffect(() => {
Expand Down Expand Up @@ -128,6 +133,8 @@ const App = ({
events: existingSpace?.events ?? [],
state: existingSpace?.state,
keys: existingSpace?.keys ?? [],
updates: existingSpace?.updates ?? [],
lastUpdateClock: existingSpace?.lastUpdateClock ?? -1,
};
});
});
Expand Down Expand Up @@ -163,12 +170,29 @@ const App = ({
setSpaces((spaces) =>
spaces.map((space) => {
if (space.id === response.id) {
let lastUpdateClock = space.lastUpdateClock;
const updates = [];
if (space.updates) {
updates.push(...space.updates);
}
if (response.updates) {
console.log('response.updates', response.updates, lastUpdateClock);
if (response.updates.firstUpdateClock === lastUpdateClock + 1) {
lastUpdateClock = response.updates.lastUpdateClock;
updates.push(...response.updates.updates);
} else {
// TODO request missing updates from server
}
}

// TODO fix readonly type issue
return {
...space,
events: response.events as SpaceEvent[],
state: newState,
keys,
lastUpdateClock,
updates,
};
}
return space;
Expand Down Expand Up @@ -207,6 +231,40 @@ const App = ({
setInvitations(response.invitations.map((invitation) => invitation));
break;
}
case 'update-confirmed': {
setSpaces((spaces) =>
spaces.map((space) => {
if (space.id === response.spaceId && space.lastUpdateClock + 1 === response.clock) {
return { ...space, lastUpdateClock: response.clock };
}
return space;
}),
);
setUpdatesInFlight((updatesInFlight) => updatesInFlight.filter((id) => id !== response.ephemeralId));
break;
}
case 'updates-notification': {
setSpaces((spaces) =>
spaces.map((space) => {
if (space.id === response.spaceId) {
let lastUpdateClock = space.lastUpdateClock;
if (response.updates.firstUpdateClock === space.lastUpdateClock + 1) {
lastUpdateClock = response.updates.lastUpdateClock;
} else {
// TODO request missing updates from server
}

return {
...space,
updates: [...space.updates, ...response.updates.updates],
lastUpdateClock,
};
}
return space;
}),
);
break;
}
default:
assertExhaustive(response);
}
Expand Down Expand Up @@ -379,8 +437,56 @@ const App = ({
</Button>
);
})}
<h3>Updates</h3>
<Button
onClick={() => {
const ephemeralId = uuid();
setUpdatesInFlight((updatesInFlight) => [...updatesInFlight, ephemeralId]);
setSpaces((currentSpaces) =>
currentSpaces.map((currentSpace) => {
if (space.id === currentSpace.id) {
return { ...currentSpace, updates: [...currentSpace.updates, 'a'] };
}
return currentSpace;
}),
);
const message: RequestCreateUpdate = {
type: 'create-update',
ephemeralId,
update: 'a',
spaceId: space.id,
};
websocketConnection?.send(JSON.stringify(message));
}}
>
Create an update
</Button>
<h3>Updates Content</h3>
<p>last update clock: {space.lastUpdateClock}</p>
<p className="text-xs">
{space.updates.map((update, index) => {
return (
// biome-ignore lint/suspicious/noArrayIndexKey: we need a unique identifier here
<span key={`${update}-${index}`} className="border border-gray-300">
{update}
</span>
);
})}
</p>
<h3>Updates in flight</h3>
<ul className="text-xs">
{updatesInFlight.map((updateInFlight) => {
return (
<li key={updateInFlight} className="border border-gray-300">
{updateInFlight}
</li>
);
})}
</ul>
<hr />
<h3>State</h3>
<DebugSpaceState state={space.state} />
<hr />
<h3>Events</h3>
<DebugSpaceEvents events={space.events} />
<hr />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- CreateTable
CREATE TABLE "Update" (
"spaceId" TEXT NOT NULL,
"clock" INTEGER NOT NULL,
"content" BLOB NOT NULL,

PRIMARY KEY ("spaceId", "clock"),
CONSTRAINT "Update_spaceId_fkey" FOREIGN KEY ("spaceId") REFERENCES "Space" ("id") ON DELETE RESTRICT ON UPDATE CASCADE
);
10 changes: 10 additions & 0 deletions apps/server/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ model Space {
members Account[]
invitations Invitation[]
keys SpaceKey[]
updates Update[]
}

model SpaceKey {
Expand Down Expand Up @@ -68,3 +69,12 @@ model Invitation {

@@unique([spaceId, inviteeAccountId])
}

model Update {
space Space @relation(fields: [spaceId], references: [id])
spaceId String
clock Int
content Bytes

@@id([spaceId, clock])
}
65 changes: 65 additions & 0 deletions apps/server/src/handlers/createUpdate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { prisma } from '../prisma.js';

type Params = {
accountId: string;
update: string;
spaceId: string;
};

export const createUpdate = async ({ accountId, update, spaceId }: Params) => {
// throw error if account is not a member of the space
await prisma.space.findUniqueOrThrow({
where: { id: spaceId, members: { some: { id: accountId } } },
});

let success = false;
let retries = 0;
const maxRetries = 5;
const retryDelay = 100; // milliseconds
let result:
| {
spaceId: string;
clock: number;
content: Buffer;
}
| undefined = undefined;

while (!success && retries < maxRetries) {
try {
result = await prisma.$transaction(async (prisma) => {
const lastUpdate = await prisma.update.findFirst({
where: { spaceId },
orderBy: { clock: 'desc' },
});

const clock = lastUpdate ? lastUpdate.clock + 1 : 0;

return await prisma.update.create({
data: {
spaceId,
clock,
content: Buffer.from(update),
},
});
});
success = true;
return result;
} catch (error) {
const dbError = error as { code?: string; message?: string };
if (dbError.code === 'P2034' || dbError.code === 'P1008' || dbError.message?.includes('database is locked')) {
retries += 1;
console.warn(`Database is busy, retrying (${retries}/${maxRetries})...`);
await new Promise((resolve) => setTimeout(resolve, retryDelay));
} else {
console.error('Database error:', error);
break;
}
}
}

if (!result) {
throw new Error('Failed to create update');
}

return result;
};
13 changes: 13 additions & 0 deletions apps/server/src/handlers/getSpace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ export const getSpace = async ({ spaceId, accountId }: Params) => {
},
},
},
updates: {
orderBy: {
clock: 'asc',
},
},
},
});

Expand All @@ -52,5 +57,13 @@ export const getSpace = async ({ spaceId, accountId }: Params) => {
id: space.id,
events: space.events.map((wrapper) => JSON.parse(wrapper.event)),
keyBoxes,
updates:
space.updates.length > 0
? {
updates: space.updates.map((update) => update.content.toString()),
firstUpdateClock: space.updates[0].clock,
lastUpdateClock: space.updates[space.updates.length - 1].clock,
}
: undefined,
};
};
26 changes: 26 additions & 0 deletions apps/server/src/handlers/listUpdates.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { prisma } from '../prisma.js';

type Params = {
accountId: string;
spaceId: string;
after?: number;
};

export const listUpdates = async ({ spaceId, accountId, after }: Params) => {
// throw error if account is not a member of the space
await prisma.space.findUniqueOrThrow({
where: { id: spaceId, members: { some: { id: accountId } } },
});

return await prisma.update.findMany({
where: after
? {
spaceId,
clock: { gt: after },
}
: { spaceId },
orderBy: {
clock: 'desc',
},
});
};
44 changes: 44 additions & 0 deletions apps/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ import type {
ResponseListSpaces,
ResponseSpace,
ResponseSpaceEvent,
ResponseUpdateConfirmed,
ResponseUpdatesNotification,
Updates,
} from 'graph-framework-messages';
import { RequestMessage } from 'graph-framework-messages';
import type { SpaceEvent } from 'graph-framework-space-events';
import { applyEvent } from 'graph-framework-space-events';
import WebSocket, { WebSocketServer } from 'ws';
import { applySpaceEvent } from './handlers/applySpaceEvent.js';
import { createSpace } from './handlers/createSpace.js';
import { createUpdate } from './handlers/createUpdate.js';
import { getSpace } from './handlers/getSpace.js';
import { listInvitations } from './handlers/listInvitations.js';
import { listSpaces } from './handlers/listSpaces.js';
Expand Down Expand Up @@ -66,6 +70,25 @@ function broadcastSpaceEvents({
}
}

function broadcastUpdates({
spaceId,
updates,
currentClient,
}: { spaceId: string; updates: Updates; currentClient: CustomWebSocket }) {
for (const client of webSocketServer.clients as Set<CustomWebSocket>) {
if (currentClient === client) continue;

const outgoingMessage: ResponseUpdatesNotification = {
type: 'updates-notification',
updates,
spaceId,
};
if (client.readyState === WebSocket.OPEN && client.subscribedSpaces.has(spaceId)) {
client.send(JSON.stringify(outgoingMessage));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, have we considered using protobuf here? May not be worth it, but curious on your thoughts

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, makes a lot of sense here

the main reason why I didn't so far: I yet haven't worked with it and didn't know which library to use or if there are best practices to consider. I created an issue and we can flesh out the details there #53

}
}
}

webSocketServer.on('connection', async (webSocket: CustomWebSocket, request: Request) => {
const params = parse(request.url, true);
if (!params.query.accountId || typeof params.query.accountId !== 'string') {
Expand Down Expand Up @@ -159,6 +182,27 @@ webSocketServer.on('connection', async (webSocket: CustomWebSocket, request: Req
broadcastSpaceEvents({ spaceId: data.spaceId, event: data.event, currentClient: webSocket });
break;
}
case 'create-update': {
const update = await createUpdate({ accountId, spaceId: data.spaceId, update: data.update });
const outgoingMessage: ResponseUpdateConfirmed = {
type: 'update-confirmed',
ephemeralId: data.ephemeralId,
clock: update.clock,
spaceId: data.spaceId,
};
webSocket.send(JSON.stringify(outgoingMessage));

broadcastUpdates({
spaceId: data.spaceId,
updates: {
updates: [update.content.toString()],
firstUpdateClock: update.clock,
lastUpdateClock: update.clock,
},
currentClient: webSocket,
});
break;
}
default:
assertExhaustive(data);
break;
Expand Down
Loading
Loading