Skip to content

Commit bbf33fe

Browse files
committed
Merge branch 'develop' of github.com:codervisor/devlog into develop
2 parents dc6caa2 + 3fbcdc9 commit bbf33fe

File tree

18 files changed

+2311
-11
lines changed

18 files changed

+2311
-11
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Chat Session Events API Endpoint
3+
*
4+
* GET /api/chat-sessions/[sessionId]/events - Get session events
5+
*/
6+
7+
import { NextRequest, NextResponse } from 'next/server';
8+
import { getPrismaClient } from '@codervisor/devlog-core/server';
9+
10+
// Mark route as dynamic
11+
export const dynamic = 'force-dynamic';
12+
13+
/**
14+
* GET /api/chat-sessions/:sessionId/events - Get events for a chat session
15+
*
16+
* Returns all agent events associated with the specified chat session,
17+
* ordered chronologically.
18+
*/
19+
export async function GET(
20+
request: NextRequest,
21+
{ params }: { params: { sessionId: string } }
22+
) {
23+
try {
24+
const { sessionId } = params;
25+
26+
// Validate UUID format
27+
const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
28+
if (!uuidRegex.test(sessionId)) {
29+
return NextResponse.json(
30+
{ error: 'Invalid session ID format' },
31+
{ status: 400 }
32+
);
33+
}
34+
35+
// Get Prisma client
36+
const prisma = getPrismaClient();
37+
38+
// Fetch events for the session
39+
const events = await prisma.agentEvent.findMany({
40+
where: { sessionId },
41+
orderBy: { timestamp: 'asc' },
42+
include: {
43+
session: {
44+
include: {
45+
workspace: {
46+
include: {
47+
machine: true,
48+
project: true,
49+
},
50+
},
51+
},
52+
},
53+
},
54+
});
55+
56+
return NextResponse.json({
57+
sessionId,
58+
events,
59+
count: events.length,
60+
});
61+
} catch (error) {
62+
console.error('[GET /api/chat-sessions/:sessionId/events] Error:', error);
63+
return NextResponse.json(
64+
{
65+
error: error instanceof Error ? error.message : 'Failed to get session events',
66+
},
67+
{ status: 500 }
68+
);
69+
}
70+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/**
2+
* Chat Session API Endpoint
3+
*
4+
* POST /api/chat-sessions - Create/update chat session
5+
*/
6+
7+
import { NextRequest, NextResponse } from 'next/server';
8+
import { getPrismaClient } from '@codervisor/devlog-core/server';
9+
import { ChatSessionCreateSchema } from '@/schemas/hierarchy';
10+
import { ApiValidator } from '@/schemas/validation';
11+
12+
// Mark route as dynamic
13+
export const dynamic = 'force-dynamic';
14+
15+
/**
16+
* POST /api/chat-sessions - Create/update chat session
17+
*
18+
* Creates a new chat session or updates an existing one based on sessionId.
19+
* Supports updating message count, token count, and end time.
20+
*/
21+
export async function POST(request: NextRequest) {
22+
try {
23+
// Validate request body
24+
const validation = await ApiValidator.validateJsonBody(
25+
request,
26+
ChatSessionCreateSchema
27+
);
28+
29+
if (!validation.success) {
30+
return validation.response;
31+
}
32+
33+
const data = validation.data;
34+
35+
// Get Prisma client
36+
const prisma = getPrismaClient();
37+
38+
// Upsert chat session
39+
const session = await prisma.chatSession.upsert({
40+
where: { sessionId: data.sessionId },
41+
create: {
42+
sessionId: data.sessionId,
43+
workspaceId: data.workspaceId,
44+
agentType: data.agentType,
45+
modelId: data.modelId,
46+
startedAt: data.startedAt,
47+
endedAt: data.endedAt,
48+
messageCount: data.messageCount,
49+
totalTokens: data.totalTokens,
50+
},
51+
update: {
52+
endedAt: data.endedAt,
53+
messageCount: data.messageCount,
54+
totalTokens: data.totalTokens,
55+
},
56+
});
57+
58+
return NextResponse.json(session, { status: 200 });
59+
} catch (error) {
60+
console.error('[POST /api/chat-sessions] Error:', error);
61+
return NextResponse.json(
62+
{
63+
error: error instanceof Error ? error.message : 'Failed to upsert session',
64+
},
65+
{ status: 500 }
66+
);
67+
}
68+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/**
2+
* Batch Event Creation API Endpoint
3+
*
4+
* POST /api/events/batch - Batch create agent events
5+
*/
6+
7+
import { NextRequest, NextResponse } from 'next/server';
8+
import { getPrismaClient } from '@codervisor/devlog-core/server';
9+
import { BatchEventsCreateSchema } from '@/schemas/hierarchy';
10+
import { ApiValidator } from '@/schemas/validation';
11+
12+
// Mark route as dynamic
13+
export const dynamic = 'force-dynamic';
14+
15+
/**
16+
* POST /api/events/batch - Batch create events
17+
*
18+
* Creates multiple agent events in a single transaction.
19+
* Maximum 1000 events per request for performance.
20+
*/
21+
export async function POST(request: NextRequest) {
22+
try {
23+
// Validate request body
24+
const validation = await ApiValidator.validateJsonBody(
25+
request,
26+
BatchEventsCreateSchema
27+
);
28+
29+
if (!validation.success) {
30+
return validation.response;
31+
}
32+
33+
const events = validation.data;
34+
35+
if (events.length === 0) {
36+
return NextResponse.json(
37+
{ error: 'At least one event is required' },
38+
{ status: 400 }
39+
);
40+
}
41+
42+
// Get Prisma client
43+
const prisma = getPrismaClient();
44+
45+
// Use createMany for better performance
46+
const result = await prisma.agentEvent.createMany({
47+
data: events.map((event) => ({
48+
timestamp: event.timestamp,
49+
eventType: event.eventType,
50+
agentId: event.agentId,
51+
agentVersion: event.agentVersion,
52+
sessionId: event.sessionId,
53+
projectId: event.projectId,
54+
context: event.context,
55+
data: event.data,
56+
metrics: event.metrics,
57+
parentEventId: event.parentEventId,
58+
relatedEventIds: event.relatedEventIds,
59+
tags: event.tags,
60+
severity: event.severity,
61+
})),
62+
skipDuplicates: true, // Skip events with duplicate IDs
63+
});
64+
65+
return NextResponse.json(
66+
{
67+
created: result.count,
68+
requested: events.length,
69+
},
70+
{ status: 201 }
71+
);
72+
} catch (error) {
73+
console.error('[POST /api/events/batch] Error:', error);
74+
75+
// Handle specific Prisma errors
76+
if (error instanceof Error) {
77+
if (error.message.includes('Foreign key constraint')) {
78+
return NextResponse.json(
79+
{
80+
error: 'Invalid reference: session or project not found',
81+
details: error.message,
82+
},
83+
{ status: 400 }
84+
);
85+
}
86+
}
87+
88+
return NextResponse.json(
89+
{
90+
error: error instanceof Error ? error.message : 'Failed to create events',
91+
},
92+
{ status: 500 }
93+
);
94+
}
95+
}

apps/web/app/api/events/stream/route.ts

Lines changed: 97 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,56 +2,142 @@
22
* Server-Sent Events (SSE) endpoint for real-time updates
33
*
44
* Provides a persistent connection that streams updates about:
5-
* - New agent sessions
6-
* - Session status changes
75
* - New agent events
6+
* - Session status changes
87
* - Dashboard metrics updates
8+
*
9+
* Supports hierarchy-based filtering:
10+
* - projectId: Filter events by project
11+
* - machineId: Filter events by machine
12+
* - workspaceId: Filter events by workspace
913
*/
1014

1115
import { NextRequest } from 'next/server';
12-
import { EventBroadcaster } from '@/lib/realtime/event-broadcaster';
16+
import { getPrismaClient } from '@codervisor/devlog-core/server';
1317

1418
// Mark this route as dynamic to prevent static generation
1519
export const dynamic = 'force-dynamic';
1620
export const runtime = 'nodejs';
1721

1822
// Keep-alive interval in milliseconds
1923
const KEEP_ALIVE_INTERVAL = 30000; // 30 seconds
24+
// Polling interval for new events
25+
const POLL_INTERVAL = 5000; // 5 seconds
2026

2127
export async function GET(request: NextRequest) {
22-
const broadcaster = EventBroadcaster.getInstance();
28+
// Parse filter parameters
29+
const { searchParams } = new URL(request.url);
30+
const projectId = searchParams.get('projectId');
31+
const machineId = searchParams.get('machineId');
32+
const workspaceId = searchParams.get('workspaceId');
33+
34+
// Build filter for events
35+
const filters = {
36+
projectId: projectId ? parseInt(projectId, 10) : undefined,
37+
machineId: machineId ? parseInt(machineId, 10) : undefined,
38+
workspaceId: workspaceId ? parseInt(workspaceId, 10) : undefined,
39+
};
2340

2441
// Create a readable stream for SSE
2542
const stream = new ReadableStream({
2643
start(controller) {
2744
const encoder = new TextEncoder();
45+
let lastTimestamp = new Date();
2846

2947
// Send initial connection message
3048
const connectionMessage = `event: connected\ndata: ${JSON.stringify({
3149
timestamp: new Date().toISOString(),
32-
clientCount: broadcaster.getClientCount() + 1
50+
filters,
3351
})}\n\n`;
3452
controller.enqueue(encoder.encode(connectionMessage));
3553

36-
// Add this client to the broadcaster
37-
broadcaster.addClient(controller);
38-
3954
// Set up keep-alive heartbeat
4055
const keepAliveInterval = setInterval(() => {
4156
try {
4257
const heartbeat = `: heartbeat ${Date.now()}\n\n`;
4358
controller.enqueue(encoder.encode(heartbeat));
4459
} catch (error) {
45-
console.error('Error sending heartbeat:', error);
60+
console.error('[SSE] Error sending heartbeat:', error);
4661
clearInterval(keepAliveInterval);
47-
broadcaster.removeClient(controller);
62+
clearInterval(pollInterval);
4863
}
4964
}, KEEP_ALIVE_INTERVAL);
5065

66+
// Poll for new events
67+
const pollInterval = setInterval(async () => {
68+
try {
69+
const prisma = getPrismaClient();
70+
71+
// Build where clause based on filters
72+
const where: any = {
73+
timestamp: {
74+
gte: lastTimestamp,
75+
},
76+
};
77+
78+
if (filters.projectId) {
79+
where.projectId = filters.projectId;
80+
}
81+
82+
if (filters.machineId) {
83+
where.session = {
84+
workspace: {
85+
machineId: filters.machineId,
86+
},
87+
};
88+
}
89+
90+
if (filters.workspaceId) {
91+
where.session = {
92+
...where.session,
93+
workspaceId: filters.workspaceId,
94+
};
95+
}
96+
97+
// Fetch new events
98+
const events = await prisma.agentEvent.findMany({
99+
where,
100+
orderBy: { timestamp: 'desc' },
101+
take: 50,
102+
include: {
103+
session: {
104+
include: {
105+
workspace: {
106+
include: {
107+
machine: true,
108+
project: true,
109+
},
110+
},
111+
},
112+
},
113+
},
114+
});
115+
116+
if (events.length > 0) {
117+
// Update last timestamp
118+
lastTimestamp = new Date(events[0].timestamp);
119+
120+
// Send events to client
121+
const message = `event: events\ndata: ${JSON.stringify({
122+
type: 'events',
123+
data: events,
124+
})}\n\n`;
125+
controller.enqueue(encoder.encode(message));
126+
}
127+
} catch (error) {
128+
console.error('[SSE] Error polling events:', error);
129+
const errorMessage = `event: error\ndata: ${JSON.stringify({
130+
type: 'error',
131+
error: error instanceof Error ? error.message : 'Unknown error',
132+
})}\n\n`;
133+
controller.enqueue(encoder.encode(errorMessage));
134+
}
135+
}, POLL_INTERVAL);
136+
51137
// Clean up when client disconnects
52138
request.signal.addEventListener('abort', () => {
53139
clearInterval(keepAliveInterval);
54-
broadcaster.removeClient(controller);
140+
clearInterval(pollInterval);
55141
try {
56142
controller.close();
57143
} catch (error) {

0 commit comments

Comments
 (0)