diff --git a/.changeset/lazy-birds-wait.md b/.changeset/lazy-birds-wait.md deleted file mode 100644 index 48fd9890..00000000 --- a/.changeset/lazy-birds-wait.md +++ /dev/null @@ -1,6 +0,0 @@ ---- -"create-hypergraph": patch ---- - -improve geo connect box based on authentication state in all templates - \ No newline at end of file diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 00000000..f43a83b3 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,7 @@ +{ + "permissions": { + "allow": ["Bash(pnpm lint:fix:*)", "Bash(pnpm typecheck:*)", "Bash(pnpm check:*)"], + "deny": [], + "ask": [] + } +} diff --git a/apps/server-new/src/http/api.ts b/apps/server-new/src/http/api.ts index 8194564e..63320ce9 100644 --- a/apps/server-new/src/http/api.ts +++ b/apps/server-new/src/http/api.ts @@ -65,7 +65,7 @@ export class IdentityQuery extends Schema.Class('IdentityQuery')( /** * Health endpoints */ -export const statusEndpoint = HttpApiEndpoint.get('status')`/`.addSuccess(Schema.String); +export const statusEndpoint = HttpApiEndpoint.get('status')`/status`.addSuccess(Schema.String); export const healthGroup = HttpApiGroup.make('Health').add(statusEndpoint); diff --git a/apps/server-new/src/index.ts b/apps/server-new/src/index.ts index 1a7bdb7b..e52f14d4 100644 --- a/apps/server-new/src/index.ts +++ b/apps/server-new/src/index.ts @@ -26,6 +26,7 @@ const Observability = Layer.unwrapEffect( const layer = server.pipe( Layer.provide(Logger.structured), + // Layer.provide(Logger.pretty), Layer.provide(Observability), Layer.provide(PlatformConfigProvider.layerDotEnvAdd('.env')), Layer.provide(NodeContext.layer), diff --git a/apps/server-new/src/server.ts b/apps/server-new/src/server.ts index a6cbf865..0a7c32f5 100644 --- a/apps/server-new/src/server.ts +++ b/apps/server-new/src/server.ts @@ -2,16 +2,14 @@ import { createServer } from 'node:http'; import * as HttpApiScalar from '@effect/platform/HttpApiScalar'; import * as HttpLayerRouter from '@effect/platform/HttpLayerRouter'; import * as HttpMiddleware from '@effect/platform/HttpMiddleware'; -import * as HttpServerRequest from '@effect/platform/HttpServerRequest'; -import * as HttpServerResponse from '@effect/platform/HttpServerResponse'; import * as NodeHttpServer from '@effect/platform-node/NodeHttpServer'; import * as Effect from 'effect/Effect'; import * as Layer from 'effect/Layer'; -import * as Schedule from 'effect/Schedule'; -import * as Stream from 'effect/Stream'; import { serverPortConfig } from './config/server.ts'; import { hypergraphApi } from './http/api.ts'; import { HandlersLive } from './http/handlers.ts'; +import * as ConnectionsService from './services/connections.ts'; +import { WebSocketLayer } from './websocket.ts'; // Create scalar openapi browser layer at /docs. const DocsLayer = HttpApiScalar.layerHttpLayerRouter({ @@ -24,20 +22,6 @@ const ApiLayer = HttpLayerRouter.addHttpApi(hypergraphApi, { openapiPath: '/docs/openapi.json', }).pipe(Layer.provide(HandlersLive)); -// Create websocket layer at /ws. -const WebSocketLayer = HttpLayerRouter.add( - 'GET', - '/ws', - // TODO: Implement actual websocket logic here. - Stream.fromSchedule(Schedule.spaced(1000)).pipe( - Stream.map(JSON.stringify), - Stream.pipeThroughChannel(HttpServerRequest.upgradeChannel()), - Stream.decodeText(), - Stream.runForEach((_) => Effect.log(_)), - Effect.as(HttpServerResponse.empty()), - ), -); - // Merge router layers together and add the cors middleware layer. const CorsMiddleware = HttpLayerRouter.middleware(HttpMiddleware.cors()); const AppLayer = Layer.mergeAll(ApiLayer, DocsLayer, WebSocketLayer).pipe(Layer.provide(CorsMiddleware.layer)); @@ -47,4 +31,7 @@ const HttpServerLayer = serverPortConfig.pipe( Layer.unwrapEffect, ); -export const server = HttpLayerRouter.serve(AppLayer).pipe(Layer.provide(HttpServerLayer)); +export const server = HttpLayerRouter.serve(AppLayer).pipe( + Layer.provide(HttpServerLayer), + Layer.provide(ConnectionsService.layer), +); diff --git a/apps/server-new/src/services/account-inbox.ts b/apps/server-new/src/services/account-inbox.ts index 0ba6c40f..79d23a69 100644 --- a/apps/server-new/src/services/account-inbox.ts +++ b/apps/server-new/src/services/account-inbox.ts @@ -37,6 +37,16 @@ export class AccountInboxService extends Context.Tag('AccountInboxService')< Messages.InboxMessage, ResourceNotFoundError | ValidationError | AuthorizationError | DatabaseService.DatabaseError >; + readonly createAccountInbox: ( + data: Messages.RequestCreateAccountInbox, + ) => Effect.Effect; + readonly getLatestAccountInboxMessages: (params: { + inboxId: string; + since: Date; + }) => Effect.Effect; + readonly listAccountInboxes: (params: { + accountAddress: string; + }) => Effect.Effect; } >() {} @@ -258,9 +268,131 @@ export const layer = Effect.gen(function* () { return createdMessage; }); + const createAccountInbox = Effect.fn('createAccountInbox')(function* (data: Messages.RequestCreateAccountInbox) { + const { accountAddress, inboxId, isPublic, authPolicy, encryptionPublicKey, signature } = data; + + // Verify the signature is valid for the corresponding accountAddress + const signer = Inboxes.recoverAccountInboxCreatorKey(data); + const signerAccount = yield* getAppOrConnectIdentity({ + accountAddress: data.accountAddress, + signaturePublicKey: signer, + }).pipe(Effect.mapError(() => new AuthorizationError({ message: 'Invalid signature' }))); + + if (signerAccount.accountAddress !== accountAddress) { + return yield* Effect.fail(new AuthorizationError({ message: 'Invalid signature' })); + } + + // Create the inbox (will throw an error if it already exists) + const inbox = yield* use((client) => + client.accountInbox.create({ + data: { + id: inboxId, + isPublic, + authPolicy, + encryptionPublicKey, + signatureHex: signature.hex, + signatureRecovery: signature.recovery, + account: { connect: { address: accountAddress } }, + }, + }), + ); + + return { + inboxId: inbox.id, + accountAddress, + isPublic: inbox.isPublic, + authPolicy: inbox.authPolicy as Inboxes.InboxSenderAuthPolicy, + encryptionPublicKey: inbox.encryptionPublicKey, + signature: { + hex: inbox.signatureHex, + recovery: inbox.signatureRecovery, + }, + }; + }); + + const getLatestAccountInboxMessages = Effect.fn('getLatestAccountInboxMessages')(function* ({ + inboxId, + since, + }: { + inboxId: string; + since: Date; + }) { + const messages = yield* use((client) => + client.accountInboxMessage.findMany({ + where: { + accountInboxId: inboxId, + createdAt: { + gte: since, + }, + }, + orderBy: { + createdAt: 'asc', + }, + }), + ); + + return messages.map( + (msg): Messages.InboxMessage => ({ + id: msg.id, + ciphertext: msg.ciphertext, + signature: + msg.signatureHex != null && msg.signatureRecovery != null + ? { + hex: msg.signatureHex, + recovery: msg.signatureRecovery, + } + : undefined, + authorAccountAddress: msg.authorAccountAddress ?? undefined, + createdAt: msg.createdAt, + }), + ); + }); + + const listAccountInboxes = Effect.fn('listAccountInboxes')(function* ({ + accountAddress, + }: { + accountAddress: string; + }) { + const inboxes = yield* use((client) => + client.accountInbox.findMany({ + where: { accountAddress }, + select: { + id: true, + isPublic: true, + authPolicy: true, + encryptionPublicKey: true, + account: { + select: { + address: true, + }, + }, + signatureHex: true, + signatureRecovery: true, + }, + }), + ); + + return inboxes.map( + (inbox): Messages.AccountInbox => ({ + inboxId: inbox.id, + accountAddress: inbox.account.address, + isPublic: inbox.isPublic, + authPolicy: inbox.authPolicy as Inboxes.InboxSenderAuthPolicy, + encryptionPublicKey: inbox.encryptionPublicKey, + signature: { + hex: inbox.signatureHex, + recovery: inbox.signatureRecovery, + }, + }), + ); + }); + return { listPublicAccountInboxes, getAccountInbox, postAccountInboxMessage, + createAccountInbox, + getLatestAccountInboxMessages, + listAccountInboxes, } as const; }).pipe(Layer.effect(AccountInboxService), Layer.provide(DatabaseService.layer), Layer.provide(IdentityService.layer)); diff --git a/apps/server-new/src/services/connections.ts b/apps/server-new/src/services/connections.ts new file mode 100644 index 00000000..7f73855a --- /dev/null +++ b/apps/server-new/src/services/connections.ts @@ -0,0 +1,199 @@ +import { Messages } from '@graphprotocol/hypergraph'; +import { Context, Effect, Layer, Ref } from 'effect'; +import type * as Mailbox from 'effect/Mailbox'; + +type Connection = { + accountAddress: string; + appIdentityAddress: string; + mailbox: Mailbox.Mailbox; + subscribedSpaces: Set; +}; + +export class ConnectionsService extends Context.Tag('ConnectionsService')< + ConnectionsService, + { + readonly registerConnection: (params: { + accountAddress: string; + appIdentityAddress: string; + mailbox: Mailbox.Mailbox; + }) => Effect.Effect; + readonly removeConnection: (connectionId: string) => Effect.Effect; + readonly subscribeToSpace: (connectionId: string, spaceId: string) => Effect.Effect; + readonly unsubscribeFromSpace: (connectionId: string, spaceId: string) => Effect.Effect; + readonly broadcastToSpace: (params: { + spaceId: string; + message: Messages.ResponseMessage; + excludeConnectionId?: string; + }) => Effect.Effect; + readonly broadcastToAccount: (params: { + accountAddress: string; + message: Messages.ResponseMessage; + excludeConnectionId?: string; + }) => Effect.Effect; + readonly getConnection: (connectionId: string) => Effect.Effect; + } +>() {} + +export const layer = Effect.gen(function* () { + // Store connections by a unique connection ID + const connections = yield* Ref.make(new Map()); + const connectionCounter = yield* Ref.make(0); + + const registerConnection = Effect.fn('registerConnection')(function* ({ + accountAddress, + appIdentityAddress, + mailbox, + }: { + accountAddress: string; + appIdentityAddress: string; + mailbox: Mailbox.Mailbox; + }) { + const nextId = yield* Ref.updateAndGet(connectionCounter, (n) => n + 1); + yield* Effect.logInfo('Next ID', { + nextId, + }); + const connectionId = `conn-${nextId}`; + const connection: Connection = { + accountAddress, + appIdentityAddress, + mailbox, + subscribedSpaces: new Set(), + }; + + yield* Ref.update(connections, (map) => new Map(map).set(connectionId, connection)); + + yield* Effect.logInfo('Registered new connection', { + connectionId, + accountAddress, + appIdentityAddress, + }); + + return connectionId; + }); + + const removeConnection = Effect.fn('removeConnection')(function* (connectionId: string) { + const currentConnections = yield* Ref.get(connections); + const connection = currentConnections.get(connectionId); + + if (connection) { + yield* Effect.logInfo('Removing connection', { + connectionId, + accountAddress: connection.accountAddress, + subscribedSpaces: Array.from(connection.subscribedSpaces), + }); + + yield* Ref.update(connections, (map) => { + const newMap = new Map(map); + newMap.delete(connectionId); + return newMap; + }); + } + }); + + const subscribeToSpace = Effect.fn('subscribeToSpace')(function* (connectionId: string, spaceId: string) { + const currentConnections = yield* Ref.get(connections); + const connection = currentConnections.get(connectionId); + + if (connection) { + yield* Ref.update(connections, (map) => { + const newMap = new Map(map); + const conn = newMap.get(connectionId); + if (conn) { + conn.subscribedSpaces.add(spaceId); + } + return newMap; + }); + + yield* Effect.logInfo('Subscribed connection to space', { + connectionId, + spaceId, + accountAddress: connection.accountAddress, + }); + } + }); + + const unsubscribeFromSpace = Effect.fn('unsubscribeFromSpace')(function* (connectionId: string, spaceId: string) { + const currentConnections = yield* Ref.get(connections); + const connection = currentConnections.get(connectionId); + + if (connection) { + yield* Ref.update(connections, (map) => { + const newMap = new Map(map); + const conn = newMap.get(connectionId); + if (conn) { + conn.subscribedSpaces.delete(spaceId); + } + return newMap; + }); + + yield* Effect.logInfo('Unsubscribed connection from space', { + connectionId, + spaceId, + accountAddress: connection.accountAddress, + }); + } + }); + + const broadcastToSpace = Effect.fn('broadcastToSpace')(function* ({ + spaceId, + message, + excludeConnectionId, + }: { + spaceId: string; + message: Messages.ResponseMessage; + excludeConnectionId?: string; + }) { + const currentConnections = yield* Ref.get(connections); + + for (const [connectionId, connection] of currentConnections) { + // Skip if this is the excluded connection (sender) + if (excludeConnectionId && connectionId === excludeConnectionId) { + continue; + } + + // Only send to connections subscribed to this space + if (connection.subscribedSpaces.has(spaceId)) { + yield* connection.mailbox.offer(Messages.serializeV2(message)); + } + } + }); + + const broadcastToAccount = Effect.fn('broadcastToAccount')(function* ({ + accountAddress, + message, + excludeConnectionId, + }: { + accountAddress: string; + message: Messages.ResponseMessage; + excludeConnectionId?: string; + }) { + const currentConnections = yield* Ref.get(connections); + + for (const [connectionId, connection] of currentConnections) { + // Skip if this is the excluded connection (sender) + if (excludeConnectionId && connectionId === excludeConnectionId) { + continue; + } + + // Only send to connections from the same account + if (connection.accountAddress === accountAddress) { + yield* connection.mailbox.offer(Messages.serializeV2(message)); + } + } + }); + + const getConnection = Effect.fn('getConnection')(function* (connectionId: string) { + const currentConnections = yield* Ref.get(connections); + return currentConnections.get(connectionId); + }); + + return { + registerConnection, + removeConnection, + subscribeToSpace, + unsubscribeFromSpace, + broadcastToSpace, + broadcastToAccount, + getConnection, + } as const; +}).pipe(Layer.effect(ConnectionsService)); diff --git a/apps/server-new/src/services/invitations.ts b/apps/server-new/src/services/invitations.ts new file mode 100644 index 00000000..3d15c6cc --- /dev/null +++ b/apps/server-new/src/services/invitations.ts @@ -0,0 +1,62 @@ +import { type Messages, SpaceEvents } from '@graphprotocol/hypergraph'; +import { Context, Effect, Layer } from 'effect'; +import * as Schema from 'effect/Schema'; +import * as DatabaseService from './database.js'; +import * as IdentityService from './identity.js'; + +export class InvitationsService extends Context.Tag('InvitationsService')< + InvitationsService, + { + readonly listByAppIdentity: ( + appIdentityAddress: string, + ) => Effect.Effect; + } +>() {} + +const decodeSpaceState = Schema.decodeUnknownEither(SpaceEvents.SpaceState); + +export const layer = Effect.gen(function* () { + const { use } = yield* DatabaseService.DatabaseService; + + const listByAppIdentity = Effect.fn('listByAppIdentity')(function* (accountAddress: string) { + const invitations = yield* use((client) => + client.invitation.findMany({ + where: { + inviteeAccountAddress: accountAddress, + }, + include: { + space: { + include: { + events: { + orderBy: { + counter: 'desc', + }, + take: 1, + }, + }, + }, + }, + }), + ); + + const processedInvitations = []; + for (const invitation of invitations) { + const result = decodeSpaceState(JSON.parse(invitation.space.events[0].state)); + if (result._tag === 'Right') { + const state = result.right; + processedInvitations.push({ + id: invitation.id, + previousEventHash: state.lastEventHash, + spaceId: invitation.spaceId, + }); + } else { + yield* Effect.logError('Invalid space state from the DB', { error: result.left }); + } + } + return processedInvitations; + }); + + return { + listByAppIdentity, + } as const; +}).pipe(Layer.effect(InvitationsService), Layer.provide(DatabaseService.layer), Layer.provide(IdentityService.layer)); diff --git a/apps/server-new/src/services/space-inbox.ts b/apps/server-new/src/services/space-inbox.ts index 1822b862..0a9ffe95 100644 --- a/apps/server-new/src/services/space-inbox.ts +++ b/apps/server-new/src/services/space-inbox.ts @@ -33,6 +33,10 @@ export class SpaceInboxService extends Context.Tag('SpaceInboxService')< Messages.InboxMessage, ResourceNotFoundError | ValidationError | AuthorizationError | DatabaseService.DatabaseError >; + readonly getLatestSpaceInboxMessages: (params: { + inboxId: string; + since: Date; + }) => Effect.Effect; } >() {} @@ -242,9 +246,48 @@ export const layer = Effect.gen(function* () { return createdMessage; }); + const getLatestSpaceInboxMessages = Effect.fn('getLatestSpaceInboxMessages')(function* ({ + inboxId, + since, + }: { + inboxId: string; + since: Date; + }) { + const messages = yield* use((client) => + client.spaceInboxMessage.findMany({ + where: { + spaceInboxId: inboxId, + createdAt: { + gte: since, + }, + }, + orderBy: { + createdAt: 'asc', + }, + }), + ); + + return messages.map( + (msg): Messages.InboxMessage => ({ + id: msg.id, + ciphertext: msg.ciphertext, + signature: + msg.signatureHex != null && msg.signatureRecovery != null + ? { + hex: msg.signatureHex, + recovery: msg.signatureRecovery, + } + : undefined, + authorAccountAddress: msg.authorAccountAddress ?? undefined, + createdAt: msg.createdAt, + }), + ); + }); + return { listPublicSpaceInboxes, getSpaceInbox, postSpaceInboxMessage, + getLatestSpaceInboxMessages, } as const; }).pipe(Layer.effect(SpaceInboxService), Layer.provide(DatabaseService.layer), Layer.provide(IdentityService.layer)); diff --git a/apps/server-new/src/services/spaces.ts b/apps/server-new/src/services/spaces.ts index b0a8c121..b5301a9b 100644 --- a/apps/server-new/src/services/spaces.ts +++ b/apps/server-new/src/services/spaces.ts @@ -1,5 +1,7 @@ -import { Identity, type Messages, SpaceEvents, Utils } from '@graphprotocol/hypergraph'; -import { Context, Effect, Layer } from 'effect'; +import { Identity, type Inboxes, type Messages, SpaceEvents, Utils } from '@graphprotocol/hypergraph'; +import { Context, Effect, Exit, Layer } from 'effect'; +import * as Predicate from 'effect/Predicate'; +import { ResourceNotFoundError } from '../http/errors.js'; import * as DatabaseService from './database.js'; import * as IdentityService from './identity.js'; @@ -22,6 +24,46 @@ export interface SpaceInfo { }>; } +export interface GetSpaceResult { + id: string; + name: string; + events: SpaceEvents.SpaceEvent[]; + keyBoxes: Array<{ + id: string; + ciphertext: string; + nonce: string; + authorPublicKey: string; + accountAddress: string; + }>; + inboxes: Array<{ + inboxId: string; + isPublic: boolean; + authPolicy: Inboxes.InboxSenderAuthPolicy; + encryptionPublicKey: string; + secretKey: string; + }>; + updates: + | { + updates: Array<{ + accountAddress: string; + update: Uint8Array; + signature: { + hex: string; + recovery: number; + }; + updateId: string; + }>; + firstUpdateClock: number; + lastUpdateClock: number; + } + | undefined; +} + +export interface SpaceListEntry { + id: string; + name: string; +} + export interface CreateSpaceParams { accountAddress: string; event: SpaceEvents.CreateSpaceEvent; @@ -32,12 +74,25 @@ export interface CreateSpaceParams { name: string; } +export interface GetSpaceParams { + spaceId: string; + accountAddress: string; + appIdentityAddress: string; +} + export interface AddAppIdentityToSpacesParams { appIdentityAddress: string; accountAddress: string; spacesInput: Messages.RequestConnectAddAppIdentityToSpaces['spacesInput']; } +export interface ApplySpaceEventParams { + accountAddress: string; + spaceId: string; + event: SpaceEvents.SpaceEvent; + keyBoxes: Messages.KeyBoxWithKeyId[]; +} + export class SpacesService extends Context.Tag('SpacesService')< SpacesService, { @@ -48,6 +103,15 @@ export class SpacesService extends Context.Tag('SpacesService')< readonly addAppIdentityToSpaces: ( params: AddAppIdentityToSpacesParams, ) => Effect.Effect; + readonly listByAppIdentity: ( + appIdentityAddress: string, + ) => Effect.Effect; + readonly getSpace: ( + params: GetSpaceParams, + ) => Effect.Effect; + readonly applySpaceEvent: ( + params: ApplySpaceEventParams, + ) => Effect.Effect; } >() {} @@ -107,6 +171,132 @@ export const layer = Effect.gen(function* () { })); }); + const listByAppIdentity = Effect.fn('listByAppIdentity')(function* (appIdentityAddress: string) { + return yield* use((client) => + client.space.findMany({ + where: { + appIdentities: { + some: { + address: appIdentityAddress, + }, + }, + }, + include: { + appIdentities: { + select: { + address: true, + appId: true, + }, + }, + keys: { + include: { + keyBoxes: { + where: { + appIdentityAddress, + }, + }, + }, + }, + }, + }), + ); + }); + + const getSpace = Effect.fn('getSpace')(function* (params: GetSpaceParams) { + const { spaceId, accountAddress, appIdentityAddress } = params; + + const space = yield* use((client) => + client.space.findUnique({ + where: { + id: spaceId, + members: { + some: { + address: accountAddress, + }, + }, + }, + include: { + events: { + orderBy: { + counter: 'asc', + }, + }, + keys: { + include: { + keyBoxes: { + where: { + accountAddress, + appIdentityAddress, + }, + select: { + nonce: true, + ciphertext: true, + authorPublicKey: true, + }, + }, + }, + }, + updates: { + orderBy: { + clock: 'asc', + }, + }, + inboxes: { + select: { + id: true, + isPublic: true, + authPolicy: true, + encryptionPublicKey: true, + encryptedSecretKey: true, + }, + }, + }, + }), + ).pipe( + Effect.filterOrFail(Predicate.isNotNull, () => new ResourceNotFoundError({ resource: 'Space', id: spaceId })), + ); + + const keyBoxes = space.keys.flatMap((key) => { + return { + id: key.id, + nonce: key.keyBoxes[0].nonce, + ciphertext: key.keyBoxes[0].ciphertext, + accountAddress, + authorPublicKey: key.keyBoxes[0].authorPublicKey, + }; + }); + + return { + id: space.id, + name: space.name, + events: space.events.map((wrapper) => JSON.parse(wrapper.event)), + keyBoxes, + inboxes: space.inboxes.map((inbox) => ({ + inboxId: inbox.id, + isPublic: inbox.isPublic, + authPolicy: inbox.authPolicy as Inboxes.InboxSenderAuthPolicy, + encryptionPublicKey: inbox.encryptionPublicKey, + secretKey: inbox.encryptedSecretKey, + })), + updates: + space.updates.length > 0 + ? { + updates: space.updates.map((update) => ({ + accountAddress: update.accountAddress, + update: new Uint8Array(update.content), + signature: { + hex: update.signatureHex, + recovery: update.signatureRecovery, + }, + updateId: update.updateId, + })), + firstUpdateClock: space.updates[0].clock, + lastUpdateClock: space.updates[space.updates.length - 1].clock, + } + : undefined, + }; + }); + const createSpace = Effect.fn('createSpace')(function* (params: CreateSpaceParams) { const { accountAddress, event, keyBox, infoContent, infoSignatureHex, infoSignatureRecovery, name } = params; @@ -227,9 +417,127 @@ export const layer = Effect.gen(function* () { ); }); + const applySpaceEvent = Effect.fn('applySpaceEvent')(function* (params: ApplySpaceEventParams) { + const { accountAddress, spaceId, event, keyBoxes } = params; + + if (event.transaction.type === 'create-space') { + return yield* Effect.fail(new Error('applySpaceEvent does not support create-space events.')); + } + + yield* use((client) => + client.$transaction(async (transaction) => { + if (event.transaction.type === 'accept-invitation') { + // verify that the account is the invitee + await transaction.invitation.findFirstOrThrow({ + where: { inviteeAccountAddress: event.author.accountAddress }, + }); + } else { + // verify that the account is a member of the space + // TODO verify that the account is an admin of the space + await transaction.space.findUniqueOrThrow({ + where: { + id: spaceId, + members: { some: { address: accountAddress } }, + }, + }); + } + + const lastEvent = await transaction.spaceEvent.findFirstOrThrow({ + where: { spaceId }, + orderBy: { counter: 'desc' }, + }); + + // Create the getVerifiedIdentity function for event validation + const getVerifiedIdentity = (accountAddressToFetch: string, publicKey: string) => { + // applySpaceEvent is only allowed to be called by the account that is applying the event + if (accountAddressToFetch !== accountAddress) { + return Effect.fail(new Identity.InvalidIdentityError()); + } + + return getAppOrConnectIdentity({ + accountAddress: accountAddressToFetch, + signaturePublicKey: publicKey, + }).pipe(Effect.mapError(() => new Identity.InvalidIdentityError())); + }; + + const result = await Effect.runPromiseExit( + SpaceEvents.applyEvent({ + event, + state: JSON.parse(lastEvent.state), + getVerifiedIdentity, + }), + ); + + if (Exit.isFailure(result)) { + throw new Error('Invalid event'); + } + + if (event.transaction.type === 'create-invitation') { + const inviteeAccountAddress = event.transaction.inviteeAccountAddress; + await transaction.invitation.create({ + data: { + id: event.transaction.id, + spaceId, + accountAddress: event.author.accountAddress, + inviteeAccountAddress, + }, + }); + await transaction.spaceKeyBox.createMany({ + data: keyBoxes.map((keyBox) => ({ + id: `${keyBox.id}-${inviteeAccountAddress}`, + nonce: keyBox.nonce, + ciphertext: keyBox.ciphertext, + accountAddress: inviteeAccountAddress, + authorPublicKey: keyBox.authorPublicKey, + spaceKeyId: keyBox.id, + })), + }); + } + + if (event.transaction.type === 'accept-invitation') { + await transaction.invitation.delete({ + where: { spaceId_inviteeAccountAddress: { spaceId, inviteeAccountAddress: event.author.accountAddress } }, + }); + + await transaction.space.update({ + where: { id: spaceId }, + data: { members: { connect: { address: event.author.accountAddress } } }, + }); + } + + await transaction.spaceEvent.create({ + data: { + spaceId, + counter: lastEvent.counter + 1, + event: JSON.stringify(event), + id: event.transaction.id, + state: JSON.stringify(result.value), + }, + }); + + if (event.transaction.type === 'create-space-inbox') { + await transaction.spaceInbox.create({ + data: { + id: event.transaction.inboxId, + isPublic: event.transaction.isPublic, + authPolicy: event.transaction.authPolicy, + encryptionPublicKey: event.transaction.encryptionPublicKey, + encryptedSecretKey: event.transaction.secretKey, + space: { connect: { id: spaceId } }, + spaceEvent: { connect: { id: event.transaction.id } }, + }, + }); + } + }), + ); + }); + return { listByAccount, + listByAppIdentity, + getSpace, createSpace, addAppIdentityToSpaces, + applySpaceEvent, } as const; }).pipe(Layer.effect(SpacesService), Layer.provide(DatabaseService.layer), Layer.provide(IdentityService.layer)); diff --git a/apps/server-new/src/services/updates.ts b/apps/server-new/src/services/updates.ts new file mode 100644 index 00000000..f3b735d9 --- /dev/null +++ b/apps/server-new/src/services/updates.ts @@ -0,0 +1,115 @@ +import { Context, Effect, Layer, Predicate, Schedule } from 'effect'; +import { ResourceNotFoundError } from '../http/errors.js'; +import * as DatabaseService from './database.js'; + +type CreateUpdateParams = { + accountAddress: string; + update: Uint8Array; + spaceId: string; + signatureHex: string; + signatureRecovery: number; + updateId: string; +}; + +type CreateUpdateResult = { + clock: number; + content: Uint8Array; + signatureHex: string; + signatureRecovery: number; + updateId: string; + accountAddress: string; + spaceId: string; +}; + +export class UpdatesService extends Context.Tag('UpdatesService')< + UpdatesService, + { + readonly createUpdate: ( + params: CreateUpdateParams, + ) => Effect.Effect; + } +>() {} + +// Retry with Effect's built-in retry mechanism for database lock errors +const retrySchedule = Schedule.exponential('100 millis').pipe( + Schedule.intersect(Schedule.recurs(5)), + Schedule.whileInput((error: DatabaseService.DatabaseError) => { + // Check if it's a database lock error that should trigger retry + const cause = error.cause as { code?: string; message?: string }; + const shouldRetry = + cause?.code === 'P2034' || // Prisma transaction conflict + cause?.code === 'P1008' || // Prisma connection timeout + Boolean(cause?.message?.includes('database is locked')); + return shouldRetry; + }), +); + +export const layer = Effect.gen(function* () { + const { use } = yield* DatabaseService.DatabaseService; + + const createUpdate = Effect.fn('createUpdate')(function* ({ + accountAddress, + update, + spaceId, + signatureHex, + signatureRecovery, + updateId, + }: CreateUpdateParams) { + // First verify the account is a member of the space + yield* use((client) => + client.space.findUnique({ + where: { id: spaceId, members: { some: { address: accountAddress } } }, + }), + ).pipe( + Effect.filterOrFail(Predicate.isNotNull, () => new ResourceNotFoundError({ resource: 'Space', id: spaceId })), + ); + + const result = yield* use((client) => + client.$transaction(async (prisma) => { + const lastUpdate = await prisma.update.findFirst({ + where: { spaceId }, + orderBy: { clock: 'desc' }, + }); + + const clock = lastUpdate ? lastUpdate.clock + 1 : 0; + + return await prisma.update.create({ + data: { + space: { connect: { id: spaceId } }, + clock, + content: Buffer.from(update), + signatureHex, + signatureRecovery, + updateId, + account: { connect: { address: accountAddress } }, + }, + }); + }), + ).pipe( + Effect.retry(retrySchedule), + Effect.tapError((error) => { + const cause = error.cause as { code?: string; message?: string }; + return Effect.logError('Failed to create update after retries', { + error: cause?.message || String(error), + code: cause?.code, + spaceId, + updateId, + }); + }), + ); + + return { + clock: result.clock, + content: new Uint8Array(result.content), + signatureHex: result.signatureHex, + signatureRecovery: result.signatureRecovery, + updateId: result.updateId, + accountAddress: result.accountAddress, + spaceId: result.spaceId, + }; + }); + + return { + createUpdate, + } as const; +}).pipe(Layer.effect(UpdatesService), Layer.provide(DatabaseService.layer)); diff --git a/apps/server-new/src/websocket.ts b/apps/server-new/src/websocket.ts new file mode 100644 index 00000000..7b50552c --- /dev/null +++ b/apps/server-new/src/websocket.ts @@ -0,0 +1,436 @@ +import * as HttpLayerRouter from '@effect/platform/HttpLayerRouter'; +import * as HttpServerRequest from '@effect/platform/HttpServerRequest'; +import * as HttpServerResponse from '@effect/platform/HttpServerResponse'; +import * as Socket from '@effect/platform/Socket'; +import { Messages } from '@graphprotocol/hypergraph'; +import { isArray } from 'effect/Array'; +import * as Effect from 'effect/Effect'; +import * as Mailbox from 'effect/Mailbox'; +import * as Schema from 'effect/Schema'; +import * as Stream from 'effect/Stream'; +import * as AccountInboxService from './services/account-inbox.ts'; +import * as AppIdentityService from './services/app-identity.ts'; +import * as ConnectionsService from './services/connections.ts'; +import * as IdentityService from './services/identity.ts'; +import * as InvitationsService from './services/invitations.ts'; +import * as SpaceInboxService from './services/space-inbox.ts'; +import * as SpacesService from './services/spaces.ts'; +import * as UpdatesService from './services/updates.ts'; + +const decodeRequestMessage = Schema.decodeUnknownEither(Messages.RequestMessage); + +export const WebSocketLayer = HttpLayerRouter.add( + 'GET', + '/', + Effect.gen(function* () { + const request = yield* HttpServerRequest.HttpServerRequest; + const spacesService = yield* SpacesService.SpacesService; + const invitationsService = yield* InvitationsService.InvitationsService; + const updatesService = yield* UpdatesService.UpdatesService; + const connectionsService = yield* ConnectionsService.ConnectionsService; + const accountInboxService = yield* AccountInboxService.AccountInboxService; + const spaceInboxService = yield* SpaceInboxService.SpaceInboxService; + const responseMailbox = yield* Mailbox.make(); + + const searchParams = HttpServerRequest.searchParamsFromURL(new URL(request.url, 'http://localhost')); + const token = isArray(searchParams.token) ? searchParams.token[0] : searchParams.token; + + if (!token) { + return yield* HttpServerResponse.empty({ status: 400 }); + } + + const appIdentityService = yield* AppIdentityService.AppIdentityService; + const identityService = yield* IdentityService.IdentityService; + const { accountAddress, address } = yield* appIdentityService.getBySessionToken(token).pipe(Effect.orDie); + + // Register this connection + const connectionId = yield* connectionsService.registerConnection({ + accountAddress, + appIdentityAddress: address, + mailbox: responseMailbox, + }); + + return yield* Mailbox.toStream(responseMailbox).pipe( + Stream.map(JSON.stringify), + Stream.pipeThroughChannel(HttpServerRequest.upgradeChannel()), + Stream.decodeText(), + Stream.runForEach((message) => + Effect.gen(function* () { + const json = Messages.deserialize(message); + const request = yield* decodeRequestMessage(json); + switch (request.type) { + case 'list-spaces': { + const spaces = yield* spacesService.listByAppIdentity(address); + const outgoingMessage: Messages.ResponseListSpaces = { type: 'list-spaces', spaces: spaces }; + // TODO: fix Messages.serialize + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + break; + } + case 'list-invitations': { + const invitations = yield* invitationsService.listByAppIdentity(accountAddress); + const outgoingMessage: Messages.ResponseListInvitations = { + type: 'list-invitations', + invitations, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + break; + } + case 'subscribe-space': { + const space = yield* spacesService.getSpace({ + spaceId: request.id, + accountAddress, + appIdentityAddress: address, + }); + + // Track this subscription + yield* connectionsService.subscribeToSpace(connectionId, request.id); + + const outgoingMessage: Messages.ResponseSpace = { + type: 'space', + ...space, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + break; + } + case 'create-update': { + const signer = Messages.recoverUpdateMessageSigner(request); + const identity = yield* identityService.getAppOrConnectIdentity({ + accountAddress: request.accountAddress, + signaturePublicKey: signer, + }); + if (identity.accountAddress !== accountAddress) { + // TODO: improve error handling + return yield* Effect.die(new Error('Invalid signature')); + } + + const update = yield* updatesService.createUpdate({ + accountAddress: request.accountAddress, + update: request.update, + spaceId: request.spaceId, + signatureHex: request.signature.hex, + signatureRecovery: request.signature.recovery, + updateId: request.updateId, + }); + const outgoingMessage: Messages.ResponseUpdateConfirmed = { + type: 'update-confirmed', + updateId: request.updateId, + clock: update.clock, + spaceId: request.spaceId, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + // Broadcast the update to all subscribed clients + const updates: Messages.Updates = { + updates: [ + { + update: update.content, + accountAddress: update.accountAddress, + signature: { hex: update.signatureHex, recovery: update.signatureRecovery }, + updateId: update.updateId, + }, + ], + firstUpdateClock: update.clock, + lastUpdateClock: update.clock, + }; + + const broadcastMessage: Messages.ResponseUpdatesNotification = { + type: 'updates-notification', + updates, + spaceId: request.spaceId, + }; + + yield* connectionsService.broadcastToSpace({ + spaceId: request.spaceId, + message: broadcastMessage, + excludeConnectionId: connectionId, + }); + + break; + } + case 'create-space-event': { + // Create the new space + const spaceResult = yield* spacesService.createSpace({ + accountAddress, + event: request.event, + keyBox: request.keyBox, + infoContent: new Uint8Array(), // TODO: Get from request when available + infoSignatureHex: '', + infoSignatureRecovery: 0, + name: request.name, + }); + + // Get the full space data to send back + const space = yield* spacesService.getSpace({ + spaceId: spaceResult.id, + accountAddress, + appIdentityAddress: address, + }); + + const outgoingMessage: Messages.ResponseSpace = { + type: 'space', + ...space, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + break; + } + case 'create-invitation-event': { + // Apply the invitation event to the space + yield* spacesService.applySpaceEvent({ + accountAddress, + spaceId: request.spaceId, + event: request.event, + keyBoxes: [...request.keyBoxes], // Convert readonly array to mutable + }); + + // Get the updated space data + const space = yield* spacesService.getSpace({ + spaceId: request.spaceId, + accountAddress, + appIdentityAddress: address, + }); + + // Send the updated space back to the client + const outgoingMessage: Messages.ResponseSpace = { + type: 'space', + ...space, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + // Broadcast the space event to other subscribers + const spaceEventMessage: Messages.ResponseSpaceEvent = { + type: 'space-event', + spaceId: request.spaceId, + event: request.event, + }; + + yield* connectionsService.broadcastToSpace({ + spaceId: request.spaceId, + message: spaceEventMessage, + excludeConnectionId: connectionId, + }); + + // Notify the invitee if they're connected + if (request.event.transaction.type === 'create-invitation') { + const inviteeAccountAddress = request.event.transaction.inviteeAccountAddress; + + // Get the updated invitation list for the invitee + const invitations = yield* invitationsService.listByAppIdentity(inviteeAccountAddress); + const invitationMessage: Messages.ResponseListInvitations = { + type: 'list-invitations', + invitations, + }; + + // Broadcast to all connections for the invitee account + yield* connectionsService.broadcastToAccount({ + accountAddress: inviteeAccountAddress, + message: invitationMessage, + }); + } + + break; + } + case 'accept-invitation-event': { + // Apply the invitation acceptance event to the space + yield* spacesService.applySpaceEvent({ + accountAddress, + spaceId: request.spaceId, + event: request.event, + keyBoxes: [], // No keyBoxes needed for accepting invitations + }); + + // Get the updated space data + const space = yield* spacesService.getSpace({ + spaceId: request.spaceId, + accountAddress, + appIdentityAddress: address, + }); + + // Send the updated space back to the client + const outgoingMessage: Messages.ResponseSpace = { + type: 'space', + ...space, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + // Broadcast the space event to other subscribers + const spaceEventMessage: Messages.ResponseSpaceEvent = { + type: 'space-event', + spaceId: request.spaceId, + event: request.event, + }; + + yield* connectionsService.broadcastToSpace({ + spaceId: request.spaceId, + message: spaceEventMessage, + excludeConnectionId: connectionId, + }); + + break; + } + case 'create-space-inbox-event': { + // Apply the space inbox creation event to the space + yield* spacesService.applySpaceEvent({ + accountAddress, + spaceId: request.spaceId, + event: request.event, + keyBoxes: [], // No keyBoxes needed for creating space inboxes + }); + + // Get the updated space data + const space = yield* spacesService.getSpace({ + spaceId: request.spaceId, + accountAddress, + appIdentityAddress: address, + }); + + // Send the updated space back to the client + const outgoingMessage: Messages.ResponseSpace = { + type: 'space', + ...space, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + // Broadcast the space event to other subscribers + const spaceEventMessage: Messages.ResponseSpaceEvent = { + type: 'space-event', + spaceId: request.spaceId, + event: request.event, + }; + + yield* connectionsService.broadcastToSpace({ + spaceId: request.spaceId, + message: spaceEventMessage, + excludeConnectionId: connectionId, + }); + + break; + } + case 'create-account-inbox': { + // Validate that the account matches the authenticated user + if (request.accountAddress !== accountAddress) { + // TODO: Better error handling + return yield* Effect.fail(new Error('Invalid accountAddress')); + } + + // Create the account inbox + const inbox = yield* accountInboxService.createAccountInbox(request); + + // Broadcast the new inbox to other clients from the same account + const inboxMessage: Messages.ResponseAccountInbox = { + type: 'account-inbox', + inbox: { + accountAddress: inbox.accountAddress, + inboxId: inbox.inboxId, + isPublic: inbox.isPublic, + authPolicy: inbox.authPolicy, + encryptionPublicKey: inbox.encryptionPublicKey, + signature: inbox.signature, + }, + }; + + yield* connectionsService.broadcastToAccount({ + accountAddress, + message: inboxMessage, + excludeConnectionId: connectionId, + }); + + yield* responseMailbox.offer(Messages.serializeV2(inboxMessage)); + + break; + } + case 'get-latest-space-inbox-messages': { + // Check that the user has access to this space + yield* spacesService.getSpace({ + spaceId: request.spaceId, + accountAddress, + appIdentityAddress: address, + }); + + // Get the latest messages from the space inbox + const messages = yield* spaceInboxService.getLatestSpaceInboxMessages({ + inboxId: request.inboxId, + since: request.since, + }); + + const outgoingMessage: Messages.ResponseSpaceInboxMessages = { + type: 'space-inbox-messages', + spaceId: request.spaceId, + inboxId: request.inboxId, + messages, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + break; + } + case 'get-latest-account-inbox-messages': { + // Check that the user has access to this inbox + yield* accountInboxService.getAccountInbox({ + accountAddress, + inboxId: request.inboxId, + }); + + // Get the latest messages from the account inbox + const messages = yield* accountInboxService.getLatestAccountInboxMessages({ + inboxId: request.inboxId, + since: request.since, + }); + + const outgoingMessage: Messages.ResponseAccountInboxMessages = { + type: 'account-inbox-messages', + accountAddress, + inboxId: request.inboxId, + messages, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + break; + } + case 'get-account-inboxes': { + // List all inboxes for the authenticated account + const inboxes = yield* accountInboxService.listAccountInboxes({ accountAddress }); + + const outgoingMessage: Messages.ResponseAccountInboxes = { + type: 'account-inboxes', + inboxes, + }; + yield* responseMailbox.offer(Messages.serializeV2(outgoingMessage)); + + break; + } + } + }), + ), + Effect.catchAll((error) => + Effect.gen(function* () { + // Only log error if it's not a SocketCloseError + if (!Socket.SocketCloseError.is(error)) { + yield* Effect.logInfo('WebSocket disconnected due to error', { + error: error.message || String(error), + accountAddress, + appIdentityAddress: address, + }); + } + }), + ), + Effect.ensuring( + Effect.gen(function* () { + // Clean up the connection when it closes + yield* connectionsService.removeConnection(connectionId); + yield* Effect.logInfo('WebSocket connection closed', { + accountAddress, + appIdentityAddress: address, + }); + }), + ), + Effect.as(HttpServerResponse.empty()), + ); + }) + .pipe(Effect.provide(AppIdentityService.layer)) + .pipe(Effect.provide(SpacesService.layer)) + .pipe(Effect.provide(InvitationsService.layer)) + .pipe(Effect.provide(IdentityService.layer)) + .pipe(Effect.provide(UpdatesService.layer)) + .pipe(Effect.provide(AccountInboxService.layer)) + .pipe(Effect.provide(SpaceInboxService.layer)), +); diff --git a/packages/create-hypergraph/CHANGELOG.md b/packages/create-hypergraph/CHANGELOG.md index 88555c4b..f3258838 100644 --- a/packages/create-hypergraph/CHANGELOG.md +++ b/packages/create-hypergraph/CHANGELOG.md @@ -1,5 +1,10 @@ # create-hypergraph +## 0.5.4 +### Patch Changes + +- de0d153: improve geo connect box based on authentication state in all templates + ## 0.5.3 ### Patch Changes diff --git a/packages/create-hypergraph/package.json b/packages/create-hypergraph/package.json index 2ba4bf42..8bc16dad 100644 --- a/packages/create-hypergraph/package.json +++ b/packages/create-hypergraph/package.json @@ -1,6 +1,6 @@ { "name": "create-hypergraph", - "version": "0.5.3", + "version": "0.5.4", "description": "CLI toolchain to scaffold a Hypergraph-enabled application with a given template.", "type": "module", "bin": { diff --git a/packages/hypergraph/src/messages/serialize.ts b/packages/hypergraph/src/messages/serialize.ts index 1950fead..54e7a855 100644 --- a/packages/hypergraph/src/messages/serialize.ts +++ b/packages/hypergraph/src/messages/serialize.ts @@ -11,6 +11,21 @@ export function serialize(obj: any): string { }); } +// biome-ignore lint/suspicious/noExplicitAny: same as stringify and parse +export function serializeV2(obj: any): any { + return JSON.parse( + JSON.stringify(obj, (_key, value) => { + if (value instanceof Uint8Array) { + return { __type: 'Uint8Array', data: Array.from(value) }; + } + if (value instanceof Date) { + return { __type: 'Date', data: value.toISOString() }; + } + return value; + }), + ); +} + export function deserialize(json: string): unknown { return JSON.parse(json, (_key, value) => { if (value && value.__type === 'Uint8Array') {