Skip to content

Commit b3b2553

Browse files
authored
fix(otel): propagate the task event store to run descendants (#2583)
1 parent cdd1a88 commit b3b2553

File tree

8 files changed

+138
-127
lines changed

8 files changed

+138
-127
lines changed

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ export class IdempotencyKeyConcern {
1717
private readonly traceEventConcern: TraceEventConcern
1818
) {}
1919

20-
async handleTriggerRequest(request: TriggerTaskRequest): Promise<IdempotencyKeyConcernResult> {
20+
async handleTriggerRequest(
21+
request: TriggerTaskRequest,
22+
parentStore: string | undefined
23+
): Promise<IdempotencyKeyConcernResult> {
2124
const idempotencyKey = request.options?.idempotencyKey ?? request.body.options?.idempotencyKey;
2225
const idempotencyKeyExpiresAt =
2326
request.options?.idempotencyKeyExpiresAt ??
@@ -83,6 +86,7 @@ export class IdempotencyKeyConcern {
8386
if (associatedWaitpoint && resumeParentOnCompletion && parentRunId) {
8487
await this.traceEventConcern.traceIdempotentRun(
8588
request,
89+
parentStore,
8690
{
8791
existingRun,
8892
idempotencyKey,

apps/webapp/app/runEngine/concerns/traceEvents.server.ts

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,26 @@
1-
import { EventRepository } from "~/v3/eventRepository/eventRepository.server";
2-
import { TracedEventSpan, TraceEventConcern, TriggerTaskRequest } from "../types";
31
import { SemanticInternalAttributes } from "@trigger.dev/core/v3/semanticInternalAttributes";
42
import { TaskRun } from "@trigger.dev/database";
5-
import { getTaskEventStore } from "~/v3/taskEventStore.server";
6-
import { ClickhouseEventRepository } from "~/v3/eventRepository/clickhouseEventRepository.server";
73
import { IEventRepository } from "~/v3/eventRepository/eventRepository.types";
8-
import { FEATURE_FLAG, flags } from "~/v3/featureFlags.server";
9-
import { env } from "~/env.server";
104
import { getEventRepository } from "~/v3/eventRepository/index.server";
5+
import { TracedEventSpan, TraceEventConcern, TriggerTaskRequest } from "../types";
116

127
export class DefaultTraceEventsConcern implements TraceEventConcern {
13-
private readonly eventRepository: EventRepository;
14-
private readonly clickhouseEventRepository: ClickhouseEventRepository;
15-
16-
constructor(
17-
eventRepository: EventRepository,
18-
clickhouseEventRepository: ClickhouseEventRepository
19-
) {
20-
this.eventRepository = eventRepository;
21-
this.clickhouseEventRepository = clickhouseEventRepository;
22-
}
23-
248
async #getEventRepository(
25-
request: TriggerTaskRequest
9+
request: TriggerTaskRequest,
10+
parentStore: string | undefined
2611
): Promise<{ repository: IEventRepository; store: string }> {
2712
return await getEventRepository(
28-
request.environment.organization.featureFlags as Record<string, unknown>
13+
request.environment.organization.featureFlags as Record<string, unknown>,
14+
parentStore
2915
);
3016
}
3117

3218
async traceRun<T>(
3319
request: TriggerTaskRequest,
20+
parentStore: string | undefined,
3421
callback: (span: TracedEventSpan, store: string) => Promise<T>
3522
): Promise<T> {
36-
const { repository, store } = await this.#getEventRepository(request);
23+
const { repository, store } = await this.#getEventRepository(request, parentStore);
3724

3825
return await repository.traceEvent(
3926
request.taskId,
@@ -73,6 +60,7 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
7360

7461
async traceIdempotentRun<T>(
7562
request: TriggerTaskRequest,
63+
parentStore: string | undefined,
7664
options: {
7765
existingRun: TaskRun;
7866
idempotencyKey: string;
@@ -82,7 +70,7 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
8270
callback: (span: TracedEventSpan, store: string) => Promise<T>
8371
): Promise<T> {
8472
const { existingRun, idempotencyKey, incomplete, isError } = options;
85-
const { repository, store } = await this.#getEventRepository(request);
73+
const { repository, store } = await this.#getEventRepository(request, parentStore);
8674

8775
return await repository.traceEvent(
8876
`${request.taskId} (cached)`,
@@ -107,7 +95,7 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
10795
},
10896
async (event, traceContext, traceparent) => {
10997
//log a message
110-
await this.eventRepository.recordEvent(
98+
await repository.recordEvent(
11199
`There's an existing run for idempotencyKey: ${idempotencyKey}`,
112100
{
113101
taskSlug: request.taskId,

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 100 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,8 @@ export class RunEngineTriggerTaskService {
197197
}
198198

199199
const idempotencyKeyConcernResult = await this.idempotencyKeyConcern.handleTriggerRequest(
200-
triggerRequest
200+
triggerRequest,
201+
parentRun?.taskEventStore
201202
);
202203

203204
if (idempotencyKeyConcernResult.isCached) {
@@ -266,105 +267,109 @@ export class RunEngineTriggerTaskService {
266267
const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);
267268

268269
try {
269-
return await this.traceEventConcern.traceRun(triggerRequest, async (event, store) => {
270-
const result = await this.runNumberIncrementer.incrementRunNumber(
271-
triggerRequest,
272-
async (num) => {
273-
event.setAttribute("queueName", queueName);
274-
span.setAttribute("queueName", queueName);
275-
event.setAttribute("runId", runFriendlyId);
276-
span.setAttribute("runId", runFriendlyId);
277-
278-
const payloadPacket = await this.payloadProcessor.process(triggerRequest);
279-
280-
const taskRun = await this.engine.trigger(
281-
{
282-
number: num,
283-
friendlyId: runFriendlyId,
284-
environment: environment,
285-
idempotencyKey,
286-
idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined,
287-
taskIdentifier: taskId,
288-
payload: payloadPacket.data ?? "",
289-
payloadType: payloadPacket.dataType,
290-
context: body.context,
291-
traceContext: this.#propagateExternalTraceContext(
292-
event.traceContext,
293-
parentRun?.traceContext,
294-
event.traceparent?.spanId
295-
),
296-
traceId: event.traceId,
297-
spanId: event.spanId,
298-
parentSpanId:
299-
options.parentAsLinkType === "replay" ? undefined : event.traceparent?.spanId,
300-
replayedFromTaskRunFriendlyId: options.replayedFromTaskRunFriendlyId,
301-
lockedToVersionId: lockedToBackgroundWorker?.id,
302-
taskVersion: lockedToBackgroundWorker?.version,
303-
sdkVersion: lockedToBackgroundWorker?.sdkVersion,
304-
cliVersion: lockedToBackgroundWorker?.cliVersion,
305-
concurrencyKey: body.options?.concurrencyKey,
306-
queue: queueName,
307-
lockedQueueId,
308-
workerQueue,
309-
isTest: body.options?.test ?? false,
310-
delayUntil,
311-
queuedAt: delayUntil ? undefined : new Date(),
312-
maxAttempts: body.options?.maxAttempts,
313-
taskEventStore: store,
314-
ttl,
315-
tags,
316-
oneTimeUseToken: options.oneTimeUseToken,
317-
parentTaskRunId: parentRun?.id,
318-
rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id,
319-
batch: options?.batchId
320-
? {
321-
id: options.batchId,
322-
index: options.batchIndex ?? 0,
323-
}
324-
: undefined,
325-
resumeParentOnCompletion: body.options?.resumeParentOnCompletion,
326-
depth,
327-
metadata: metadataPacket?.data,
328-
metadataType: metadataPacket?.dataType,
329-
seedMetadata: metadataPacket?.data,
330-
seedMetadataType: metadataPacket?.dataType,
331-
maxDurationInSeconds: body.options?.maxDuration
332-
? clampMaxDuration(body.options.maxDuration)
333-
: undefined,
334-
machine: body.options?.machine,
335-
priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined,
336-
queueTimestamp:
337-
options.queueTimestamp ??
338-
(parentRun && body.options?.resumeParentOnCompletion
339-
? parentRun.queueTimestamp ?? undefined
340-
: undefined),
341-
scheduleId: options.scheduleId,
342-
scheduleInstanceId: options.scheduleInstanceId,
343-
createdAt: options.overrideCreatedAt,
344-
bulkActionId: body.options?.bulkActionId,
345-
planType,
346-
},
347-
this.prisma
348-
);
349-
350-
const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined;
351-
352-
if (error) {
353-
event.failWithError(error);
270+
return await this.traceEventConcern.traceRun(
271+
triggerRequest,
272+
parentRun?.taskEventStore,
273+
async (event, store) => {
274+
const result = await this.runNumberIncrementer.incrementRunNumber(
275+
triggerRequest,
276+
async (num) => {
277+
event.setAttribute("queueName", queueName);
278+
span.setAttribute("queueName", queueName);
279+
event.setAttribute("runId", runFriendlyId);
280+
span.setAttribute("runId", runFriendlyId);
281+
282+
const payloadPacket = await this.payloadProcessor.process(triggerRequest);
283+
284+
const taskRun = await this.engine.trigger(
285+
{
286+
number: num,
287+
friendlyId: runFriendlyId,
288+
environment: environment,
289+
idempotencyKey,
290+
idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined,
291+
taskIdentifier: taskId,
292+
payload: payloadPacket.data ?? "",
293+
payloadType: payloadPacket.dataType,
294+
context: body.context,
295+
traceContext: this.#propagateExternalTraceContext(
296+
event.traceContext,
297+
parentRun?.traceContext,
298+
event.traceparent?.spanId
299+
),
300+
traceId: event.traceId,
301+
spanId: event.spanId,
302+
parentSpanId:
303+
options.parentAsLinkType === "replay" ? undefined : event.traceparent?.spanId,
304+
replayedFromTaskRunFriendlyId: options.replayedFromTaskRunFriendlyId,
305+
lockedToVersionId: lockedToBackgroundWorker?.id,
306+
taskVersion: lockedToBackgroundWorker?.version,
307+
sdkVersion: lockedToBackgroundWorker?.sdkVersion,
308+
cliVersion: lockedToBackgroundWorker?.cliVersion,
309+
concurrencyKey: body.options?.concurrencyKey,
310+
queue: queueName,
311+
lockedQueueId,
312+
workerQueue,
313+
isTest: body.options?.test ?? false,
314+
delayUntil,
315+
queuedAt: delayUntil ? undefined : new Date(),
316+
maxAttempts: body.options?.maxAttempts,
317+
taskEventStore: store,
318+
ttl,
319+
tags,
320+
oneTimeUseToken: options.oneTimeUseToken,
321+
parentTaskRunId: parentRun?.id,
322+
rootTaskRunId: parentRun?.rootTaskRunId ?? parentRun?.id,
323+
batch: options?.batchId
324+
? {
325+
id: options.batchId,
326+
index: options.batchIndex ?? 0,
327+
}
328+
: undefined,
329+
resumeParentOnCompletion: body.options?.resumeParentOnCompletion,
330+
depth,
331+
metadata: metadataPacket?.data,
332+
metadataType: metadataPacket?.dataType,
333+
seedMetadata: metadataPacket?.data,
334+
seedMetadataType: metadataPacket?.dataType,
335+
maxDurationInSeconds: body.options?.maxDuration
336+
? clampMaxDuration(body.options.maxDuration)
337+
: undefined,
338+
machine: body.options?.machine,
339+
priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined,
340+
queueTimestamp:
341+
options.queueTimestamp ??
342+
(parentRun && body.options?.resumeParentOnCompletion
343+
? parentRun.queueTimestamp ?? undefined
344+
: undefined),
345+
scheduleId: options.scheduleId,
346+
scheduleInstanceId: options.scheduleInstanceId,
347+
createdAt: options.overrideCreatedAt,
348+
bulkActionId: body.options?.bulkActionId,
349+
planType,
350+
},
351+
this.prisma
352+
);
353+
354+
const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined;
355+
356+
if (error) {
357+
event.failWithError(error);
358+
}
359+
360+
return { run: taskRun, error, isCached: false };
354361
}
362+
);
355363

356-
return { run: taskRun, error, isCached: false };
364+
if (result?.error) {
365+
throw new ServiceValidationError(
366+
taskRunErrorToString(taskRunErrorEnhancer(result.error))
367+
);
357368
}
358-
);
359369

360-
if (result?.error) {
361-
throw new ServiceValidationError(
362-
taskRunErrorToString(taskRunErrorEnhancer(result.error))
363-
);
370+
return result;
364371
}
365-
366-
return result;
367-
});
372+
);
368373
} catch (error) {
369374
if (error instanceof RunDuplicateIdempotencyKeyError) {
370375
//retry calling this function, because this time it will return the idempotent run

apps/webapp/app/runEngine/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,12 @@ export type TracedEventSpan = {
143143
export interface TraceEventConcern {
144144
traceRun<T>(
145145
request: TriggerTaskRequest,
146+
parentStore: string | undefined,
146147
callback: (span: TracedEventSpan, store: string) => Promise<T>
147148
): Promise<T>;
148149
traceIdempotentRun<T>(
149150
request: TriggerTaskRequest,
151+
parentStore: string | undefined,
150152
options: {
151153
existingRun: TaskRun;
152154
idempotencyKey: string;

apps/webapp/app/v3/eventRepository/index.server.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,17 @@ export function resolveEventRepositoryForStore(store: string | undefined): IEven
1818
}
1919

2020
export async function getEventRepository(
21-
featureFlags: Record<string, unknown> | undefined
21+
featureFlags: Record<string, unknown> | undefined,
22+
parentStore: string | undefined
2223
): Promise<{ repository: IEventRepository; store: string }> {
24+
if (typeof parentStore === "string") {
25+
if (parentStore === "clickhouse") {
26+
return { repository: clickhouseEventRepository, store: "clickhouse" };
27+
} else {
28+
return { repository: eventRepository, store: getTaskEventStore() };
29+
}
30+
}
31+
2332
const taskEventRepository = await resolveTaskEventRepositoryFlag(featureFlags);
2433

2534
if (taskEventRepository === "clickhouse") {

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { TriggerTaskRequestBody } from "@trigger.dev/core/v3";
22
import { RunEngineVersion, TaskRun } from "@trigger.dev/database";
3+
import { env } from "~/env.server";
34
import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server";
45
import { DefaultPayloadProcessor } from "~/runEngine/concerns/payloads.server";
56
import { DefaultQueueManager } from "~/runEngine/concerns/queues.server";
@@ -9,12 +10,9 @@ import { RunEngineTriggerTaskService } from "~/runEngine/services/triggerTask.se
910
import { DefaultTriggerTaskValidator } from "~/runEngine/validators/triggerTaskValidator";
1011
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1112
import { determineEngineVersion } from "../engineVersion.server";
12-
import { eventRepository } from "../eventRepository/eventRepository.server";
1313
import { tracer } from "../tracer.server";
1414
import { WithRunEngine } from "./baseService.server";
1515
import { TriggerTaskServiceV1 } from "./triggerTaskV1.server";
16-
import { env } from "~/env.server";
17-
import { clickhouseEventRepository } from "../eventRepository/clickhouseEventRepositoryInstance.server";
1816

1917
export type TriggerTaskServiceOptions = {
2018
idempotencyKey?: string;
@@ -94,10 +92,7 @@ export class TriggerTaskService extends WithRunEngine {
9492
body: TriggerTaskRequestBody,
9593
options: TriggerTaskServiceOptions = {}
9694
): Promise<TriggerTaskServiceResult | undefined> {
97-
const traceEventConcern = new DefaultTraceEventsConcern(
98-
eventRepository,
99-
clickhouseEventRepository
100-
);
95+
const traceEventConcern = new DefaultTraceEventsConcern();
10196

10297
const service = new RunEngineTriggerTaskService({
10398
prisma: this._prisma,

0 commit comments

Comments
 (0)