diff --git a/apps/coordinator/src/checkpointer.ts b/apps/coordinator/src/checkpointer.ts index 69c51e2fb5..e7404bb0dd 100644 --- a/apps/coordinator/src/checkpointer.ts +++ b/apps/coordinator/src/checkpointer.ts @@ -277,6 +277,7 @@ export class Checkpointer { return result.checkpoint; } finally { if (opts.shouldHeartbeat) { + // @ts-ignore - Some kind of node incompatible type issue clearInterval(interval); } removeCurrentAbortController(); diff --git a/apps/webapp/app/utils/taskEvent.ts b/apps/webapp/app/utils/taskEvent.ts index 7c5be6a34e..5fa2713e16 100644 --- a/apps/webapp/app/utils/taskEvent.ts +++ b/apps/webapp/app/utils/taskEvent.ts @@ -12,7 +12,7 @@ import { TaskEventStyle, unflattenAttributes, } from "@trigger.dev/core/v3"; -import { Prisma, TaskEvent } from "@trigger.dev/database"; +import { Prisma, TaskEvent, TaskEventKind } from "@trigger.dev/database"; import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/TreeView/TreeView"; import type { PreparedEvent, @@ -76,7 +76,7 @@ export function prepareTrace(events: TaskEvent[]): TraceSummary | undefined { level: event.level, events: event.events, environmentType: event.environmentType, - isDebug: event.isDebug, + isDebug: event.kind === TaskEventKind.LOG, }, } satisfies SpanSummary; diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index 391b3be1b9..990c6127ef 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -20,7 +20,7 @@ import { omit, unflattenAttributes, } from "@trigger.dev/core/v3"; -import { Prisma, TaskEvent, TaskEventStatus, type TaskEventKind } from "@trigger.dev/database"; +import { Prisma, TaskEvent, TaskEventKind, TaskEventStatus } from "@trigger.dev/database"; import { createHash } from "node:crypto"; import { EventEmitter } from "node:stream"; import { Gauge } from "prom-client"; @@ -126,10 +126,10 @@ export type QueriedEvent = Prisma.TaskEventGetPayload<{ isError: true; isPartial: true; isCancelled: true; - isDebug: true; level: true; events: true; environmentType: true; + kind: true; }; }>; @@ -186,26 +186,6 @@ export type UpdateEventOptions = { events?: SpanEvents; }; -type TaskEventSummary = Pick< - TaskEvent, - | "id" - | "spanId" - | "parentId" - | "runId" - | "idempotencyKey" - | "message" - | "style" - | "startTime" - | "duration" - | "isError" - | "isPartial" - | "isCancelled" - | "level" - | "events" - | "environmentType" - | "isDebug" ->; - export class EventRepository { private readonly _flushScheduler: DynamicFlushScheduler; private _randomIdGenerator = new RandomIdGenerator(); @@ -512,7 +492,7 @@ export class EventRepository { isError: event.isError, isPartial: ancestorCancelled ? false : event.isPartial, isCancelled: event.isCancelled === true ? true : event.isPartial && ancestorCancelled, - isDebug: event.isDebug, + isDebug: event.kind === TaskEventKind.LOG, startTime: getDateFromNanoseconds(event.startTime), level: event.level, events: event.events, @@ -569,7 +549,7 @@ export class EventRepository { isError: true, isPartial: true, isCancelled: true, - isDebug: true, + kind: true, level: true, events: true, environmentType: true, @@ -865,10 +845,8 @@ export class EventRepository { ...options.attributes.metadata, }; - const isDebug = options.attributes.isDebug; - const style = { - [SemanticInternalAttributes.STYLE_ICON]: isDebug ? "warn" : "play", + [SemanticInternalAttributes.STYLE_ICON]: options.attributes.isDebug ? "warn" : "play", }; if (!options.attributes.runId) { @@ -883,12 +861,11 @@ export class EventRepository { message: message, serviceName: "api server", serviceNamespace: "trigger.dev", - level: isDebug ? "WARN" : "TRACE", - kind: options.kind, + level: options.attributes.isDebug ? "WARN" : "TRACE", + kind: options.attributes.isDebug ? TaskEventKind.LOG : options.kind, status: "OK", startTime, isPartial: false, - isDebug, duration, // convert to nanoseconds environmentId: options.environment.id, environmentType: options.environment.type, diff --git a/apps/webapp/app/v3/services/triggerTaskV2.server.ts b/apps/webapp/app/v3/services/triggerTaskV2.server.ts index dacb7e1dee..592b92b688 100644 --- a/apps/webapp/app/v3/services/triggerTaskV2.server.ts +++ b/apps/webapp/app/v3/services/triggerTaskV2.server.ts @@ -312,21 +312,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine { event.setAttribute("runId", runFriendlyId); span.setAttribute("runId", runFriendlyId); - const workerGroupService = new WorkerGroupService({ - prisma: this._prisma, - engine: this._engine, - }); - const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({ - projectId: environment.projectId, - }); - - if (!workerGroup) { - logger.error("Default worker group not found", { - projectId: environment.projectId, - }); - - return; - } + const masterQueue = await this.#getMasterQueueForEnvironment(environment); const taskRun = await this._engine.trigger( { @@ -351,7 +337,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine { concurrencyKey: body.options?.concurrencyKey, queueName, queue: body.options?.queue, - masterQueue: workerGroup.masterQueue, + masterQueue: masterQueue, isTest: body.options?.test ?? false, delayUntil, queuedAt: delayUntil ? undefined : new Date(), @@ -441,6 +427,27 @@ export class TriggerTaskServiceV2 extends WithRunEngine { }); } + async #getMasterQueueForEnvironment(environment: AuthenticatedEnvironment) { + if (environment.type === "DEVELOPMENT") { + return; + } + + const workerGroupService = new WorkerGroupService({ + prisma: this._prisma, + engine: this._engine, + }); + + const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({ + projectId: environment.projectId, + }); + + if (!workerGroup) { + throw new ServiceValidationError("No worker group found"); + } + + return workerGroup.masterQueue; + } + async #getQueueName(taskId: string, environment: AuthenticatedEnvironment, queueName?: string) { if (queueName) { return queueName; diff --git a/apps/webapp/app/v3/taskEventStore.server.ts b/apps/webapp/app/v3/taskEventStore.server.ts index ec66fd8ce1..3426f98463 100644 --- a/apps/webapp/app/v3/taskEventStore.server.ts +++ b/apps/webapp/app/v3/taskEventStore.server.ts @@ -20,7 +20,7 @@ export type TraceEvent = Pick< | "level" | "events" | "environmentType" - | "isDebug" + | "kind" >; export type TaskEventStoreTable = "taskEvent" | "taskEventPartitioned"; @@ -138,7 +138,7 @@ export class TaskEventStore { level, events, "environmentType", - "isDebug" + "kind" FROM "TaskEventPartitioned" WHERE "traceId" = ${traceId} @@ -168,7 +168,7 @@ export class TaskEventStore { level, events, "environmentType", - "isDebug" + "kind" FROM "TaskEvent" WHERE "traceId" = ${traceId} ORDER BY "startTime" ASC diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index e38846e382..af2c88485d 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -2381,6 +2381,7 @@ model TaskEvent { isError Boolean @default(false) isPartial Boolean @default(false) isCancelled Boolean @default(false) + /// deprecated: don't use this, moving this to properties, this now uses TaskEventKind.LOG isDebug Boolean @default(false) serviceName String diff --git a/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts b/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts index bbe5ab2de6..6639253654 100644 --- a/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts +++ b/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts @@ -104,6 +104,7 @@ describe("RunEngine batchTriggerAndWait", () => { batchId: batch.id, environmentId: authenticatedEnvironment.id, projectId: authenticatedEnvironment.projectId, + organizationId: authenticatedEnvironment.organizationId, }); const afterBlockedByBatch = await engine.getRunExecutionData({ runId: parentRun.id }); diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index a708fd1269..a6bf6cf8b2 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -58,14 +58,14 @@ export type TriggerParams = { sdkVersion?: string; cliVersion?: string; concurrencyKey?: string; - masterQueue: string; + masterQueue?: string; queueName: string; queue?: QueueOptions; isTest: boolean; delayUntil?: Date; queuedAt?: Date; maxAttempts?: number; - taskEventStore: string; + taskEventStore?: string; priorityMs?: number; ttl?: string; tags: { id: string; name: string }[]; diff --git a/internal-packages/testcontainers/src/utils.ts b/internal-packages/testcontainers/src/utils.ts index 2f6c9e15d0..b770c7fcee 100644 --- a/internal-packages/testcontainers/src/utils.ts +++ b/internal-packages/testcontainers/src/utils.ts @@ -87,6 +87,10 @@ async function verifyRedisConnection(container: StartedRedisContainer) { }, }); + redis.on("error", (error) => { + // swallow the error + }); + try { await redis.ping(); } finally {