Skip to content

Commit ffc88ff

Browse files
authored
feat: store databunny chat responses (#102)
1 parent ad2765c commit ffc88ff

File tree

10 files changed

+318
-167
lines changed

10 files changed

+318
-167
lines changed

apps/api/src/agent/handlers/chart-handler.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,24 @@ export interface ChartHandlerContext {
3030
aiTime: number;
3131
}
3232

33-
export async function* handleChartResponse(
33+
export async function handleChartResponse(
3434
parsedAiJson: z.infer<typeof AIResponseJsonSchema>,
3535
context: ChartHandlerContext
36-
): AsyncGenerator<StreamingUpdate> {
36+
): Promise<StreamingUpdate> {
3737
if (!parsedAiJson.sql) {
38-
yield {
38+
return {
3939
type: 'error',
4040
content: 'AI did not provide a query for the chart.',
4141
debugInfo: context.user?.role === 'ADMIN' ? context.debugInfo : undefined,
4242
};
43-
return;
4443
}
4544

4645
if (!validateSQL(parsedAiJson.sql)) {
47-
yield {
46+
return {
4847
type: 'error',
4948
content: 'Generated query failed security validation.',
5049
debugInfo: context.user?.role === 'ADMIN' ? context.debugInfo : undefined,
5150
};
52-
return;
5351
}
5452

5553
try {
@@ -64,7 +62,7 @@ export async function* handleChartResponse(
6462
};
6563
}
6664

67-
yield {
65+
return {
6866
type: 'complete',
6967
content:
7068
queryResult.data.length > 0
@@ -83,7 +81,7 @@ export async function* handleChartResponse(
8381
error: queryError instanceof Error ? queryError.message : 'Unknown error',
8482
sql: parsedAiJson.sql,
8583
});
86-
yield {
84+
return {
8785
type: 'error',
8886
content: getRandomMessage(queryFailedMessages),
8987
debugInfo: context.user?.role === 'ADMIN' ? context.debugInfo : undefined,

apps/api/src/agent/handlers/metric-handler.ts

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,18 @@ export interface MetricHandlerContext {
1212
debugInfo: Record<string, unknown>;
1313
}
1414

15-
export async function* handleMetricResponse(
15+
export async function handleMetricResponse(
1616
parsedAiJson: z.infer<typeof AIResponseJsonSchema>,
1717
context: MetricHandlerContext
18-
): AsyncGenerator<StreamingUpdate> {
18+
): Promise<StreamingUpdate> {
1919
if (parsedAiJson.sql) {
2020
if (!validateSQL(parsedAiJson.sql)) {
21-
yield {
21+
return {
2222
type: 'error',
2323
content: 'Generated query failed security validation.',
2424
debugInfo:
2525
context.user?.role === 'ADMIN' ? context.debugInfo : undefined,
2626
};
27-
return;
2827
}
2928

3029
try {
@@ -33,21 +32,21 @@ export async function* handleMetricResponse(
3332
queryResult.data,
3433
parsedAiJson.metric_value
3534
);
36-
yield* sendMetricResponse(parsedAiJson, metricValue, context);
35+
return sendMetricResponse(parsedAiJson, metricValue, context);
3736
} catch (queryError: unknown) {
3837
console.error('❌ Metric SQL execution error', {
3938
error:
4039
queryError instanceof Error ? queryError.message : 'Unknown error',
4140
sql: parsedAiJson.sql,
4241
});
43-
yield* sendMetricResponse(
42+
return sendMetricResponse(
4443
parsedAiJson,
4544
parsedAiJson.metric_value,
4645
context
4746
);
4847
}
4948
} else {
50-
yield* sendMetricResponse(parsedAiJson, parsedAiJson.metric_value, context);
49+
return sendMetricResponse(parsedAiJson, parsedAiJson.metric_value, context);
5150
}
5251
}
5352

@@ -67,17 +66,17 @@ function extractMetricValue(
6766
return valueKey ? firstRow[valueKey] : defaultValue;
6867
}
6968

70-
async function* sendMetricResponse(
69+
function sendMetricResponse(
7170
parsedAiJson: z.infer<typeof AIResponseJsonSchema>,
7271
metricValue: unknown,
7372
context: MetricHandlerContext
74-
): AsyncGenerator<StreamingUpdate> {
73+
): StreamingUpdate {
7574
const formattedValue =
7675
typeof metricValue === 'number'
7776
? metricValue.toLocaleString()
7877
: metricValue;
7978

80-
yield {
79+
return {
8180
type: 'complete',
8281
content:
8382
parsedAiJson.text_response ||

apps/api/src/agent/processor.ts

Lines changed: 160 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
import type { User } from '@databuddy/auth';
2-
import type { Website } from '@databuddy/shared';
2+
import { createId, type Website } from '@databuddy/shared';
33
import type { AssistantRequestType } from '../schemas';
44
import { handleChartResponse } from './handlers/chart-handler';
55
import { handleMetricResponse } from './handlers/metric-handler';
6+
import type
7+
{ AIResponse } from './prompts/agent';
68
import { getAICompletion } from './utils/ai-client';
9+
import {
10+
addMessageToConversation,
11+
createNewConversation,
12+
} from './utils/conversation-utils';
713
import type { StreamingUpdate } from './utils/stream-utils';
814
import { generateThinkingSteps } from './utils/stream-utils';
915

@@ -23,20 +29,125 @@ const unexpectedErrorMessages = [
2329
'Something went a bit wonky on my end. Try asking me again?',
2430
];
2531

26-
export interface AssistantRequest extends AssistantRequestType {
32+
export interface AssistantRequest extends Omit<AssistantRequestType, 'model'> {
2733
websiteHostname: string;
34+
model: NonNullable<AssistantRequestType['model']>;
2835
}
2936

3037
export interface AssistantContext {
31-
user?: User | null;
38+
user: User;
3239
website: Website;
3340
debugInfo: Record<string, unknown>;
3441
}
3542

36-
export async function* processAssistantRequest(
43+
async function processResponseByType(
44+
parsedResponse: AIResponse,
45+
context: AssistantContext,
46+
startTime: number,
47+
aiTime: number
48+
): Promise<StreamingUpdate> {
49+
switch (parsedResponse.response_type) {
50+
case 'text': {
51+
const textResult = {
52+
type: 'complete',
53+
content:
54+
parsedResponse.text_response || "Here's the answer to your question.",
55+
data: { hasVisualization: false, responseType: 'text' },
56+
debugInfo:
57+
context.user.role === 'ADMIN' ? context.debugInfo : undefined,
58+
} as const;
59+
return textResult;
60+
}
61+
62+
case 'metric': {
63+
return await handleMetricResponse(parsedResponse, context);
64+
}
65+
66+
case 'chart': {
67+
if (parsedResponse.sql) {
68+
return await handleChartResponse(parsedResponse, {
69+
...context,
70+
startTime,
71+
aiTime,
72+
});
73+
}
74+
return {
75+
type: 'error',
76+
content: 'Invalid chart configuration.',
77+
debugInfo:
78+
context.user.role === 'ADMIN' ? context.debugInfo : undefined,
79+
};
80+
}
81+
default: {
82+
return {
83+
type: 'error',
84+
content: 'Invalid response format from AI.',
85+
debugInfo:
86+
context.user.role === 'ADMIN' ? context.debugInfo : undefined,
87+
};
88+
}
89+
}
90+
}
91+
92+
function saveConversationWithResult(
93+
request: AssistantRequest,
94+
context: AssistantContext,
95+
parsedResponse: AIResponse,
96+
finalResult: StreamingUpdate,
97+
conversationId: string
98+
) {
99+
const numberOfUserMessages = request.messages.filter(
100+
(message) => message.role === 'user'
101+
).length;
102+
const isNewConversation = numberOfUserMessages === 1;
103+
104+
const conversationMessages = [
105+
{
106+
id: createId(),
107+
role: 'user',
108+
content: request.messages.at(-1)?.content as string,
109+
conversationId,
110+
modelType: request.model,
111+
},
112+
{
113+
id: createId(),
114+
role: 'assistant',
115+
content: finalResult.content,
116+
conversationId,
117+
modelType: request.model,
118+
sql: parsedResponse.sql,
119+
chartType: parsedResponse.chart_type,
120+
responseType: parsedResponse.response_type,
121+
textResponse: parsedResponse.text_response,
122+
thinkingSteps: parsedResponse.thinking_steps,
123+
hasError: finalResult.type === 'error',
124+
errorMessage: finalResult.type === 'error' ? finalResult.content : '',
125+
finalResult,
126+
},
127+
];
128+
129+
if (isNewConversation) {
130+
createNewConversation(
131+
conversationId,
132+
request.websiteId,
133+
context.user.id,
134+
'New Conversation',
135+
request.model,
136+
conversationMessages
137+
);
138+
} else {
139+
addMessageToConversation(
140+
conversationId,
141+
request.model,
142+
conversationMessages
143+
);
144+
}
145+
}
146+
147+
export async function processAssistantRequest(
37148
request: AssistantRequest,
38149
context: AssistantContext
39-
): AsyncGenerator<StreamingUpdate> {
150+
): Promise<StreamingUpdate[]> {
40151
const startTime = Date.now();
41152

42153
try {
@@ -46,7 +157,7 @@ export async function* processAssistantRequest(
46157
websiteHostname: request.websiteHostname,
47158
});
48159

49-
if (context.user?.role === 'ADMIN') {
160+
if (context.user.role === 'ADMIN') {
50161
context.debugInfo.validatedInput = {
51162
message: request.messages.at(-1),
52163
websiteId: request.websiteId,
@@ -62,13 +173,14 @@ export async function* processAssistantRequest(
62173
const parsedResponse = aiResponse.content;
63174

64175
if (!parsedResponse) {
65-
yield {
66-
type: 'error',
67-
content: getRandomMessage(parseErrorMessages),
68-
debugInfo:
69-
context.user?.role === 'ADMIN' ? context.debugInfo : undefined,
70-
};
71-
return;
176+
return [
177+
{
178+
type: 'error',
179+
content: getRandomMessage(parseErrorMessages),
180+
debugInfo:
181+
context.user.role === 'ADMIN' ? context.debugInfo : undefined,
182+
},
183+
];
72184
}
73185

74186
console.info('✅ [Assistant Processor] AI response parsed', {
@@ -77,66 +189,49 @@ export async function* processAssistantRequest(
77189
thinkingSteps: parsedResponse.thinking_steps?.length || 0,
78190
});
79191

80-
// Process thinking steps
81-
if (parsedResponse.thinking_steps?.length) {
82-
yield* generateThinkingSteps(parsedResponse.thinking_steps);
83-
}
192+
const conversationId = request.conversationId || createId();
193+
const assistantResponse: StreamingUpdate[] = [];
84194

85-
// Handle different response types
86-
switch (parsedResponse.response_type) {
87-
case 'text':
88-
yield {
89-
type: 'complete',
90-
content:
91-
parsedResponse.text_response ||
92-
"Here's the answer to your question.",
93-
data: { hasVisualization: false, responseType: 'text' },
94-
debugInfo:
95-
context.user?.role === 'ADMIN' ? context.debugInfo : undefined,
96-
};
97-
break;
98-
99-
case 'metric':
100-
yield* handleMetricResponse(parsedResponse, context);
101-
break;
102-
103-
case 'chart':
104-
if (parsedResponse.sql) {
105-
yield* handleChartResponse(parsedResponse, {
106-
...context,
107-
startTime,
108-
aiTime,
109-
});
110-
} else {
111-
yield {
112-
type: 'error',
113-
content: 'Invalid chart configuration.',
114-
debugInfo:
115-
context.user?.role === 'ADMIN' ? context.debugInfo : undefined,
116-
};
117-
}
118-
break;
119-
120-
default:
121-
yield {
122-
type: 'error',
123-
content: 'Invalid response format from AI.',
124-
debugInfo:
125-
context.user?.role === 'ADMIN' ? context.debugInfo : undefined,
126-
};
195+
if (parsedResponse.thinking_steps) {
196+
assistantResponse.push(
197+
...generateThinkingSteps(parsedResponse.thinking_steps)
198+
);
127199
}
200+
201+
const finalResult = await processResponseByType(
202+
parsedResponse,
203+
context,
204+
startTime,
205+
aiTime
206+
);
207+
208+
assistantResponse.push(finalResult);
209+
210+
setImmediate(() => {
211+
saveConversationWithResult(
212+
request,
213+
context,
214+
parsedResponse,
215+
finalResult,
216+
conversationId
217+
);
218+
});
219+
220+
return assistantResponse;
128221
} catch (error: unknown) {
129222
const errorMessage =
130223
error instanceof Error ? error.message : 'Unknown error';
131224
console.error('💥 [Assistant Processor] Processing error', {
132225
error: errorMessage,
133226
});
134227

135-
yield {
136-
type: 'error',
137-
content: getRandomMessage(unexpectedErrorMessages),
138-
debugInfo:
139-
context.user?.role === 'ADMIN' ? { error: errorMessage } : undefined,
140-
};
228+
return [
229+
{
230+
type: 'error',
231+
content: getRandomMessage(unexpectedErrorMessages),
232+
debugInfo:
233+
context.user.role === 'ADMIN' ? { error: errorMessage } : undefined,
234+
},
235+
];
141236
}
142237
}

0 commit comments

Comments
 (0)