Skip to content

Commit 374df93

Browse files
committed
chore: migrate last span queries from postgres to clickhouse (feature flagged)
1 parent 3faa046 commit 374df93

File tree

12 files changed

+543
-29
lines changed

12 files changed

+543
-29
lines changed

packages/core/src/data-access/traces/countByDocument.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import { Workspace } from '../../schema/models/types/Workspace'
55
import { Commit } from '../../schema/models/types/Commit'
66
import { CommitsRepository } from '../../repositories'
77
import { LogSources } from '@latitude-data/constants'
8+
import { isFeatureEnabledByName } from '../../services/workspaceFeatures/isFeatureEnabledByName'
9+
import { countDistinctTracesByDocument } from '../../queries/clickhouse/spans/countByDocument'
10+
11+
const CLICKHOUSE_SPANS_READ_FLAG = 'clickhouse-spans-read'
812

913
export async function countTracesByDocument(
1014
{
@@ -20,8 +24,26 @@ export async function countTracesByDocument(
2024
},
2125
db = database,
2226
) {
27+
const clickhouseEnabledResult = await isFeatureEnabledByName(
28+
workspace.id,
29+
CLICKHOUSE_SPANS_READ_FLAG,
30+
db,
31+
)
32+
const shouldUseClickHouse =
33+
clickhouseEnabledResult.ok && clickhouseEnabledResult.value
34+
2335
const commitsRepo = new CommitsRepository(workspace.id, db)
2436
const commits = await commitsRepo.getCommitsHistory({ commit })
37+
38+
if (shouldUseClickHouse) {
39+
return countDistinctTracesByDocument({
40+
workspaceId: workspace.id,
41+
documentUuid,
42+
commitUuids: commits.map((c) => c.uuid),
43+
logSources,
44+
})
45+
}
46+
2547
let filters = [
2648
eq(spans.workspaceId, workspace.id),
2749
eq(spans.documentUuid, documentUuid),

packages/core/src/data-access/traces/hasProductionTraces.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ import { and, eq, inArray, sql, SQL } from 'drizzle-orm'
22
import { database } from '../../client'
33
import { MAIN_SPAN_TYPES, RUN_SOURCES, RunSourceGroup } from '../../constants'
44
import { spans } from '../../schema/models/spans'
5+
import { isFeatureEnabledByName } from '../../services/workspaceFeatures/isFeatureEnabledByName'
6+
import { hasProductionTraces as chHasProductionTraces } from '../../queries/clickhouse/spans/hasProductionTraces'
7+
8+
const CLICKHOUSE_SPANS_READ_FLAG = 'clickhouse-spans-read'
59

610
export async function hasProductionTraces(
711
{
@@ -13,6 +17,18 @@ export async function hasProductionTraces(
1317
},
1418
db = database,
1519
): Promise<boolean> {
20+
const clickhouseEnabledResult = await isFeatureEnabledByName(
21+
workspaceId,
22+
CLICKHOUSE_SPANS_READ_FLAG,
23+
db,
24+
)
25+
const shouldUseClickHouse =
26+
clickhouseEnabledResult.ok && clickhouseEnabledResult.value
27+
28+
if (shouldUseClickHouse) {
29+
return chHasProductionTraces({ workspaceId, projectId })
30+
}
31+
1632
const conditions: SQL<unknown>[] = [
1733
eq(spans.workspaceId, workspaceId),
1834
inArray(spans.type, Array.from(MAIN_SPAN_TYPES)),

packages/core/src/data-access/weeklyEmail/logs/index.ts

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
countDistinct,
55
desc,
66
eq,
7+
inArray,
78
isNotNull,
89
sql,
910
} from 'drizzle-orm'
@@ -16,6 +17,13 @@ import { Workspace } from '../../../schema/models/types/Workspace'
1617
import { SureDateRange } from '../../../constants'
1718
import { getDateRangeOrLastWeekRange } from '../utils'
1819
import { hasProductionTraces } from '../../traces/hasProductionTraces'
20+
import { isFeatureEnabledByName } from '../../../services/workspaceFeatures/isFeatureEnabledByName'
21+
import {
22+
getGlobalLogsStats as chGetGlobalLogsStats,
23+
getTopProjectsLogsStats as chGetTopProjectsLogsStats,
24+
} from '../../../queries/clickhouse/spans/weeklyEmailLogs'
25+
26+
const CLICKHOUSE_SPANS_READ_FLAG = 'clickhouse-spans-read'
1927

2028
async function getGlobalLogsStats(
2129
{
@@ -26,7 +34,22 @@ async function getGlobalLogsStats(
2634
range: SureDateRange
2735
},
2836
db = database,
37+
useClickHouse = false,
2938
) {
39+
if (useClickHouse) {
40+
const stats = await chGetGlobalLogsStats({
41+
workspaceId: workspace.id,
42+
from: range.from,
43+
to: range.to,
44+
})
45+
46+
return {
47+
logsCount: stats.logsCount,
48+
tokensSpent: stats.tokensSpent,
49+
tokensCost: stats.totalCostInMillicents / 100000,
50+
}
51+
}
52+
3053
const logsCountResult = await db
3154
.select({ count: countDistinct(spans.traceId) })
3255
.from(spans)
@@ -71,7 +94,42 @@ async function getTopProjectsLogsStats(
7194
projectsLimit: number
7295
},
7396
db = database,
97+
useClickHouse = false,
7498
) {
99+
if (useClickHouse) {
100+
const stats = await chGetTopProjectsLogsStats({
101+
workspaceId: workspace.id,
102+
from: range.from,
103+
to: range.to,
104+
projectsLimit,
105+
})
106+
107+
if (stats.length === 0) return []
108+
109+
const namesByProjectId = await db
110+
.select({ id: projects.id, name: projects.name })
111+
.from(projects)
112+
.where(
113+
and(
114+
eq(projects.workspaceId, workspace.id),
115+
inArray(
116+
projects.id,
117+
stats.map((stat) => stat.projectId),
118+
),
119+
),
120+
)
121+
.then((rows) => new Map(rows.map((row) => [row.id, row.name])))
122+
123+
return stats.map((stat) => ({
124+
projectId: stat.projectId,
125+
projectName:
126+
namesByProjectId.get(stat.projectId) ?? `Project ${stat.projectId}`,
127+
logsCount: stat.logsCount,
128+
tokensSpent: stat.tokensSpent,
129+
tokensCost: stat.totalCostInMillicents / 100000,
130+
}))
131+
}
132+
75133
// Pre-filter spans by date range in subqueries for better performance
76134
const spansInRangeSubquery = db
77135
.select({
@@ -133,17 +191,30 @@ export async function getLogsData(
133191
},
134192
db = database,
135193
): Promise<LogStats> {
194+
const clickhouseEnabledResult = await isFeatureEnabledByName(
195+
workspace.id,
196+
CLICKHOUSE_SPANS_READ_FLAG,
197+
db,
198+
)
199+
const shouldUseClickHouse =
200+
clickhouseEnabledResult.ok && clickhouseEnabledResult.value
201+
136202
const usedInProduction = await hasProductionTraces(
137203
{ workspaceId: workspace.id },
138204
db,
139205
)
140206

141207
const range = getDateRangeOrLastWeekRange(dateRange)
142208

143-
const globalStats = await getGlobalLogsStats({ workspace, range }, db)
209+
const globalStats = await getGlobalLogsStats(
210+
{ workspace, range },
211+
db,
212+
shouldUseClickHouse,
213+
)
144214
const topProjects = await getTopProjectsLogsStats(
145215
{ workspace, range, projectsLimit },
146216
db,
217+
shouldUseClickHouse,
147218
)
148219

149220
return {
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import { LogSources } from '@latitude-data/constants'
2+
import { clickhouseClient } from '../../../client/clickhouse'
3+
import { SPANS_TABLE } from '../../../clickhouse/models/spans'
4+
5+
export async function countDistinctTracesByDocument({
6+
workspaceId,
7+
documentUuid,
8+
commitUuids,
9+
logSources,
10+
}: {
11+
workspaceId: number
12+
documentUuid: string
13+
commitUuids: string[]
14+
logSources?: LogSources[]
15+
}) {
16+
if (commitUuids.length === 0) return 0
17+
18+
const queryParams: Record<string, unknown> = {
19+
workspaceId,
20+
documentUuid,
21+
commitUuids,
22+
}
23+
24+
const sourceFilter =
25+
logSources && logSources.length > 0
26+
? 'AND source IN ({logSources: Array(String)})'
27+
: ''
28+
29+
if (logSources && logSources.length > 0) {
30+
queryParams.logSources = logSources
31+
}
32+
33+
const result = await clickhouseClient().query({
34+
query: `
35+
SELECT count(DISTINCT trace_id) AS cnt
36+
FROM ${SPANS_TABLE}
37+
WHERE workspace_id = {workspaceId: UInt64}
38+
AND document_uuid = {documentUuid: String}
39+
AND commit_uuid IN ({commitUuids: Array(String)})
40+
${sourceFilter}
41+
`,
42+
format: 'JSONEachRow',
43+
query_params: queryParams,
44+
})
45+
46+
const rows = await result.json<{ cnt: string }>()
47+
return Number(rows[0]?.cnt ?? 0)
48+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { MAIN_SPAN_TYPES } from '@latitude-data/constants'
2+
import { clickhouseClient } from '../../../client/clickhouse'
3+
import { SPANS_TABLE } from '../../../clickhouse/models/spans'
4+
import { toClickHouseDateTime } from '../../../clickhouse/insert'
5+
6+
export async function countMainTypesSince({
7+
workspaceId,
8+
since,
9+
}: {
10+
workspaceId: number
11+
since: Date
12+
}) {
13+
const result = await clickhouseClient().query({
14+
query: `
15+
SELECT count() AS cnt
16+
FROM ${SPANS_TABLE}
17+
WHERE workspace_id = {workspaceId: UInt64}
18+
AND type IN ({spanTypes: Array(String)})
19+
AND started_at >= {since: DateTime64(6, 'UTC')}
20+
`,
21+
format: 'JSONEachRow',
22+
query_params: {
23+
workspaceId,
24+
spanTypes: Array.from(MAIN_SPAN_TYPES),
25+
since: toClickHouseDateTime(since),
26+
},
27+
})
28+
29+
const rows = await result.json<{ cnt: string }>()
30+
return Number(rows[0]?.cnt ?? 0)
31+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { SpanType } from '@latitude-data/constants'
2+
import { clickhouseClient } from '../../../client/clickhouse'
3+
import { SPANS_TABLE } from '../../../clickhouse/models/spans'
4+
import { toClickHouseDateTime } from '../../../clickhouse/insert'
5+
6+
export async function getExperimentPromptSpansBefore({
7+
workspaceId,
8+
documentUuid,
9+
before,
10+
limit,
11+
}: {
12+
workspaceId: number
13+
documentUuid: string
14+
before: Date
15+
limit: number
16+
}) {
17+
const result = await clickhouseClient().query({
18+
query: `
19+
SELECT span_id, trace_id
20+
FROM ${SPANS_TABLE}
21+
WHERE workspace_id = {workspaceId: UInt64}
22+
AND document_uuid = {documentUuid: String}
23+
AND started_at < {before: DateTime64(6, 'UTC')}
24+
AND experiment_uuid IS NULL
25+
AND type = {promptType: String}
26+
ORDER BY started_at DESC
27+
LIMIT {limit: UInt32}
28+
`,
29+
format: 'JSONEachRow',
30+
query_params: {
31+
workspaceId,
32+
documentUuid,
33+
before: toClickHouseDateTime(before),
34+
promptType: SpanType.Prompt,
35+
limit,
36+
},
37+
})
38+
39+
return result.json<{ span_id: string; trace_id: string }>()
40+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import { SpanType } from '@latitude-data/constants'
2+
import { clickhouseClient } from '../../../client/clickhouse'
3+
import { SPANS_TABLE } from '../../../clickhouse/models/spans'
4+
5+
export type ExperimentRunMetadataResult = {
6+
count: number
7+
totalDuration: number
8+
totalCost: number
9+
totalTokens: number
10+
}
11+
12+
export async function getExperimentRunMetadata({
13+
workspaceId,
14+
experimentUuid,
15+
}: {
16+
workspaceId: number
17+
experimentUuid: string
18+
}): Promise<ExperimentRunMetadataResult> {
19+
const result = await clickhouseClient().query({
20+
query: `
21+
WITH experiment_trace_ids AS (
22+
SELECT DISTINCT trace_id
23+
FROM ${SPANS_TABLE}
24+
WHERE workspace_id = {workspaceId: UInt64}
25+
AND experiment_uuid = {experimentUuid: String}
26+
)
27+
SELECT
28+
countDistinctIf(trace_id, type IN ({runTypes: Array(String)})) AS trace_count,
29+
coalesce(sumIf(duration_ms, type IN ({runTypes: Array(String)})), 0) AS total_duration,
30+
coalesce(sumIf(cost, type = {completionType: String}), 0) AS total_cost,
31+
coalesce(
32+
sumIf(
33+
coalesce(tokens_prompt, 0) +
34+
coalesce(tokens_cached, 0) +
35+
coalesce(tokens_reasoning, 0) +
36+
coalesce(tokens_completion, 0),
37+
type = {completionType: String}
38+
),
39+
0
40+
) AS total_tokens
41+
FROM ${SPANS_TABLE}
42+
WHERE workspace_id = {workspaceId: UInt64}
43+
AND trace_id IN (SELECT trace_id FROM experiment_trace_ids)
44+
`,
45+
format: 'JSONEachRow',
46+
query_params: {
47+
workspaceId,
48+
experimentUuid,
49+
runTypes: [SpanType.Prompt, SpanType.Chat],
50+
completionType: SpanType.Completion,
51+
},
52+
})
53+
54+
const rows = await result.json<{
55+
trace_count: string
56+
total_duration: string
57+
total_cost: string
58+
total_tokens: string
59+
}>()
60+
61+
const row = rows[0]
62+
return {
63+
count: Number(row?.trace_count ?? 0),
64+
totalDuration: Number(row?.total_duration ?? 0),
65+
totalCost: Number(row?.total_cost ?? 0),
66+
totalTokens: Number(row?.total_tokens ?? 0),
67+
}
68+
}

0 commit comments

Comments
 (0)