diff --git a/.trajectories/completed/2026-01/traj_v9dkdoxylyid.json b/.trajectories/completed/2026-01/traj_v9dkdoxylyid.json new file mode 100644 index 00000000..01e1c884 --- /dev/null +++ b/.trajectories/completed/2026-01/traj_v9dkdoxylyid.json @@ -0,0 +1,89 @@ +{ + "id": "traj_v9dkdoxylyid", + "version": 1, + "task": { + "title": "Implement first-class user messaging with channels and DMs", + "source": { + "system": "plain", + "id": "user-messaging-feature" + } + }, + "status": "completed", + "startedAt": "2026-01-06T17:11:57.504Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-01-06T17:12:09.617Z" + } + ], + "chapters": [ + { + "id": "chap_u095g4higo8q", + "title": "Work", + "agentName": "default", + "startedAt": "2026-01-06T17:12:09.617Z", + "events": [ + { + "ts": 1767719529619, + "type": "decision", + "content": "Chose unified relay daemon approach over WebSocket-only: Chose unified relay daemon approach over WebSocket-only", + "raw": { + "question": "Chose unified relay daemon approach over WebSocket-only", + "chosen": "Chose unified relay daemon approach over WebSocket-only", + "alternatives": [], + "reasoning": "Enables consistent message routing for both users and agents, supports future features like message persistence and cross-project messaging" + }, + "significance": "high" + }, + { + "ts": 1767719541811, + "type": "decision", + "content": "Added EntityType to protocol: Added EntityType to protocol", + "raw": { + "question": "Added EntityType to protocol", + "chosen": "Added EntityType to protocol", + "alternatives": [], + "reasoning": "Distinguishes 'user' (human) from 'agent' (AI) entities for proper routing and UI display" + }, + "significance": "high" + }, + { + "ts": 1767719549146, + "type": "decision", + "content": "DM channels use 'dm:alice:bob' naming convention with sorted names: DM channels use 'dm:alice:bob' naming convention with sorted names", + "raw": { + "question": "DM channels use 'dm:alice:bob' naming convention with sorted names", + "chosen": "DM channels use 'dm:alice:bob' naming convention with sorted names", + "alternatives": [], + "reasoning": "Ensures consistent channel naming regardless of who initiates the DM" + }, + "significance": "high" + }, + { + "ts": 1767719563124, + "type": "decision", + "content": "Created UserBridge to bridge WebSocket users to relay daemon: Created UserBridge to bridge WebSocket users to relay daemon", + "raw": { + "question": "Created UserBridge to bridge WebSocket users to relay daemon", + "chosen": "Created UserBridge to bridge WebSocket users to relay daemon", + "alternatives": [], + "reasoning": "Clean separation of concerns - dashboard server handles WebSocket, UserBridge creates relay client per user for unified messaging" + }, + "significance": "high" + } + ], + "endedAt": "2026-01-06T17:12:56.919Z" + } + ], + "commits": [], + "filesChanged": [], + "projectId": "/home/user/relay", + "tags": [], + "completedAt": "2026-01-06T17:12:56.919Z", + "retrospective": { + "summary": "Implemented first-class user messaging: EntityType protocol extension, channel join/leave/message routing in daemon, UserBridge for dashboard-relay integration, REST API endpoints, and React components (useChannels hook, ChannelSidebar, ChannelChat). All 1030 tests passing.", + "approach": "Standard approach", + "confidence": 0.85 + } +} \ No newline at end of file diff --git a/.trajectories/completed/2026-01/traj_v9dkdoxylyid.md b/.trajectories/completed/2026-01/traj_v9dkdoxylyid.md new file mode 100644 index 00000000..963e052e --- /dev/null +++ b/.trajectories/completed/2026-01/traj_v9dkdoxylyid.md @@ -0,0 +1,47 @@ +# Trajectory: Implement first-class user messaging with channels and DMs + +> **Status:** ✅ Completed +> **Task:** user-messaging-feature +> **Confidence:** 85% +> **Started:** January 6, 2026 at 05:11 PM +> **Completed:** January 6, 2026 at 05:12 PM + +--- + +## Summary + +Implemented first-class user messaging: EntityType protocol extension, channel join/leave/message routing in daemon, UserBridge for dashboard-relay integration, REST API endpoints, and React components (useChannels hook, ChannelSidebar, ChannelChat). All 1030 tests passing. + +**Approach:** Standard approach + +--- + +## Key Decisions + +### Chose unified relay daemon approach over WebSocket-only +- **Chose:** Chose unified relay daemon approach over WebSocket-only +- **Reasoning:** Enables consistent message routing for both users and agents, supports future features like message persistence and cross-project messaging + +### Added EntityType to protocol +- **Chose:** Added EntityType to protocol +- **Reasoning:** Distinguishes 'user' (human) from 'agent' (AI) entities for proper routing and UI display + +### DM channels use 'dm:alice:bob' naming convention with sorted names +- **Chose:** DM channels use 'dm:alice:bob' naming convention with sorted names +- **Reasoning:** Ensures consistent channel naming regardless of who initiates the DM + +### Created UserBridge to bridge WebSocket users to relay daemon +- **Chose:** Created UserBridge to bridge WebSocket users to relay daemon +- **Reasoning:** Clean separation of concerns - dashboard server handles WebSocket, UserBridge creates relay client per user for unified messaging + +--- + +## Chapters + +### 1. Work +*Agent: default* + +- Chose unified relay daemon approach over WebSocket-only: Chose unified relay daemon approach over WebSocket-only +- Added EntityType to protocol: Added EntityType to protocol +- DM channels use 'dm:alice:bob' naming convention with sorted names: DM channels use 'dm:alice:bob' naming convention with sorted names +- Created UserBridge to bridge WebSocket users to relay daemon: Created UserBridge to bridge WebSocket users to relay daemon diff --git a/.trajectories/index.json b/.trajectories/index.json index 2b9250ea..e4820323 100644 --- a/.trajectories/index.json +++ b/.trajectories/index.json @@ -1,6 +1,6 @@ { "version": 1, - "lastUpdated": "2026-01-06T16:24:50.254Z", + "lastUpdated": "2026-01-06T17:12:56.941Z", "trajectories": { "traj_ozd98si6a7ns": { "title": "Fix thinking indicator showing on all messages", @@ -358,6 +358,13 @@ "startedAt": "2026-01-06T16:24:13.901Z", "completedAt": "2026-01-06T16:24:50.235Z", "path": "/home/user/relay/.trajectories/completed/2026-01/traj_78ffm31jn3uk.json" + }, + "traj_v9dkdoxylyid": { + "title": "Implement first-class user messaging with channels and DMs", + "status": "completed", + "startedAt": "2026-01-06T17:11:57.504Z", + "completedAt": "2026-01-06T17:12:56.919Z", + "path": "/home/user/relay/.trajectories/completed/2026-01/traj_v9dkdoxylyid.json" } } } \ No newline at end of file diff --git a/src/cloud/api/onboarding.ts b/src/cloud/api/onboarding.ts index 063cb59c..20ffabac 100644 --- a/src/cloud/api/onboarding.ts +++ b/src/cloud/api/onboarding.ts @@ -317,7 +317,7 @@ onboardingRouter.post('/cli/:provider/complete/:sessionId', async (req: Request, try { let accessToken = token || session.token; let refreshToken = session.refreshToken; - let tokenExpiresAt = session.tokenExpiresAt; + let _tokenExpiresAt = session.tokenExpiresAt; // If using workspace delegation, forward complete request first if (session.workspaceUrl && session.workspaceSessionId) { @@ -364,7 +364,7 @@ onboardingRouter.post('/cli/:provider/complete/:sessionId', async (req: Request, accessToken = creds.token; refreshToken = creds.refreshToken; if (creds.tokenExpiresAt) { - tokenExpiresAt = new Date(creds.tokenExpiresAt); + _tokenExpiresAt = new Date(creds.tokenExpiresAt); } console.log('[onboarding] Fetched credentials from workspace:', { hasToken: !!accessToken, diff --git a/src/daemon/channels.test.ts b/src/daemon/channels.test.ts new file mode 100644 index 00000000..58056cc8 --- /dev/null +++ b/src/daemon/channels.test.ts @@ -0,0 +1,687 @@ +/** + * Unit tests for channel functionality in the Router class. + * TDD: Writing tests before implementation. + * + * Tests channel operations: + * - Channel creation and management + * - User and agent channel membership + * - Channel message routing + * - DM (direct message) support + */ + +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { Router } from './router.js'; +import type { Connection } from './connection.js'; +import type { StorageAdapter, StoredMessage } from '../storage/adapter.js'; +import type { Envelope, SendPayload, DeliverEnvelope } from '../protocol/types.js'; +import type { + ChannelJoinPayload, + ChannelLeavePayload, + ChannelMessagePayload, +} from '../protocol/channels.js'; + +/** + * Mock Connection class for testing. + */ +class MockConnection implements Pick { + id: string; + agentName: string | undefined; + sessionId: string; + entityType: 'agent' | 'user'; + sentEnvelopes: Envelope[] = []; + private sequences: Map = new Map(); + sendMock = vi.fn(); + closeMock = vi.fn(); + private sendReturnValue = true; + + constructor( + id: string, + agentName?: string, + options?: { sessionId?: string; entityType?: 'agent' | 'user' } + ) { + this.id = id; + this.agentName = agentName; + this.sessionId = options?.sessionId ?? 'session-1'; + this.entityType = options?.entityType ?? 'agent'; + } + + send(envelope: Envelope): boolean { + this.sentEnvelopes.push(envelope); + this.sendMock(envelope); + return this.sendReturnValue; + } + + setSendReturnValue(value: boolean): void { + this.sendReturnValue = value; + } + + getNextSeq(topic: string, peer: string): number { + const key = `${topic}:${peer}`; + const seq = (this.sequences.get(key) ?? 0) + 1; + this.sequences.set(key, seq); + return seq; + } + + close(): void { + this.closeMock(); + } + + clearSent(): void { + this.sentEnvelopes = []; + this.sendMock.mockClear(); + } +} + +/** + * Helper to create a channel message envelope. + */ +function createChannelMessageEnvelope( + from: string, + channel: string, + body: string +): Envelope { + return { + v: 1, + type: 'CHANNEL_MESSAGE', + id: `msg-${Date.now()}`, + ts: Date.now(), + from, + payload: { + channel, + body, + }, + }; +} + +/** + * Helper to create a channel join envelope. + */ +function createChannelJoinEnvelope( + from: string, + channel: string +): Envelope { + return { + v: 1, + type: 'CHANNEL_JOIN', + id: `join-${Date.now()}`, + ts: Date.now(), + from, + payload: { + channel, + }, + }; +} + +/** + * Helper to create a channel leave envelope. + */ +function createChannelLeaveEnvelope( + from: string, + channel: string +): Envelope { + return { + v: 1, + type: 'CHANNEL_LEAVE', + id: `leave-${Date.now()}`, + ts: Date.now(), + from, + payload: { + channel, + }, + }; +} + +describe('Router - Channel Support', () => { + let router: Router; + let storage: StorageAdapter; + let saved: StoredMessage[]; + + beforeEach(() => { + saved = []; + storage = { + init: async () => {}, + saveMessage: async (message) => { saved.push(message); }, + getMessages: async () => saved, + }; + router = new Router({ storage }); + }); + + describe('Channel Join', () => { + it('should allow a user to join a channel', () => { + const user = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + router.register(user); + + const joinEnvelope = createChannelJoinEnvelope('alice', '#general'); + router.handleChannelJoin(user, joinEnvelope); + + expect(router.getChannelMembers('#general')).toContain('alice'); + }); + + it('should allow an agent to join a channel', () => { + const agent = new MockConnection('conn-1', 'CodeReviewer', { entityType: 'agent' }); + router.register(agent); + + const joinEnvelope = createChannelJoinEnvelope('CodeReviewer', '#engineering'); + router.handleChannelJoin(agent, joinEnvelope); + + expect(router.getChannelMembers('#engineering')).toContain('CodeReviewer'); + }); + + it('should allow multiple members to join the same channel', () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + const bob = new MockConnection('conn-2', 'bob', { entityType: 'user' }); + const agent = new MockConnection('conn-3', 'CodeReviewer', { entityType: 'agent' }); + + router.register(alice); + router.register(bob); + router.register(agent); + + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#general')); + router.handleChannelJoin(bob, createChannelJoinEnvelope('bob', '#general')); + router.handleChannelJoin(agent, createChannelJoinEnvelope('CodeReviewer', '#general')); + + const members = router.getChannelMembers('#general'); + expect(members).toContain('alice'); + expect(members).toContain('bob'); + expect(members).toContain('CodeReviewer'); + expect(members).toHaveLength(3); + }); + + it('should broadcast join notification to existing channel members', () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + const bob = new MockConnection('conn-2', 'bob', { entityType: 'user' }); + + router.register(alice); + router.register(bob); + + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#general')); + alice.clearSent(); + + router.handleChannelJoin(bob, createChannelJoinEnvelope('bob', '#general')); + + // Alice should receive notification that Bob joined + expect(alice.sendMock).toHaveBeenCalled(); + const notification = alice.sentEnvelopes.find(e => e.type === 'CHANNEL_JOIN'); + expect(notification).toBeDefined(); + expect(notification?.from).toBe('bob'); + }); + + it('should not duplicate join if already a member', () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + router.register(alice); + + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#general')); + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#general')); + + // Should only appear once + const members = router.getChannelMembers('#general'); + const aliceCount = members.filter(m => m === 'alice').length; + expect(aliceCount).toBe(1); + }); + }); + + describe('Channel Leave', () => { + it('should remove member from channel on leave', () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + router.register(alice); + + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#general')); + expect(router.getChannelMembers('#general')).toContain('alice'); + + router.handleChannelLeave(alice, createChannelLeaveEnvelope('alice', '#general')); + expect(router.getChannelMembers('#general')).not.toContain('alice'); + }); + + it('should broadcast leave notification to remaining members', () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + const bob = new MockConnection('conn-2', 'bob', { entityType: 'user' }); + + router.register(alice); + router.register(bob); + + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#general')); + router.handleChannelJoin(bob, createChannelJoinEnvelope('bob', '#general')); + bob.clearSent(); + + router.handleChannelLeave(alice, createChannelLeaveEnvelope('alice', '#general')); + + // Bob should receive notification that Alice left + expect(bob.sendMock).toHaveBeenCalled(); + const notification = bob.sentEnvelopes.find(e => e.type === 'CHANNEL_LEAVE'); + expect(notification).toBeDefined(); + expect(notification?.from).toBe('alice'); + }); + + it('should remove member from all channels on disconnect', () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + router.register(alice); + + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#general')); + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#engineering')); + + expect(router.getChannelMembers('#general')).toContain('alice'); + expect(router.getChannelMembers('#engineering')).toContain('alice'); + + router.unregister(alice); + + expect(router.getChannelMembers('#general')).not.toContain('alice'); + expect(router.getChannelMembers('#engineering')).not.toContain('alice'); + }); + + it('should handle leave from channel not joined gracefully', () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + router.register(alice); + + // Should not throw + expect(() => { + router.handleChannelLeave(alice, createChannelLeaveEnvelope('alice', '#nonexistent')); + }).not.toThrow(); + }); + }); + + describe('Channel Message Routing', () => { + it('should route message to all channel members except sender', () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + const bob = new MockConnection('conn-2', 'bob', { entityType: 'user' }); + const charlie = new MockConnection('conn-3', 'charlie', { entityType: 'user' }); + + router.register(alice); + router.register(bob); + router.register(charlie); + + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#general')); + router.handleChannelJoin(bob, createChannelJoinEnvelope('bob', '#general')); + router.handleChannelJoin(charlie, createChannelJoinEnvelope('charlie', '#general')); + + alice.clearSent(); + bob.clearSent(); + charlie.clearSent(); + + const msgEnvelope = createChannelMessageEnvelope('alice', '#general', 'Hello everyone!'); + router.routeChannelMessage(alice, msgEnvelope); + + // Alice should NOT receive her own message + expect(alice.sendMock).not.toHaveBeenCalled(); + + // Bob and Charlie should receive the message + expect(bob.sendMock).toHaveBeenCalled(); + expect(charlie.sendMock).toHaveBeenCalled(); + + const bobMsg = bob.sentEnvelopes.find(e => e.type === 'CHANNEL_MESSAGE'); + expect(bobMsg?.from).toBe('alice'); + expect((bobMsg?.payload as ChannelMessagePayload).body).toBe('Hello everyone!'); + }); + + it('should not route message to non-members', () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + const bob = new MockConnection('conn-2', 'bob', { entityType: 'user' }); + const outsider = new MockConnection('conn-3', 'outsider', { entityType: 'user' }); + + router.register(alice); + router.register(bob); + router.register(outsider); + + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#private')); + router.handleChannelJoin(bob, createChannelJoinEnvelope('bob', '#private')); + // outsider does NOT join + + alice.clearSent(); + bob.clearSent(); + outsider.clearSent(); + + const msgEnvelope = createChannelMessageEnvelope('alice', '#private', 'Secret message'); + router.routeChannelMessage(alice, msgEnvelope); + + expect(bob.sendMock).toHaveBeenCalled(); + expect(outsider.sendMock).not.toHaveBeenCalled(); + }); + + it('should route message from agent to user in same channel', () => { + const agent = new MockConnection('conn-1', 'CodeReviewer', { entityType: 'agent' }); + const user = new MockConnection('conn-2', 'alice', { entityType: 'user' }); + + router.register(agent); + router.register(user); + + router.handleChannelJoin(agent, createChannelJoinEnvelope('CodeReviewer', '#engineering')); + router.handleChannelJoin(user, createChannelJoinEnvelope('alice', '#engineering')); + + agent.clearSent(); + user.clearSent(); + + const msgEnvelope = createChannelMessageEnvelope('CodeReviewer', '#engineering', 'PR looks good!'); + router.routeChannelMessage(agent, msgEnvelope); + + expect(user.sendMock).toHaveBeenCalled(); + const received = user.sentEnvelopes.find(e => e.type === 'CHANNEL_MESSAGE'); + expect(received?.from).toBe('CodeReviewer'); + }); + + it('should route message from user to agent in same channel', () => { + const agent = new MockConnection('conn-1', 'CodeReviewer', { entityType: 'agent' }); + const user = new MockConnection('conn-2', 'alice', { entityType: 'user' }); + + router.register(agent); + router.register(user); + + router.handleChannelJoin(agent, createChannelJoinEnvelope('CodeReviewer', '#engineering')); + router.handleChannelJoin(user, createChannelJoinEnvelope('alice', '#engineering')); + + agent.clearSent(); + user.clearSent(); + + const msgEnvelope = createChannelMessageEnvelope('alice', '#engineering', 'Can you review my PR?'); + router.routeChannelMessage(user, msgEnvelope); + + expect(agent.sendMock).toHaveBeenCalled(); + const received = agent.sentEnvelopes.find(e => e.type === 'CHANNEL_MESSAGE'); + expect(received?.from).toBe('alice'); + }); + + it('should persist channel messages', async () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + const bob = new MockConnection('conn-2', 'bob', { entityType: 'user' }); + + router.register(alice); + router.register(bob); + + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#general')); + router.handleChannelJoin(bob, createChannelJoinEnvelope('bob', '#general')); + + const msgEnvelope = createChannelMessageEnvelope('alice', '#general', 'Persistent message'); + router.routeChannelMessage(alice, msgEnvelope); + + // Give async storage a moment + await new Promise(resolve => setTimeout(resolve, 10)); + + expect(saved.length).toBeGreaterThan(0); + const savedMsg = saved.find(m => m.body === 'Persistent message'); + expect(savedMsg).toBeDefined(); + expect(savedMsg?.from).toBe('alice'); + }); + }); + + describe('Direct Messages (DM)', () => { + it('should route DM between two users', () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + const bob = new MockConnection('conn-2', 'bob', { entityType: 'user' }); + + router.register(alice); + router.register(bob); + + // Both join the DM channel + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', 'dm:alice:bob')); + router.handleChannelJoin(bob, createChannelJoinEnvelope('bob', 'dm:alice:bob')); + + alice.clearSent(); + bob.clearSent(); + + const dmEnvelope = createChannelMessageEnvelope('alice', 'dm:alice:bob', 'Hey Bob, private message!'); + router.routeChannelMessage(alice, dmEnvelope); + + expect(bob.sendMock).toHaveBeenCalled(); + expect(alice.sendMock).not.toHaveBeenCalled(); // Sender doesn't receive own message + + const received = bob.sentEnvelopes.find(e => e.type === 'CHANNEL_MESSAGE'); + expect((received?.payload as ChannelMessagePayload).body).toBe('Hey Bob, private message!'); + }); + + it('should route DM between user and agent', () => { + const user = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + const agent = new MockConnection('conn-2', 'Assistant', { entityType: 'agent' }); + + router.register(user); + router.register(agent); + + // Both join the DM channel + router.handleChannelJoin(user, createChannelJoinEnvelope('alice', 'dm:alice:Assistant')); + router.handleChannelJoin(agent, createChannelJoinEnvelope('Assistant', 'dm:alice:Assistant')); + + user.clearSent(); + agent.clearSent(); + + // User sends to agent + const dm1 = createChannelMessageEnvelope('alice', 'dm:alice:Assistant', 'Can you help me?'); + router.routeChannelMessage(user, dm1); + + expect(agent.sendMock).toHaveBeenCalled(); + + agent.clearSent(); + user.clearSent(); + + // Agent responds + const dm2 = createChannelMessageEnvelope('Assistant', 'dm:alice:Assistant', 'Of course! How can I help?'); + router.routeChannelMessage(agent, dm2); + + expect(user.sendMock).toHaveBeenCalled(); + }); + + it('should support group DM with multiple participants', () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + const bob = new MockConnection('conn-2', 'bob', { entityType: 'user' }); + const charlie = new MockConnection('conn-3', 'charlie', { entityType: 'user' }); + + router.register(alice); + router.register(bob); + router.register(charlie); + + const groupDm = 'dm:alice:bob:charlie'; + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', groupDm)); + router.handleChannelJoin(bob, createChannelJoinEnvelope('bob', groupDm)); + router.handleChannelJoin(charlie, createChannelJoinEnvelope('charlie', groupDm)); + + alice.clearSent(); + bob.clearSent(); + charlie.clearSent(); + + const msg = createChannelMessageEnvelope('alice', groupDm, 'Group message'); + router.routeChannelMessage(alice, msg); + + // Both Bob and Charlie should receive, but not Alice + expect(alice.sendMock).not.toHaveBeenCalled(); + expect(bob.sendMock).toHaveBeenCalled(); + expect(charlie.sendMock).toHaveBeenCalled(); + }); + }); + + describe('User Entity Registration', () => { + it('should track user entity type separately from agents', () => { + const user = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + const agent = new MockConnection('conn-2', 'CodeReviewer', { entityType: 'agent' }); + + router.register(user); + router.register(agent); + + expect(router.isUser('alice')).toBe(true); + expect(router.isUser('CodeReviewer')).toBe(false); + expect(router.isAgent('alice')).toBe(false); + expect(router.isAgent('CodeReviewer')).toBe(true); + }); + + it('should return all users', () => { + const user1 = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + const user2 = new MockConnection('conn-2', 'bob', { entityType: 'user' }); + const agent = new MockConnection('conn-3', 'CodeReviewer', { entityType: 'agent' }); + + router.register(user1); + router.register(user2); + router.register(agent); + + const users = router.getUsers(); + expect(users).toContain('alice'); + expect(users).toContain('bob'); + expect(users).not.toContain('CodeReviewer'); + expect(users).toHaveLength(2); + }); + + it('should return all agents (excluding users)', () => { + const user = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + const agent1 = new MockConnection('conn-2', 'CodeReviewer', { entityType: 'agent' }); + const agent2 = new MockConnection('conn-3', 'TestRunner', { entityType: 'agent' }); + + router.register(user); + router.register(agent1); + router.register(agent2); + + const agents = router.getAgents(); + expect(agents).toContain('CodeReviewer'); + expect(agents).toContain('TestRunner'); + expect(agents).not.toContain('alice'); + }); + }); + + describe('Channel Listing', () => { + it('should list all channels', () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + router.register(alice); + + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#general')); + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#engineering')); + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', 'dm:alice:bob')); + + const channels = router.getChannels(); + expect(channels).toContain('#general'); + expect(channels).toContain('#engineering'); + expect(channels).toContain('dm:alice:bob'); + }); + + it('should list channels for a specific user', () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + const bob = new MockConnection('conn-2', 'bob', { entityType: 'user' }); + + router.register(alice); + router.register(bob); + + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#general')); + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#alice-only')); + router.handleChannelJoin(bob, createChannelJoinEnvelope('bob', '#general')); + router.handleChannelJoin(bob, createChannelJoinEnvelope('bob', '#bob-only')); + + const aliceChannels = router.getChannelsForMember('alice'); + expect(aliceChannels).toContain('#general'); + expect(aliceChannels).toContain('#alice-only'); + expect(aliceChannels).not.toContain('#bob-only'); + }); + + it('should delete empty channels', () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + router.register(alice); + + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#temp')); + expect(router.getChannels()).toContain('#temp'); + + router.handleChannelLeave(alice, createChannelLeaveEnvelope('alice', '#temp')); + // Empty channel should be removed + expect(router.getChannels()).not.toContain('#temp'); + }); + }); + + describe('Channel Thread Support', () => { + it('should support threaded messages in channels', () => { + const alice = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + const bob = new MockConnection('conn-2', 'bob', { entityType: 'user' }); + + router.register(alice); + router.register(bob); + + router.handleChannelJoin(alice, createChannelJoinEnvelope('alice', '#general')); + router.handleChannelJoin(bob, createChannelJoinEnvelope('bob', '#general')); + + alice.clearSent(); + bob.clearSent(); + + // Parent message + const parentMsg: Envelope = { + v: 1, + type: 'CHANNEL_MESSAGE', + id: 'parent-123', + ts: Date.now(), + from: 'alice', + payload: { + channel: '#general', + body: 'Starting a thread', + }, + }; + router.routeChannelMessage(alice, parentMsg); + + bob.clearSent(); + + // Threaded reply + const threadedReply: Envelope = { + v: 1, + type: 'CHANNEL_MESSAGE', + id: 'reply-456', + ts: Date.now(), + from: 'bob', + payload: { + channel: '#general', + body: 'This is a reply', + thread: 'parent-123', + }, + }; + router.routeChannelMessage(bob, threadedReply); + + // Alice should receive the threaded reply + expect(alice.sendMock).toHaveBeenCalled(); + const received = alice.sentEnvelopes.find(e => e.type === 'CHANNEL_MESSAGE'); + expect((received?.payload as ChannelMessagePayload).thread).toBe('parent-123'); + }); + }); + + describe('Integration with existing SEND/DELIVER', () => { + it('should still support direct agent-to-agent messaging via SEND', () => { + const agent1 = new MockConnection('conn-1', 'agent1', { entityType: 'agent' }); + const agent2 = new MockConnection('conn-2', 'agent2', { entityType: 'agent' }); + + router.register(agent1); + router.register(agent2); + + // Traditional SEND message (not channel) + const sendEnvelope: Envelope = { + v: 1, + type: 'SEND', + id: 'msg-1', + ts: Date.now(), + from: 'agent1', + to: 'agent2', + payload: { + kind: 'message', + body: 'Direct message via SEND', + }, + }; + + router.route(agent1, sendEnvelope); + + expect(agent2.sendMock).toHaveBeenCalled(); + const delivered = agent2.sentEnvelopes.find(e => e.type === 'DELIVER') as DeliverEnvelope; + expect(delivered.payload.body).toBe('Direct message via SEND'); + }); + + it('should allow user to send direct message to agent via SEND', () => { + const user = new MockConnection('conn-1', 'alice', { entityType: 'user' }); + const agent = new MockConnection('conn-2', 'CodeReviewer', { entityType: 'agent' }); + + router.register(user); + router.register(agent); + + const sendEnvelope: Envelope = { + v: 1, + type: 'SEND', + id: 'msg-1', + ts: Date.now(), + from: 'alice', + to: 'CodeReviewer', + payload: { + kind: 'message', + body: 'User to agent direct message', + }, + }; + + router.route(user, sendEnvelope); + + expect(agent.sendMock).toHaveBeenCalled(); + const delivered = agent.sentEnvelopes.find(e => e.type === 'DELIVER') as DeliverEnvelope; + expect(delivered.payload.body).toBe('User to agent direct message'); + }); + }); +}); diff --git a/src/daemon/connection.ts b/src/daemon/connection.ts index 886d1d6f..f778147d 100644 --- a/src/daemon/connection.ts +++ b/src/daemon/connection.ts @@ -18,6 +18,7 @@ import { type PongPayload, type ErrorPayload, type AckPayload, + type EntityType, PROTOCOL_VERSION, } from '../protocol/types.js'; import { encodeFrame, FrameParser } from '../protocol/framing.js'; @@ -54,11 +55,14 @@ export class Connection { private _state: ConnectionState = 'CONNECTING'; private _agentName?: string; + private _entityType?: EntityType; private _cli?: string; private _program?: string; private _model?: string; private _task?: string; private _workingDirectory?: string; + private _displayName?: string; + private _avatarUrl?: string; private _sessionId: string; private _resumeToken: string; private _isResumed = false; @@ -97,6 +101,10 @@ export class Connection { return this._agentName; } + get entityType(): EntityType | undefined { + return this._entityType; + } + get cli(): string | undefined { return this._cli; } @@ -117,6 +125,14 @@ export class Connection { return this._workingDirectory; } + get displayName(): string | undefined { + return this._displayName; + } + + get avatarUrl(): string | undefined { + return this._avatarUrl; + } + get sessionId(): string { return this._sessionId; } @@ -183,11 +199,14 @@ export class Connection { } this._agentName = envelope.payload.agent; + this._entityType = envelope.payload.entityType; this._cli = envelope.payload.cli; this._program = envelope.payload.program; this._model = envelope.payload.model; this._task = envelope.payload.task; this._workingDirectory = envelope.payload.workingDirectory; + this._displayName = envelope.payload.displayName; + this._avatarUrl = envelope.payload.avatarUrl; // Check for session resume const resumeToken = envelope.payload.session?.resume_token; diff --git a/src/daemon/router.ts b/src/daemon/router.ts index 9959d709..f33d4ceb 100644 --- a/src/daemon/router.ts +++ b/src/daemon/router.ts @@ -11,8 +11,14 @@ import { type AckPayload, type ShadowConfig, type SpeakOnTrigger, + type EntityType, PROTOCOL_VERSION, } from '../protocol/types.js'; +import type { + ChannelJoinPayload, + ChannelLeavePayload, + ChannelMessagePayload, +} from '../protocol/channels.js'; import type { StorageAdapter } from '../storage/adapter.js'; import type { AgentRegistry } from './agent-registry.js'; import { routerLog } from '../utils/logger.js'; @@ -20,6 +26,8 @@ import { routerLog } from '../utils/logger.js'; export interface RoutableConnection { id: string; agentName?: string; + /** Entity type: 'agent' (default) or 'user' for human users */ + entityType?: EntityType; cli?: string; program?: string; model?: string; @@ -100,6 +108,13 @@ export class Router { /** Reverse lookup: shadowAgent -> primaryAgent (for cleanup) */ private primaryByShadow: Map = new Map(); + /** Channel membership: channel -> Set of member names */ + private channels: Map> = new Map(); + /** User entities (human users, not agents) */ + private users: Map = new Map(); + /** Reverse lookup: member name -> Set of channels they're in */ + private memberChannels: Map> = new Map(); + /** Default timeout for processing indicator (30 seconds) */ private static readonly PROCESSING_TIMEOUT_MS = 30_000; @@ -134,21 +149,34 @@ export class Router { this.connections.set(connection.id, connection); if (connection.agentName) { - // Handle existing connection with same name (disconnect old) - const existing = this.agents.get(connection.agentName); - if (existing && existing.id !== connection.id) { - existing.close(); - this.connections.delete(existing.id); + const isUser = connection.entityType === 'user'; + + if (isUser) { + // Handle existing user connection with same name (disconnect old) + const existingUser = this.users.get(connection.agentName); + if (existingUser && existingUser.id !== connection.id) { + existingUser.close(); + this.connections.delete(existingUser.id); + } + this.users.set(connection.agentName, connection); + routerLog.info(`User registered: ${connection.agentName}`); + } else { + // Handle existing agent connection with same name (disconnect old) + const existing = this.agents.get(connection.agentName); + if (existing && existing.id !== connection.id) { + existing.close(); + this.connections.delete(existing.id); + } + this.agents.set(connection.agentName, connection); + this.registry?.registerOrUpdate({ + name: connection.agentName, + cli: connection.cli, + program: connection.program, + model: connection.model, + task: connection.task, + workingDirectory: connection.workingDirectory, + }); } - this.agents.set(connection.agentName, connection); - this.registry?.registerOrUpdate({ - name: connection.agentName, - cli: connection.cli, - program: connection.program, - model: connection.model, - task: connection.task, - workingDirectory: connection.workingDirectory, - }); } } @@ -158,9 +186,18 @@ export class Router { unregister(connection: RoutableConnection): void { this.connections.delete(connection.id); if (connection.agentName) { - const current = this.agents.get(connection.agentName); - if (current?.id === connection.id) { - this.agents.delete(connection.agentName); + const isUser = connection.entityType === 'user'; + + if (isUser) { + const currentUser = this.users.get(connection.agentName); + if (currentUser?.id === connection.id) { + this.users.delete(connection.agentName); + } + } else { + const current = this.agents.get(connection.agentName); + if (current?.id === connection.id) { + this.agents.delete(connection.agentName); + } } // Remove from all subscriptions @@ -168,6 +205,9 @@ export class Router { subscribers.delete(connection.agentName); } + // Remove from all channels and notify remaining members + this.removeFromAllChannels(connection.agentName); + // Clean up shadow relationships this.unbindShadow(connection.agentName); @@ -178,6 +218,26 @@ export class Router { this.clearPendingForConnection(connection.id); } + /** + * Remove a member from all channels they're in. + */ + private removeFromAllChannels(memberName: string): void { + const memberChannelSet = this.memberChannels.get(memberName); + if (!memberChannelSet) return; + + for (const channelName of memberChannelSet) { + const members = this.channels.get(channelName); + if (members) { + members.delete(memberName); + // Clean up empty channels + if (members.size === 0) { + this.channels.delete(channelName); + } + } + } + this.memberChannels.delete(memberName); + } + /** * Subscribe an agent to a topic. */ @@ -893,4 +953,257 @@ export class Router { } } } + + // ==================== Channel Methods ==================== + + /** + * Handle a CHANNEL_JOIN message. + * Adds the member to the channel and notifies existing members. + */ + handleChannelJoin( + connection: RoutableConnection, + envelope: Envelope + ): void { + const memberName = connection.agentName; + if (!memberName) { + routerLog.warn('CHANNEL_JOIN from connection without name'); + return; + } + + const channel = envelope.payload.channel; + + // Get or create channel + let members = this.channels.get(channel); + if (!members) { + members = new Set(); + this.channels.set(channel, members); + } + + // Check if already a member + if (members.has(memberName)) { + routerLog.debug(`${memberName} already in ${channel}`); + return; + } + + // Notify existing members about the new joiner + for (const existingMember of members) { + const memberConn = this.getConnectionByName(existingMember); + if (memberConn) { + const joinNotification: Envelope = { + v: PROTOCOL_VERSION, + type: 'CHANNEL_JOIN', + id: uuid(), + ts: Date.now(), + from: memberName, + payload: envelope.payload, + }; + memberConn.send(joinNotification); + } + } + + // Add the new member + members.add(memberName); + + // Track which channels this member is in + let memberChannelSet = this.memberChannels.get(memberName); + if (!memberChannelSet) { + memberChannelSet = new Set(); + this.memberChannels.set(memberName, memberChannelSet); + } + memberChannelSet.add(channel); + + routerLog.info(`${memberName} joined ${channel} (${members.size} members)`); + } + + /** + * Handle a CHANNEL_LEAVE message. + * Removes the member from the channel and notifies remaining members. + */ + handleChannelLeave( + connection: RoutableConnection, + envelope: Envelope + ): void { + const memberName = connection.agentName; + if (!memberName) { + routerLog.warn('CHANNEL_LEAVE from connection without name'); + return; + } + + const channel = envelope.payload.channel; + const members = this.channels.get(channel); + + if (!members || !members.has(memberName)) { + routerLog.debug(`${memberName} not in ${channel}, ignoring leave`); + return; + } + + // Remove from channel + members.delete(memberName); + + // Remove from member's channel list + const memberChannelSet = this.memberChannels.get(memberName); + if (memberChannelSet) { + memberChannelSet.delete(channel); + if (memberChannelSet.size === 0) { + this.memberChannels.delete(memberName); + } + } + + // Notify remaining members + for (const remainingMember of members) { + const memberConn = this.getConnectionByName(remainingMember); + if (memberConn) { + const leaveNotification: Envelope = { + v: PROTOCOL_VERSION, + type: 'CHANNEL_LEAVE', + id: uuid(), + ts: Date.now(), + from: memberName, + payload: envelope.payload, + }; + memberConn.send(leaveNotification); + } + } + + // Clean up empty channels + if (members.size === 0) { + this.channels.delete(channel); + routerLog.debug(`Channel ${channel} deleted (empty)`); + } + + routerLog.info(`${memberName} left ${channel}`); + } + + /** + * Route a channel message to all members except the sender. + */ + routeChannelMessage( + connection: RoutableConnection, + envelope: Envelope + ): void { + const senderName = connection.agentName; + if (!senderName) { + routerLog.warn('CHANNEL_MESSAGE from connection without name'); + return; + } + + const channel = envelope.payload.channel; + const members = this.channels.get(channel); + + if (!members) { + routerLog.warn(`Message to non-existent channel ${channel}`); + return; + } + + if (!members.has(senderName)) { + routerLog.warn(`${senderName} not a member of ${channel}`); + return; + } + + // Route to all members except sender + for (const memberName of members) { + if (memberName === senderName) continue; + + const memberConn = this.getConnectionByName(memberName); + if (memberConn) { + const deliverEnvelope: Envelope = { + v: PROTOCOL_VERSION, + type: 'CHANNEL_MESSAGE', + id: uuid(), + ts: Date.now(), + from: senderName, + payload: envelope.payload, + }; + memberConn.send(deliverEnvelope); + } + } + + // Persist channel message + this.persistChannelMessage(envelope, senderName); + + routerLog.debug(`${senderName} -> ${channel}: ${envelope.payload.body.substring(0, 50)}`); + } + + /** + * Persist a channel message to storage. + */ + private persistChannelMessage( + envelope: Envelope, + from: string + ): void { + if (!this.storage) return; + + this.storage.saveMessage({ + id: envelope.id, + ts: envelope.ts, + from, + to: envelope.payload.channel, // Channel name as "to" + topic: undefined, + kind: 'message', + body: envelope.payload.body, + data: { + ...envelope.payload.data, + _isChannelMessage: true, + _channel: envelope.payload.channel, + _mentions: envelope.payload.mentions, + }, + thread: envelope.payload.thread, + status: 'unread', + is_urgent: false, + is_broadcast: true, // Channel messages are effectively broadcasts + }).catch((err) => { + routerLog.error('Failed to persist channel message', { error: String(err) }); + }); + } + + /** + * Get all members of a channel. + */ + getChannelMembers(channel: string): string[] { + const members = this.channels.get(channel); + return members ? Array.from(members) : []; + } + + /** + * Get all channels. + */ + getChannels(): string[] { + return Array.from(this.channels.keys()); + } + + /** + * Get all channels a member is in. + */ + getChannelsForMember(memberName: string): string[] { + const channels = this.memberChannels.get(memberName); + return channels ? Array.from(channels) : []; + } + + /** + * Check if a name belongs to a user (not an agent). + */ + isUser(name: string): boolean { + return this.users.has(name); + } + + /** + * Check if a name belongs to an agent (not a user). + */ + isAgent(name: string): boolean { + return this.agents.has(name); + } + + /** + * Get list of connected user names (human users only). + */ + getUsers(): string[] { + return Array.from(this.users.keys()); + } + + /** + * Get a connection by name (checks both agents and users). + */ + private getConnectionByName(name: string): RoutableConnection | undefined { + return this.agents.get(name) ?? this.users.get(name); + } } diff --git a/src/dashboard-server/server.ts b/src/dashboard-server/server.ts index 1038284f..795b82e0 100644 --- a/src/dashboard-server/server.ts +++ b/src/dashboard-server/server.ts @@ -10,6 +10,7 @@ import { fileURLToPath } from 'url'; import { SqliteStorageAdapter } from '../storage/sqlite-adapter.js'; import type { StorageAdapter, StoredMessage } from '../storage/adapter.js'; import { RelayClient } from '../wrapper/client.js'; +import { UserBridge } from './user-bridge.js'; import { computeNeedsAttention } from './needs-attention.js'; import { computeSystemMetrics, formatPrometheusMetrics } from './metrics.js'; import { MultiProjectClient } from '../bridge/multi-project-client.js'; @@ -739,6 +740,28 @@ export async function startDashboard( // Start default relay client connection (non-blocking) getRelayClient('Dashboard').catch(() => {}); + // User bridge for human-to-human and human-to-agent messaging + const userBridge = new UserBridge({ + socketPath, + createRelayClient: async (options) => { + const client = new RelayClient({ + socketPath: options.socketPath, + agentName: options.agentName, + entityType: options.entityType, + cli: 'dashboard', + reconnect: true, + maxReconnectAttempts: 5, + }); + + client.onError = (err) => { + console.error(`[user-bridge] Relay client error for ${options.agentName}:`, err.message); + }; + + await client.connect(); + return client; + }, + }); + // Bridge client for cross-project messaging let bridgeClient: MultiProjectClient | undefined; let bridgeClientConnecting = false; @@ -1930,6 +1953,11 @@ export async function startDashboard( console.log(`[dashboard] User ${username} came online`); + // Register user with relay daemon for messaging + userBridge.registerUser(username, ws, { avatarUrl }).catch((err) => { + console.error(`[dashboard] Failed to register user ${username} with relay:`, err); + }); + // Broadcast join to all other clients (only for truly new users) broadcastPresence({ type: 'presence_join', @@ -2006,6 +2034,88 @@ export async function startDashboard( avatarUrl: userState?.info.avatarUrl, isTyping: msg.isTyping, }, ws); + } else if (msg.type === 'channel_join') { + // Join a channel + if (!clientUsername) { + console.warn(`[dashboard] Security: Unauthenticated channel_join attempt`); + return; + } + if (!msg.channel || typeof msg.channel !== 'string') { + console.warn(`[dashboard] Invalid channel_join: missing channel`); + return; + } + userBridge.joinChannel(clientUsername, msg.channel).then((success) => { + ws.send(JSON.stringify({ + type: 'channel_joined', + channel: msg.channel, + success, + })); + }).catch((err) => { + console.error(`[dashboard] Channel join error:`, err); + ws.send(JSON.stringify({ + type: 'channel_joined', + channel: msg.channel, + success: false, + error: err.message, + })); + }); + } else if (msg.type === 'channel_leave') { + // Leave a channel + if (!clientUsername) { + console.warn(`[dashboard] Security: Unauthenticated channel_leave attempt`); + return; + } + if (!msg.channel || typeof msg.channel !== 'string') { + console.warn(`[dashboard] Invalid channel_leave: missing channel`); + return; + } + userBridge.leaveChannel(clientUsername, msg.channel).then((success) => { + ws.send(JSON.stringify({ + type: 'channel_left', + channel: msg.channel, + success, + })); + }).catch((err) => { + console.error(`[dashboard] Channel leave error:`, err); + }); + } else if (msg.type === 'channel_message') { + // Send message to channel + if (!clientUsername) { + console.warn(`[dashboard] Security: Unauthenticated channel_message attempt`); + return; + } + if (!msg.channel || typeof msg.channel !== 'string') { + console.warn(`[dashboard] Invalid channel_message: missing channel`); + return; + } + if (!msg.body || typeof msg.body !== 'string') { + console.warn(`[dashboard] Invalid channel_message: missing body`); + return; + } + userBridge.sendChannelMessage(clientUsername, msg.channel, msg.body, { + thread: msg.thread, + }).catch((err) => { + console.error(`[dashboard] Channel message error:`, err); + }); + } else if (msg.type === 'direct_message') { + // Send direct message to user or agent + if (!clientUsername) { + console.warn(`[dashboard] Security: Unauthenticated direct_message attempt`); + return; + } + if (!msg.to || typeof msg.to !== 'string') { + console.warn(`[dashboard] Invalid direct_message: missing 'to'`); + return; + } + if (!msg.body || typeof msg.body !== 'string') { + console.warn(`[dashboard] Invalid direct_message: missing body`); + return; + } + userBridge.sendDirectMessage(clientUsername, msg.to, msg.body, { + thread: msg.thread, + }).catch((err) => { + console.error(`[dashboard] Direct message error:`, err); + }); } } catch (err) { console.error('[dashboard] Invalid presence message:', err); @@ -2028,6 +2138,9 @@ export async function startDashboard( onlineUsers.delete(clientUsername); console.log(`[dashboard] User ${clientUsername} disconnected`); + // Unregister from relay daemon + userBridge.unregisterUser(clientUsername); + broadcastPresence({ type: 'presence_leave', username: clientUsername, @@ -2047,6 +2160,91 @@ export async function startDashboard( }); }); + // ===== Channel API ===== + /** + * GET /api/channels - Get list of channels the user has joined + */ + app.get('/api/channels', (req, res) => { + const username = req.query.username as string; + if (!username) { + return res.status(400).json({ error: 'username query param required' }); + } + const channels = userBridge.getUserChannels(username); + res.json({ channels }); + }); + + /** + * GET /api/channels/users - Get list of registered users + */ + app.get('/api/channels/users', (_req, res) => { + const users = userBridge.getRegisteredUsers(); + res.json({ users }); + }); + + /** + * POST /api/channels/join - Join a channel + */ + app.post('/api/channels/join', express.json(), async (req, res) => { + const { username, channel } = req.body; + if (!username || !channel) { + return res.status(400).json({ error: 'username and channel required' }); + } + try { + const success = await userBridge.joinChannel(username, channel); + res.json({ success, channel }); + } catch (err: any) { + res.status(500).json({ error: err.message }); + } + }); + + /** + * POST /api/channels/leave - Leave a channel + */ + app.post('/api/channels/leave', express.json(), async (req, res) => { + const { username, channel } = req.body; + if (!username || !channel) { + return res.status(400).json({ error: 'username and channel required' }); + } + try { + const success = await userBridge.leaveChannel(username, channel); + res.json({ success, channel }); + } catch (err: any) { + res.status(500).json({ error: err.message }); + } + }); + + /** + * POST /api/channels/message - Send a message to a channel + */ + app.post('/api/channels/message', express.json(), async (req, res) => { + const { username, channel, body, thread } = req.body; + if (!username || !channel || !body) { + return res.status(400).json({ error: 'username, channel, and body required' }); + } + try { + const success = await userBridge.sendChannelMessage(username, channel, body, { thread }); + res.json({ success }); + } catch (err: any) { + res.status(500).json({ error: err.message }); + } + }); + + /** + * POST /api/dm - Send a direct message + */ + app.post('/api/dm', express.json(), async (req, res) => { + const { from, to, body, thread } = req.body; + if (!from || !to || !body) { + return res.status(400).json({ error: 'from, to, and body required' }); + } + try { + const success = await userBridge.sendDirectMessage(from, to, body, { thread }); + res.json({ success }); + } catch (err: any) { + res.status(500).json({ error: err.message }); + } + }); + // ===== Health Check API ===== /** * GET /health - Health check endpoint for monitoring diff --git a/src/dashboard-server/user-bridge.test.ts b/src/dashboard-server/user-bridge.test.ts new file mode 100644 index 00000000..5713f205 --- /dev/null +++ b/src/dashboard-server/user-bridge.test.ts @@ -0,0 +1,419 @@ +/** + * Tests for the User Bridge functionality. + * TDD: Tests for bridging dashboard WebSocket users to the relay daemon. + * + * The user bridge allows human users connected via WebSocket to: + * - Register as "user" entities in the relay daemon + * - Join/leave channels + * - Send/receive messages through the relay daemon + * - Communicate with agents and other users + */ + +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { UserBridge } from './user-bridge.js'; + +// Mock WebSocket +class MockWebSocket { + public sentMessages: unknown[] = []; + public readyState = 1; // OPEN + private eventHandlers = new Map void)[]>(); + + send(data: string): void { + this.sentMessages.push(JSON.parse(data)); + } + + on(event: string, handler: (...args: unknown[]) => void): void { + const handlers = this.eventHandlers.get(event) || []; + handlers.push(handler); + this.eventHandlers.set(event, handlers); + } + + emit(event: string, ...args: unknown[]): void { + const handlers = this.eventHandlers.get(event) || []; + for (const handler of handlers) { + handler(...args); + } + } + + close(): void { + this.readyState = 3; // CLOSED + this.emit('close'); + } + + clearSent(): void { + this.sentMessages = []; + } +} + +// Mock RelayClient +class MockRelayClient { + public connected = false; + public agentName: string; + public entityType?: string; + public sentMessages: Array<{ to: string; body: string; kind: string; thread?: string }> = []; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + public onMessage?: (from: string, payload: any, messageId: string, meta?: any, originalTo?: string) => void; + + constructor(options: { socketPath: string; agentName: string; entityType?: string }) { + this.agentName = options.agentName; + this.entityType = options.entityType; + } + + async connect(): Promise { + this.connected = true; + } + + disconnect(): void { + this.connected = false; + } + + get state(): string { + return this.connected ? 'READY' : 'DISCONNECTED'; + } + + sendMessage( + to: string, + body: string, + kind: string = 'message', + _data?: unknown, + thread?: string + ): boolean { + this.sentMessages.push({ to, body, kind, thread }); + return true; + } + + // Test helper to simulate receiving a message + simulateIncomingMessage(from: string, body: string, envelope: unknown): void { + this.onMessage?.(from, envelope, 'test-msg-id', undefined, undefined); + } + + clearSent(): void { + this.sentMessages = []; + } +} + +describe('UserBridge', () => { + let bridge: UserBridge; + let mockWs: MockWebSocket; + let mockRelayClient: MockRelayClient; + let relayClientFactory: ReturnType; + + beforeEach(() => { + mockWs = new MockWebSocket(); + mockRelayClient = new MockRelayClient({ + socketPath: '/tmp/test.sock', + agentName: 'alice', + entityType: 'user', + }); + + relayClientFactory = vi.fn().mockResolvedValue(mockRelayClient); + + bridge = new UserBridge({ + socketPath: '/tmp/test.sock', + createRelayClient: relayClientFactory, + }); + }); + + afterEach(() => { + bridge.dispose(); + }); + + describe('User Registration', () => { + it('should register a user with the relay daemon', async () => { + await bridge.registerUser('alice', mockWs as unknown as WebSocket, { + avatarUrl: 'https://example.com/alice.png', + }); + + expect(relayClientFactory).toHaveBeenCalledWith( + expect.objectContaining({ + agentName: 'alice', + entityType: 'user', + }) + ); + expect(mockRelayClient.connected).toBe(true); + }); + + it('should track registered users', async () => { + await bridge.registerUser('alice', mockWs as unknown as WebSocket); + + expect(bridge.isUserRegistered('alice')).toBe(true); + expect(bridge.isUserRegistered('bob')).toBe(false); + }); + + it('should allow multiple users to register', async () => { + const ws1 = new MockWebSocket(); + const ws2 = new MockWebSocket(); + + const client1 = new MockRelayClient({ socketPath: '/tmp/test.sock', agentName: 'alice', entityType: 'user' }); + const client2 = new MockRelayClient({ socketPath: '/tmp/test.sock', agentName: 'bob', entityType: 'user' }); + + relayClientFactory + .mockResolvedValueOnce(client1) + .mockResolvedValueOnce(client2); + + await bridge.registerUser('alice', ws1 as unknown as WebSocket); + await bridge.registerUser('bob', ws2 as unknown as WebSocket); + + expect(bridge.isUserRegistered('alice')).toBe(true); + expect(bridge.isUserRegistered('bob')).toBe(true); + expect(bridge.getRegisteredUsers()).toContain('alice'); + expect(bridge.getRegisteredUsers()).toContain('bob'); + }); + + it('should handle re-registration of same user', async () => { + const ws1 = new MockWebSocket(); + const ws2 = new MockWebSocket(); + + await bridge.registerUser('alice', ws1 as unknown as WebSocket); + await bridge.registerUser('alice', ws2 as unknown as WebSocket); + + // Should reuse or replace, not duplicate + expect(bridge.getRegisteredUsers().filter(u => u === 'alice')).toHaveLength(1); + }); + }); + + describe('User Unregistration', () => { + it('should unregister a user and disconnect relay client', async () => { + await bridge.registerUser('alice', mockWs as unknown as WebSocket); + expect(bridge.isUserRegistered('alice')).toBe(true); + + bridge.unregisterUser('alice'); + + expect(bridge.isUserRegistered('alice')).toBe(false); + expect(mockRelayClient.connected).toBe(false); + }); + + it('should handle unregistering non-existent user gracefully', () => { + expect(() => bridge.unregisterUser('nonexistent')).not.toThrow(); + }); + }); + + describe('Channel Operations', () => { + beforeEach(async () => { + await bridge.registerUser('alice', mockWs as unknown as WebSocket); + mockRelayClient.clearSent(); + }); + + it('should send channel join to relay daemon', async () => { + await bridge.joinChannel('alice', '#general'); + + expect(mockRelayClient.sentMessages).toContainEqual( + expect.objectContaining({ + to: '#general', + kind: 'channel_join', + }) + ); + }); + + it('should send channel leave to relay daemon', async () => { + await bridge.leaveChannel('alice', '#general'); + + expect(mockRelayClient.sentMessages).toContainEqual( + expect.objectContaining({ + to: '#general', + kind: 'channel_leave', + }) + ); + }); + + it('should track user channel membership', async () => { + await bridge.joinChannel('alice', '#general'); + await bridge.joinChannel('alice', '#engineering'); + + const channels = bridge.getUserChannels('alice'); + expect(channels).toContain('#general'); + expect(channels).toContain('#engineering'); + }); + + it('should remove channel from membership on leave', async () => { + await bridge.joinChannel('alice', '#general'); + await bridge.joinChannel('alice', '#engineering'); + await bridge.leaveChannel('alice', '#general'); + + const channels = bridge.getUserChannels('alice'); + expect(channels).not.toContain('#general'); + expect(channels).toContain('#engineering'); + }); + }); + + describe('Message Sending', () => { + beforeEach(async () => { + await bridge.registerUser('alice', mockWs as unknown as WebSocket); + mockRelayClient.clearSent(); + }); + + it('should send channel message via relay client', async () => { + await bridge.sendChannelMessage('alice', '#general', 'Hello everyone!'); + + expect(mockRelayClient.sentMessages).toContainEqual( + expect.objectContaining({ + to: '#general', + body: 'Hello everyone!', + kind: 'message', + }) + ); + }); + + it('should send direct message to another user', async () => { + await bridge.sendDirectMessage('alice', 'bob', 'Hey Bob!'); + + expect(mockRelayClient.sentMessages).toContainEqual( + expect.objectContaining({ + to: 'bob', + body: 'Hey Bob!', + kind: 'message', + }) + ); + }); + + it('should send direct message to agent', async () => { + await bridge.sendDirectMessage('alice', 'CodeReviewer', 'Review my PR please'); + + expect(mockRelayClient.sentMessages).toContainEqual( + expect.objectContaining({ + to: 'CodeReviewer', + body: 'Review my PR please', + kind: 'message', + }) + ); + }); + + it('should support threaded messages', async () => { + await bridge.sendChannelMessage('alice', '#general', 'Reply to thread', { + thread: 'parent-msg-123', + }); + + expect(mockRelayClient.sentMessages).toContainEqual( + expect.objectContaining({ + to: '#general', + body: 'Reply to thread', + thread: 'parent-msg-123', + }) + ); + }); + }); + + describe('Message Receiving', () => { + beforeEach(async () => { + await bridge.registerUser('alice', mockWs as unknown as WebSocket); + mockWs.clearSent(); + }); + + it('should forward incoming channel messages to WebSocket', () => { + mockRelayClient.simulateIncomingMessage('bob', 'Hello Alice!', { + type: 'CHANNEL_MESSAGE', + payload: { + channel: '#general', + body: 'Hello Alice!', + }, + }); + + expect(mockWs.sentMessages).toContainEqual( + expect.objectContaining({ + type: 'channel_message', + channel: '#general', + from: 'bob', + body: 'Hello Alice!', + }) + ); + }); + + it('should forward incoming direct messages to WebSocket', () => { + mockRelayClient.simulateIncomingMessage('bob', 'Private message', { + type: 'DELIVER', + from: 'bob', + to: 'alice', + payload: { + body: 'Private message', + }, + }); + + expect(mockWs.sentMessages).toContainEqual( + expect.objectContaining({ + type: 'direct_message', + from: 'bob', + body: 'Private message', + }) + ); + }); + + it('should forward agent messages to WebSocket', () => { + mockRelayClient.simulateIncomingMessage('CodeReviewer', 'PR approved!', { + type: 'DELIVER', + from: 'CodeReviewer', + to: 'alice', + payload: { + body: 'PR approved!', + }, + }); + + expect(mockWs.sentMessages).toContainEqual( + expect.objectContaining({ + type: 'direct_message', + from: 'CodeReviewer', + body: 'PR approved!', + }) + ); + }); + }); + + describe('WebSocket Disconnect Handling', () => { + it('should unregister user when WebSocket closes', async () => { + await bridge.registerUser('alice', mockWs as unknown as WebSocket); + expect(bridge.isUserRegistered('alice')).toBe(true); + + mockWs.close(); + + expect(bridge.isUserRegistered('alice')).toBe(false); + }); + + it('should disconnect relay client when WebSocket closes', async () => { + await bridge.registerUser('alice', mockWs as unknown as WebSocket); + expect(mockRelayClient.connected).toBe(true); + + mockWs.close(); + + expect(mockRelayClient.connected).toBe(false); + }); + }); + + describe('Error Handling', () => { + it('should handle send to unregistered user gracefully', async () => { + const result = await bridge.sendChannelMessage('nonexistent', '#general', 'test'); + expect(result).toBe(false); + }); + + it('should handle relay client connection failure', async () => { + relayClientFactory.mockRejectedValueOnce(new Error('Connection failed')); + + await expect( + bridge.registerUser('alice', mockWs as unknown as WebSocket) + ).rejects.toThrow('Connection failed'); + + expect(bridge.isUserRegistered('alice')).toBe(false); + }); + }); + + describe('Disposal', () => { + it('should disconnect all users on dispose', async () => { + const ws1 = new MockWebSocket(); + const ws2 = new MockWebSocket(); + + const client1 = new MockRelayClient({ socketPath: '/tmp/test.sock', agentName: 'alice', entityType: 'user' }); + const client2 = new MockRelayClient({ socketPath: '/tmp/test.sock', agentName: 'bob', entityType: 'user' }); + + relayClientFactory + .mockResolvedValueOnce(client1) + .mockResolvedValueOnce(client2); + + await bridge.registerUser('alice', ws1 as unknown as WebSocket); + await bridge.registerUser('bob', ws2 as unknown as WebSocket); + + bridge.dispose(); + + expect(client1.connected).toBe(false); + expect(client2.connected).toBe(false); + expect(bridge.getRegisteredUsers()).toHaveLength(0); + }); + }); +}); diff --git a/src/dashboard-server/user-bridge.ts b/src/dashboard-server/user-bridge.ts new file mode 100644 index 00000000..c04ee3b9 --- /dev/null +++ b/src/dashboard-server/user-bridge.ts @@ -0,0 +1,306 @@ +/** + * User Bridge - Bridges dashboard WebSocket users to the relay daemon. + * + * This module allows human users connected via WebSocket to: + * - Register as "user" entities in the relay daemon + * - Join/leave channels + * - Send/receive messages through the relay daemon + * - Communicate with agents and other users + */ + +import type { WebSocket } from 'ws'; + +/** + * Relay client interface (subset of RelayClient for dependency injection) + */ +export interface IRelayClient { + connect(): Promise; + disconnect(): void; + state: string; + sendMessage( + to: string, + body: string, + kind?: string, + data?: unknown, + thread?: string + ): boolean; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + onMessage?: (from: string, payload: any, messageId: string, meta?: any, originalTo?: string) => void; +} + +/** + * Factory function type for creating relay clients + */ +export type RelayClientFactory = (options: { + socketPath: string; + agentName: string; + entityType: 'user'; + displayName?: string; + avatarUrl?: string; +}) => Promise; + +/** + * User session state + */ +interface UserSession { + username: string; + relayClient: IRelayClient; + webSocket: WebSocket; + channels: Set; + avatarUrl?: string; +} + +/** + * Options for creating a UserBridge + */ +export interface UserBridgeOptions { + socketPath: string; + createRelayClient: RelayClientFactory; +} + +/** + * Message options for sending + */ +export interface SendMessageOptions { + thread?: string; + data?: Record; +} + +/** + * UserBridge manages the connection between dashboard WebSocket users + * and the relay daemon. + */ +export class UserBridge { + private readonly socketPath: string; + private readonly createRelayClient: RelayClientFactory; + private readonly users = new Map(); + + constructor(options: UserBridgeOptions) { + this.socketPath = options.socketPath; + this.createRelayClient = options.createRelayClient; + } + + /** + * Register a user with the relay daemon. + * Creates a relay client connection for the user. + */ + async registerUser( + username: string, + webSocket: WebSocket, + options?: { avatarUrl?: string; displayName?: string } + ): Promise { + // If user already registered, unregister first + if (this.users.has(username)) { + this.unregisterUser(username); + } + + // Create relay client for this user + const relayClient = await this.createRelayClient({ + socketPath: this.socketPath, + agentName: username, + entityType: 'user', + displayName: options?.displayName, + avatarUrl: options?.avatarUrl, + }); + + // Connect to daemon + await relayClient.connect(); + + // Set up message handler to forward messages to WebSocket + relayClient.onMessage = (from, payload, _messageId, _meta, _originalTo) => { + const body = typeof payload === 'object' && payload !== null && 'body' in payload + ? (payload as { body: string }).body + : String(payload); + this.handleIncomingMessage(username, from, body, payload); + }; + + // Create session + const session: UserSession = { + username, + relayClient, + webSocket, + channels: new Set(), + avatarUrl: options?.avatarUrl, + }; + + this.users.set(username, session); + + // Set up WebSocket close handler + webSocket.on('close', () => { + this.unregisterUser(username); + }); + + console.log(`[user-bridge] User ${username} registered with relay daemon`); + } + + /** + * Unregister a user and disconnect their relay client. + */ + unregisterUser(username: string): void { + const session = this.users.get(username); + if (!session) return; + + session.relayClient.disconnect(); + this.users.delete(username); + + console.log(`[user-bridge] User ${username} unregistered from relay daemon`); + } + + /** + * Check if a user is registered. + */ + isUserRegistered(username: string): boolean { + return this.users.has(username); + } + + /** + * Get list of all registered users. + */ + getRegisteredUsers(): string[] { + return Array.from(this.users.keys()); + } + + /** + * Join a channel. + */ + async joinChannel(username: string, channel: string): Promise { + const session = this.users.get(username); + if (!session) { + console.warn(`[user-bridge] Cannot join channel - user ${username} not registered`); + return false; + } + + // Send channel join via relay client + session.relayClient.sendMessage(channel, '', 'channel_join'); + + // Track membership + session.channels.add(channel); + + console.log(`[user-bridge] User ${username} joined channel ${channel}`); + return true; + } + + /** + * Leave a channel. + */ + async leaveChannel(username: string, channel: string): Promise { + const session = this.users.get(username); + if (!session) { + console.warn(`[user-bridge] Cannot leave channel - user ${username} not registered`); + return false; + } + + // Send channel leave via relay client + session.relayClient.sendMessage(channel, '', 'channel_leave'); + + // Update membership + session.channels.delete(channel); + + console.log(`[user-bridge] User ${username} left channel ${channel}`); + return true; + } + + /** + * Get channels a user has joined. + */ + getUserChannels(username: string): string[] { + const session = this.users.get(username); + return session ? Array.from(session.channels) : []; + } + + /** + * Send a message to a channel. + */ + async sendChannelMessage( + username: string, + channel: string, + body: string, + options?: SendMessageOptions + ): Promise { + const session = this.users.get(username); + if (!session) { + console.warn(`[user-bridge] Cannot send - user ${username} not registered`); + return false; + } + + return session.relayClient.sendMessage( + channel, + body, + 'message', + options?.data, + options?.thread + ); + } + + /** + * Send a direct message to another user or agent. + */ + async sendDirectMessage( + fromUsername: string, + toName: string, + body: string, + options?: SendMessageOptions + ): Promise { + const session = this.users.get(fromUsername); + if (!session) { + console.warn(`[user-bridge] Cannot send DM - user ${fromUsername} not registered`); + return false; + } + + return session.relayClient.sendMessage( + toName, + body, + 'message', + options?.data, + options?.thread + ); + } + + /** + * Handle incoming message from relay daemon. + */ + private handleIncomingMessage( + username: string, + from: string, + body: string, + envelope: unknown + ): void { + const session = this.users.get(username); + if (!session) return; + + const ws = session.webSocket; + if (ws.readyState !== 1) return; // Not OPEN + + // Determine message type from envelope + const env = envelope as { type?: string; payload?: { channel?: string; body?: string }; from?: string; to?: string }; + + if (env.type === 'CHANNEL_MESSAGE') { + // Channel message + ws.send(JSON.stringify({ + type: 'channel_message', + channel: env.payload?.channel, + from, + body: env.payload?.body || body, + timestamp: new Date().toISOString(), + })); + } else { + // Direct message (DELIVER) + ws.send(JSON.stringify({ + type: 'direct_message', + from, + body: (env.payload as { body?: string })?.body || body, + timestamp: new Date().toISOString(), + })); + } + } + + /** + * Dispose of all user sessions. + */ + dispose(): void { + for (const [username] of this.users) { + this.unregisterUser(username); + } + console.log('[user-bridge] Disposed all user sessions'); + } +} diff --git a/src/dashboard/react-components/ChannelChat.tsx b/src/dashboard/react-components/ChannelChat.tsx new file mode 100644 index 00000000..1bfc924c --- /dev/null +++ b/src/dashboard/react-components/ChannelChat.tsx @@ -0,0 +1,269 @@ +/** + * ChannelChat Component + * + * Chat view for a channel or DM conversation. + * Displays messages and provides input for sending new messages. + */ + +import React, { useState, useCallback, useRef, useEffect } from 'react'; +import type { ChannelMessage } from './hooks/useChannels'; + +export interface ChannelChatProps { + /** Current channel name */ + channel: string; + /** Messages in this channel */ + messages: ChannelMessage[]; + /** Current user's username */ + currentUser: string; + /** Send a message */ + onSendMessage: (body: string, thread?: string) => void; + /** Online users for mentions */ + onlineUsers?: string[]; +} + +export function ChannelChat({ + channel, + messages, + currentUser, + onSendMessage, + onlineUsers = [], +}: ChannelChatProps) { + const [inputValue, setInputValue] = useState(''); + const messagesEndRef = useRef(null); + const inputRef = useRef(null); + + // Filter messages for this channel + const channelMessages = messages.filter(m => { + if (m.type === 'channel_message') { + return m.channel === channel; + } + // For DMs, check if this is the right conversation + if (m.type === 'direct_message' && channel.startsWith('dm:')) { + const participants = channel.split(':').slice(1); + return participants.includes(m.from) || participants.includes(m.to || ''); + } + return false; + }); + + // Scroll to bottom on new messages + useEffect(() => { + messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' }); + }, [channelMessages.length]); + + const handleSend = useCallback(() => { + const trimmed = inputValue.trim(); + if (!trimmed) return; + + onSendMessage(trimmed); + setInputValue(''); + inputRef.current?.focus(); + }, [inputValue, onSendMessage]); + + const handleKeyDown = useCallback((e: React.KeyboardEvent) => { + if (e.key === 'Enter' && !e.shiftKey) { + e.preventDefault(); + handleSend(); + } + }, [handleSend]); + + const isDm = channel.startsWith('dm:'); + const channelDisplay = isDm + ? channel.split(':').slice(1).filter(u => u !== currentUser).join(', ') + : channel; + + return ( +
+ {/* Header */} +
+ + {isDm ? '@' : ''}{channelDisplay} + + {!isDm && ( + + Channel + + )} +
+ + {/* Messages */} +
+ {channelMessages.length === 0 ? ( +
+
+ {isDm ? '👋' : '💬'} +
+
+ {isDm + ? `Start a conversation with ${channelDisplay}` + : `Welcome to ${channel}`} +
+
+ {isDm + ? 'Send a message to get started' + : 'This is the beginning of the channel'} +
+
+ ) : ( + channelMessages.map((msg) => ( + + )) + )} +
+
+ + {/* Input */} +
+
+