Skip to content

Commit 16ffe5a

Browse files
committed
add updates sync
1 parent 62add9b commit 16ffe5a

File tree

8 files changed

+312
-0
lines changed

8 files changed

+312
-0
lines changed

apps/events/src/routes/playground.tsx

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { DebugSpaceEvents } from '@/components/debug-space-events';
33
import { DebugSpaceState } from '@/components/debug-space-state';
44
import { Button } from '@/components/ui/button';
55
import { assertExhaustive } from '@/lib/assertExhaustive';
6+
import { uuid } from '@automerge/automerge';
67
import { bytesToHex, hexToBytes } from '@noble/hashes/utils';
78
import { createFileRoute } from '@tanstack/react-router';
89
import { Effect, Exit } from 'effect';
@@ -12,6 +13,7 @@ import type {
1213
RequestAcceptInvitationEvent,
1314
RequestCreateInvitationEvent,
1415
RequestCreateSpaceEvent,
16+
RequestCreateUpdate,
1517
RequestListInvitations,
1618
RequestListSpaces,
1719
RequestSubscribeToSpace,
@@ -57,6 +59,8 @@ type SpaceStorageEntry = {
5759
events: SpaceEvent[];
5860
state: SpaceState | undefined;
5961
keys: { id: string; key: string }[];
62+
updates: string[];
63+
lastUpdateClock: number;
6064
};
6165

6266
const decodeResponseMessage = Schema.decodeUnknownEither(ResponseMessage);
@@ -79,6 +83,7 @@ const App = ({
7983
const [websocketConnection, setWebsocketConnection] = useState<WebSocket>();
8084
const [spaces, setSpaces] = useState<SpaceStorageEntry[]>([]);
8185
const [invitations, setInvitations] = useState<Invitation[]>([]);
86+
const [updatesInFlight, setUpdatesInFlight] = useState<string[]>([]);
8287

8388
// Create a stable WebSocket connection that only depends on accountId
8489
useEffect(() => {
@@ -128,6 +133,8 @@ const App = ({
128133
events: existingSpace?.events ?? [],
129134
state: existingSpace?.state,
130135
keys: existingSpace?.keys ?? [],
136+
updates: existingSpace?.updates ?? [],
137+
lastUpdateClock: existingSpace?.lastUpdateClock ?? -1,
131138
};
132139
});
133140
});
@@ -163,12 +170,29 @@ const App = ({
163170
setSpaces((spaces) =>
164171
spaces.map((space) => {
165172
if (space.id === response.id) {
173+
let lastUpdateClock = space.lastUpdateClock;
174+
const updates = [];
175+
if (space.updates) {
176+
updates.push(...space.updates);
177+
}
178+
if (response.updates) {
179+
console.log('response.updates', response.updates, lastUpdateClock);
180+
if (response.updates.firstUpdateClock === lastUpdateClock + 1) {
181+
lastUpdateClock = response.updates.lastUpdateClock;
182+
updates.push(...response.updates.updates);
183+
} else {
184+
// TODO request missing updates from server
185+
}
186+
}
187+
166188
// TODO fix readonly type issue
167189
return {
168190
...space,
169191
events: response.events as SpaceEvent[],
170192
state: newState,
171193
keys,
194+
lastUpdateClock,
195+
updates,
172196
};
173197
}
174198
return space;
@@ -207,6 +231,40 @@ const App = ({
207231
setInvitations(response.invitations.map((invitation) => invitation));
208232
break;
209233
}
234+
case 'update-confirmed': {
235+
setSpaces((spaces) =>
236+
spaces.map((space) => {
237+
if (space.id === response.spaceId && space.lastUpdateClock + 1 === response.clock) {
238+
return { ...space, lastUpdateClock: response.clock };
239+
}
240+
return space;
241+
}),
242+
);
243+
setUpdatesInFlight((updatesInFlight) => updatesInFlight.filter((id) => id !== response.ephemeralId));
244+
break;
245+
}
246+
case 'updates-notification': {
247+
setSpaces((spaces) =>
248+
spaces.map((space) => {
249+
if (space.id === response.spaceId) {
250+
let lastUpdateClock = space.lastUpdateClock;
251+
if (response.updates.firstUpdateClock === space.lastUpdateClock + 1) {
252+
lastUpdateClock = response.updates.lastUpdateClock;
253+
} else {
254+
// TODO request missing updates from server
255+
}
256+
257+
return {
258+
...space,
259+
updates: [...space.updates, ...response.updates.updates],
260+
lastUpdateClock,
261+
};
262+
}
263+
return space;
264+
}),
265+
);
266+
break;
267+
}
210268
default:
211269
assertExhaustive(response);
212270
}
@@ -379,8 +437,56 @@ const App = ({
379437
</Button>
380438
);
381439
})}
440+
<h3>Updates</h3>
441+
<Button
442+
onClick={() => {
443+
const ephemeralId = uuid();
444+
setUpdatesInFlight((updatesInFlight) => [...updatesInFlight, ephemeralId]);
445+
setSpaces((currentSpaces) =>
446+
currentSpaces.map((currentSpace) => {
447+
if (space.id === currentSpace.id) {
448+
return { ...currentSpace, updates: [...currentSpace.updates, 'a'] };
449+
}
450+
return currentSpace;
451+
}),
452+
);
453+
const message: RequestCreateUpdate = {
454+
type: 'create-update',
455+
ephemeralId,
456+
update: 'a',
457+
spaceId: space.id,
458+
};
459+
websocketConnection?.send(JSON.stringify(message));
460+
}}
461+
>
462+
Create an update
463+
</Button>
464+
<h3>Updates Content</h3>
465+
<p>last update clock: {space.lastUpdateClock}</p>
466+
<p className="text-xs">
467+
{space.updates.map((update, index) => {
468+
return (
469+
// biome-ignore lint/suspicious/noArrayIndexKey: we need a unique identifier here
470+
<span key={`${update}-${index}`} className="border border-gray-300">
471+
{update}
472+
</span>
473+
);
474+
})}
475+
</p>
476+
<h3>Updates in flight</h3>
477+
<ul className="text-xs">
478+
{updatesInFlight.map((updateInFlight) => {
479+
return (
480+
<li key={updateInFlight} className="border border-gray-300">
481+
{updateInFlight}
482+
</li>
483+
);
484+
})}
485+
</ul>
486+
<hr />
382487
<h3>State</h3>
383488
<DebugSpaceState state={space.state} />
489+
<hr />
384490
<h3>Events</h3>
385491
<DebugSpaceEvents events={space.events} />
386492
<hr />
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
-- CreateTable
2+
CREATE TABLE "Update" (
3+
"spaceId" TEXT NOT NULL,
4+
"clock" INTEGER NOT NULL,
5+
"content" BLOB NOT NULL,
6+
7+
PRIMARY KEY ("spaceId", "clock"),
8+
CONSTRAINT "Update_spaceId_fkey" FOREIGN KEY ("spaceId") REFERENCES "Space" ("id") ON DELETE RESTRICT ON UPDATE CASCADE
9+
);

apps/server/prisma/schema.prisma

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ model Space {
2828
members Account[]
2929
invitations Invitation[]
3030
keys SpaceKey[]
31+
updates Update[]
3132
}
3233

3334
model SpaceKey {
@@ -68,3 +69,12 @@ model Invitation {
6869
6970
@@unique([spaceId, inviteeAccountId])
7071
}
72+
73+
model Update {
74+
space Space @relation(fields: [spaceId], references: [id])
75+
spaceId String
76+
clock Int
77+
content Bytes
78+
79+
@@id([spaceId, clock])
80+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import { prisma } from '../prisma.js';
2+
3+
type Params = {
4+
accountId: string;
5+
update: string;
6+
spaceId: string;
7+
};
8+
9+
export const createUpdate = async ({ accountId, update, spaceId }: Params) => {
10+
// throw error if account is not a member of the space
11+
await prisma.space.findUniqueOrThrow({
12+
where: { id: spaceId, members: { some: { id: accountId } } },
13+
});
14+
15+
let success = false;
16+
let retries = 0;
17+
const maxRetries = 5;
18+
const retryDelay = 100; // milliseconds
19+
let result:
20+
| {
21+
spaceId: string;
22+
clock: number;
23+
content: Buffer;
24+
}
25+
| undefined = undefined;
26+
27+
while (!success && retries < maxRetries) {
28+
try {
29+
result = await prisma.$transaction(async (prisma) => {
30+
const lastUpdate = await prisma.update.findFirst({
31+
where: { spaceId },
32+
orderBy: { clock: 'desc' },
33+
});
34+
35+
const clock = lastUpdate ? lastUpdate.clock + 1 : 0;
36+
37+
return await prisma.update.create({
38+
data: {
39+
spaceId,
40+
clock,
41+
content: Buffer.from(update),
42+
},
43+
});
44+
});
45+
success = true;
46+
return result;
47+
} catch (error) {
48+
const dbError = error as { code?: string; message?: string };
49+
if (dbError.code === 'P2034' || dbError.code === 'P1008' || dbError.message?.includes('database is locked')) {
50+
retries += 1;
51+
console.warn(`Database is busy, retrying (${retries}/${maxRetries})...`);
52+
await new Promise((resolve) => setTimeout(resolve, retryDelay));
53+
} else {
54+
console.error('Database error:', error);
55+
break;
56+
}
57+
}
58+
}
59+
60+
if (!result) {
61+
throw new Error('Failed to create update');
62+
}
63+
64+
return result;
65+
};

apps/server/src/handlers/getSpace.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ export const getSpace = async ({ spaceId, accountId }: Params) => {
3535
},
3636
},
3737
},
38+
updates: {
39+
orderBy: {
40+
clock: 'asc',
41+
},
42+
},
3843
},
3944
});
4045

@@ -52,5 +57,13 @@ export const getSpace = async ({ spaceId, accountId }: Params) => {
5257
id: space.id,
5358
events: space.events.map((wrapper) => JSON.parse(wrapper.event)),
5459
keyBoxes,
60+
updates:
61+
space.updates.length > 0
62+
? {
63+
updates: space.updates.map((update) => update.content.toString()),
64+
firstUpdateClock: space.updates[0].clock,
65+
lastUpdateClock: space.updates[space.updates.length - 1].clock,
66+
}
67+
: undefined,
5568
};
5669
};
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { prisma } from '../prisma.js';
2+
3+
type Params = {
4+
accountId: string;
5+
spaceId: string;
6+
after?: number;
7+
};
8+
9+
export const listUpdates = async ({ spaceId, accountId, after }: Params) => {
10+
// throw error if account is not a member of the space
11+
await prisma.space.findUniqueOrThrow({
12+
where: { id: spaceId, members: { some: { id: accountId } } },
13+
});
14+
15+
return await prisma.update.findMany({
16+
where: after
17+
? {
18+
spaceId,
19+
clock: { gt: after },
20+
}
21+
: { spaceId },
22+
orderBy: {
23+
clock: 'desc',
24+
},
25+
});
26+
};

apps/server/src/index.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@ import type {
88
ResponseListSpaces,
99
ResponseSpace,
1010
ResponseSpaceEvent,
11+
ResponseUpdateConfirmed,
12+
ResponseUpdatesNotification,
13+
Updates,
1114
} from 'graph-framework-messages';
1215
import { RequestMessage } from 'graph-framework-messages';
1316
import type { SpaceEvent } from 'graph-framework-space-events';
1417
import { applyEvent } from 'graph-framework-space-events';
1518
import WebSocket, { WebSocketServer } from 'ws';
1619
import { applySpaceEvent } from './handlers/applySpaceEvent.js';
1720
import { createSpace } from './handlers/createSpace.js';
21+
import { createUpdate } from './handlers/createUpdate.js';
1822
import { getSpace } from './handlers/getSpace.js';
1923
import { listInvitations } from './handlers/listInvitations.js';
2024
import { listSpaces } from './handlers/listSpaces.js';
@@ -66,6 +70,25 @@ function broadcastSpaceEvents({
6670
}
6771
}
6872

73+
function broadcastUpdates({
74+
spaceId,
75+
updates,
76+
currentClient,
77+
}: { spaceId: string; updates: Updates; currentClient: CustomWebSocket }) {
78+
for (const client of webSocketServer.clients as Set<CustomWebSocket>) {
79+
if (currentClient === client) continue;
80+
81+
const outgoingMessage: ResponseUpdatesNotification = {
82+
type: 'updates-notification',
83+
updates,
84+
spaceId,
85+
};
86+
if (client.readyState === WebSocket.OPEN && client.subscribedSpaces.has(spaceId)) {
87+
client.send(JSON.stringify(outgoingMessage));
88+
}
89+
}
90+
}
91+
6992
webSocketServer.on('connection', async (webSocket: CustomWebSocket, request: Request) => {
7093
const params = parse(request.url, true);
7194
if (!params.query.accountId || typeof params.query.accountId !== 'string') {
@@ -159,6 +182,27 @@ webSocketServer.on('connection', async (webSocket: CustomWebSocket, request: Req
159182
broadcastSpaceEvents({ spaceId: data.spaceId, event: data.event, currentClient: webSocket });
160183
break;
161184
}
185+
case 'create-update': {
186+
const update = await createUpdate({ accountId, spaceId: data.spaceId, update: data.update });
187+
const outgoingMessage: ResponseUpdateConfirmed = {
188+
type: 'update-confirmed',
189+
ephemeralId: data.ephemeralId,
190+
clock: update.clock,
191+
spaceId: data.spaceId,
192+
};
193+
webSocket.send(JSON.stringify(outgoingMessage));
194+
195+
broadcastUpdates({
196+
spaceId: data.spaceId,
197+
updates: {
198+
updates: [update.content.toString()],
199+
firstUpdateClock: update.clock,
200+
lastUpdateClock: update.clock,
201+
},
202+
currentClient: webSocket,
203+
});
204+
break;
205+
}
162206
default:
163207
assertExhaustive(data);
164208
break;

0 commit comments

Comments
 (0)