Skip to content

Commit d2082f6

Browse files
committed
much better type safety
1 parent ff1aa00 commit d2082f6

File tree

4 files changed

+175
-30
lines changed

4 files changed

+175
-30
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import type { ClickHouse } from "@internal/clickhouse";
1+
import type { ClickHouse, TaskRunInsertArray, PayloadInsertArray } from "@internal/clickhouse";
2+
import { TASK_RUN_INDEX, PAYLOAD_INDEX } from "@internal/clickhouse";
23
import { type RedisOptions } from "@internal/redis";
34
import {
45
LogicalReplicationClient,
@@ -80,7 +81,9 @@ type TaskRunInsert = {
8081

8182
export type RunsReplicationServiceEvents = {
8283
message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }];
83-
batchFlushed: [{ flushId: string; taskRunInserts: any[][]; payloadInserts: any[][] }];
84+
batchFlushed: [
85+
{ flushId: string; taskRunInserts: TaskRunInsertArray[]; payloadInserts: PayloadInsertArray[] },
86+
];
8487
};
8588

8689
export class RunsReplicationService {
@@ -576,35 +579,29 @@ export class RunsReplicationService {
576579
.filter(Boolean)
577580
// batch inserts in clickhouse are more performant if the items
578581
// are pre-sorted by the primary key
579-
// Array indices: [0]=environment_id, [1]=organization_id, [2]=project_id, [3]=run_id, [5]=created_at
580582
.sort((a, b) => {
581-
if (a[1] !== b[1]) {
582-
// organization_id
583-
return a[1] < b[1] ? -1 : 1;
583+
if (a[TASK_RUN_INDEX.organization_id] !== b[TASK_RUN_INDEX.organization_id]) {
584+
return a[TASK_RUN_INDEX.organization_id] < b[TASK_RUN_INDEX.organization_id] ? -1 : 1;
584585
}
585-
if (a[2] !== b[2]) {
586-
// project_id
587-
return a[2] < b[2] ? -1 : 1;
586+
if (a[TASK_RUN_INDEX.project_id] !== b[TASK_RUN_INDEX.project_id]) {
587+
return a[TASK_RUN_INDEX.project_id] < b[TASK_RUN_INDEX.project_id] ? -1 : 1;
588588
}
589-
if (a[0] !== b[0]) {
590-
// environment_id
591-
return a[0] < b[0] ? -1 : 1;
589+
if (a[TASK_RUN_INDEX.environment_id] !== b[TASK_RUN_INDEX.environment_id]) {
590+
return a[TASK_RUN_INDEX.environment_id] < b[TASK_RUN_INDEX.environment_id] ? -1 : 1;
592591
}
593-
if (a[5] !== b[5]) {
594-
// created_at
595-
return a[5] - b[5];
592+
if (a[TASK_RUN_INDEX.created_at] !== b[TASK_RUN_INDEX.created_at]) {
593+
return a[TASK_RUN_INDEX.created_at] - b[TASK_RUN_INDEX.created_at];
596594
}
597-
return a[3] < b[3] ? -1 : 1; // run_id
595+
return a[TASK_RUN_INDEX.run_id] < b[TASK_RUN_INDEX.run_id] ? -1 : 1;
598596
});
599597

600598
const payloadInserts = preparedInserts
601599
.map(({ payloadInsert }) => payloadInsert)
602600
.filter(Boolean)
603601
// batch inserts in clickhouse are more performant if the items
604602
// are pre-sorted by the primary key
605-
// Array indices: [0]=run_id
606603
.sort((a, b) => {
607-
return a[0] < b[0] ? -1 : 1; // run_id
604+
return a[PAYLOAD_INDEX.run_id] < b[PAYLOAD_INDEX.run_id] ? -1 : 1;
608605
});
609606

610607
span.setAttribute("task_run_inserts", taskRunInserts.length);
@@ -769,7 +766,7 @@ export class RunsReplicationService {
769766
};
770767
}
771768

772-
async #insertTaskRunInserts(taskRunInserts: any[][], attempt: number) {
769+
async #insertTaskRunInserts(taskRunInserts: TaskRunInsertArray[], attempt: number) {
773770
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
774771
const [insertError, insertResult] =
775772
await this.options.clickhouse.taskRuns.insertCompactArrays(taskRunInserts, {
@@ -792,7 +789,7 @@ export class RunsReplicationService {
792789
});
793790
}
794791

795-
async #insertPayloadInserts(payloadInserts: any[][], attempt: number) {
792+
async #insertPayloadInserts(payloadInserts: PayloadInsertArray[], attempt: number) {
796793
return await startSpan(this._tracer, "insertPayloadInserts", async (span) => {
797794
const [insertError, insertResult] =
798795
await this.options.clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, {
@@ -817,7 +814,7 @@ export class RunsReplicationService {
817814

818815
async #prepareRunInserts(
819816
batchedRun: TaskRunInsert
820-
): Promise<{ taskRunInsert?: any[]; payloadInsert?: any[] }> {
817+
): Promise<{ taskRunInsert?: TaskRunInsertArray; payloadInsert?: PayloadInsertArray }> {
821818
this.logger.debug("Preparing run", {
822819
batchedRun,
823820
});
@@ -854,7 +851,7 @@ export class RunsReplicationService {
854851
environmentType: string,
855852
event: "insert" | "update" | "delete",
856853
_version: bigint
857-
): Promise<any[]> {
854+
): Promise<TaskRunInsertArray> {
858855
const output = await this.#prepareJson(run.output, run.outputType);
859856

860857
// Return array matching TASK_RUN_COLUMNS order
@@ -907,7 +904,7 @@ export class RunsReplicationService {
907904
];
908905
}
909906

910-
async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<any[]> {
907+
async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<PayloadInsertArray> {
911908
const payload = await this.#prepareJson(run.payload, run.payloadType);
912909

913910
// Return array matching PAYLOAD_COLUMNS order

apps/webapp/test/runsReplicationService.part2.test.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ClickHouse } from "@internal/clickhouse";
1+
import { ClickHouse, TASK_RUN_INDEX } from "@internal/clickhouse";
22
import { containerTest } from "@internal/testcontainers";
33
import { Logger } from "@trigger.dev/core/logger";
44
import { readFile } from "node:fs/promises";
@@ -889,12 +889,15 @@ describe("RunsReplicationService (part 2/2)", () => {
889889
await setTimeout(1000);
890890

891891
expect(batchFlushedEvents?.[0].taskRunInserts).toHaveLength(2);
892-
// taskRunInserts are now arrays, not objects
893-
// Index 3 is run_id, Index 6 is status
894-
expect(batchFlushedEvents?.[0].taskRunInserts[0][3]).toEqual(run.id); // run_id
895-
expect(batchFlushedEvents?.[0].taskRunInserts[0][6]).toEqual("PENDING_VERSION"); // status
896-
expect(batchFlushedEvents?.[0].taskRunInserts[1][3]).toEqual(run.id); // run_id
897-
expect(batchFlushedEvents?.[0].taskRunInserts[1][6]).toEqual("COMPLETED_SUCCESSFULLY"); // status
892+
// Use TASK_RUN_INDEX for type-safe array access
893+
expect(batchFlushedEvents?.[0].taskRunInserts[0][TASK_RUN_INDEX.run_id]).toEqual(run.id);
894+
expect(batchFlushedEvents?.[0].taskRunInserts[0][TASK_RUN_INDEX.status]).toEqual(
895+
"PENDING_VERSION"
896+
);
897+
expect(batchFlushedEvents?.[0].taskRunInserts[1][TASK_RUN_INDEX.run_id]).toEqual(run.id);
898+
expect(batchFlushedEvents?.[0].taskRunInserts[1][TASK_RUN_INDEX.status]).toEqual(
899+
"COMPLETED_SUCCESSFULLY"
900+
);
898901

899902
await runsReplicationService.stop();
900903
}

internal-packages/clickhouse/src/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,14 @@ export type * from "./taskRuns.js";
3131
export type * from "./taskEvents.js";
3232
export type * from "./client/queryBuilder.js";
3333

34+
// Re-export column constants and indices for type-safe array access
35+
export {
36+
TASK_RUN_COLUMNS,
37+
TASK_RUN_INDEX,
38+
PAYLOAD_COLUMNS,
39+
PAYLOAD_INDEX,
40+
} from "./taskRuns.js";
41+
3442
// TSQL query execution
3543
export {
3644
executeTSQL,

internal-packages/clickhouse/src/taskRuns.ts

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,72 @@ export const TASK_RUN_COLUMNS = [
101101
"max_duration_in_seconds",
102102
] as const;
103103

104+
// Type-safe column indices derived from TASK_RUN_COLUMNS
105+
// This ensures indices stay in sync with column order
106+
export const TASK_RUN_INDEX = {
107+
environment_id: 0,
108+
organization_id: 1,
109+
project_id: 2,
110+
run_id: 3,
111+
updated_at: 4,
112+
created_at: 5,
113+
status: 6,
114+
environment_type: 7,
115+
friendly_id: 8,
116+
attempt: 9,
117+
engine: 10,
118+
task_identifier: 11,
119+
queue: 12,
120+
schedule_id: 13,
121+
batch_id: 14,
122+
completed_at: 15,
123+
started_at: 16,
124+
executed_at: 17,
125+
delay_until: 18,
126+
queued_at: 19,
127+
expired_at: 20,
128+
usage_duration_ms: 21,
129+
cost_in_cents: 22,
130+
base_cost_in_cents: 23,
131+
output: 24,
132+
error: 25,
133+
tags: 26,
134+
task_version: 27,
135+
sdk_version: 28,
136+
cli_version: 29,
137+
machine_preset: 30,
138+
root_run_id: 31,
139+
parent_run_id: 32,
140+
depth: 33,
141+
span_id: 34,
142+
trace_id: 35,
143+
idempotency_key: 36,
144+
expiration_ttl: 37,
145+
is_test: 38,
146+
_version: 39,
147+
_is_deleted: 40,
148+
concurrency_key: 41,
149+
bulk_action_group_ids: 42,
150+
worker_queue: 43,
151+
max_duration_in_seconds: 44,
152+
} as const satisfies Record<(typeof TASK_RUN_COLUMNS)[number], number>;
153+
154+
export type TaskRunColumnName = (typeof TASK_RUN_COLUMNS)[number];
155+
156+
// Runtime assertion to verify TASK_RUN_INDEX matches TASK_RUN_COLUMNS order
157+
// This will throw at module load time if there's a mismatch
158+
(function verifyTaskRunColumnIndices() {
159+
for (let i = 0; i < TASK_RUN_COLUMNS.length; i++) {
160+
const column = TASK_RUN_COLUMNS[i];
161+
const index = TASK_RUN_INDEX[column];
162+
if (index !== i) {
163+
throw new Error(
164+
`TASK_RUN_INDEX mismatch: column "${column}" has index ${index} but should be ${i}`
165+
);
166+
}
167+
}
168+
})();
169+
104170
export function insertTaskRunsCompactArrays(ch: ClickhouseWriter, settings?: ClickHouseSettings) {
105171
return ch.insertCompactRaw({
106172
name: "insertTaskRunsCompactArrays",
@@ -124,6 +190,77 @@ export type RawTaskRunPayloadV1 = z.infer<typeof RawTaskRunPayloadV1>;
124190

125191
export const PAYLOAD_COLUMNS = ["run_id", "created_at", "payload"] as const;
126192

193+
// Type-safe column indices for payload columns
194+
export const PAYLOAD_INDEX = {
195+
run_id: 0,
196+
created_at: 1,
197+
payload: 2,
198+
} as const satisfies Record<(typeof PAYLOAD_COLUMNS)[number], number>;
199+
200+
export type PayloadColumnName = (typeof PAYLOAD_COLUMNS)[number];
201+
202+
/**
203+
* Type-safe tuple representing a task run insert array.
204+
* Order matches TASK_RUN_COLUMNS exactly.
205+
*/
206+
export type TaskRunInsertArray = [
207+
environment_id: string,
208+
organization_id: string,
209+
project_id: string,
210+
run_id: string,
211+
updated_at: number,
212+
created_at: number,
213+
status: string,
214+
environment_type: string,
215+
friendly_id: string,
216+
attempt: number,
217+
engine: string,
218+
task_identifier: string,
219+
queue: string,
220+
schedule_id: string,
221+
batch_id: string,
222+
completed_at: number | null,
223+
started_at: number | null,
224+
executed_at: number | null,
225+
delay_until: number | null,
226+
queued_at: number | null,
227+
expired_at: number | null,
228+
usage_duration_ms: number,
229+
cost_in_cents: number,
230+
base_cost_in_cents: number,
231+
output: { data: unknown },
232+
error: { data: unknown },
233+
tags: string[],
234+
task_version: string,
235+
sdk_version: string,
236+
cli_version: string,
237+
machine_preset: string,
238+
root_run_id: string,
239+
parent_run_id: string,
240+
depth: number,
241+
span_id: string,
242+
trace_id: string,
243+
idempotency_key: string,
244+
expiration_ttl: string,
245+
is_test: boolean,
246+
_version: string,
247+
_is_deleted: number,
248+
concurrency_key: string,
249+
bulk_action_group_ids: string[],
250+
worker_queue: string,
251+
max_duration_in_seconds: number | null,
252+
];
253+
254+
/**
255+
* Type-safe tuple representing a payload insert array.
256+
* Order matches PAYLOAD_COLUMNS exactly.
257+
*/
258+
export type PayloadInsertArray = [
259+
run_id: string,
260+
created_at: number,
261+
payload: { data: unknown },
262+
];
263+
127264
export function insertRawTaskRunPayloadsCompactArrays(
128265
ch: ClickhouseWriter,
129266
settings?: ClickHouseSettings

0 commit comments

Comments
 (0)