Skip to content

Commit 41457c5

Browse files
authored
Merge pull request #46 from geobrowser/websocket-updates
Broadcast space events via Websocket
2 parents e844f0a + a53a866 commit 41457c5

File tree

4 files changed

+145
-54
lines changed

4 files changed

+145
-54
lines changed

apps/events/src/routes/playground.tsx

Lines changed: 68 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import { createFileRoute } from '@tanstack/react-router';
88
import { Effect, Exit } from 'effect';
99
import * as Schema from 'effect/Schema';
1010
import type {
11-
EventMessage,
1211
Invitation,
12+
RequestAcceptInvitationEvent,
1313
RequestCreateInvitationEvent,
1414
RequestCreateSpaceEvent,
1515
RequestListInvitations,
@@ -80,13 +80,40 @@ const App = ({
8080
const [spaces, setSpaces] = useState<SpaceStorageEntry[]>([]);
8181
const [invitations, setInvitations] = useState<Invitation[]>([]);
8282

83+
// Create a stable WebSocket connection that only depends on accountId
8384
useEffect(() => {
84-
// temporary until we have a way to create accounts and authenticate them
8585
const websocketConnection = new WebSocket(`ws://localhost:3030/?accountId=${accountId}`);
8686
setWebsocketConnection(websocketConnection);
8787

88+
const onOpen = () => {
89+
console.log('websocket connected');
90+
};
91+
92+
const onError = (event: Event) => {
93+
console.log('websocket error', event);
94+
};
95+
96+
const onClose = (event: CloseEvent) => {
97+
console.log('websocket close', event);
98+
};
99+
100+
websocketConnection.addEventListener('open', onOpen);
101+
websocketConnection.addEventListener('error', onError);
102+
websocketConnection.addEventListener('close', onClose);
103+
104+
return () => {
105+
websocketConnection.removeEventListener('open', onOpen);
106+
websocketConnection.removeEventListener('error', onError);
107+
websocketConnection.removeEventListener('close', onClose);
108+
websocketConnection.close();
109+
};
110+
}, [accountId]); // Only recreate when accountId changes
111+
112+
// Handle WebSocket messages in a separate effect
113+
useEffect(() => {
114+
if (!websocketConnection) return;
115+
88116
const onMessage = async (event: MessageEvent) => {
89-
console.log('message received', event.data);
90117
const data = JSON.parse(event.data);
91118
const message = decodeResponseMessage(data);
92119
if (message._tag === 'Right') {
@@ -157,8 +184,31 @@ const App = ({
157184
);
158185
break;
159186
}
160-
case 'event': {
161-
console.log('event', response);
187+
case 'space-event': {
188+
const space = spaces.find((s) => s.id === response.spaceId);
189+
if (!space) {
190+
console.error('Space not found', response.spaceId);
191+
return;
192+
}
193+
if (!space.state) {
194+
console.error('Space has no state', response.spaceId);
195+
return;
196+
}
197+
198+
const applyEventResult = await Effect.runPromiseExit(
199+
applyEvent({ event: response.event, state: space.state }),
200+
);
201+
if (Exit.isSuccess(applyEventResult)) {
202+
setSpaces((spaces) =>
203+
spaces.map((space) => {
204+
if (space.id === response.spaceId) {
205+
return { ...space, state: applyEventResult.value, events: [...space.events, response.event] };
206+
}
207+
return space;
208+
}),
209+
);
210+
}
211+
162212
break;
163213
}
164214
case 'list-invitations': {
@@ -170,31 +220,13 @@ const App = ({
170220
}
171221
}
172222
};
173-
websocketConnection.addEventListener('message', onMessage);
174-
175-
const onOpen = () => {
176-
console.log('websocket connected');
177-
};
178-
websocketConnection.addEventListener('open', onOpen);
179-
180-
const onError = (event: Event) => {
181-
console.log('websocket error', event);
182-
};
183-
websocketConnection.addEventListener('error', onError);
184223

185-
const onClose = (event: CloseEvent) => {
186-
console.log('websocket close', event);
187-
};
188-
websocketConnection.addEventListener('close', onClose);
224+
websocketConnection.addEventListener('message', onMessage);
189225

190226
return () => {
191227
websocketConnection.removeEventListener('message', onMessage);
192-
websocketConnection.removeEventListener('open', onOpen);
193-
websocketConnection.removeEventListener('error', onError);
194-
websocketConnection.removeEventListener('close', onClose);
195-
websocketConnection.close();
196228
};
197-
}, [accountId, encryptionPrivateKey]);
229+
}, [websocketConnection, encryptionPrivateKey, spaces]);
198230

199231
return (
200232
<>
@@ -268,8 +300,18 @@ const App = ({
268300
console.error('Failed to accept invitation', spaceEvent);
269301
return;
270302
}
271-
const message: EventMessage = { type: 'event', event: spaceEvent.value, spaceId: invitation.spaceId };
303+
const message: RequestAcceptInvitationEvent = {
304+
type: 'accept-invitation-event',
305+
event: spaceEvent.value,
306+
spaceId: invitation.spaceId,
307+
};
272308
websocketConnection?.send(JSON.stringify(message));
309+
310+
// temporary until we have define a strategy for accepting invitations response
311+
setTimeout(() => {
312+
const message2: RequestListInvitations = { type: 'list-invitations' };
313+
websocketConnection?.send(JSON.stringify(message2));
314+
}, 1000);
273315
}}
274316
/>
275317
<h2 className="text-lg">Spaces</h2>

apps/server/src/index.ts

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,27 @@ import 'dotenv/config';
33
import { parse } from 'node:url';
44
import { Effect, Exit, Schema } from 'effect';
55
import express from 'express';
6-
import type { ResponseListInvitations, ResponseListSpaces, ResponseSpace } from 'graph-framework-messages';
6+
import type {
7+
ResponseListInvitations,
8+
ResponseListSpaces,
9+
ResponseSpace,
10+
ResponseSpaceEvent,
11+
} from 'graph-framework-messages';
712
import { RequestMessage } from 'graph-framework-messages';
13+
import type { SpaceEvent } from 'graph-framework-space-events';
814
import { applyEvent } from 'graph-framework-space-events';
9-
import type WebSocket from 'ws';
10-
import { WebSocketServer } from 'ws';
15+
import WebSocket, { WebSocketServer } from 'ws';
1116
import { applySpaceEvent } from './handlers/applySpaceEvent.js';
1217
import { createSpace } from './handlers/createSpace.js';
1318
import { getSpace } from './handlers/getSpace.js';
1419
import { listInvitations } from './handlers/listInvitations.js';
1520
import { listSpaces } from './handlers/listSpaces.js';
1621
import { tmpInitAccount } from './handlers/tmpInitAccount.js';
1722
import { assertExhaustive } from './utils/assertExhaustive.js';
23+
interface CustomWebSocket extends WebSocket {
24+
accountId: string;
25+
subscribedSpaces: Set<string>;
26+
}
1827

1928
const decodeRequestMessage = Schema.decodeUnknownEither(RequestMessage);
2029

@@ -38,13 +47,34 @@ const server = app.listen(PORT, () => {
3847
console.log(`Listening on port ${PORT}`);
3948
});
4049

41-
webSocketServer.on('connection', async (webSocket: WebSocket, request: Request) => {
50+
function broadcastSpaceEvents({
51+
spaceId,
52+
event,
53+
currentClient,
54+
}: { spaceId: string; event: SpaceEvent; currentClient: CustomWebSocket }) {
55+
for (const client of webSocketServer.clients as Set<CustomWebSocket>) {
56+
if (currentClient === client) continue;
57+
58+
const outgoingMessage: ResponseSpaceEvent = {
59+
type: 'space-event',
60+
spaceId,
61+
event,
62+
};
63+
if (client.readyState === WebSocket.OPEN && client.subscribedSpaces.has(spaceId)) {
64+
client.send(JSON.stringify(outgoingMessage));
65+
}
66+
}
67+
}
68+
69+
webSocketServer.on('connection', async (webSocket: CustomWebSocket, request: Request) => {
4270
const params = parse(request.url, true);
4371
if (!params.query.accountId || typeof params.query.accountId !== 'string') {
4472
webSocket.close();
4573
return;
4674
}
4775
const accountId = params.query.accountId;
76+
webSocket.accountId = accountId;
77+
webSocket.subscribedSpaces = new Set();
4878

4979
console.log('Connection established', accountId);
5080
webSocket.on('message', async (message) => {
@@ -59,6 +89,7 @@ webSocketServer.on('connection', async (webSocket: WebSocket, request: Request)
5989
...space,
6090
type: 'space',
6191
};
92+
webSocket.subscribedSpaces.add(data.id);
6293
webSocket.send(JSON.stringify(outgoingMessage));
6394
break;
6495
}
@@ -96,29 +127,36 @@ webSocketServer.on('connection', async (webSocket: WebSocket, request: Request)
96127
keyBoxes: data.keyBoxes.map((keyBox) => keyBox),
97128
});
98129
const spaceWithEvents = await getSpace({ accountId, spaceId: data.spaceId });
130+
// TODO send back confirmation instead of the entire space
99131
const outgoingMessage: ResponseSpace = {
100132
...spaceWithEvents,
101133
type: 'space',
102134
};
103135
webSocket.send(JSON.stringify(outgoingMessage));
104-
break;
105-
}
106-
case 'event': {
107-
switch (data.event.transaction.type) {
108-
case 'delete-space': {
109-
break;
110-
}
111-
case 'accept-invitation': {
112-
await applySpaceEvent({ accountId, spaceId: data.spaceId, event: data.event, keyBoxes: [] });
113-
const spaceWithEvents = await getSpace({ accountId, spaceId: data.spaceId });
114-
const outgoingMessage: ResponseSpace = {
115-
...spaceWithEvents,
116-
type: 'space',
117-
};
118-
webSocket.send(JSON.stringify(outgoingMessage));
119-
break;
136+
for (const client of webSocketServer.clients as Set<CustomWebSocket>) {
137+
if (
138+
client.readyState === WebSocket.OPEN &&
139+
client.accountId === data.event.transaction.signaturePublicKey
140+
) {
141+
const invitations = await listInvitations({ accountId: client.accountId });
142+
const outgoingMessage: ResponseListInvitations = { type: 'list-invitations', invitations: invitations };
143+
// for now sending the entire list of invitations to the client - we could send only a single one
144+
client.send(JSON.stringify(outgoingMessage));
120145
}
121146
}
147+
148+
broadcastSpaceEvents({ spaceId: data.spaceId, event: data.event, currentClient: webSocket });
149+
break;
150+
}
151+
case 'accept-invitation-event': {
152+
await applySpaceEvent({ accountId, spaceId: data.spaceId, event: data.event, keyBoxes: [] });
153+
const spaceWithEvents = await getSpace({ accountId, spaceId: data.spaceId });
154+
const outgoingMessage: ResponseSpace = {
155+
...spaceWithEvents,
156+
type: 'space',
157+
};
158+
webSocket.send(JSON.stringify(outgoingMessage));
159+
broadcastSpaceEvents({ spaceId: data.spaceId, event: data.event, currentClient: webSocket });
122160
break;
123161
}
124162
default:

packages/graph-framework-messages/src/types.ts

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import {
33
AcceptInvitationEvent,
44
CreateInvitationEvent,
55
CreateSpaceEvent,
6-
DeleteSpaceEvent,
76
SpaceEvent,
87
} from 'graph-framework-space-events';
98

@@ -42,13 +41,13 @@ export const RequestCreateInvitationEvent = Schema.Struct({
4241

4342
export type RequestCreateInvitationEvent = Schema.Schema.Type<typeof RequestCreateInvitationEvent>;
4443

45-
export const EventMessage = Schema.Struct({
46-
type: Schema.Literal('event'),
44+
export const RequestAcceptInvitationEvent = Schema.Struct({
45+
type: Schema.Literal('accept-invitation-event'),
4746
spaceId: Schema.String,
48-
event: Schema.Union(DeleteSpaceEvent, AcceptInvitationEvent),
47+
event: AcceptInvitationEvent,
4948
});
5049

51-
export type EventMessage = Schema.Schema.Type<typeof EventMessage>;
50+
export type RequestAcceptInvitationEvent = Schema.Schema.Type<typeof RequestAcceptInvitationEvent>;
5251

5352
export const RequestSubscribeToSpace = Schema.Struct({
5453
type: Schema.Literal('subscribe-space'),
@@ -72,7 +71,7 @@ export type RequestListInvitations = Schema.Schema.Type<typeof RequestListInvita
7271
export const RequestMessage = Schema.Union(
7372
RequestCreateSpaceEvent,
7473
RequestCreateInvitationEvent,
75-
EventMessage,
74+
RequestAcceptInvitationEvent,
7675
RequestSubscribeToSpace,
7776
RequestListSpaces,
7877
RequestListInvitations,
@@ -106,6 +105,14 @@ export const ResponseListInvitations = Schema.Struct({
106105

107106
export type ResponseListInvitations = Schema.Schema.Type<typeof ResponseListInvitations>;
108107

108+
export const ResponseSpaceEvent = Schema.Struct({
109+
type: Schema.Literal('space-event'),
110+
spaceId: Schema.String,
111+
event: SpaceEvent,
112+
});
113+
114+
export type ResponseSpaceEvent = Schema.Schema.Type<typeof ResponseSpaceEvent>;
115+
109116
export const ResponseSpace = Schema.Struct({
110117
type: Schema.Literal('space'),
111118
id: Schema.String,
@@ -115,6 +122,11 @@ export const ResponseSpace = Schema.Struct({
115122

116123
export type ResponseSpace = Schema.Schema.Type<typeof ResponseSpace>;
117124

118-
export const ResponseMessage = Schema.Union(EventMessage, ResponseListSpaces, ResponseListInvitations, ResponseSpace);
125+
export const ResponseMessage = Schema.Union(
126+
ResponseListSpaces,
127+
ResponseListInvitations,
128+
ResponseSpace,
129+
ResponseSpaceEvent,
130+
);
119131

120132
export type ResponseMessage = Schema.Schema.Type<typeof ResponseMessage>;

packages/graph-framework-space-events/src/apply-event.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ export const applyEvent = ({
3434
return Effect.fail(new InvalidEventError());
3535
}
3636
if (event.transaction.previousEventHash !== state.lastEventHash) {
37-
console.log('WEEEEE', event.transaction.previousEventHash, state.lastEventHash);
3837
return Effect.fail(new InvalidEventError());
3938
}
4039
}

0 commit comments

Comments
 (0)