Skip to content

Commit 05ce130

Browse files
committed
cleanup the types
1 parent 90a6034 commit 05ce130

File tree

7 files changed

+362
-409
lines changed

7 files changed

+362
-409
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV2 } from "@internal/clickhouse";
1+
import type { ClickHouse } from "@internal/clickhouse";
22
import { TASK_RUN_COLUMNS, PAYLOAD_COLUMNS } from "@internal/clickhouse";
33
import { type RedisOptions } from "@internal/redis";
44
import {
@@ -915,12 +915,12 @@ export class RunsReplicationService {
915915
run.idempotencyKey ?? "", // idempotency_key
916916
run.ttl ?? "", // expiration_ttl
917917
run.isTest ?? false, // is_test
918+
_version.toString(), // _version
919+
event === "delete" ? 1 : 0, // _is_deleted
918920
run.concurrencyKey ?? "", // concurrency_key
919921
run.bulkActionGroupIds ?? [], // bulk_action_group_ids
920922
run.masterQueue ?? "", // worker_queue
921923
run.maxDurationInSeconds ?? 0, // max_duration_in_seconds
922-
_version.toString(), // _version
923-
event === "delete" ? 1 : 0, // _is_deleted
924924
];
925925
}
926926

internal-packages/clickhouse/src/client/client.ts

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import {
66
createClient,
77
type ResultSet,
88
type Row,
9+
type BaseQueryParams,
10+
type InsertResult,
911
} from "@clickhouse/client";
1012
import { recordSpanError, Span, startSpan, trace, Tracer } from "@internal/tracing";
11-
import { flattenAttributes, tryCatch } from "@trigger.dev/core/v3";
13+
import { flattenAttributes, tryCatch, type Result } from "@trigger.dev/core/v3";
1214
import { z } from "zod";
1315
import { InsertError, QueryError } from "./errors.js";
1416
import type {
@@ -797,6 +799,105 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
797799
});
798800
};
799801
}
802+
803+
public insertCompactRaw(req: {
804+
name: string;
805+
table: string;
806+
columns: readonly string[];
807+
settings?: ClickHouseSettings;
808+
}): (
809+
events: readonly any[][] | any[],
810+
options?: {
811+
attributes?: Record<string, string | number | boolean>;
812+
params?: BaseQueryParams;
813+
}
814+
) => Promise<Result<InsertResult, InsertError>> {
815+
return async (events, options) => {
816+
const queryId = randomUUID();
817+
818+
return await startSpan(this.tracer, "insert", async (span) => {
819+
// Check if events is a single row (array) or multiple rows (array of arrays)
820+
// If first element is not an array, treat as single row
821+
const isSingleRow = events.length > 0 && !Array.isArray(events[0]);
822+
const eventsArray: readonly any[][] = isSingleRow
823+
? [events as any[]]
824+
: (events as readonly any[][]);
825+
826+
this.logger.debug("Inserting into clickhouse (compact raw)", {
827+
clientName: this.name,
828+
name: req.name,
829+
table: req.table,
830+
events: eventsArray.length,
831+
settings: req.settings,
832+
attributes: options?.attributes,
833+
options,
834+
queryId,
835+
});
836+
837+
span.setAttributes({
838+
"clickhouse.clientName": this.name,
839+
"clickhouse.tableName": req.table,
840+
"clickhouse.operationName": req.name,
841+
"clickhouse.queryId": queryId,
842+
"clickhouse.format": "JSONCompactEachRowWithNames",
843+
...flattenAttributes(req.settings, "clickhouse.settings"),
844+
...flattenAttributes(options?.attributes),
845+
});
846+
847+
// Build compact format: [columns, ...rows]
848+
// Data is already in array format, no conversion needed
849+
const compactData: any[] = [Array.from(req.columns), ...eventsArray];
850+
851+
const [clickhouseError, result] = await tryCatch(
852+
this.client.insert({
853+
table: req.table,
854+
format: "JSONCompactEachRowWithNames",
855+
values: compactData,
856+
query_id: queryId,
857+
...options?.params,
858+
clickhouse_settings: {
859+
...req.settings,
860+
...options?.params?.clickhouse_settings,
861+
},
862+
})
863+
);
864+
865+
if (clickhouseError) {
866+
this.logger.error("Error inserting into clickhouse", {
867+
name: req.name,
868+
error: clickhouseError,
869+
table: req.table,
870+
});
871+
872+
recordClickhouseError(span, clickhouseError);
873+
return [new InsertError(clickhouseError.message), null];
874+
}
875+
876+
this.logger.debug("Inserted into clickhouse", {
877+
clientName: this.name,
878+
name: req.name,
879+
table: req.table,
880+
result,
881+
queryId,
882+
});
883+
884+
span.setAttributes({
885+
"clickhouse.query_id": result.query_id,
886+
"clickhouse.executed": result.executed,
887+
"clickhouse.summary.read_rows": result.summary?.read_rows,
888+
"clickhouse.summary.read_bytes": result.summary?.read_bytes,
889+
"clickhouse.summary.written_rows": result.summary?.written_rows,
890+
"clickhouse.summary.written_bytes": result.summary?.written_bytes,
891+
"clickhouse.summary.total_rows_to_read": result.summary?.total_rows_to_read,
892+
"clickhouse.summary.result_rows": result.summary?.result_rows,
893+
"clickhouse.summary.result_bytes": result.summary?.result_bytes,
894+
"clickhouse.summary.elapsed_ns": result.summary?.elapsed_ns,
895+
});
896+
897+
return [null, result];
898+
});
899+
};
900+
}
800901
}
801902

