Skip to content

Commit 67e53d2

Browse files
committed
feat: migrate ch remaining repos to new pattern
1 parent f2d86f4 commit 67e53d2

File tree

9 files changed

+226
-323
lines changed

9 files changed

+226
-323
lines changed

apps/ingest/src/routes/traces.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
import { createSpanClickhouseRepository } from "@platform/db-clickhouse"
1+
import { OrganizationId } from "@domain/shared"
2+
import { SpanRepository } from "@domain/spans"
3+
import { ChSqlClientLive, SpanRepositoryLive } from "@platform/db-clickhouse"
24
import { Effect } from "effect"
35
import type { Hono } from "hono"
46
import { getClickhouseClient } from "../clients.ts"
@@ -36,8 +38,16 @@ export const registerTracesRoute = ({ app }: TracesRouteContext) => {
3638
})
3739

3840
if (spans.length > 0) {
39-
const spanRepository = createSpanClickhouseRepository(getClickhouseClient())
40-
await Effect.runPromise(spanRepository.insert(spans))
41+
const orgId = OrganizationId(c.get("organizationId"))
42+
await Effect.runPromise(
43+
Effect.gen(function* () {
44+
const repo = yield* SpanRepository
45+
yield* repo.insert(spans)
46+
}).pipe(
47+
Effect.provide(SpanRepositoryLive),
48+
Effect.provide(ChSqlClientLive(getClickhouseClient(), orgId)),
49+
),
50+
)
4151
}
4252

4353
return c.json({})

apps/web/src/domains/spans/spans.functions.ts

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { NotFoundError, OrganizationId, ProjectId, SpanId, TraceId } from "@domain/shared"
2-
import type { Span, SpanDetail, SpanRepository, Trace, TraceRepository } from "@domain/spans"
3-
import { createSpanClickhouseRepository, createTraceClickhouseRepository } from "@platform/db-clickhouse"
2+
import type { Span, SpanDetail, Trace } from "@domain/spans"
3+
import { SpanRepository, TraceRepository } from "@domain/spans"
4+
import { SpanRepositoryLive, TraceRepositoryLive, withClickHouse } from "@platform/db-clickhouse"
45
import { createServerFn } from "@tanstack/react-start"
56
import { Effect } from "effect"
67
import { z } from "zod"
@@ -62,22 +63,6 @@ export interface SpanDetailRecord extends SpanRecord {
6263
readonly toolDefinitions: string
6364
}
6465

