Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ export class IdempotencyKeyConcern {
private readonly traceEventConcern: TraceEventConcern
) {}

async handleTriggerRequest(request: TriggerTaskRequest): Promise<IdempotencyKeyConcernResult> {
async handleTriggerRequest(
request: TriggerTaskRequest,
parentStore: string | undefined
): Promise<IdempotencyKeyConcernResult> {
const idempotencyKey = request.options?.idempotencyKey ?? request.body.options?.idempotencyKey;
const idempotencyKeyExpiresAt =
request.options?.idempotencyKeyExpiresAt ??
Expand Down Expand Up @@ -83,6 +86,7 @@ export class IdempotencyKeyConcern {
if (associatedWaitpoint && resumeParentOnCompletion && parentRunId) {
await this.traceEventConcern.traceIdempotentRun(
request,
parentStore,
{
existingRun,
idempotencyKey,
Expand Down
32 changes: 10 additions & 22 deletions apps/webapp/app/runEngine/concerns/traceEvents.server.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,26 @@
import { EventRepository } from "~/v3/eventRepository/eventRepository.server";
import { TracedEventSpan, TraceEventConcern, TriggerTaskRequest } from "../types";
import { SemanticInternalAttributes } from "@trigger.dev/core/v3/semanticInternalAttributes";
import { TaskRun } from "@trigger.dev/database";
import { getTaskEventStore } from "~/v3/taskEventStore.server";
import { ClickhouseEventRepository } from "~/v3/eventRepository/clickhouseEventRepository.server";
import { IEventRepository } from "~/v3/eventRepository/eventRepository.types";
import { FEATURE_FLAG, flags } from "~/v3/featureFlags.server";
import { env } from "~/env.server";
import { getEventRepository } from "~/v3/eventRepository/index.server";
import { TracedEventSpan, TraceEventConcern, TriggerTaskRequest } from "../types";

export class DefaultTraceEventsConcern implements TraceEventConcern {
private readonly eventRepository: EventRepository;
private readonly clickhouseEventRepository: ClickhouseEventRepository;

constructor(
eventRepository: EventRepository,
clickhouseEventRepository: ClickhouseEventRepository
) {
this.eventRepository = eventRepository;
this.clickhouseEventRepository = clickhouseEventRepository;
}

async #getEventRepository(
request: TriggerTaskRequest
request: TriggerTaskRequest,
parentStore: string | undefined
): Promise<{ repository: IEventRepository; store: string }> {
return await getEventRepository(
request.environment.organization.featureFlags as Record<string, unknown>
request.environment.organization.featureFlags as Record<string, unknown>,
parentStore
);
}

async traceRun<T>(
request: TriggerTaskRequest,
parentStore: string | undefined,
callback: (span: TracedEventSpan, store: string) => Promise<T>
): Promise<T> {
const { repository, store } = await this.#getEventRepository(request);
const { repository, store } = await this.#getEventRepository(request, parentStore);

return await repository.traceEvent(
request.taskId,
Expand Down Expand Up @@ -73,6 +60,7 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {

async traceIdempotentRun<T>(
request: TriggerTaskRequest,
parentStore: string | undefined,
options: {
existingRun: TaskRun;
idempotencyKey: string;
Expand All @@ -82,7 +70,7 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
callback: (span: TracedEventSpan, store: string) => Promise<T>
): Promise<T> {
const { existingRun, idempotencyKey, incomplete, isError } = options;
const { repository, store } = await this.#getEventRepository(request);
const { repository, store } = await this.#getEventRepository(request, parentStore);

return await repository.traceEvent(
`${request.taskId} (cached)`,
Expand All @@ -107,7 +95,7 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
},
async (event, traceContext, traceparent) => {
//log a message
await this.eventRepository.recordEvent(
await repository.recordEvent(
`There's an existing run for idempotencyKey: ${idempotencyKey}`,
{
taskSlug: request.taskId,
Expand Down
195 changes: 100 additions & 95 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ export class RunEngineTriggerTaskService {
}

const idempotencyKeyConcernResult = await this.idempotencyKeyConcern.handleTriggerRequest(
triggerRequest
triggerRequest,
parentRun?.taskEventStore
);

if (idempotencyKeyConcernResult.isCached) {
Expand Down Expand Up @@ -266,105 +267,109 @@ export class RunEngineTriggerTaskService {
const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);

try {
return await this.traceEventConcern.traceRun(triggerRequest, async (event, store) => {
const result = await this.runNumberIncrementer.incrementRunNumber(
triggerRequest,
async (num) => {
event.setAttribute("queueName", queueName);
span.setAttribute("queueName", queueName);
event.setAttribute("runId", runFriendlyId);
span.setAttribute("runId", runFriendlyId);

const payloadPacket = await this.payloadProcessor.process(triggerRequest);

const taskRun = await this.engine.trigger(
{
number: num,
friendlyId: runFriendlyId,
environment: environment,
idempotencyKey,
idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined,
taskIdentifier: taskId,
payload: payloadPacket.data ?? "",
payloadType: payloadPacket.dataType,
context: body.context,
traceContext: this.#propagateExternalTraceContext(
event.traceContext,
parentRun?.traceContext,
event.traceparent?.spanId
),
traceId: event.traceId,
spanId: event.spanId,
parentSpanId:
options.parentAsLinkType === "replay" ? undefined : event.traceparent?.spanId,
replayedFromTaskRunFriendlyId: options.replayedFromTaskRunFriendlyId,
lockedToVersionId: lockedToBackgroundWorker?.id,
taskVersion: lockedToBackgroundWorker?.version,
sdkVersion: lockedToBackgroundWorker?.sdkVersion,
cliVersion: lockedToBackgroundWorker?.cliVersion,
concurrencyKey: body.options?.concurrencyKey,
queue: queueName,
lockedQueueId,
workerQueue,
isTest: body.options?.test ?? false,
delayUntil,
queuedAt: delayUntil ? undefined : new Date(),
maxAttempts: body.options?.maxAttempts,
taskEventStore: store,
ttl,
tags,
oneTimeUseToken: options.oneTimeUseToken,
parentTaskRunId: parentRun?.id,
rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id,
batch: options?.batchId
? {
id: options.batchId,
index: options.batchIndex ?? 0,
}
: undefined,
resumeParentOnCompletion: body.options?.resumeParentOnCompletion,
depth,
metadata: metadataPacket?.data,
metadataType: metadataPacket?.dataType,
seedMetadata: metadataPacket?.data,
seedMetadataType: metadataPacket?.dataType,
maxDurationInSeconds: body.options?.maxDuration
? clampMaxDuration(body.options.maxDuration)
: undefined,
machine: body.options?.machine,
priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined,
queueTimestamp:
options.queueTimestamp ??
(parentRun && body.options?.resumeParentOnCompletion
? parentRun.queueTimestamp ?? undefined
: undefined),
scheduleId: options.scheduleId,
scheduleInstanceId: options.scheduleInstanceId,
createdAt: options.overrideCreatedAt,
bulkActionId: body.options?.bulkActionId,
planType,
},
this.prisma
);

const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined;

if (error) {
event.failWithError(error);
return await this.traceEventConcern.traceRun(
triggerRequest,
parentRun?.taskEventStore,
async (event, store) => {
const result = await this.runNumberIncrementer.incrementRunNumber(
triggerRequest,
async (num) => {
event.setAttribute("queueName", queueName);
span.setAttribute("queueName", queueName);
event.setAttribute("runId", runFriendlyId);
span.setAttribute("runId", runFriendlyId);

const payloadPacket = await this.payloadProcessor.process(triggerRequest);

const taskRun = await this.engine.trigger(
{
number: num,
friendlyId: runFriendlyId,
environment: environment,
idempotencyKey,
idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined,
taskIdentifier: taskId,
payload: payloadPacket.data ?? "",
payloadType: payloadPacket.dataType,
context: body.context,
traceContext: this.#propagateExternalTraceContext(
event.traceContext,
parentRun?.traceContext,
event.traceparent?.spanId
),
traceId: event.traceId,
spanId: event.spanId,
parentSpanId:
options.parentAsLinkType === "replay" ? undefined : event.traceparent?.spanId,
replayedFromTaskRunFriendlyId: options.replayedFromTaskRunFriendlyId,
lockedToVersionId: lockedToBackgroundWorker?.id,
taskVersion: lockedToBackgroundWorker?.version,
sdkVersion: lockedToBackgroundWorker?.sdkVersion,
cliVersion: lockedToBackgroundWorker?.cliVersion,
concurrencyKey: body.options?.concurrencyKey,
queue: queueName,
lockedQueueId,
workerQueue,
isTest: body.options?.test ?? false,
delayUntil,
queuedAt: delayUntil ? undefined : new Date(),
maxAttempts: body.options?.maxAttempts,
taskEventStore: store,
ttl,
tags,
oneTimeUseToken: options.oneTimeUseToken,
parentTaskRunId: parentRun?.id,
rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id,
batch: options?.batchId
? {
id: options.batchId,
index: options.batchIndex ?? 0,
}
: undefined,
resumeParentOnCompletion: body.options?.resumeParentOnCompletion,
depth,
metadata: metadataPacket?.data,
metadataType: metadataPacket?.dataType,
seedMetadata: metadataPacket?.data,
seedMetadataType: metadataPacket?.dataType,
maxDurationInSeconds: body.options?.maxDuration
? clampMaxDuration(body.options.maxDuration)
: undefined,
machine: body.options?.machine,
priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined,
queueTimestamp:
options.queueTimestamp ??
(parentRun && body.options?.resumeParentOnCompletion
? parentRun.queueTimestamp ?? undefined
: undefined),
scheduleId: options.scheduleId,
scheduleInstanceId: options.scheduleInstanceId,
createdAt: options.overrideCreatedAt,
bulkActionId: body.options?.bulkActionId,
planType,
},
this.prisma
);

const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined;

if (error) {
event.failWithError(error);
}

return { run: taskRun, error, isCached: false };
}
);

return { run: taskRun, error, isCached: false };
if (result?.error) {
throw new ServiceValidationError(
taskRunErrorToString(taskRunErrorEnhancer(result.error))
);
}
);

if (result?.error) {
throw new ServiceValidationError(
taskRunErrorToString(taskRunErrorEnhancer(result.error))
);
return result;
}

return result;
});
);
} catch (error) {
if (error instanceof RunDuplicateIdempotencyKeyError) {
//retry calling this function, because this time it will return the idempotent run
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/runEngine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,12 @@ export type TracedEventSpan = {
export interface TraceEventConcern {
traceRun<T>(
request: TriggerTaskRequest,
parentStore: string | undefined,
callback: (span: TracedEventSpan, store: string) => Promise<T>
): Promise<T>;
traceIdempotentRun<T>(
request: TriggerTaskRequest,
parentStore: string | undefined,
options: {
existingRun: TaskRun;
idempotencyKey: string;
Expand Down
11 changes: 10 additions & 1 deletion apps/webapp/app/v3/eventRepository/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,17 @@ export function resolveEventRepositoryForStore(store: string | undefined): IEven
}

export async function getEventRepository(
featureFlags: Record<string, unknown> | undefined
featureFlags: Record<string, unknown> | undefined,
parentStore: string | undefined
): Promise<{ repository: IEventRepository; store: string }> {
if (typeof parentStore === "string") {
if (parentStore === "clickhouse") {
return { repository: clickhouseEventRepository, store: "clickhouse" };
} else {
return { repository: eventRepository, store: getTaskEventStore() };
}
Comment on lines +24 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve the parent event store selection

When parentStore is provided (Line 24), we intend to keep the descendant run in the same event store as its parent. The current branch discards the supplied store and falls back to getTaskEventStore(), so a parent run stored in "taskEvent" could spawn children in whatever the current default is (e.g. "taskEventPartitioned"). That breaks trace/event continuity and can move data into the wrong backend. Please return the passed-in store instead of recomputing it.

   if (typeof parentStore === "string") {
     if (parentStore === "clickhouse") {
       return { repository: clickhouseEventRepository, store: "clickhouse" };
-    } else {
-      return { repository: eventRepository, store: getTaskEventStore() };
-    }
+    }
+
+    return { repository: eventRepository, store: parentStore };
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (typeof parentStore === "string") {
if (parentStore === "clickhouse") {
return { repository: clickhouseEventRepository, store: "clickhouse" };
} else {
return { repository: eventRepository, store: getTaskEventStore() };
}
if (typeof parentStore === "string") {
if (parentStore === "clickhouse") {
return { repository: clickhouseEventRepository, store: "clickhouse" };
}
return { repository: eventRepository, store: parentStore };
}
🤖 Prompt for AI Agents
In apps/webapp/app/v3/eventRepository/index.server.ts around lines 24-29, the
branch that handles a supplied parentStore currently returns repository:
eventRepository but sets store: getTaskEventStore(), discarding the passed-in
parentStore; change that branch to return the repository appropriate for
non-clickhouse parents (eventRepository) but preserve and return the passed-in
parentStore value (store: parentStore) instead of recomputing it.

}

const taskEventRepository = await resolveTaskEventRepositoryFlag(featureFlags);

if (taskEventRepository === "clickhouse") {
Expand Down
9 changes: 2 additions & 7 deletions apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { TriggerTaskRequestBody } from "@trigger.dev/core/v3";
import { RunEngineVersion, TaskRun } from "@trigger.dev/database";
import { env } from "~/env.server";
import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server";
import { DefaultPayloadProcessor } from "~/runEngine/concerns/payloads.server";
import { DefaultQueueManager } from "~/runEngine/concerns/queues.server";
Expand All @@ -9,12 +10,9 @@ import { RunEngineTriggerTaskService } from "~/runEngine/services/triggerTask.se
import { DefaultTriggerTaskValidator } from "~/runEngine/validators/triggerTaskValidator";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { determineEngineVersion } from "../engineVersion.server";
import { eventRepository } from "../eventRepository/eventRepository.server";
import { tracer } from "../tracer.server";
import { WithRunEngine } from "./baseService.server";
import { TriggerTaskServiceV1 } from "./triggerTaskV1.server";
import { env } from "~/env.server";
import { clickhouseEventRepository } from "../eventRepository/clickhouseEventRepositoryInstance.server";

export type TriggerTaskServiceOptions = {
idempotencyKey?: string;
Expand Down Expand Up @@ -94,10 +92,7 @@ export class TriggerTaskService extends WithRunEngine {
body: TriggerTaskRequestBody,
options: TriggerTaskServiceOptions = {}
): Promise<TriggerTaskServiceResult | undefined> {
const traceEventConcern = new DefaultTraceEventsConcern(
eventRepository,
clickhouseEventRepository
);
const traceEventConcern = new DefaultTraceEventsConcern();

const service = new RunEngineTriggerTaskService({
prisma: this._prisma,
Expand Down
Loading