Skip to content

Commit 83c679d

Browse files
committed
In dev, the worker group is optional when triggering tasks (the master queue is defined by the environment). Also deprecated the TaskEvent.isDebug column and using TaskEventKind.LOG instead for debug events
1 parent e97704d commit 83c679d

File tree

6 files changed

+37
-52
lines changed

6 files changed

+37
-52
lines changed

apps/webapp/app/utils/taskEvent.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
TaskEventStyle,
1313
unflattenAttributes,
1414
} from "@trigger.dev/core/v3";
15-
import { Prisma, TaskEvent } from "@trigger.dev/database";
15+
import { Prisma, TaskEvent, TaskEventKind } from "@trigger.dev/database";
1616
import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/TreeView/TreeView";
1717
import type {
1818
PreparedEvent,
@@ -76,7 +76,7 @@ export function prepareTrace(events: TaskEvent[]): TraceSummary | undefined {
7676
level: event.level,
7777
events: event.events,
7878
environmentType: event.environmentType,
79-
isDebug: event.isDebug,
79+
isDebug: event.kind === TaskEventKind.LOG,
8080
},
8181
} satisfies SpanSummary;
8282

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 7 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import {
2020
omit,
2121
unflattenAttributes,
2222
} from "@trigger.dev/core/v3";
23-
import { Prisma, TaskEvent, TaskEventStatus, type TaskEventKind } from "@trigger.dev/database";
23+
import { Prisma, TaskEvent, TaskEventKind, TaskEventStatus } from "@trigger.dev/database";
2424
import { createHash } from "node:crypto";
2525
import { EventEmitter } from "node:stream";
2626
import { Gauge } from "prom-client";
@@ -126,10 +126,10 @@ export type QueriedEvent = Prisma.TaskEventGetPayload<{
126126
isError: true;
127127
isPartial: true;
128128
isCancelled: true;
129-
isDebug: true;
130129
level: true;
131130
events: true;
132131
environmentType: true;
132+
kind: true;
133133
};
134134
}>;
135135

@@ -186,26 +186,6 @@ export type UpdateEventOptions = {
186186
events?: SpanEvents;
187187
};
188188