65-
let spanRepoInstance: SpanRepository | undefined
66-
const getSpanRepository = () => {
67-
if (!spanRepoInstance) {
68-
spanRepoInstance = createSpanClickhouseRepository(getClickhouseClient())
69-
}
70-
return spanRepoInstance
71-
}
72-
73-
let traceRepoInstance: TraceRepository | undefined
74-
const getTraceRepository = () => {
75-
if (!traceRepoInstance) {
76-
traceRepoInstance = createTraceClickhouseRepository(getClickhouseClient())
77-
}
78-
return traceRepoInstance
79-
}
80-
8166
const serializeSpan = (span: Span): SpanRecord => ({
8267
organizationId: span.organizationId,
8368
projectId: span.projectId,
@@ -138,9 +123,12 @@ export const listSpansByTrace = createServerFn({ method: "GET" })
138123
.inputValidator(z.object({ traceId: z.string() }))
139124
.handler(async ({ data }): Promise<SpanRecord[]> => {
140125
const { organizationId } = await requireSession()
141-
const repo = getSpanRepository()
126+
const orgId = OrganizationId(organizationId)
142127
const spans = await Effect.runPromise(
143-
repo.findByTraceId({ organizationId: OrganizationId(organizationId), traceId: TraceId(data.traceId) }),
128+
Effect.gen(function* () {
129+
const repo = yield* SpanRepository
130+
return yield* repo.findByTraceId({ organizationId: orgId, traceId: TraceId(data.traceId) })
131+
}).pipe(withClickHouse(SpanRepositoryLive, getClickhouseClient(), orgId)),
144132
)
145133
return spans.map(serializeSpan)
146134
})
@@ -150,13 +138,16 @@ export const getSpanDetail = createServerFn({ method: "GET" })
150138
.inputValidator(z.object({ traceId: z.string(), spanId: z.string() }))
151139
.handler(async ({ data }): Promise<SpanDetailRecord> => {
152140
const { organizationId } = await requireSession()
153-
const repo = getSpanRepository()
141+
const orgId = OrganizationId(organizationId)
154142
const span = await Effect.runPromise(
155-
repo.findBySpanId({
156-
organizationId: OrganizationId(organizationId),
157-
traceId: TraceId(data.traceId),
158-
spanId: SpanId(data.spanId),
159-
}),
143+
Effect.gen(function* () {
144+
const repo = yield* SpanRepository
145+
return yield* repo.findBySpanId({
146+
organizationId: orgId,
147+
traceId: TraceId(data.traceId),
148+
spanId: SpanId(data.spanId),
149+
})
150+
}).pipe(withClickHouse(SpanRepositoryLive, getClickhouseClient(), orgId)),
160151
)
161152
if (!span) {
162153
throw new NotFoundError({ entity: "Span", id: data.spanId })
@@ -223,13 +214,17 @@ export const listTracesByProject = createServerFn({ method: "GET" })
223214
.inputValidator(z.object({ projectId: z.string() }))
224215
.handler(async ({ data }): Promise<TraceRecord[]> => {
225216
const { organizationId } = await requireSession()
226-
const repo = getTraceRepository()
217+
const orgId = OrganizationId(organizationId)
227218
const traces = await Effect.runPromise(
228-
repo.findByProjectId({
229-
organizationId: OrganizationId(organizationId),
230-
projectId: ProjectId(data.projectId),
231-
options: { limit: 200 },
232-
}),
219+
Effect.gen(function* () {
220+
const repo = yield* TraceRepository
221+
return yield* repo.findByProjectId({
222+
organizationId: orgId,
223+
projectId: ProjectId(data.projectId),
224+
options: { limit: 200 },
225+
})
226+
}).pipe(withClickHouse(TraceRepositoryLive, getClickhouseClient(), orgId)),
233227
)
228+
234229
return traces.map(serializeTrace)
235230
})

packages/domain/spans/src/index.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
export type { Span, SpanDetail, SpanKind, SpanStatusCode } from "./entities/span.ts"
22
export type { Trace, TraceDetail, TraceStatus } from "./entities/trace.ts"
3-
export type { SpanListOptions, SpanRepository } from "./ports/span-repository.ts"
4-
export type { TraceListOptions, TraceRepository } from "./ports/trace-repository.ts"
3+
export type { SpanListOptions, SpanRepositoryShape } from "./ports/span-repository.ts"
4+
export { SpanRepository } from "./ports/span-repository.ts"
5+
export type { TraceListOptions, TraceRepositoryShape } from "./ports/trace-repository.ts"
6+
export { TraceRepository } from "./ports/trace-repository.ts"

packages/domain/spans/src/ports/span-repository.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import type { OrganizationId, ProjectId, RepositoryError, SpanId, TraceId } from "@domain/shared"
2-
import type { Effect } from "effect"
2+
import { type Effect, ServiceMap } from "effect"
33
import type { Span, SpanDetail } from "../entities/span.ts"
44

55
/**
66
* Repository port for spans (ClickHouse).
77
*/
8-
export interface SpanRepository {
8+
export interface SpanRepositoryShape {
99
insert(spans: readonly SpanDetail[]): Effect.Effect<void, RepositoryError>
1010

1111
findByTraceId(input: {
@@ -32,3 +32,7 @@ export interface SpanListOptions {
3232
readonly limit?: number
3333
readonly offset?: number
3434
}
35+
36+
export class SpanRepository extends ServiceMap.Service<SpanRepository, SpanRepositoryShape>()(
37+
"@domain/spans/SpanRepository",
38+
) {}

packages/domain/spans/src/ports/trace-repository.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { OrganizationId, ProjectId, RepositoryError, TraceId } from "@domain/shared"
2-
import type { Effect } from "effect"
2+
import { type Effect, ServiceMap } from "effect"
33
import type { Trace, TraceDetail } from "../entities/trace.ts"
44

55
/**
@@ -8,7 +8,7 @@ import type { Trace, TraceDetail } from "../entities/trace.ts"
88
* No insert method — the traces table is populated automatically
99
* by a materialized view on each insert into spans.
1010
*/
11-
export interface TraceRepository {
11+
export interface TraceRepositoryShape {
1212
findByProjectId(input: {
1313
readonly organizationId: OrganizationId
1414
readonly projectId: ProjectId
@@ -28,3 +28,7 @@ export interface TraceListOptions {
2828
readonly limit?: number
2929
readonly offset?: number
3030
}
31+
32+
export class TraceRepository extends ServiceMap.Service<TraceRepository, TraceRepositoryShape>()(
33+
"@domain/spans/TraceRepository",
34+
) {}

packages/platform/db-clickhouse/src/index.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,6 @@ export { healthcheckClickhouse } from "./health.ts"
1010
export { commandClickhouse, insertJsonEachRow, queryClickhouse } from "./sql.ts"
1111
export { ChSqlClientLive } from "./ch-sql-client.ts"
1212
export { withClickHouse } from "./with-clickhouse.ts"
13-
export {
14-
createDatasetRowClickHouseRepository,
15-
DatasetRowRepositoryLive,
16-
} from "./repositories/dataset-row-repository.ts"
17-
export { createSpanClickhouseRepository } from "./repositories/span-repository.ts"
18-
export { createTraceClickhouseRepository } from "./repositories/trace-repository.ts"
13+
export { DatasetRowRepositoryLive } from "./repositories/dataset-row-repository.ts"
14+
export { SpanRepositoryLive } from "./repositories/span-repository.ts"
15+
export { TraceRepositoryLive } from "./repositories/trace-repository.ts"

packages/platform/db-clickhouse/src/repositories/dataset-row-repository.ts

Lines changed: 1 addition & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import type { ClickHouseClient } from "@clickhouse/client"
22
import type { DatasetRow } from "@domain/datasets"
33
import { DatasetRowRepository, RowNotFoundError } from "@domain/datasets"
4-
import { ChSqlClient, type ChSqlClientShape, DatasetId, DatasetRowId, toRepositoryError } from "@domain/shared"
4+
import { ChSqlClient, type ChSqlClientShape, DatasetId, DatasetRowId } from "@domain/shared"
55
import { safeParseJson, safeStringifyJson } from "@repo/utils"
66
import { Effect, Layer } from "effect"
7-
import { insertJsonEachRow, queryClickhouse } from "../sql.ts"
87

98
type DatasetRowCH = {
109
row_id: string
@@ -35,160 +34,6 @@ const buildVersionClause = (version: number | undefined) =>
3534

3635
const INSERT_BATCH_SIZE = 500
3736

38-
export const createDatasetRowClickHouseRepository = (client: ClickHouseClient) => ({
39-
insertBatch: (args: {
40-
organizationId: string
41-
datasetId: string
42-
version: number
43-
rows: readonly {
44-
readonly id: DatasetRowId
45-
readonly input: Record<string, unknown>
46-
readonly output?: Record<string, unknown>
47-
readonly metadata?: Record<string, unknown>
48-
}[]
49-
}) =>
50-
Effect.gen(function* () {
51-
const values = args.rows.map((row) => ({
52-
organization_id: args.organizationId,
53-
dataset_id: args.datasetId,
54-
row_id: row.id,
55-
xact_id: args.version,
56-
input: safeStringifyJson(row.input),
57-
output: safeStringifyJson(row.output ?? {}),
58-
metadata: safeStringifyJson(row.metadata ?? {}),
59-
}))
60-
61-
for (let i = 0; i < values.length; i += INSERT_BATCH_SIZE) {
62-
const batch = values.slice(i, i + INSERT_BATCH_SIZE)
63-
yield* insertJsonEachRow(client, "dataset_rows", batch).pipe(
64-
Effect.mapError((e) => toRepositoryError(e, "insertBatch")),
65-
)
66-
}
67-
68-
return args.rows.map((r) => r.id)
69-
}),
70-
71-
list: (args: {
72-
organizationId: string
73-
datasetId: string
74-
version?: number
75-
search?: string
76-
limit?: number
77-
offset?: number
78-
}) =>
79-
Effect.gen(function* () {
80-
const limit = args.limit ?? 50
81-
const offset = args.offset ?? 0
82-
const params: Record<string, unknown> = {
83-
organizationId: args.organizationId,
84-
datasetId: args.datasetId,
85-
limit,
86-
offset,
87-
}
88-
89-
if (args.version !== undefined) params.version = args.version
90-
if (args.search) params.search = args.search
91-
92-
const versionClause = buildVersionClause(args.version)
93-
const searchClause = buildSearchClause(args.search)
94-
95-
const dataQuery = `
96-
SELECT
97-
row_id,
98-
argMax(input, xact_id) AS input,
99-
argMax(output, xact_id) AS output,
100-
argMax(metadata, xact_id) AS metadata,
101-
min(created_at) AS created_at,
102-
max(xact_id) AS latest_xact_id
103-
FROM dataset_rows
104-
WHERE organization_id = {organizationId:String}
105-
AND dataset_id = {datasetId:String}
106-
${versionClause}
107-
GROUP BY row_id
108-
HAVING argMax(_object_delete, xact_id) = false
109-
${searchClause}
110-
ORDER BY created_at DESC
111-
LIMIT {limit:UInt32} OFFSET {offset:UInt32}
112-
`
113-
114-
const countQuery = `
115-
SELECT count() AS total FROM (
116-
SELECT
117-
row_id,
118-
argMax(input, xact_id) AS input,
119-
argMax(output, xact_id) AS output
120-
FROM dataset_rows
121-
WHERE organization_id = {organizationId:String}
122-
AND dataset_id = {datasetId:String}
123-
${versionClause}
124-
GROUP BY row_id
125-
HAVING argMax(_object_delete, xact_id) = false
126-
${searchClause}
127-
)
128-
`
129-
130-
const [rows, countResult] = yield* Effect.all([
131-
queryClickhouse<DatasetRowCH>(client, dataQuery, params).pipe(
132-
Effect.mapError((e) => toRepositoryError(e, "list")),
133-
),
134-
queryClickhouse<{ total: string }>(client, countQuery, params).pipe(
135-
Effect.mapError((e) => toRepositoryError(e, "list:count")),
136-
),
137-
])
138-
139-
return {
140-
rows: rows.map((row) => toDomainRow(row, args.datasetId)),
141-
total: Number(countResult[0]?.total ?? 0),
142-
} as const
143-
}),
144-
145-
findById: (args: {
146-
organizationId: string
147-
datasetId: string
148-
rowId: string
149-
version?: number
150-
}) =>
151-
Effect.gen(function* () {
152-
const params: Record<string, unknown> = {
153-
organizationId: args.organizationId,
154-
datasetId: args.datasetId,
155-
rowId: args.rowId,
156-
}
157-
158-
if (args.version !== undefined) params.version = args.version
159-
160-
const versionClause = buildVersionClause(args.version)
161-
162-
const query = `
163-
SELECT
164-
row_id,
165-
argMax(input, xact_id) AS input,
166-
argMax(output, xact_id) AS output,
167-
argMax(metadata, xact_id) AS metadata,
168-
min(created_at) AS created_at,
169-
max(xact_id) AS latest_xact_id
170-
FROM dataset_rows
171-
WHERE organization_id = {organizationId:String}
172-
AND dataset_id = {datasetId:String}
173-
AND row_id = {rowId:String}
174-
${versionClause}
175-
GROUP BY row_id
176-
HAVING argMax(_object_delete, xact_id) = false
177-
LIMIT 1
178-
`
179-
180-
const rows = yield* queryClickhouse<DatasetRowCH>(client, query, params).pipe(
181-
Effect.mapError((e) => toRepositoryError(e, "findById")),
182-
)
183-
184-
if (rows.length === 0) {
185-
return yield* new RowNotFoundError({ rowId: args.rowId })
186-
}
187-
188-
return toDomainRow(rows[0] ?? [], args.datasetId)
189-
}),
190-
})
191-
19237
export const DatasetRowRepositoryLive = Layer.effect(
19338
DatasetRowRepository,
19439
Effect.gen(function* () {

0 commit comments

Comments
 (0)