Skip to content

Commit f36fb08

Browse files
committed
massive cleanup on the query API
1 parent 7112c64 commit f36fb08

File tree

13 files changed

+2200
-1573
lines changed

13 files changed

+2200
-1573
lines changed

apps/api/src/agent/core/assistant-orchestrator.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { User } from "@databuddy/auth";
2+
import { logger } from "@databuddy/shared/logger";
23
import type { StreamingUpdate } from "@databuddy/shared/types/assistant";
34
import type { Website } from "@databuddy/shared/types/website";
45
import { createId } from "@databuddy/shared/utils/ids";
@@ -112,9 +113,9 @@ export class AssistantOrchestrator {
112113
finalResult,
113114
metrics
114115
);
115-
console.log("✅ Conversation saved successfully");
116+
logger.debug({ messageId }, "Conversation saved successfully");
116117
} catch (error) {
117-
console.error("❌ Failed to save conversation:", error);
118+
logger.error({ error, messageId }, "Failed to save conversation");
118119
}
119120
}
120121

@@ -145,9 +146,9 @@ export class AssistantOrchestrator {
145146
errorResponse,
146147
metrics
147148
);
148-
console.log("✅ Error conversation saved successfully");
149+
logger.debug({ messageId }, "Error conversation saved successfully");
149150
} catch (error) {
150-
console.error("❌ Failed to save error conversation:", error);
151+
logger.error({ error }, "Failed to save error conversation");
151152
}
152153
}
153154
}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
import { chQuery } from "@databuddy/db";
2+
import { QueryBuilders } from "./builders";
3+
import { SimpleQueryBuilder } from "./simple-builder";
4+
import type { QueryRequest, SimpleQueryConfig } from "./types";
5+
import { applyPlugins } from "./utils";
6+
7+
type BatchRequest = QueryRequest & { type: string };
8+
type BatchResult = { type: string; data: Record<string, unknown>[]; error?: string };
9+
type BatchOptions = { websiteDomain?: string | null; timezone?: string };
10+
11+
function getSchemaSignature(config: SimpleQueryConfig): string | null {
12+
const fields = config.meta?.output_fields;
13+
return fields?.length ? fields.map((f) => `${f.name}:${f.type}`).join(",") : null;
14+
}
15+
16+
async function runSingle(req: BatchRequest, opts?: BatchOptions): Promise<BatchResult> {
17+
const config = QueryBuilders[req.type];
18+
if (!config) {
19+
return { type: req.type, data: [], error: `Unknown query type: ${req.type}` };
20+
}
21+
22+
try {
23+
const builder = new SimpleQueryBuilder(
24+
config,
25+
{ ...req, timezone: opts?.timezone ?? req.timezone },
26+
opts?.websiteDomain
27+
);
28+
return { type: req.type, data: await builder.execute() };
29+
} catch (e) {
30+
return { type: req.type, data: [], error: e instanceof Error ? e.message : "Query failed" };
31+
}
32+
}
33+
34+
function groupBySchema(requests: BatchRequest[]): Map<string, BatchRequest[]> {
35+
const groups = new Map<string, BatchRequest[]>();
36+
37+
for (const req of requests) {
38+
const config = QueryBuilders[req.type];
39+
if (!config) {
40+
continue;
41+
}
42+
43+
const sig = getSchemaSignature(config) || `__solo_${req.type}`;
44+
const list = groups.get(sig) || [];
45+
list.push(req);
46+
groups.set(sig, list);
47+
}
48+
49+
return groups;
50+
}
51+
52+
function buildUnionQuery(requests: BatchRequest[], opts?: BatchOptions) {
53+
const queries: string[] = [];
54+
const params: Record<string, unknown> = {};
55+
const types: string[] = [];
56+
57+
for (let i = 0; i < requests.length; i++) {
58+
const req = requests[i];
59+
if (!req) {
60+
continue;
61+
}
62+
63+
const config = QueryBuilders[req.type];
64+
if (!config) {
65+
continue;
66+
}
67+
68+
const builder = new SimpleQueryBuilder(
69+
config,
70+
{ ...req, timezone: opts?.timezone ?? req.timezone },
71+
opts?.websiteDomain
72+
);
73+
74+
let { sql, params: queryParams } = builder.compile();
75+
76+
for (const [key, value] of Object.entries(queryParams)) {
77+
const prefixedKey = `q${i}_${key}`;
78+
params[prefixedKey] = value;
79+
sql = sql.replaceAll(`{${key}:`, `{${prefixedKey}:`);
80+
}
81+
82+
types.push(req.type);
83+
queries.push(`SELECT '${req.type}' as __query_type, * FROM (${sql})`);
84+
}
85+
86+
return { sql: queries.join("\nUNION ALL\n"), params, types };
87+
}
88+
89+
function splitResults(
90+
rows: Array<Record<string, unknown> & { __query_type: string }>,
91+
types: string[]
92+
): Map<string, Record<string, unknown>[]> {
93+
const byType = new Map<string, Record<string, unknown>[]>(types.map((t) => [t, []]));
94+
95+
for (const { __query_type, ...rest } of rows) {
96+
byType.get(__query_type)?.push(rest);
97+
}
98+
99+
return byType;
100+
}
101+
102+
export async function executeBatch(requests: BatchRequest[], opts?: BatchOptions): Promise<BatchResult[]> {
103+
if (requests.length === 0) {
104+
return [];
105+
}
106+
if (requests.length === 1 && requests[0]) {
107+
return [await runSingle(requests[0], opts)];
108+
}
109+
110+
const groups = groupBySchema(requests);
111+
const results: BatchResult[] = [];
112+
113+
for (const groupReqs of groups.values()) {
114+
if (groupReqs.length === 0) {
115+
continue;
116+
}
117+
118+
if (groupReqs.length === 1 && groupReqs[0]) {
119+
results.push(await runSingle(groupReqs[0], opts));
120+
continue;
121+
}
122+
123+
try {
124+
const { sql, params, types } = buildUnionQuery(groupReqs, opts);
125+
const rawRows = await chQuery(sql, params);
126+
const split = splitResults(rawRows as Array<Record<string, unknown> & { __query_type: string }>, types);
127+
128+
for (const type of types) {
129+
const config = QueryBuilders[type];
130+
const raw = split.get(type) || [];
131+
results.push({
132+
type,
133+
data: config ? applyPlugins(raw, config, opts?.websiteDomain) : raw,
134+
});
135+
}
136+
} catch {
137+
for (const req of groupReqs) {
138+
results.push(await runSingle(req, opts));
139+
}
140+
}
141+
}
142+
143+
const resultMap = new Map(results.map((r) => [r.type, r]));
144+
return requests.map((req) => resultMap.get(req.type) || { type: req.type, data: [] });
145+
}
146+
147+
export function areQueriesCompatible(type1: string, type2: string): boolean {
148+
const [c1, c2] = [QueryBuilders[type1], QueryBuilders[type2]];
149+
if (!(c1 && c2)) {
150+
return false;
151+
}
152+
const [s1, s2] = [getSchemaSignature(c1), getSchemaSignature(c2)];
153+
return Boolean(s1 && s2 && s1 === s2);
154+
}
155+
156+
export function getCompatibleQueries(type: string): string[] {
157+
const config = QueryBuilders[type];
158+
const sig = config ? getSchemaSignature(config) : null;
159+
if (!sig) {
160+
return [];
161+
}
162+
163+
return Object.entries(QueryBuilders)
164+
.filter(([t, c]) => t !== type && getSchemaSignature(c) === sig)
165+
.map(([t]) => t);
166+
}
167+
168+
export function getSchemaGroups(): Map<string, string[]> {
169+
const groups = new Map<string, string[]>();
170+
171+
for (const [type, config] of Object.entries(QueryBuilders)) {
172+
const sig = getSchemaSignature(config);
173+
if (!sig) {
174+
continue;
175+
}
176+
const list = groups.get(sig) || [];
177+
list.push(type);
178+
groups.set(sig, list);
179+
}
180+
181+
return groups;
182+
}

0 commit comments

Comments
 (0)