189-
type TaskEventSummary = Pick<
190-
TaskEvent,
191-
| "id"
192-
| "spanId"
193-
| "parentId"
194-
| "runId"
195-
| "idempotencyKey"
196-
| "message"
197-
| "style"
198-
| "startTime"
199-
| "duration"
200-
| "isError"
201-
| "isPartial"
202-
| "isCancelled"
203-
| "level"
204-
| "events"
205-
| "environmentType"
206-
| "isDebug"
207-
>;
208-
209189
export class EventRepository {
210190
private readonly _flushScheduler: DynamicFlushScheduler<CreatableEvent>;
211191
private _randomIdGenerator = new RandomIdGenerator();
@@ -512,7 +492,7 @@ export class EventRepository {
512492
isError: event.isError,
513493
isPartial: ancestorCancelled ? false : event.isPartial,
514494
isCancelled: event.isCancelled === true ? true : event.isPartial && ancestorCancelled,
515-
isDebug: event.isDebug,
495+
isDebug: event.kind === TaskEventKind.LOG,
516496
startTime: getDateFromNanoseconds(event.startTime),
517497
level: event.level,
518498
events: event.events,
@@ -569,7 +549,7 @@ export class EventRepository {
569549
isError: true,
570550
isPartial: true,
571551
isCancelled: true,
572-
isDebug: true,
552+
kind: true,
573553
level: true,
574554
events: true,
575555
environmentType: true,
@@ -865,10 +845,8 @@ export class EventRepository {
865845
...options.attributes.metadata,
866846
};
867847

868-
const isDebug = options.attributes.isDebug;
869-
870848
const style = {
871-
[SemanticInternalAttributes.STYLE_ICON]: isDebug ? "warn" : "play",
849+
[SemanticInternalAttributes.STYLE_ICON]: options.attributes.isDebug ? "warn" : "play",
872850
};
873851

874852
if (!options.attributes.runId) {
@@ -883,12 +861,11 @@ export class EventRepository {
883861
message: message,
884862
serviceName: "api server",
885863
serviceNamespace: "trigger.dev",
886-
level: isDebug ? "WARN" : "TRACE",
887-
kind: options.kind,
864+
level: options.attributes.isDebug ? "WARN" : "TRACE",
865+
kind: options.attributes.isDebug ? TaskEventKind.LOG : options.kind,
888866
status: "OK",
889867
startTime,
890868
isPartial: false,
891-
isDebug,
892869
duration, // convert to nanoseconds
893870
environmentId: options.environment.id,
894871
environmentType: options.environment.type,

apps/webapp/app/v3/services/triggerTaskV2.server.ts

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -312,21 +312,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
312312
event.setAttribute("runId", runFriendlyId);
313313
span.setAttribute("runId", runFriendlyId);
314314

315-
const workerGroupService = new WorkerGroupService({
316-
prisma: this._prisma,
317-
engine: this._engine,
318-
});
319-
const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({
320-
projectId: environment.projectId,
321-
});
322-
323-
if (!workerGroup) {
324-
logger.error("Default worker group not found", {
325-
projectId: environment.projectId,
326-
});
327-
328-
return;
329-
}
315+
const masterQueue = await this.#getMasterQueueForEnvironment(environment);
330316

331317
const taskRun = await this._engine.trigger(
332318
{
@@ -351,7 +337,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
351337
concurrencyKey: body.options?.concurrencyKey,
352338
queueName,
353339
queue: body.options?.queue,
354-
masterQueue: workerGroup.masterQueue,
340+
masterQueue: masterQueue,
355341
isTest: body.options?.test ?? false,
356342
delayUntil,
357343
queuedAt: delayUntil ? undefined : new Date(),
@@ -441,6 +427,27 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
441427
});
442428
}
443429

430+
async #getMasterQueueForEnvironment(environment: AuthenticatedEnvironment) {
431+
if (environment.type === "DEVELOPMENT") {
432+
return;
433+
}
434+
435+
const workerGroupService = new WorkerGroupService({
436+
prisma: this._prisma,
437+
engine: this._engine,
438+
});
439+
440+
const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({
441+
projectId: environment.projectId,
442+
});
443+
444+
if (!workerGroup) {
445+
throw new ServiceValidationError("No worker group found");
446+
}
447+
448+
return workerGroup.masterQueue;
449+
}
450+
444451
async #getQueueName(taskId: string, environment: AuthenticatedEnvironment, queueName?: string) {
445452
if (queueName) {
446453
return queueName;

apps/webapp/app/v3/taskEventStore.server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ export type TraceEvent = Pick<
2020
| "level"
2121
| "events"
2222
| "environmentType"
23-
| "isDebug"
23+
| "kind"
2424
>;
2525

2626
export type TaskEventStoreTable = "taskEvent" | "taskEventPartitioned";
@@ -138,7 +138,7 @@ export class TaskEventStore {
138138
level,
139139
events,
140140
"environmentType",
141-
"isDebug"
141+
"kind"
142142
FROM "TaskEventPartitioned"
143143
WHERE
144144
"traceId" = ${traceId}
@@ -168,7 +168,7 @@ export class TaskEventStore {
168168
level,
169169
events,
170170
"environmentType",
171-
"isDebug"
171+
"kind"
172172
FROM "TaskEvent"
173173
WHERE "traceId" = ${traceId}
174174
ORDER BY "startTime" ASC

internal-packages/database/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2381,6 +2381,7 @@ model TaskEvent {
23812381
isError Boolean @default(false)
23822382
isPartial Boolean @default(false)
23832383
isCancelled Boolean @default(false)
2384+
/// deprecated: don't use this, moving this to properties, this now uses TaskEventKind.LOG
23842385
isDebug Boolean @default(false)
23852386
23862387
serviceName String

internal-packages/run-engine/src/engine/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ export type TriggerParams = {
5858
sdkVersion?: string;
5959
cliVersion?: string;
6060
concurrencyKey?: string;
61-
masterQueue: string;
61+
masterQueue?: string;
6262
queueName: string;
6363
queue?: QueueOptions;
6464
isTest: boolean;

0 commit comments

Comments
 (0)