Skip to content

Commit cb60ba4

Browse files
committed
WIP
1 parent 555c474 commit cb60ba4

File tree

4 files changed

+57
-10
lines changed

4 files changed

+57
-10
lines changed

.husky/pre-commit

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env sh
22
set -e
33

4-
# pnpm format
5-
# pnpm check:fix
6-
# pnpm knip
4+
pnpm format
5+
pnpm check:fix
6+
pnpm knip

packages/domain/datasets/src/ports/dataset-row-repository.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1-
import type { DatasetId, DatasetRowId, OrganizationId, RepositoryError } from "@domain/shared"
1+
import type { DatasetId, DatasetRowId, OrganizationId, RepositoryError, TraceId } from "@domain/shared"
22
import { type Effect, ServiceMap } from "effect"
33
import type { DatasetRow, RowFieldValue, RowNotFoundError } from "../entities/dataset-row.ts"
44

55
export interface DatasetRowRepositoryShape {
6+
findExistingTraceIds(args: {
7+
readonly organizationId: OrganizationId
8+
readonly datasetId: DatasetId
9+
readonly traceIds: readonly TraceId[]
10+
}): Effect.Effect<ReadonlySet<TraceId>, RepositoryError>
611
insertBatch(args: {
712
readonly organizationId: OrganizationId
813
readonly datasetId: DatasetId

packages/domain/datasets/src/use-cases/add-traces-to-dataset.ts

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import type { TraceDetail } from "@domain/spans"
33
import { TraceRepository } from "@domain/spans"
44
import { Effect } from "effect"
55
import { DatasetRepository } from "../ports/dataset-repository.ts"
6+
import { DatasetRowRepository } from "../ports/dataset-row-repository.ts"
67
import { createDataset } from "./create-dataset.ts"
78
import { insertRows } from "./insert-rows.ts"
89

@@ -24,15 +25,14 @@ function mapTraceToRow(t: TraceDetail) {
2425
}
2526
}
2627

27-
function fetchAndMapTraces(args: {
28+
function fetchTraces(args: {
2829
readonly organizationId: OrganizationId
2930
readonly projectId: ProjectId
3031
readonly traceIds: readonly TraceId[]
3132
}) {
3233
return Effect.gen(function* () {
3334
const repo = yield* TraceRepository
34-
const traces = yield* repo.findByTraceIds(args)
35-
return traces.map(mapTraceToRow)
35+
return yield* repo.findByTraceIds(args)
3636
})
3737
}
3838

@@ -43,7 +43,19 @@ export function addTracesToDataset(args: {
4343
readonly traceIds: readonly TraceId[]
4444
}) {
4545
return Effect.gen(function* () {
46-
const rows = yield* fetchAndMapTraces(args)
46+
const rowRepo = yield* DatasetRowRepository
47+
48+
const existingTraceIds = yield* rowRepo.findExistingTraceIds({
49+
organizationId: args.organizationId,
50+
datasetId: args.datasetId,
51+
traceIds: args.traceIds,
52+
})
53+
54+
const newTraceIds = args.traceIds.filter((id) => !existingTraceIds.has(id))
55+
if (newTraceIds.length === 0) return { versionId: "" as const, version: 0, rowIds: [] as string[] }
56+
57+
const traces = yield* fetchTraces({ ...args, traceIds: newTraceIds })
58+
const rows = traces.map(mapTraceToRow)
4759
if (rows.length === 0) return { versionId: "" as const, version: 0, rowIds: [] as string[] }
4860

4961
return yield* insertRows({
@@ -71,7 +83,8 @@ export function createDatasetFromTraces(args: {
7183
})
7284

7385
const populateDataset = Effect.gen(function* () {
74-
const rows = yield* fetchAndMapTraces(args)
86+
const traces = yield* fetchTraces(args)
87+
const rows = traces.map(mapTraceToRow)
7588
if (rows.length === 0) {
7689
return { datasetId: dataset.id, versionId: "" as const, version: 0, rowIds: [] as string[] }
7790
}

packages/platform/db-clickhouse/src/repositories/dataset-row-repository.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { ClickHouseClient } from "@clickhouse/client"
22
import type { DatasetRow } from "@domain/datasets"
33
import { DatasetRowRepository, RowNotFoundError } from "@domain/datasets"
4-
import { ChSqlClient, type ChSqlClientShape, DatasetId, DatasetRowId } from "@domain/shared"
4+
import { ChSqlClient, type ChSqlClientShape, DatasetId, DatasetRowId, TraceId } from "@domain/shared"
55
import { safeParseJson, safeStringifyJson } from "@repo/utils"
66
import { Effect, Layer } from "effect"
77

@@ -47,6 +47,35 @@ export const DatasetRowRepositoryLive = Layer.effect(
4747
const chSqlClient = (yield* ChSqlClient) as ChSqlClientShape<ClickHouseClient>
4848

4949
return {
50+
findExistingTraceIds: (args) =>
51+
chSqlClient.query(async (client) => {
52+
if (args.traceIds.length === 0) return new Set<TraceId>()
53+
54+
const result = await client
55+
.query({
56+
query: `
57+
SELECT DISTINCT JSONExtractString(
58+
argMax(metadata, xact_id), 'traceId'
59+
) AS trace_id
60+
FROM dataset_rows
61+
WHERE organization_id = {organizationId:String}
62+
AND dataset_id = {datasetId:String}
63+
GROUP BY row_id
64+
HAVING argMax(_object_delete, xact_id) = false
65+
AND trace_id IN ({traceIds:Array(String)})
66+
`,
67+
query_params: {
68+
organizationId: args.organizationId as string,
69+
datasetId: args.datasetId as string,
70+
traceIds: Array.from(args.traceIds) as string[],
71+
},
72+
format: "JSONEachRow",
73+
})
74+
.then((r) => r.json<{ trace_id: string }>())
75+
76+
return new Set(result.map((r) => TraceId(r.trace_id)))
77+
}),
78+
5079
insertBatch: (args) =>
5180
chSqlClient.query(async (client) => {
5281
const values = args.rows.map((row) => ({

0 commit comments

Comments
 (0)