Skip to content

Commit f4ff15c

Browse files
committed
v3 task trigger works with clickhouse
1 parent 8a78d35 commit f4ff15c

File tree

3 files changed

+32
-19
lines changed

3 files changed

+32
-19
lines changed

apps/webapp/app/runEngine/concerns/traceEvents.server.ts

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { ClickhouseEventRepository } from "~/v3/eventRepository/clickhouseEventR
77
import { IEventRepository } from "~/v3/eventRepository/eventRepository.types";
88
import { FEATURE_FLAG, flags } from "~/v3/featureFlags.server";
99
import { env } from "~/env.server";
10+
import { getEventRepository } from "~/v3/eventRepository/index.server";
1011

1112
export class DefaultTraceEventsConcern implements TraceEventConcern {
1213
private readonly eventRepository: EventRepository;
@@ -23,17 +24,9 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
2324
async #getEventRepository(
2425
request: TriggerTaskRequest
2526
): Promise<{ repository: IEventRepository; store: string }> {
26-
const taskEventRepository = await flags({
27-
key: FEATURE_FLAG.taskEventRepository,
28-
defaultValue: env.EVENT_REPOSITORY_DEFAULT_STORE,
29-
overrides: request.environment.organization.featureFlags as Record<string, unknown>,
30-
});
31-
32-
if (taskEventRepository === "clickhouse") {
33-
return { repository: this.clickhouseEventRepository, store: "clickhouse" };
34-
}
35-
36-
return { repository: this.eventRepository, store: getTaskEventStore() };
27+
return await getEventRepository(
28+
request.environment.organization.featureFlags as Record<string, unknown>
29+
);
3730
}
3831

3932
async traceRun<T>(

apps/webapp/app/v3/eventRepository/index.server.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import { clickhouseEventRepository } from "./clickhouseEventRepositoryInstance.s
44
import { IEventRepository, TraceEventOptions } from "./eventRepository.types";
55
import { $replica } from "~/db.server";
66
import { logger } from "~/services/logger.server";
7+
import { FEATURE_FLAG, flags } from "../featureFlags.server";
8+
import { getTaskEventStore } from "../taskEventStore.server";
79

810
export function resolveEventRepositoryForStore(store: string | undefined): IEventRepository {
911
const taskEventStore = store ?? env.EVENT_REPOSITORY_DEFAULT_STORE;
@@ -15,6 +17,22 @@ export function resolveEventRepositoryForStore(store: string | undefined): IEven
1517
return eventRepository;
1618
}
1719

20+
export async function getEventRepository(
21+
featureFlags: Record<string, unknown> | undefined
22+
): Promise<{ repository: IEventRepository; store: string }> {
23+
const taskEventRepository = await flags({
24+
key: FEATURE_FLAG.taskEventRepository,
25+
defaultValue: env.EVENT_REPOSITORY_DEFAULT_STORE,
26+
overrides: featureFlags,
27+
});
28+
29+
if (taskEventRepository === "clickhouse") {
30+
return { repository: clickhouseEventRepository, store: "clickhouse" };
31+
}
32+
33+
return { repository: eventRepository, store: getTaskEventStore() };
34+
}
35+
1836
export async function recordRunDebugLog(
1937
runId: string,
2038
message: string,

apps/webapp/app/v3/services/triggerTaskV1.server.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import {
22
IOPacket,
33
packetRequiresOffloading,
4-
SemanticInternalAttributes,
5-
taskRunErrorToString,
64
taskRunErrorEnhancer,
5+
taskRunErrorToString,
76
TriggerTaskRequestBody,
87
} from "@trigger.dev/core/v3";
98
import {
@@ -12,6 +11,7 @@ import {
1211
stringifyDuration,
1312
} from "@trigger.dev/core/v3/isomorphic";
1413
import { Prisma } from "@trigger.dev/database";
14+
import { z } from "zod";
1515
import { env } from "~/env.server";
1616
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
1717
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
@@ -22,7 +22,7 @@ import { parseDelay } from "~/utils/delays";
2222
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
2323
import { handleMetadataPacket } from "~/utils/packets";
2424
import { marqs } from "~/v3/marqs/index.server";
25-
import { eventRepository } from "../eventRepository/eventRepository.server";
25+
import { getEventRepository } from "../eventRepository/index.server";
2626
import { generateFriendlyId } from "../friendlyIdentifiers";
2727
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
2828
import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server";
@@ -33,16 +33,14 @@ import { startActiveSpan } from "../tracer.server";
3333
import { clampMaxDuration } from "../utils/maxDuration";
3434
import { BaseService, ServiceValidationError } from "./baseService.server";
3535
import { EnqueueDelayedRunService } from "./enqueueDelayedRun.server";
36+
import { enqueueRun } from "./enqueueRun.server";
3637
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
3738
import {
3839
MAX_ATTEMPTS,
3940
OutOfEntitlementError,
4041
TriggerTaskServiceOptions,
4142
TriggerTaskServiceResult,
4243
} from "./triggerTask.server";
43-
import { getTaskEventStore } from "../taskEventStore.server";
44-
import { enqueueRun } from "./enqueueRun.server";
45-
import { z } from "zod";
4644

4745
// This is here for backwords compatibility for v3 users
4846
const QueueOptions = z.object({
@@ -290,8 +288,12 @@ export class TriggerTaskServiceV1 extends BaseService {
290288
})
291289
: undefined;
292290

291+
const { repository, store } = await getEventRepository(
292+
environment.organization.featureFlags as Record<string, unknown>
293+
);
294+
293295
try {
294-
const result = await eventRepository.traceEvent(
296+
const result = await repository.traceEvent(
295297
taskId,
296298
{
297299
context: options.traceContext,
@@ -398,7 +400,7 @@ export class TriggerTaskServiceV1 extends BaseService {
398400
queuedAt: delayUntil ? undefined : new Date(),
399401
queueTimestamp,
400402
maxAttempts: body.options?.maxAttempts,
401-
taskEventStore: getTaskEventStore(),
403+
taskEventStore: store,
402404
ttl,
403405
tags:
404406
tagIds.length === 0

0 commit comments

Comments
 (0)