Skip to content

Commit af16c9d

Browse files
committed
ClickHouse worker_queue on task runs
1 parent e436126 commit af16c9d

File tree

3 files changed

+23
-5
lines changed

3 files changed

+23
-5
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,13 @@ export type RunsReplicationServiceOptions = {
5959
insertMaxDelayMs?: number;
6060
};
6161

62-
type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" };
62+
type PostgresTaskRun = TaskRun & { masterQueue: string };
63+
64+
type TaskRunInsert = {
65+
_version: bigint;
66+
run: PostgresTaskRun;
67+
event: "insert" | "update" | "delete";
68+
};
6369

6470
export type RunsReplicationServiceEvents = {
6571
message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }];
@@ -243,7 +249,7 @@ export class RunsReplicationService {
243249
}
244250
}
245251

246-
async backfill(runs: TaskRun[]) {
252+
async backfill(runs: PostgresTaskRun[]) {
247253
// divide into batches of 50 to get data from Postgres
248254
const flushId = nanoid();
249255
// Use current timestamp as LSN (high enough to be above existing data)
@@ -352,7 +358,7 @@ export class RunsReplicationService {
352358
const replicationLagMs = Date.now() - Number(message.commitTime / 1000n);
353359
this._currentTransaction.commitEndLsn = message.commitEndLsn;
354360
this._currentTransaction.replicationLagMs = replicationLagMs;
355-
const transaction = this._currentTransaction as Transaction<TaskRun>;
361+
const transaction = this._currentTransaction as Transaction<PostgresTaskRun>;
356362
this._currentTransaction = null;
357363

358364
if (transaction.commitEndLsn) {
@@ -370,7 +376,7 @@ export class RunsReplicationService {
370376
}
371377
}
372378

373-
#handleTransaction(transaction: Transaction<TaskRun>) {
379+
#handleTransaction(transaction: Transaction<PostgresTaskRun>) {
374380
if (this._isShutDownComplete) return;
375381

376382
if (this._isShuttingDown) {
@@ -764,7 +770,7 @@ export class RunsReplicationService {
764770
}
765771

766772
async #prepareTaskRunInsert(
767-
run: TaskRun,
773+
run: PostgresTaskRun,
768774
organizationId: string,
769775
environmentType: string,
770776
event: "insert" | "update" | "delete",
@@ -814,6 +820,7 @@ export class RunsReplicationService {
814820
output,
815821
concurrency_key: run.concurrencyKey ?? "",
816822
bulk_action_group_ids: run.bulkActionGroupIds ?? [],
823+
worker_queue: run.masterQueue,
817824
_version: _version.toString(),
818825
_is_deleted: event === "delete" ? 1 : 0,
819826
};
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
-- +goose Up
2+
/*
3+
Add worker_queue column.
4+
*/
5+
ALTER TABLE trigger_dev.task_runs_v2
6+
ADD COLUMN worker_queue String DEFAULT '';
7+
8+
-- +goose Down
9+
ALTER TABLE trigger_dev.task_runs_v2
10+
DROP COLUMN worker_queue;

internal-packages/clickhouse/src/taskRuns.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export const TaskRunV2 = z.object({
4444
is_test: z.boolean().default(false),
4545
concurrency_key: z.string().default(""),
4646
bulk_action_group_ids: z.array(z.string()).default([]),
47+
worker_queue: z.string().default(""),
4748
_version: z.string(),
4849
_is_deleted: z.number().int().default(0),
4950
});

0 commit comments

Comments
 (0)