Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 68 additions & 26 deletions apps/events/src/routes/playground.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import { createFileRoute } from '@tanstack/react-router';
import { Effect, Exit } from 'effect';
import * as Schema from 'effect/Schema';
import type {
EventMessage,
Invitation,
RequestAcceptInvitationEvent,
RequestCreateInvitationEvent,
RequestCreateSpaceEvent,
RequestListInvitations,
Expand Down Expand Up @@ -80,13 +80,40 @@ const App = ({
const [spaces, setSpaces] = useState<SpaceStorageEntry[]>([]);
const [invitations, setInvitations] = useState<Invitation[]>([]);

// Create a stable WebSocket connection that only depends on accountId
useEffect(() => {
// temporary until we have a way to create accounts and authenticate them
const websocketConnection = new WebSocket(`ws://localhost:3030/?accountId=${accountId}`);
setWebsocketConnection(websocketConnection);

const onOpen = () => {
console.log('websocket connected');
};

const onError = (event: Event) => {
console.log('websocket error', event);
};

const onClose = (event: CloseEvent) => {
console.log('websocket close', event);
};

websocketConnection.addEventListener('open', onOpen);
websocketConnection.addEventListener('error', onError);
websocketConnection.addEventListener('close', onClose);

return () => {
websocketConnection.removeEventListener('open', onOpen);
websocketConnection.removeEventListener('error', onError);
websocketConnection.removeEventListener('close', onClose);
websocketConnection.close();
};
}, [accountId]); // Only recreate when accountId changes

// Handle WebSocket messages in a separate effect
useEffect(() => {
if (!websocketConnection) return;

const onMessage = async (event: MessageEvent) => {
console.log('message received', event.data);
const data = JSON.parse(event.data);
const message = decodeResponseMessage(data);
if (message._tag === 'Right') {
Expand Down Expand Up @@ -157,8 +184,31 @@ const App = ({
);
break;
}
case 'event': {
console.log('event', response);
case 'space-event': {
const space = spaces.find((s) => s.id === response.spaceId);
if (!space) {
console.error('Space not found', response.spaceId);
return;
}
if (!space.state) {
console.error('Space has no state', response.spaceId);
return;
}

const applyEventResult = await Effect.runPromiseExit(
applyEvent({ event: response.event, state: space.state }),
);
if (Exit.isSuccess(applyEventResult)) {
setSpaces((spaces) =>
spaces.map((space) => {
if (space.id === response.spaceId) {
return { ...space, state: applyEventResult.value, events: [...space.events, response.event] };
}
return space;
}),
);
}

break;
}
case 'list-invitations': {
Expand All @@ -170,31 +220,13 @@ const App = ({
}
}
};
websocketConnection.addEventListener('message', onMessage);

const onOpen = () => {
console.log('websocket connected');
};
websocketConnection.addEventListener('open', onOpen);

const onError = (event: Event) => {
console.log('websocket error', event);
};
websocketConnection.addEventListener('error', onError);

const onClose = (event: CloseEvent) => {
console.log('websocket close', event);
};
websocketConnection.addEventListener('close', onClose);
websocketConnection.addEventListener('message', onMessage);

return () => {
websocketConnection.removeEventListener('message', onMessage);
websocketConnection.removeEventListener('open', onOpen);
websocketConnection.removeEventListener('error', onError);
websocketConnection.removeEventListener('close', onClose);
websocketConnection.close();
};
}, [accountId, encryptionPrivateKey]);
}, [websocketConnection, encryptionPrivateKey, spaces]);

return (
<>
Expand Down Expand Up @@ -268,8 +300,18 @@ const App = ({
console.error('Failed to accept invitation', spaceEvent);
return;
}
const message: EventMessage = { type: 'event', event: spaceEvent.value, spaceId: invitation.spaceId };
const message: RequestAcceptInvitationEvent = {
type: 'accept-invitation-event',
event: spaceEvent.value,
spaceId: invitation.spaceId,
};
websocketConnection?.send(JSON.stringify(message));

// temporary until we have define a strategy for accepting invitations response
setTimeout(() => {
const message2: RequestListInvitations = { type: 'list-invitations' };
websocketConnection?.send(JSON.stringify(message2));
}, 1000);
}}
/>
<h2 className="text-lg">Spaces</h2>
Expand Down
78 changes: 58 additions & 20 deletions apps/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,27 @@ import 'dotenv/config';
import { parse } from 'node:url';
import { Effect, Exit, Schema } from 'effect';
import express from 'express';
import type { ResponseListInvitations, ResponseListSpaces, ResponseSpace } from 'graph-framework-messages';
import type {
ResponseListInvitations,
ResponseListSpaces,
ResponseSpace,
ResponseSpaceEvent,
} from 'graph-framework-messages';
import { RequestMessage } from 'graph-framework-messages';
import type { SpaceEvent } from 'graph-framework-space-events';
import { applyEvent } from 'graph-framework-space-events';
import type WebSocket from 'ws';
import { WebSocketServer } from 'ws';
import WebSocket, { WebSocketServer } from 'ws';
import { applySpaceEvent } from './handlers/applySpaceEvent.js';
import { createSpace } from './handlers/createSpace.js';
import { getSpace } from './handlers/getSpace.js';
import { listInvitations } from './handlers/listInvitations.js';
import { listSpaces } from './handlers/listSpaces.js';
import { tmpInitAccount } from './handlers/tmpInitAccount.js';
import { assertExhaustive } from './utils/assertExhaustive.js';
interface CustomWebSocket extends WebSocket {
accountId: string;
subscribedSpaces: Set<string>;
}

const decodeRequestMessage = Schema.decodeUnknownEither(RequestMessage);

Expand All @@ -38,13 +47,34 @@ const server = app.listen(PORT, () => {
console.log(`Listening on port ${PORT}`);
});

webSocketServer.on('connection', async (webSocket: WebSocket, request: Request) => {
function broadcastSpaceEvents({
spaceId,
event,
currentClient,
}: { spaceId: string; event: SpaceEvent; currentClient: CustomWebSocket }) {
for (const client of webSocketServer.clients as Set<CustomWebSocket>) {
if (currentClient === client) continue;

const outgoingMessage: ResponseSpaceEvent = {
type: 'space-event',
spaceId,
event,
};
if (client.readyState === WebSocket.OPEN && client.subscribedSpaces.has(spaceId)) {
client.send(JSON.stringify(outgoingMessage));
}
}
}

webSocketServer.on('connection', async (webSocket: CustomWebSocket, request: Request) => {
const params = parse(request.url, true);
if (!params.query.accountId || typeof params.query.accountId !== 'string') {
webSocket.close();
return;
}
const accountId = params.query.accountId;
webSocket.accountId = accountId;
webSocket.subscribedSpaces = new Set();

console.log('Connection established', accountId);
webSocket.on('message', async (message) => {
Expand All @@ -59,6 +89,7 @@ webSocketServer.on('connection', async (webSocket: WebSocket, request: Request)
...space,
type: 'space',
};
webSocket.subscribedSpaces.add(data.id);
webSocket.send(JSON.stringify(outgoingMessage));
break;
}
Expand Down Expand Up @@ -96,29 +127,36 @@ webSocketServer.on('connection', async (webSocket: WebSocket, request: Request)
keyBoxes: data.keyBoxes.map((keyBox) => keyBox),
});
const spaceWithEvents = await getSpace({ accountId, spaceId: data.spaceId });
// TODO send back confirmation instead of the entire space
const outgoingMessage: ResponseSpace = {
...spaceWithEvents,
type: 'space',
};
webSocket.send(JSON.stringify(outgoingMessage));
break;
}
case 'event': {
switch (data.event.transaction.type) {
case 'delete-space': {
break;
}
case 'accept-invitation': {
await applySpaceEvent({ accountId, spaceId: data.spaceId, event: data.event, keyBoxes: [] });
const spaceWithEvents = await getSpace({ accountId, spaceId: data.spaceId });
const outgoingMessage: ResponseSpace = {
...spaceWithEvents,
type: 'space',
};
webSocket.send(JSON.stringify(outgoingMessage));
break;
for (const client of webSocketServer.clients as Set<CustomWebSocket>) {
if (
client.readyState === WebSocket.OPEN &&
client.accountId === data.event.transaction.signaturePublicKey
) {
const invitations = await listInvitations({ accountId: client.accountId });
const outgoingMessage: ResponseListInvitations = { type: 'list-invitations', invitations: invitations };
// for now sending the entire list of invitations to the client - we could send only a single one
client.send(JSON.stringify(outgoingMessage));
}
}

broadcastSpaceEvents({ spaceId: data.spaceId, event: data.event, currentClient: webSocket });
break;
}
case 'accept-invitation-event': {
await applySpaceEvent({ accountId, spaceId: data.spaceId, event: data.event, keyBoxes: [] });
const spaceWithEvents = await getSpace({ accountId, spaceId: data.spaceId });
const outgoingMessage: ResponseSpace = {
...spaceWithEvents,
type: 'space',
};
webSocket.send(JSON.stringify(outgoingMessage));
broadcastSpaceEvents({ spaceId: data.spaceId, event: data.event, currentClient: webSocket });
break;
}
default:
Expand Down
26 changes: 19 additions & 7 deletions packages/graph-framework-messages/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
AcceptInvitationEvent,
CreateInvitationEvent,
CreateSpaceEvent,
DeleteSpaceEvent,
SpaceEvent,
} from 'graph-framework-space-events';

Expand Down Expand Up @@ -42,13 +41,13 @@ export const RequestCreateInvitationEvent = Schema.Struct({

export type RequestCreateInvitationEvent = Schema.Schema.Type<typeof RequestCreateInvitationEvent>;

export const EventMessage = Schema.Struct({
type: Schema.Literal('event'),
export const RequestAcceptInvitationEvent = Schema.Struct({
type: Schema.Literal('accept-invitation-event'),
spaceId: Schema.String,
event: Schema.Union(DeleteSpaceEvent, AcceptInvitationEvent),
event: AcceptInvitationEvent,
});

export type EventMessage = Schema.Schema.Type<typeof EventMessage>;
export type RequestAcceptInvitationEvent = Schema.Schema.Type<typeof RequestAcceptInvitationEvent>;

export const RequestSubscribeToSpace = Schema.Struct({
type: Schema.Literal('subscribe-space'),
Expand All @@ -72,7 +71,7 @@ export type RequestListInvitations = Schema.Schema.Type<typeof RequestListInvita
export const RequestMessage = Schema.Union(
RequestCreateSpaceEvent,
RequestCreateInvitationEvent,
EventMessage,
RequestAcceptInvitationEvent,
RequestSubscribeToSpace,
RequestListSpaces,
RequestListInvitations,
Expand Down Expand Up @@ -106,6 +105,14 @@ export const ResponseListInvitations = Schema.Struct({

export type ResponseListInvitations = Schema.Schema.Type<typeof ResponseListInvitations>;

export const ResponseSpaceEvent = Schema.Struct({
type: Schema.Literal('space-event'),
spaceId: Schema.String,
event: SpaceEvent,
});

export type ResponseSpaceEvent = Schema.Schema.Type<typeof ResponseSpaceEvent>;

export const ResponseSpace = Schema.Struct({
type: Schema.Literal('space'),
id: Schema.String,
Expand All @@ -115,6 +122,11 @@ export const ResponseSpace = Schema.Struct({

export type ResponseSpace = Schema.Schema.Type<typeof ResponseSpace>;

export const ResponseMessage = Schema.Union(EventMessage, ResponseListSpaces, ResponseListInvitations, ResponseSpace);
export const ResponseMessage = Schema.Union(
ResponseListSpaces,
ResponseListInvitations,
ResponseSpace,
ResponseSpaceEvent,
);

export type ResponseMessage = Schema.Schema.Type<typeof ResponseMessage>;
1 change: 0 additions & 1 deletion packages/graph-framework-space-events/src/apply-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ export const applyEvent = ({
return Effect.fail(new InvalidEventError());
}
if (event.transaction.previousEventHash !== state.lastEventHash) {
console.log('WEEEEE', event.transaction.previousEventHash, state.lastEventHash);
return Effect.fail(new InvalidEventError());
}
}
Expand Down
Loading