802903
function recordClickhouseError(span: Span, error: Error) {

internal-packages/clickhouse/src/client/noop.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,4 +188,32 @@ export class NoopClient implements ClickhouseReader, ClickhouseWriter {
188188
];
189189
};
190190
}
191+
192+
public insertCompactRaw(req: {
193+
name: string;
194+
table: string;
195+
columns: readonly string[];
196+
settings?: ClickHouseSettings;
197+
}): (events: readonly any[][] | any[]) => Promise<Result<InsertResult, InsertError>> {
198+
return async (events: readonly any[][] | any[]) => {
199+
return [
200+
null,
201+
{
202+
executed: true,
203+
query_id: "noop",
204+
summary: {
205+
read_rows: "0",
206+
read_bytes: "0",
207+
written_rows: "0",
208+
written_bytes: "0",
209+
total_rows_to_read: "0",
210+
result_rows: "0",
211+
result_bytes: "0",
212+
elapsed_ns: "0",
213+
},
214+
response_headers: {},
215+
},
216+
];
217+
};
218+
}
191219
}

internal-packages/clickhouse/src/client/types.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,5 +228,18 @@ export interface ClickhouseWriter {
228228
settings?: ClickHouseSettings;
229229
}): ClickhouseInsertFunction<TRecord>;
230230

231+
insertCompactRaw(req: {
232+
name: string;
233+
table: string;
234+
columns: readonly string[];
235+
settings?: ClickHouseSettings;
236+
}): (
237+
events: readonly any[][] | any[],
238+
options?: {
239+
attributes?: Record<string, string | number | boolean>;
240+
params?: BaseQueryParams;
241+
}
242+
) => Promise<Result<InsertResult, InsertError>>;
243+
231244
close(): Promise<void>;
232245
}

internal-packages/clickhouse/src/index.ts

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,7 @@ import { ClickhouseClient } from "./client/client.js";
33
import { ClickhouseReader, ClickhouseWriter } from "./client/types.js";
44
import { NoopClient } from "./client/noop.js";
55
import {
6-
insertTaskRuns,
7-
insertTaskRunsUnsafe,
8-
insertTaskRunsCompact,
96
insertTaskRunsCompactArrays,
10-
insertRawTaskRunPayloads,
11-
insertRawTaskRunPayloadsUnsafe,
12-
insertRawTaskRunPayloadsCompact,
137
insertRawTaskRunPayloadsCompactArrays,
148
getTaskRunsQueryBuilder,
159
getTaskActivityQueryBuilder,
@@ -174,13 +168,7 @@ export class ClickHouse {
174168

175169
get taskRuns() {
176170
return {
177-
insert: insertTaskRuns(this.writer),
178-
insertUnsafe: insertTaskRunsUnsafe(this.writer),
179-
insertCompact: insertTaskRunsCompact(this.writer),
180171
insertCompactArrays: insertTaskRunsCompactArrays(this.writer),
181-
insertPayloads: insertRawTaskRunPayloads(this.writer),
182-
insertPayloadsUnsafe: insertRawTaskRunPayloadsUnsafe(this.writer),
183-
insertPayloadsCompact: insertRawTaskRunPayloadsCompact(this.writer),
184172
insertPayloadsCompactArrays: insertRawTaskRunPayloadsCompactArrays(this.writer),
185173
queryBuilder: getTaskRunsQueryBuilder(this.reader),
186174
countQueryBuilder: getTaskRunsCountQueryBuilder(this.reader),

0 commit comments

Comments
 (0)