Skip to content

Commit 5cc006c

Browse files
committed
feat: migrate rest of span queries to clickhouse
1 parent 4261d9e commit 5cc006c

File tree

14 files changed

+1101
-24
lines changed

14 files changed

+1101
-24
lines changed

apps/web/src/app/api/projects/[projectId]/commits/[commitUuid]/documents/[documentUuid]/traces/aggregations/route.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ export const GET = errorHandler(
3434
.then((r) => r.unwrap())
3535

3636
const result = await computeDocumentTracesAggregations({
37+
workspaceId: workspace.id,
3738
documentUuid: document.documentUuid,
3839
commitUuid: headCommit?.uuid === commitUuid ? undefined : commitUuid,
3940
}).then((r) => r.unwrap())

apps/web/src/app/api/projects/[projectId]/commits/[commitUuid]/documents/[documentUuid]/traces/daily-count/route.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ export const GET = errorHandler(
3838
.then((r) => r.unwrap())
3939

4040
const result = await computeDocumentTracesDailyCount({
41+
workspaceId: workspace.id,
4142
documentUuid: document.documentUuid,
4243
commitUuid: headCommit?.uuid === commitUuid ? undefined : commitUuid,
4344
days,

packages/core/src/data-access/conversations/fetchConversation.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import { OkType, Result } from '../../lib/Result'
55
import { spans } from '../../schema/models/spans'
66
import { Workspace } from '../../schema/models/types/Workspace'
77
import { conversationAggregateFields } from './shared'
8+
import { isFeatureEnabledByName } from '../../services/workspaceFeatures/isFeatureEnabledByName'
9+
import { fetchConversation as chFetchConversation } from '../../queries/clickhouse/spans/fetchConversation'
10+
11+
const CLICKHOUSE_SPANS_READ_FLAG = 'clickhouse-spans-read'
812

913
export type Conversation = OkType<typeof fetchConversation>
1014

@@ -20,6 +24,28 @@ export async function fetchConversation(
2024
},
2125
db = database,
2226
) {
27+
const clickhouseEnabledResult = await isFeatureEnabledByName(
28+
workspace.id,
29+
CLICKHOUSE_SPANS_READ_FLAG,
30+
db,
31+
)
32+
const shouldUseClickHouse =
33+
clickhouseEnabledResult.ok && clickhouseEnabledResult.value
34+
35+
if (shouldUseClickHouse) {
36+
const result = await chFetchConversation({
37+
workspaceId: workspace.id,
38+
documentLogUuid,
39+
documentUuid,
40+
})
41+
42+
if (!result) {
43+
return Result.error(new NotFoundError('Conversation not found'))
44+
}
45+
46+
return Result.ok(result)
47+
}
48+
2349
const conditions = [
2450
eq(spans.workspaceId, workspace.id),
2551
eq(spans.documentLogUuid, documentLogUuid),

packages/core/src/data-access/conversations/fetchConversations.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ import {
2121
shouldFallbackToAllTime,
2222
} from '../../services/spans/defaultCreatedAtWindow'
2323
import { conversationAggregateFields } from './shared'
24+
import { isFeatureEnabledByName } from '../../services/workspaceFeatures/isFeatureEnabledByName'
25+
import { fetchConversations as chFetchConversations } from '../../queries/clickhouse/spans/fetchConversations'
26+
27+
const CLICKHOUSE_SPANS_READ_FLAG = 'clickhouse-spans-read'
2428

2529
export type ConversationFilters = {
2630
commitUuids: string[]
@@ -186,6 +190,58 @@ export async function fetchConversations(
186190
}: FetchConversationsParams,
187191
db = database,
188192
) {
193+
const clickhouseEnabledResult = await isFeatureEnabledByName(
194+
workspace.id,
195+
CLICKHOUSE_SPANS_READ_FLAG,
196+
db,
197+
)
198+
const shouldUseClickHouse =
199+
clickhouseEnabledResult.ok && clickhouseEnabledResult.value
200+
201+
if (shouldUseClickHouse) {
202+
const normalizedCreatedAt = normalizeCreatedAtRange(filters.createdAt)
203+
const defaultCreatedAt = applyDefaultSpansCreatedAtRange({
204+
createdAt: normalizedCreatedAt,
205+
hasCursor: Boolean(from),
206+
})
207+
208+
const queryParams = {
209+
workspaceId: workspace.id,
210+
documentUuid,
211+
filters,
212+
from,
213+
limit,
214+
}
215+
216+
const firstPage = await chFetchConversations({
217+
...queryParams,
218+
createdAt: defaultCreatedAt,
219+
})
220+
221+
if (
222+
!shouldFallbackToAllTime({
223+
hasCursor: Boolean(from),
224+
normalizedCreatedAt,
225+
itemCount: firstPage.items.length,
226+
})
227+
) {
228+
return Result.ok<FetchConversationsResult>({
229+
...firstPage,
230+
didFallbackToAllTime: undefined,
231+
})
232+
}
233+
234+
const allTime = await chFetchConversations({
235+
...queryParams,
236+
createdAt: undefined,
237+
})
238+
239+
return Result.ok<FetchConversationsResult>({
240+
...allTime,
241+
didFallbackToAllTime: true,
242+
})
243+
}
244+
189245
const normalizedCreatedAt = normalizeCreatedAtRange(filters.createdAt)
190246
const defaultCreatedAt = applyDefaultSpansCreatedAtRange({
191247
createdAt: normalizedCreatedAt,
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import { clickhouseClient } from '../../../client/clickhouse'
2+
import { SPANS_TABLE } from '../../../clickhouse/models/spans'
3+
import { TracesAggregations } from '../../../schema/models/types/Span'
4+
5+
export async function computeDocumentTracesAggregations({
6+
workspaceId,
7+
documentUuid,
8+
commitUuid,
9+
}: {
10+
workspaceId: number
11+
documentUuid: string
12+
commitUuid?: string
13+
}): Promise<TracesAggregations> {
14+
const params: Record<string, unknown> = {
15+
workspaceId,
16+
documentUuid,
17+
}
18+
19+
const countConditions = [
20+
`workspace_id = {workspaceId: UInt64}`,
21+
`document_uuid = {documentUuid: String}`,
22+
]
23+
24+
const aggregationConditions = [
25+
`workspace_id = {workspaceId: UInt64}`,
26+
`document_uuid = {documentUuid: String}`,
27+
`type = 'completion'`,
28+
]
29+
30+
if (commitUuid) {
31+
params.commitUuid = commitUuid
32+
countConditions.push(`commit_uuid = {commitUuid: String}`)
33+
aggregationConditions.push(`commit_uuid = {commitUuid: String}`)
34+
}
35+
36+
// Query for total count of distinct traces
37+
const countResult = await clickhouseClient().query({
38+
query: `
39+
SELECT count(DISTINCT trace_id) AS total_count
40+
FROM ${SPANS_TABLE}
41+
WHERE ${countConditions.join(' AND ')}
42+
`,
43+
format: 'JSONEachRow',
44+
query_params: params,
45+
})
46+
47+
const countRows = await countResult.json<{ total_count: string }>()
48+
const totalCount = Number(countRows[0]?.total_count ?? 0)
49+
50+
// Query for aggregations with median calculations using quantile(0.5)
51+
const aggregationResult = await clickhouseClient().query({
52+
query: `
53+
SELECT
54+
coalesce(
55+
sum(
56+
coalesce(tokens_prompt, 0) +
57+
coalesce(tokens_cached, 0) +
58+
coalesce(tokens_reasoning, 0) +
59+
coalesce(tokens_completion, 0)
60+
),
61+
0
62+
) AS total_tokens,
63+
coalesce(sum(cost), 0) AS total_cost_in_millicents,
64+
coalesce(
65+
avg(
66+
coalesce(tokens_prompt, 0) +
67+
coalesce(tokens_cached, 0) +
68+
coalesce(tokens_reasoning, 0) +
69+
coalesce(tokens_completion, 0)
70+
),
71+
0
72+
) AS average_tokens,
73+
coalesce(avg(cost), 0) AS average_cost_in_millicents,
74+
coalesce(quantile(0.5)(cost), 0) AS median_cost_in_millicents,
75+
coalesce(avg(duration_ms), 0) AS average_duration,
76+
coalesce(quantile(0.5)(duration_ms), 0) AS median_duration
77+
FROM ${SPANS_TABLE}
78+
WHERE ${aggregationConditions.join(' AND ')}
79+
`,
80+
format: 'JSONEachRow',
81+
query_params: params,
82+
})
83+
84+
const aggRows = await aggregationResult.json<{
85+
total_tokens: string
86+
total_cost_in_millicents: string
87+
average_tokens: string
88+
average_cost_in_millicents: string
89+
median_cost_in_millicents: string
90+
average_duration: string
91+
median_duration: string
92+
}>()
93+
94+
const row = aggRows[0] ?? {
95+
total_tokens: '0',
96+
total_cost_in_millicents: '0',
97+
average_tokens: '0',
98+
average_cost_in_millicents: '0',
99+
median_cost_in_millicents: '0',
100+
average_duration: '0',
101+
median_duration: '0',
102+
}
103+
104+
return {
105+
totalCount,
106+
totalTokens: Number(row.total_tokens),
107+
totalCostInMillicents: Number(row.total_cost_in_millicents),
108+
averageTokens: Number(row.average_tokens),
109+
averageCostInMillicents: Number(row.average_cost_in_millicents),
110+
medianCostInMillicents: Number(row.median_cost_in_millicents),
111+
averageDuration: Number(row.average_duration),
112+
medianDuration: Number(row.median_duration),
113+
}
114+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { subDays } from 'date-fns'
2+
import { clickhouseClient } from '../../../client/clickhouse'
3+
import { SPANS_TABLE } from '../../../clickhouse/models/spans'
4+
import { toClickHouseDateTime } from '../../../clickhouse/insert'
5+
6+
export type DailyCount = {
7+
date: string
8+
count: number
9+
}
10+
11+
export async function computeDocumentTracesDailyCount({
12+
workspaceId,
13+
documentUuid,
14+
commitUuid,
15+
days = 30,
16+
}: {
17+
workspaceId: number
18+
documentUuid: string
19+
commitUuid?: string
20+
days?: number
21+
}): Promise<DailyCount[]> {
22+
const now = new Date()
23+
const startDate = subDays(now, days)
24+
25+
const params: Record<string, unknown> = {
26+
workspaceId,
27+
documentUuid,
28+
startDate: toClickHouseDateTime(startDate),
29+
}
30+
31+
const conditions = [
32+
`workspace_id = {workspaceId: UInt64}`,
33+
`document_uuid = {documentUuid: String}`,
34+
`started_at >= {startDate: DateTime64(6, 'UTC')}`,
35+
]
36+
37+
if (commitUuid) {
38+
params.commitUuid = commitUuid
39+
conditions.push(`commit_uuid = {commitUuid: String}`)
40+
}
41+
42+
const result = await clickhouseClient().query({
43+
query: `
44+
SELECT
45+
toDate(started_at) AS date,
46+
count(DISTINCT trace_id) AS count
47+
FROM ${SPANS_TABLE}
48+
WHERE ${conditions.join(' AND ')}
49+
GROUP BY date
50+
ORDER BY date
51+
`,
52+
format: 'JSONEachRow',
53+
query_params: params,
54+
})
55+
56+
const rows = await result.json<{ date: string; count: string }>()
57+
58+
return rows.map((row) => ({
59+
date: row.date,
60+
count: Number(row.count),
61+
}))
62+
}

0 commit comments

Comments
 (0)