diff --git a/.changeset/nice-colts-boil.md b/.changeset/nice-colts-boil.md new file mode 100644 index 0000000000..bd395ae9dd --- /dev/null +++ b/.changeset/nice-colts-boil.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +Improve warm start times by eagerly creating the child TaskRunProcess when a previous run as completed diff --git a/apps/webapp/app/components/run/RunTimeline.tsx b/apps/webapp/app/components/run/RunTimeline.tsx index 531e1187ed..27a19e9640 100644 --- a/apps/webapp/app/components/run/RunTimeline.tsx +++ b/apps/webapp/app/components/run/RunTimeline.tsx @@ -11,6 +11,7 @@ import { DateTime, DateTimeAccurate } from "../primitives/DateTime"; import { LiveTimer } from "../runs/v3/LiveTimer"; import tileBgPath from "~/assets/images/error-banner-tile@2x.png"; import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from "../primitives/Tooltip"; +import { getHelpTextForEvent, TimelineSpanEvent } from "~/utils/timelineSpanEvents"; // Types for the RunTimeline component export type TimelineEventState = "complete" | "error" | "inprogress" | "delayed"; @@ -674,196 +675,3 @@ export function SpanTimeline({ ); } - -export type TimelineSpanEvent = { - name: string; - offset: number; - timestamp: Date; - duration?: number; - helpText?: string; - markerVariant: TimelineEventVariant; - lineVariant: TimelineLineVariant; -}; - -export function createTimelineSpanEventsFromSpanEvents( - spanEvents: SpanEvent[], - isAdmin: boolean, - relativeStartTime?: number -): Array { - // Rest of function remains the same - if (!spanEvents) { - return []; - } - - const matchingSpanEvents = spanEvents.filter((spanEvent) => - spanEvent.name.startsWith("trigger.dev/") - ); - - if (matchingSpanEvents.length === 0) { - return []; - } - - const sortedSpanEvents = [...matchingSpanEvents].sort((a, b) => { - if (a.time === b.time) { - return a.name.localeCompare(b.name); - } - - const aTime = typeof a.time === "string" ? new Date(a.time) : a.time; - const bTime = typeof b.time === "string" ? new Date(b.time) : b.time; - - return aTime.getTime() - bTime.getTime(); - }); - - const visibleSpanEvents = sortedSpanEvents.filter( - (spanEvent) => - isAdmin || - !getAdminOnlyForEvent( - "event" in spanEvent.properties && typeof spanEvent.properties.event === "string" - ? spanEvent.properties.event - : spanEvent.name - ) - ); - - if (visibleSpanEvents.length === 0) { - return []; - } - - const firstEventTime = - typeof visibleSpanEvents[0].time === "string" - ? new Date(visibleSpanEvents[0].time) - : visibleSpanEvents[0].time; - - const $relativeStartTime = relativeStartTime ?? firstEventTime.getTime(); - - const events = visibleSpanEvents.map((spanEvent, index) => { - const timestamp = - typeof spanEvent.time === "string" ? new Date(spanEvent.time) : spanEvent.time; - - const offset = millisecondsToNanoseconds(timestamp.getTime() - $relativeStartTime); - - const duration = - "duration" in spanEvent.properties && typeof spanEvent.properties.duration === "number" - ? spanEvent.properties.duration - : undefined; - - const name = - "event" in spanEvent.properties && typeof spanEvent.properties.event === "string" - ? spanEvent.properties.event - : spanEvent.name; - - let markerVariant: TimelineEventVariant = "dot-hollow"; - - if (index === 0) { - markerVariant = "start-cap"; - } - - return { - name: getFriendlyNameForEvent(name), - offset, - timestamp, - duration, - properties: spanEvent.properties, - helpText: getHelpTextForEvent(name), - markerVariant, - lineVariant: "light" as const, - }; - }); - - // Now sort by offset, ascending - events.sort((a, b) => a.offset - b.offset); - - return events; -} - -function getFriendlyNameForEvent(event: string): string { - switch (event) { - case "dequeue": { - return "Dequeued"; - } - case "fork": { - return "Launched"; - } - case "create_attempt": { - return "Attempt created"; - } - case "import": { - return "Imported task file"; - } - case "lazy_payload": { - return "Lazy attempt initialized"; - } - case "pod_scheduled": { - return "Pod scheduled"; - } - default: { - return event; - } - } -} - -function getAdminOnlyForEvent(event: string): boolean { - switch (event) { - case "dequeue": { - return false; - } - case "fork": { - return false; - } - case "create_attempt": { - return true; - } - case "import": { - return true; - } - case "lazy_payload": { - return true; - } - case "pod_scheduled": { - return true; - } - default: { - return true; - } - } -} - -function getHelpTextForEvent(event: string): string | undefined { - switch (event) { - case "dequeue": { - return "The run was dequeued from the queue"; - } - case "fork": { - return "The process was created to run the task"; - } - case "create_attempt": { - return "An attempt was created for the run"; - } - case "import": { - return "A task file was imported"; - } - case "lazy_payload": { - return "The payload was initialized lazily"; - } - case "pod_scheduled": { - return "The Kubernetes pod was scheduled to run"; - } - case "Triggered": { - return "The run was triggered"; - } - case "Dequeued": { - return "The run was dequeued from the queue"; - } - case "Started": { - return "The run began executing"; - } - case "Finished": { - return "The run completed execution"; - } - case "Expired": { - return "The run expired before it could be started"; - } - default: { - return undefined; - } - } -} diff --git a/apps/webapp/app/presenters/v3/RunPresenter.server.ts b/apps/webapp/app/presenters/v3/RunPresenter.server.ts index 3d9186f32c..f2a8dece51 100644 --- a/apps/webapp/app/presenters/v3/RunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunPresenter.server.ts @@ -1,8 +1,7 @@ import { millisecondsToNanoseconds } from "@trigger.dev/core/v3"; import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/TreeView/TreeView"; -import { createTimelineSpanEventsFromSpanEvents } from "~/components/run/RunTimeline"; import { prisma, PrismaClient } from "~/db.server"; -import { redirectWithErrorMessage } from "~/models/message.server"; +import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents"; import { getUsername } from "~/utils/username"; import { eventRepository } from "~/v3/eventRepository.server"; import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx index 3d7047628b..e11a93f6d2 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx @@ -36,12 +36,7 @@ import { import { TabButton, TabContainer } from "~/components/primitives/Tabs"; import { TextLink } from "~/components/primitives/TextLink"; import { InfoIconTooltip, SimpleTooltip } from "~/components/primitives/Tooltip"; -import { - createTimelineSpanEventsFromSpanEvents, - RunTimeline, - RunTimelineEvent, - SpanTimeline, -} from "~/components/run/RunTimeline"; +import { RunTimeline, RunTimelineEvent, SpanTimeline } from "~/components/run/RunTimeline"; import { RunIcon } from "~/components/runs/v3/RunIcon"; import { RunTag } from "~/components/runs/v3/RunTag"; import { SpanEvents } from "~/components/runs/v3/SpanEvents"; @@ -76,6 +71,7 @@ import { useEnvironment } from "~/hooks/useEnvironment"; import { WaitpointStatusCombo } from "~/components/runs/v3/WaitpointStatus"; import { PacketDisplay } from "~/components/runs/v3/PacketDisplay"; import { WaitpointDetailTable } from "~/components/runs/v3/WaitpointDetails"; +import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents"; export const loader = async ({ request, params }: LoaderFunctionArgs) => { const { projectParam, organizationSlug, envParam, runParam, spanParam } = diff --git a/apps/webapp/app/utils/timelineSpanEvents.ts b/apps/webapp/app/utils/timelineSpanEvents.ts new file mode 100644 index 0000000000..85d82164e1 --- /dev/null +++ b/apps/webapp/app/utils/timelineSpanEvents.ts @@ -0,0 +1,227 @@ +import { SpanEvent } from "@trigger.dev/core/v3"; +import { millisecondsToNanoseconds } from "@trigger.dev/core/v3/utils/durations"; + +export type TimelineEventState = "complete" | "error" | "inprogress" | "delayed"; + +export type TimelineLineVariant = "light" | "normal"; + +export type TimelineEventVariant = + | "start-cap" + | "dot-hollow" + | "dot-solid" + | "start-cap-thick" + | "end-cap-thick" + | "end-cap"; + +export type TimelineSpanEvent = { + name: string; + offset: number; + timestamp: Date; + duration?: number; + helpText?: string; + markerVariant: TimelineEventVariant; + lineVariant: TimelineLineVariant; +}; + +export function createTimelineSpanEventsFromSpanEvents( + spanEvents: SpanEvent[], + isAdmin: boolean, + relativeStartTime?: number +): Array { + if (!spanEvents) { + return []; + } + + const matchingSpanEvents = spanEvents.filter((spanEvent) => + spanEvent.name.startsWith("trigger.dev/") + ); + + if (matchingSpanEvents.length === 0) { + return []; + } + + // Check if there's a fork event + const hasForkEvent = matchingSpanEvents.some( + (spanEvent) => + "event" in spanEvent.properties && + typeof spanEvent.properties.event === "string" && + spanEvent.properties.event === "fork" + ); + + const sortedSpanEvents = [...matchingSpanEvents].sort((a, b) => { + if (a.time === b.time) { + return a.name.localeCompare(b.name); + } + + const aTime = typeof a.time === "string" ? new Date(a.time) : a.time; + const bTime = typeof b.time === "string" ? new Date(b.time) : b.time; + + return aTime.getTime() - bTime.getTime(); + }); + + const visibleSpanEvents = sortedSpanEvents.filter((spanEvent) => { + const eventName = + "event" in spanEvent.properties && typeof spanEvent.properties.event === "string" + ? spanEvent.properties.event + : spanEvent.name; + + // If we're admin, everything is visible + if (isAdmin) { + return true; + } + + // If there's no fork event, import events are also visible to non-admins + if (!hasForkEvent && eventName === "import") { + return true; + } + + // Otherwise use normal admin-only logic + return !getAdminOnlyForEvent(eventName); + }); + + if (visibleSpanEvents.length === 0) { + return []; + } + + const firstEventTime = + typeof visibleSpanEvents[0].time === "string" + ? new Date(visibleSpanEvents[0].time) + : visibleSpanEvents[0].time; + + const $relativeStartTime = relativeStartTime ?? firstEventTime.getTime(); + + const events = visibleSpanEvents.map((spanEvent, index) => { + const timestamp = + typeof spanEvent.time === "string" ? new Date(spanEvent.time) : spanEvent.time; + + const offset = millisecondsToNanoseconds(timestamp.getTime() - $relativeStartTime); + + const duration = + "duration" in spanEvent.properties && typeof spanEvent.properties.duration === "number" + ? spanEvent.properties.duration + : undefined; + + const name = + "event" in spanEvent.properties && typeof spanEvent.properties.event === "string" + ? spanEvent.properties.event + : spanEvent.name; + + let markerVariant: TimelineEventVariant = "dot-hollow"; + + if (index === 0) { + markerVariant = "start-cap"; + } + + return { + name: getFriendlyNameForEvent(name, spanEvent.properties), + offset, + timestamp, + duration, + properties: spanEvent.properties, + helpText: getHelpTextForEvent(name), + markerVariant, + lineVariant: "light" as const, + }; + }); + + // Now sort by offset, ascending + events.sort((a, b) => a.offset - b.offset); + + return events; +} + +function getFriendlyNameForEvent(event: string, properties?: Record): string { + switch (event) { + case "dequeue": { + return "Dequeued"; + } + case "fork": { + return "Launched"; + } + case "create_attempt": { + return "Attempt created"; + } + case "import": { + if (properties && typeof properties.file === "string") { + return `Importing ${properties.file}`; + } + return "Importing task file"; + } + case "lazy_payload": { + return "Lazy attempt initialized"; + } + case "pod_scheduled": { + return "Pod scheduled"; + } + default: { + return event; + } + } +} + +function getAdminOnlyForEvent(event: string): boolean { + switch (event) { + case "dequeue": { + return false; + } + case "fork": { + return false; + } + case "create_attempt": { + return true; + } + case "import": { + return true; + } + case "lazy_payload": { + return true; + } + case "pod_scheduled": { + return true; + } + default: { + return true; + } + } +} + +export function getHelpTextForEvent(event: string): string | undefined { + switch (event) { + case "dequeue": { + return "The run was dequeued from the queue"; + } + case "fork": { + return "The process was created to run the task"; + } + case "create_attempt": { + return "An attempt was created for the run"; + } + case "import": { + return "A task file was imported"; + } + case "lazy_payload": { + return "The payload was initialized lazily"; + } + case "pod_scheduled": { + return "The Kubernetes pod was scheduled to run"; + } + case "Triggered": { + return "The run was triggered"; + } + case "Dequeued": { + return "The run was dequeued from the queue"; + } + case "Started": { + return "The run began executing"; + } + case "Finished": { + return "The run completed execution"; + } + case "Expired": { + return "The run expired before it could be started"; + } + default: { + return undefined; + } + } +} diff --git a/apps/webapp/test/timelineSpanEvents.test.ts b/apps/webapp/test/timelineSpanEvents.test.ts new file mode 100644 index 0000000000..cf02db5c0a --- /dev/null +++ b/apps/webapp/test/timelineSpanEvents.test.ts @@ -0,0 +1,232 @@ +import { describe, test, expect } from "vitest"; +import { SpanEvent } from "@trigger.dev/core/v3"; +import { createTimelineSpanEventsFromSpanEvents } from "../app/utils/timelineSpanEvents"; +import { millisecondsToNanoseconds } from "@trigger.dev/core/v3/utils/durations"; + +describe("createTimelineSpanEventsFromSpanEvents", () => { + const sampleSpanEvents: SpanEvent[] = [ + { + name: "trigger.dev/start", + time: new Date("2025-04-04T08:39:27.046Z"), + properties: { event: "fork", duration: 127 }, + }, + { + name: "trigger.dev/start", + time: new Date("2025-04-04T08:39:26.985Z"), + properties: { event: "create_attempt", duration: 56 }, + }, + { + name: "trigger.dev/start", + time: new Date("2025-04-04T08:39:26.980Z"), + properties: { event: "dequeue", duration: 0 }, + }, + { + name: "trigger.dev/start", + time: new Date("2025-04-04T08:39:27.224Z"), + properties: { + file: "src/trigger/chat.ts", + event: "import", + duration: 67, + entryPoint: + "/Users/eric/code/triggerdotdev/trigger.dev/references/d3-chat/.trigger/tmp/build-AL7zTl/references/d3-chat/src/trigger/chat.mjs", + }, + }, + ]; + + // Sample events without fork event + const eventsWithoutFork: SpanEvent[] = [ + { + name: "trigger.dev/start", + time: new Date("2025-04-04T08:39:26.980Z"), + properties: { event: "dequeue", duration: 0 }, + }, + { + name: "trigger.dev/start", + time: new Date("2025-04-04T08:39:26.985Z"), + properties: { event: "create_attempt", duration: 56 }, + }, + { + name: "trigger.dev/start", + time: new Date("2025-04-04T08:39:27.224Z"), + properties: { + file: "src/trigger/chat.ts", + event: "import", + duration: 67, + entryPoint: + "/Users/eric/code/triggerdotdev/trigger.dev/references/d3-chat/.trigger/tmp/build-AL7zTl/references/d3-chat/src/trigger/chat.mjs", + }, + }, + ]; + + test("should filter non-admin events when isAdmin is false", () => { + const result = createTimelineSpanEventsFromSpanEvents(sampleSpanEvents, false); + + // Only dequeue and fork events should be visible for non-admins + expect(result.length).toBe(2); + expect(result.some((event) => event.name === "Dequeued")).toBe(true); + expect(result.some((event) => event.name === "Launched")).toBe(true); + expect(result.some((event) => event.name === "Attempt created")).toBe(false); + expect(result.some((event) => event.name.includes("Importing"))).toBe(false); + }); + + test("should include all events when isAdmin is true", () => { + const result = createTimelineSpanEventsFromSpanEvents(sampleSpanEvents, true); + + expect(result.length).toBe(4); + expect(result.some((event) => event.name === "Dequeued")).toBe(true); + expect(result.some((event) => event.name === "Launched")).toBe(true); + expect(result.some((event) => event.name === "Attempt created")).toBe(true); + expect(result.some((event) => event.name === "Importing src/trigger/chat.ts")).toBe(true); + }); + + test("should sort events by timestamp", () => { + const result = createTimelineSpanEventsFromSpanEvents(sampleSpanEvents, true); + + // Events should be sorted by time (offset) + expect(result[0].name).toBe("Dequeued"); + expect(result[1].name).toBe("Attempt created"); + expect(result[2].name).toBe("Launched"); + expect(result[3].name).toBe("Importing src/trigger/chat.ts"); + }); + + test("should calculate offsets correctly from the first event", () => { + const result = createTimelineSpanEventsFromSpanEvents(sampleSpanEvents, true); + + // First event (dequeue) should have offset 0 + const firstEventTime = new Date("2025-04-04T08:39:26.980Z").getTime(); + + expect(result[0].offset).toBe(0); + expect(result[1].offset).toBe( + millisecondsToNanoseconds(new Date("2025-04-04T08:39:26.985Z").getTime() - firstEventTime) + ); + expect(result[2].offset).toBe( + millisecondsToNanoseconds(new Date("2025-04-04T08:39:27.046Z").getTime() - firstEventTime) + ); + expect(result[3].offset).toBe( + millisecondsToNanoseconds(new Date("2025-04-04T08:39:27.224Z").getTime() - firstEventTime) + ); + }); + + test("should use the provided relativeStartTime when specified", () => { + const customStartTime = new Date("2025-04-04T08:39:26.900Z").getTime(); + const result = createTimelineSpanEventsFromSpanEvents(sampleSpanEvents, true, customStartTime); + + // Offsets should be calculated relative to customStartTime + expect(result[0].offset).toBe( + millisecondsToNanoseconds(new Date("2025-04-04T08:39:26.980Z").getTime() - customStartTime) + ); + expect(result[1].offset).toBe( + millisecondsToNanoseconds(new Date("2025-04-04T08:39:26.985Z").getTime() - customStartTime) + ); + expect(result[2].offset).toBe( + millisecondsToNanoseconds(new Date("2025-04-04T08:39:27.046Z").getTime() - customStartTime) + ); + expect(result[3].offset).toBe( + millisecondsToNanoseconds(new Date("2025-04-04T08:39:27.224Z").getTime() - customStartTime) + ); + }); + + test("should handle empty span events array", () => { + const result = createTimelineSpanEventsFromSpanEvents([], true); + expect(result).toEqual([]); + }); + + test("should handle undefined span events", () => { + const result = createTimelineSpanEventsFromSpanEvents( + undefined as unknown as SpanEvent[], + true + ); + expect(result).toEqual([]); + }); + + test("should handle non-matching span events", () => { + const nonMatchingEvents: SpanEvent[] = [ + { + name: "non-trigger.dev/event", + time: new Date("2025-04-04T08:39:27.046Z"), + properties: { event: "something", duration: 127 }, + }, + ]; + + const result = createTimelineSpanEventsFromSpanEvents(nonMatchingEvents, true); + expect(result).toEqual([]); + }); + + test("should set marker variant correctly", () => { + const result = createTimelineSpanEventsFromSpanEvents(sampleSpanEvents, true); + + // First event should have start-cap marker + expect(result[0].markerVariant).toBe("start-cap"); + + // Other events should have dot-hollow marker + for (let i = 1; i < result.length; i++) { + expect(result[i].markerVariant).toBe("dot-hollow"); + } + }); + + test("should include helpText for known events", () => { + const result = createTimelineSpanEventsFromSpanEvents(sampleSpanEvents, true); + + expect(result.find((e) => e.name === "Dequeued")?.helpText).toBe( + "The run was dequeued from the queue" + ); + expect(result.find((e) => e.name === "Launched")?.helpText).toBe( + "The process was created to run the task" + ); + expect(result.find((e) => e.name === "Attempt created")?.helpText).toBe( + "An attempt was created for the run" + ); + expect(result.find((e) => e.name === "Importing src/trigger/chat.ts")?.helpText).toBe( + "A task file was imported" + ); + }); + + test("should preserve duration from span events", () => { + const result = createTimelineSpanEventsFromSpanEvents(sampleSpanEvents, true); + + expect(result.find((e) => e.name === "Dequeued")?.duration).toBe(0); + expect(result.find((e) => e.name === "Launched")?.duration).toBe(127); + expect(result.find((e) => e.name === "Attempt created")?.duration).toBe(56); + expect(result.find((e) => e.name === "Importing src/trigger/chat.ts")?.duration).toBe(67); + }); + + test("should use fallback name for import event without file property", () => { + const eventsWithoutFile: SpanEvent[] = [ + { + name: "trigger.dev/start", + time: new Date("2025-04-04T08:39:27.224Z"), + properties: { + event: "import", + duration: 67, + }, + }, + ]; + + const result = createTimelineSpanEventsFromSpanEvents(eventsWithoutFile, true); + + expect(result.length).toBe(1); + expect(result[0].name).toBe("Importing task file"); + }); + + test("should include import events for non-admin when no fork event exists", () => { + const result = createTimelineSpanEventsFromSpanEvents(eventsWithoutFork, false); + + // Without fork event, import should also be visible for non-admins + expect(result.length).toBe(2); + expect(result.some((event) => event.name === "Dequeued")).toBe(true); + expect(result.some((event) => event.name === "Importing src/trigger/chat.ts")).toBe(true); + + // create_attempt should still be admin-only + expect(result.some((event) => event.name === "Attempt created")).toBe(false); + }); + + test("should filter import events for non-admin when fork event exists", () => { + const result = createTimelineSpanEventsFromSpanEvents(sampleSpanEvents, false); + + // With fork event, import should be hidden for non-admins + expect(result.length).toBe(2); + expect(result.some((event) => event.name === "Dequeued")).toBe(true); + expect(result.some((event) => event.name === "Launched")).toBe(true); + expect(result.some((event) => event.name.includes("Importing"))).toBe(false); + }); +}); diff --git a/packages/cli-v3/src/entryPoints/dev-run-controller.ts b/packages/cli-v3/src/entryPoints/dev-run-controller.ts index 8807915d53..847152d48c 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-controller.ts @@ -622,6 +622,15 @@ export class DevRunController { version: this.opts.worker.serverWorker?.version, engine: "V2", }, + machine: execution.machine, + }).initialize(); + + logger.debug("executing task run process", { + attemptId: execution.attempt.id, + runId: execution.run.id, + }); + + const completion = await this.taskRunProcess.execute({ payload: { execution, traceContext: execution.run.traceContext ?? {}, @@ -630,15 +639,6 @@ export class DevRunController { messageId: run.friendlyId, }); - await this.taskRunProcess.initialize(); - - logger.debug("executing task run process", { - attemptId: execution.attempt.id, - runId: execution.run.id, - }); - - const completion = await this.taskRunProcess.execute(); - logger.debug("Completed run", completion); try { diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index add6077614..76c892720c 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -36,6 +36,7 @@ import { logLevels, ManagedRuntimeManager, OtelTaskLogger, + populateEnv, StandardLifecycleHooksManager, StandardLocalsManager, StandardMetadataManager, @@ -103,7 +104,6 @@ localsAPI.setGlobalLocalsManager(standardLocalsManager); const standardRunTimelineMetricsManager = new StandardRunTimelineMetricsManager(); runTimelineMetrics.setGlobalManager(standardRunTimelineMetricsManager); -standardRunTimelineMetricsManager.seedMetricsFromEnvironment(); const standardLifecycleHooksManager = new StandardLifecycleHooksManager(); lifecycleHooks.setGlobalLifecycleHooksManager(standardLifecycleHooksManager); @@ -238,7 +238,13 @@ const zodIpc = new ZodIpcConnection({ emitSchema: ExecutorToWorkerMessageCatalog, process, handlers: { - EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata, metrics }, sender) => { + EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata, metrics, env }, sender) => { + if (env) { + populateEnv(env, { + override: true, + }); + } + log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution); standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics); @@ -302,6 +308,7 @@ const zodIpc = new ZodIpcConnection({ "import", { entryPoint: taskManifest.entryPoint, + file: taskManifest.filePath, }, async () => { const beforeImport = performance.now(); diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index 59d9923318..f12e807dd3 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -836,6 +836,18 @@ class ManagedRunController { this.exitProcess(this.successExitCode); } + if (this.taskRunProcess) { + logger.debug("waitForNextRun: eagerly recreating task run process with options"); + this.taskRunProcess = new TaskRunProcess({ + ...this.taskRunProcess.options, + isWarmStart: true, + }).initialize(); + } else { + logger.debug( + "waitForNextRun: no existing task run process, so we can't eagerly recreate it" + ); + } + // Check the service is up and get additional warm start config const connect = await this.warmStartClient.connect(); @@ -904,6 +916,9 @@ class ManagedRunController { private exitProcess(code?: number): never { logger.log("Exiting process", { code }); + if (this.taskRunProcess?.isPreparedForNextRun) { + this.taskRunProcess.forceExit(); + } process.exit(code); } @@ -980,30 +995,33 @@ class ManagedRunController { }: WorkloadRunAttemptStartResponseBody) { this.snapshotPoller.start(); - this.taskRunProcess = new TaskRunProcess({ - workerManifest: this.workerManifest, - env: envVars, - serverWorker: { - id: "unmanaged", - contentHash: env.TRIGGER_CONTENT_HASH, - version: env.TRIGGER_DEPLOYMENT_VERSION, - engine: "V2", - }, - payload: { - execution, - traceContext: execution.run.traceContext ?? {}, - }, - messageId: run.friendlyId, - }); - - await this.taskRunProcess.initialize(); + if (!this.taskRunProcess || !this.taskRunProcess.isPreparedForNextRun) { + this.taskRunProcess = new TaskRunProcess({ + workerManifest: this.workerManifest, + env: envVars, + serverWorker: { + id: "unmanaged", + contentHash: env.TRIGGER_CONTENT_HASH, + version: env.TRIGGER_DEPLOYMENT_VERSION, + engine: "V2", + }, + machine: execution.machine, + }).initialize(); + } logger.log("executing task run process", { attemptId: execution.attempt.id, runId: execution.run.id, }); - const completion = await this.taskRunProcess.execute(); + const completion = await this.taskRunProcess.execute({ + payload: { + execution, + traceContext: execution.run.traceContext ?? {}, + }, + messageId: run.friendlyId, + env: envVars, + }); logger.log("Completed run", completion); diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index 6c2ef1a674..56f81c2aac 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -36,6 +36,7 @@ import { logLevels, ManagedRuntimeManager, OtelTaskLogger, + populateEnv, ProdUsageManager, StandardLifecycleHooksManager, StandardLocalsManager, @@ -110,7 +111,6 @@ lifecycleHooks.setGlobalLifecycleHooksManager(standardLifecycleHooksManager); const standardRunTimelineMetricsManager = new StandardRunTimelineMetricsManager(); runTimelineMetrics.setGlobalManager(standardRunTimelineMetricsManager); -standardRunTimelineMetricsManager.seedMetricsFromEnvironment(); const devUsageManager = new DevUsageManager(); const prodUsageManager = new ProdUsageManager(devUsageManager, { @@ -248,7 +248,16 @@ const zodIpc = new ZodIpcConnection({ emitSchema: ExecutorToWorkerMessageCatalog, process, handlers: { - EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata, metrics }, sender) => { + EXECUTE_TASK_RUN: async ( + { execution, traceContext, metadata, metrics, env, isWarmStart }, + sender + ) => { + if (env) { + populateEnv(env, { + override: true, + }); + } + standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics); console.log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution); @@ -312,6 +321,7 @@ const zodIpc = new ZodIpcConnection({ "import", { entryPoint: taskManifest.entryPoint, + file: taskManifest.filePath, }, async () => { const beforeImport = performance.now(); @@ -383,6 +393,7 @@ const zodIpc = new ZodIpcConnection({ tracingSDK, consoleInterceptor, retries: config.retries, + isWarmStart, }); try { @@ -461,12 +472,35 @@ const zodIpc = new ZodIpcConnection({ async function flushAll(timeoutInMs: number = 10_000) { const now = performance.now(); - await Promise.all([ + const results = await Promise.allSettled([ flushUsage(timeoutInMs), flushTracingSDK(timeoutInMs), flushMetadata(timeoutInMs), ]); + const successfulFlushes = results + .filter((result) => result.status === "fulfilled") + .map((result) => result.value.flushed); + const failedFlushes = ["usage", "tracingSDK", "runMetadata"].filter( + (flushed) => !successfulFlushes.includes(flushed) + ); + + if (failedFlushes.length > 0) { + console.error(`Failed to flush ${failedFlushes.join(", ")}`); + } + + const errorMessages = results + .filter((result) => result.status === "rejected") + .map((result) => result.reason); + + if (errorMessages.length > 0) { + console.error(errorMessages.join("\n")); + } + + for (const flushed of successfulFlushes) { + console.log(`Flushed ${flushed} successfully`); + } + const duration = performance.now() - now; console.log(`Flushed all in ${duration}ms`); @@ -480,6 +514,11 @@ async function flushUsage(timeoutInMs: number = 10_000) { const duration = performance.now() - now; console.log(`Flushed usage in ${duration}ms`); + + return { + flushed: "usage", + durationMs: duration, + }; } async function flushTracingSDK(timeoutInMs: number = 10_000) { @@ -490,6 +529,11 @@ async function flushTracingSDK(timeoutInMs: number = 10_000) { const duration = performance.now() - now; console.log(`Flushed tracingSDK in ${duration}ms`); + + return { + flushed: "tracingSDK", + durationMs: duration, + }; } async function flushMetadata(timeoutInMs: number = 10_000) { @@ -500,6 +544,11 @@ async function flushMetadata(timeoutInMs: number = 10_000) { const duration = performance.now() - now; console.log(`Flushed runMetadata in ${duration}ms`); + + return { + flushed: "runMetadata", + durationMs: duration, + }; } const managedWorkerRuntime = new ManagedRuntimeManager(zodIpc, true); diff --git a/packages/cli-v3/src/executions/taskRunProcess.ts b/packages/cli-v3/src/executions/taskRunProcess.ts index fdffc8dda7..df46da8ff5 100644 --- a/packages/cli-v3/src/executions/taskRunProcess.ts +++ b/packages/cli-v3/src/executions/taskRunProcess.ts @@ -1,6 +1,7 @@ import { CompletedWaitpoint, ExecutorToWorkerMessageCatalog, + MachinePreset, ServerBackgroundWorker, TaskRunErrorCodes, TaskRunExecution, @@ -48,10 +49,15 @@ export type TaskRunProcessOptions = { workerManifest: WorkerManifest; serverWorker: ServerBackgroundWorker; env: Record; + machine: MachinePreset; + isWarmStart?: boolean; + cwd?: string; +}; + +export type TaskRunProcessExecuteParams = { payload: TaskRunExecutionPayload; messageId: string; - - cwd?: string; + env?: Record; }; export class TaskRunProcess { @@ -79,9 +85,18 @@ export class TaskRunProcess { public onWaitForBatch: Evt = new Evt(); public onWait: Evt = new Evt(); - constructor(public readonly options: TaskRunProcessOptions) {} + private _isPreparedForNextRun: boolean = false; + + constructor(public readonly options: TaskRunProcessOptions) { + this._isPreparedForNextRun = true; + } + + get isPreparedForNextRun() { + return this._isPreparedForNextRun; + } async cancel() { + this._isPreparedForNextRun = false; this._isBeingCancelled = true; try { @@ -94,6 +109,8 @@ export class TaskRunProcess { } async cleanup(kill = true) { + this._isPreparedForNextRun = false; + try { await this.#flush(); } catch (err) { @@ -105,34 +122,22 @@ export class TaskRunProcess { } } - get runId() { - return this.options.payload.execution.run.id; - } + initialize() { + const { env: $env, workerManifest, cwd, machine } = this.options; - get isTest() { - return this.options.payload.execution.run.isTest; - } - - get payload(): TaskRunExecutionPayload { - return this.options.payload; - } - - async initialize() { - const { env: $env, workerManifest, cwd, messageId, payload } = this.options; - - const maxOldSpaceSize = nodeOptionsWithMaxOldSpaceSize(undefined, payload.execution.machine); + const maxOldSpaceSize = nodeOptionsWithMaxOldSpaceSize(undefined, machine); const fullEnv = { - ...(this.isTest ? { TRIGGER_LOG_LEVEL: "debug" } : {}), ...$env, OTEL_IMPORT_HOOK_INCLUDES: workerManifest.otelImportHook?.include?.join(","), // TODO: this will probably need to use something different for bun (maybe --preload?) NODE_OPTIONS: execOptionsForRuntime(workerManifest.runtime, workerManifest, maxOldSpaceSize), PATH: process.env.PATH, TRIGGER_PROCESS_FORK_START_TIME: String(Date.now()), + TRIGGER_WARM_START: this.options.isWarmStart ? "true" : "false", }; - logger.debug(`[${this.runId}] initializing task run process`, { + logger.debug(`initializing task run process`, { env: fullEnv, path: workerManifest.workerEntryPoint, cwd, @@ -175,13 +180,13 @@ export class TaskRunProcess { resolver(result); }, - READY_TO_DISPOSE: async (message) => { - logger.debug(`[${this.runId}] task run process is ready to dispose`); + READY_TO_DISPOSE: async () => { + logger.debug(`task run process is ready to dispose`); this.onReadyToDispose.post(this); }, TASK_HEARTBEAT: async (message) => { - this.onTaskRunHeartbeat.post(messageId); + this.onTaskRunHeartbeat.post(message.id); }, WAIT_FOR_TASK: async (message) => { this.onWaitForTask.post(message); @@ -190,7 +195,7 @@ export class TaskRunProcess { this.onWaitForBatch.post(message); }, UNCAUGHT_EXCEPTION: async (message) => { - logger.debug(`[${this.runId}] uncaught exception in task run process`, { ...message }); + logger.debug("uncaught exception in task run process", { ...message }); }, }, }); @@ -198,6 +203,8 @@ export class TaskRunProcess { this._child.on("exit", this.#handleExit.bind(this)); this._child.stdout?.on("data", this.#handleLog.bind(this)); this._child.stderr?.on("data", this.#handleStdErr.bind(this)); + + return this; } async #flush(timeoutInMs: number = 5_000) { @@ -206,7 +213,9 @@ export class TaskRunProcess { await this._ipc?.sendWithAck("FLUSH", { timeoutInMs }, timeoutInMs + 1_000); } - async execute(): Promise { + async execute(params: TaskRunProcessExecuteParams): Promise { + this._isPreparedForNextRun = false; + let resolver: (value: TaskRunExecutionResult) => void; let rejecter: (err?: any) => void; @@ -215,19 +224,19 @@ export class TaskRunProcess { rejecter = reject; }); - this._attemptStatuses.set(this.payload.execution.attempt.id, "PENDING"); + this._attemptStatuses.set(params.payload.execution.attempt.id, "PENDING"); // @ts-expect-error - We know that the resolver and rejecter are defined - this._attemptPromises.set(this.payload.execution.attempt.id, { resolver, rejecter }); + this._attemptPromises.set(params.payload.execution.attempt.id, { resolver, rejecter }); - const { execution, traceContext, metrics } = this.payload; + const { execution, traceContext, metrics } = params.payload; this._currentExecution = execution; if (this._child?.connected && !this._isBeingKilled && !this._child.killed) { logger.debug( `[${new Date().toISOString()}][${ - this.runId + params.payload.execution.run.id }] sending EXECUTE_TASK_RUN message to task run process`, { pid: this.pid, @@ -239,6 +248,8 @@ export class TaskRunProcess { traceContext, metadata: this.options.serverWorker, metrics, + env: params.env, + isWarmStart: this.options.isWarmStart, }); } @@ -398,7 +409,7 @@ export class TaskRunProcess { } async kill(signal?: number | NodeJS.Signals, timeoutInMs?: number) { - logger.debug(`[${this.runId}] killing task run process`, { + logger.debug(`killing task run process`, { signal, timeoutInMs, pid: this.pid, @@ -417,6 +428,16 @@ export class TaskRunProcess { } } + forceExit() { + try { + this._isBeingKilled = true; + + this._child?.kill("SIGKILL"); + } catch (error) { + logger.debug("forceExit: failed to kill child process", { error }); + } + } + get isBeingKilled() { return this._isBeingKilled || this._child?.killed; } diff --git a/packages/core/src/v3/runTimelineMetrics/runTimelineMetricsManager.ts b/packages/core/src/v3/runTimelineMetrics/runTimelineMetricsManager.ts index cf6c210b29..d521749657 100644 --- a/packages/core/src/v3/runTimelineMetrics/runTimelineMetricsManager.ts +++ b/packages/core/src/v3/runTimelineMetrics/runTimelineMetricsManager.ts @@ -14,6 +14,8 @@ export class StandardRunTimelineMetricsManager implements RunTimelineMetricsMana } registerMetricsFromExecution(metrics?: TaskRunExecutionMetrics): void { + this.#seedMetricsFromEnvironment(); + if (metrics) { metrics.forEach((metric) => { this.registerMetric({ @@ -28,10 +30,12 @@ export class StandardRunTimelineMetricsManager implements RunTimelineMetricsMana } } - seedMetricsFromEnvironment() { + #seedMetricsFromEnvironment() { const forkStartTime = getEnvVar("TRIGGER_PROCESS_FORK_START_TIME"); + const warmStart = getEnvVar("TRIGGER_WARM_START"); + const isWarmStart = warmStart === "true"; - if (typeof forkStartTime === "string") { + if (typeof forkStartTime === "string" && !isWarmStart) { const forkStartTimeMs = parseInt(forkStartTime, 10); this.registerMetric({ diff --git a/packages/core/src/v3/schemas/messages.ts b/packages/core/src/v3/schemas/messages.ts index 6322831676..edbdfac3de 100644 --- a/packages/core/src/v3/schemas/messages.ts +++ b/packages/core/src/v3/schemas/messages.ts @@ -215,6 +215,8 @@ export const WorkerToExecutorMessageCatalog = { traceContext: z.record(z.unknown()), metadata: ServerBackgroundWorker, metrics: TaskRunExecutionMetrics.optional(), + env: z.record(z.string()).optional(), + isWarmStart: z.boolean().optional(), }), }, TASK_RUN_COMPLETED_NOTIFICATION: { diff --git a/packages/core/src/v3/schemas/style.ts b/packages/core/src/v3/schemas/style.ts index a2a4f0de2e..eab62c5b41 100644 --- a/packages/core/src/v3/schemas/style.ts +++ b/packages/core/src/v3/schemas/style.ts @@ -1,8 +1,10 @@ import { z } from "zod"; export const PRIMARY_VARIANT = "primary"; +export const WARM_VARIANT = "warm"; +export const COLD_VARIANT = "cold"; -const Variant = z.enum([PRIMARY_VARIANT]); +const Variant = z.enum([PRIMARY_VARIANT, WARM_VARIANT, COLD_VARIANT]); export type Variant = z.infer; const AccessoryItem = z.object({ diff --git a/packages/core/src/v3/semanticInternalAttributes.ts b/packages/core/src/v3/semanticInternalAttributes.ts index c23d1c2ef8..3ff44eb518 100644 --- a/packages/core/src/v3/semanticInternalAttributes.ts +++ b/packages/core/src/v3/semanticInternalAttributes.ts @@ -23,6 +23,7 @@ export const SemanticInternalAttributes = { MACHINE_PRESET_CPU: "ctx.machine.cpu", MACHINE_PRESET_MEMORY: "ctx.machine.memory", MACHINE_PRESET_CENTS_PER_MS: "ctx.machine.centsPerMs", + SKIP_SPAN_PARTIAL: "$span.skip_partial", SPAN_PARTIAL: "$span.partial", SPAN_ID: "$span.span_id", ENTITY_TYPE: "$entity.type", @@ -58,4 +59,5 @@ export const SemanticInternalAttributes = { SPAN_ATTEMPT: "$span.attempt", METRIC_EVENTS: "$metrics.events", EXECUTION_ENVIRONMENT: "exec_env", + WARM_START: "warm_start", }; diff --git a/packages/core/src/v3/taskContext/index.ts b/packages/core/src/v3/taskContext/index.ts index 8edd6859dc..77598ff6cf 100644 --- a/packages/core/src/v3/taskContext/index.ts +++ b/packages/core/src/v3/taskContext/index.ts @@ -31,11 +31,16 @@ export class TaskContextAPI { return this.#getTaskContext()?.worker; } + get isWarmStart(): boolean | undefined { + return this.#getTaskContext()?.isWarmStart; + } + get attributes(): Attributes { if (this.ctx) { return { ...this.contextAttributes, ...this.workerAttributes, + [SemanticInternalAttributes.WARM_START]: !!this.isWarmStart, }; } diff --git a/packages/core/src/v3/taskContext/otelProcessors.ts b/packages/core/src/v3/taskContext/otelProcessors.ts index 123de426fd..ff96b34cc8 100644 --- a/packages/core/src/v3/taskContext/otelProcessors.ts +++ b/packages/core/src/v3/taskContext/otelProcessors.ts @@ -29,15 +29,12 @@ export class TaskContextSpanProcessor implements SpanProcessor { ); } - if (!isPartialSpan(span)) { + if (!isPartialSpan(span) && !skipPartialSpan(span)) { const partialSpan = createPartialSpan(this._tracer, span, parentContext); - - this._innerProcessor.onStart(span, parentContext); - partialSpan.end(); - } else { - this._innerProcessor.onStart(span, parentContext); } + + this._innerProcessor.onStart(span, parentContext); } // Delegate the rest of the methods to the wrapped processor @@ -59,6 +56,10 @@ function isPartialSpan(span: Span) { return span.attributes[SemanticInternalAttributes.SPAN_PARTIAL] === true; } +function skipPartialSpan(span: Span) { + return span.attributes[SemanticInternalAttributes.SKIP_SPAN_PARTIAL] === true; +} + function createPartialSpan(tracer: Tracer, span: Span, parentContext: Context) { const partialSpan = tracer.startSpan( span.name, diff --git a/packages/core/src/v3/taskContext/types.ts b/packages/core/src/v3/taskContext/types.ts index 71606d9478..e60b54ad2b 100644 --- a/packages/core/src/v3/taskContext/types.ts +++ b/packages/core/src/v3/taskContext/types.ts @@ -3,4 +3,5 @@ import { ServerBackgroundWorker, TaskRunContext } from "../schemas/index.js"; export type TaskContext = { ctx: TaskRunContext; worker: ServerBackgroundWorker; + isWarmStart?: boolean; }; diff --git a/packages/core/src/v3/tracer.ts b/packages/core/src/v3/tracer.ts index d7f5d70f12..4adf8268b5 100644 --- a/packages/core/src/v3/tracer.ts +++ b/packages/core/src/v3/tracer.ts @@ -80,11 +80,20 @@ export class TriggerTracer { let spanEnded = false; + const createPartialSpanWithEvents = options?.events && options.events.length > 0; + return this.tracer.startActiveSpan( name, { ...options, - attributes, + attributes: { + ...attributes, + ...(createPartialSpanWithEvents + ? { + [SemanticInternalAttributes.SKIP_SPAN_PARTIAL]: true, + } + : {}), + }, startTime: clock.preciseNow(), }, parentContext, @@ -97,6 +106,29 @@ export class TriggerTracer { } }); + if (taskContext.ctx && createPartialSpanWithEvents) { + const partialSpan = this.tracer.startSpan( + name, + { + ...options, + attributes: { + ...attributes, + [SemanticInternalAttributes.SPAN_PARTIAL]: true, + [SemanticInternalAttributes.SPAN_ID]: span.spanContext().spanId, + }, + }, + parentContext + ); + + if (options?.events) { + for (const event of options.events) { + partialSpan.addEvent(event.name, event.attributes, event.startTime); + } + } + + partialSpan.end(); + } + if (options?.events) { for (const event of options.events) { span.addEvent(event.name, event.attributes, event.startTime); diff --git a/packages/core/src/v3/workers/index.ts b/packages/core/src/v3/workers/index.ts index 39c166823b..de89b5587b 100644 --- a/packages/core/src/v3/workers/index.ts +++ b/packages/core/src/v3/workers/index.ts @@ -27,3 +27,4 @@ export { StandardRunTimelineMetricsManager } from "../runTimelineMetrics/runTime export { WarmStartClient, type WarmStartClientOptions } from "../workers/warmStartClient.js"; export { StandardLifecycleHooksManager } from "../lifecycleHooks/manager.js"; export { StandardLocalsManager } from "../locals/manager.js"; +export { populateEnv } from "./populateEnv.js"; diff --git a/packages/core/src/v3/workers/populateEnv.ts b/packages/core/src/v3/workers/populateEnv.ts new file mode 100644 index 0000000000..68fafddf12 --- /dev/null +++ b/packages/core/src/v3/workers/populateEnv.ts @@ -0,0 +1,50 @@ +/** + * Options for environment variable population + */ +interface PopulateEnvOptions { + /** + * Whether to override existing environment variables + * @default false + */ + override?: boolean; + + /** + * Whether to enable debug logging + * @default false + */ + debug?: boolean; +} + +/** + * Populates process.env with values from the provided object + * + * @param envObject - Object containing environment variables to set + * @param options - Optional configuration + */ +export function populateEnv( + envObject: Record, + options: PopulateEnvOptions = {} +): void { + const { override = false, debug = false } = options; + + if (!envObject || typeof envObject !== "object") { + return; + } + + // Set process.env values + for (const key of Object.keys(envObject)) { + if (Object.prototype.hasOwnProperty.call(process.env, key)) { + if (override) { + process.env[key] = envObject[key]; + + if (debug) { + console.log(`"${key}" is already defined and WAS overwritten`); + } + } else if (debug) { + console.log(`"${key}" is already defined and was NOT overwritten`); + } + } else { + process.env[key] = envObject[key]; + } + } +} diff --git a/packages/core/src/v3/workers/taskExecutor.ts b/packages/core/src/v3/workers/taskExecutor.ts index 6ecef7f842..6ec68b6bb8 100644 --- a/packages/core/src/v3/workers/taskExecutor.ts +++ b/packages/core/src/v3/workers/taskExecutor.ts @@ -27,6 +27,7 @@ import { import { recordSpanException, TracingSDK } from "../otel/index.js"; import { runTimelineMetrics } from "../run-timeline-metrics-api.js"; import { + COLD_VARIANT, RetryOptions, ServerBackgroundWorker, TaskRunContext, @@ -34,6 +35,7 @@ import { TaskRunExecution, TaskRunExecutionResult, TaskRunExecutionRetry, + WARM_VARIANT, } from "../schemas/index.js"; import { SemanticInternalAttributes } from "../semanticInternalAttributes.js"; import { taskContext } from "../task-context-api.js"; @@ -57,6 +59,7 @@ export type TaskExecutorOptions = { enabledInDev?: boolean; default?: RetryOptions; }; + isWarmStart?: boolean; }; export class TaskExecutor { @@ -69,6 +72,7 @@ export class TaskExecutor { default?: RetryOptions; } | undefined; + private _isWarmStart: boolean | undefined; constructor( public task: TaskMetadataWithFunctions, @@ -78,6 +82,7 @@ export class TaskExecutor { this._tracer = options.tracer; this._consoleInterceptor = options.consoleInterceptor; this._retries = options.retries; + this._isWarmStart = options.isWarmStart; } async execute( @@ -97,6 +102,7 @@ export class TaskExecutor { taskContext.setGlobalTaskContext({ ctx, worker, + isWarmStart: this._isWarmStart, }); if (execution.run.metadata) { @@ -297,10 +303,18 @@ export class TaskExecutor { kind: SpanKind.CONSUMER, attributes: { [SemanticInternalAttributes.STYLE_ICON]: "attempt", + [SemanticInternalAttributes.ENTITY_TYPE]: "attempt", [SemanticInternalAttributes.SPAN_ATTEMPT]: true, ...(execution.attempt.number === 1 ? runTimelineMetrics.convertMetricsToSpanAttributes() : {}), + ...(execution.environment.type !== "DEVELOPMENT" + ? { + [SemanticInternalAttributes.STYLE_VARIANT]: this._isWarmStart + ? WARM_VARIANT + : COLD_VARIANT, + } + : {}), }, events: execution.attempt.number === 1 diff --git a/references/d3-chat/package.json b/references/d3-chat/package.json index 30527c797f..d8de7d79c7 100644 --- a/references/d3-chat/package.json +++ b/references/d3-chat/package.json @@ -8,6 +8,7 @@ "start": "next start", "lint": "next lint", "dev:trigger": "trigger dev", + "deploy": "trigger deploy", "tunnel": "ngrok http --url=d3-demo.ngrok.dev 3000", "python:install-requirements": "uv pip sync requirements.txt", "python:compile-requirements": "uv pip compile requirements.in -o requirements.txt", diff --git a/references/d3-chat/src/trigger/chat.ts b/references/d3-chat/src/trigger/chat.ts index 21b34fa082..184613b36f 100644 --- a/references/d3-chat/src/trigger/chat.ts +++ b/references/d3-chat/src/trigger/chat.ts @@ -1,31 +1,16 @@ -import { openai } from "@ai-sdk/openai"; import { anthropic } from "@ai-sdk/anthropic"; -import { python } from "@trigger.dev/python"; +import { openai } from "@ai-sdk/openai"; import { ai } from "@trigger.dev/sdk/ai"; -import { metadata, schemaTask, wait } from "@trigger.dev/sdk/v3"; +import { logger, metadata, schemaTask, wait } from "@trigger.dev/sdk/v3"; import { sql } from "@vercel/postgres"; import { streamText, TextStreamPart, tool } from "ai"; import { nanoid } from "nanoid"; import { z } from "zod"; import { sendSQLApprovalMessage } from "../lib/slack"; +import { crawler } from "./crawler"; import { chartTool } from "./sandbox"; import { QueryApproval } from "./schemas"; -const crawlerTask = schemaTask({ - id: "crawler", - description: "Crawl a URL and return the markdown", - schema: z.object({ - url: z.string().describe("The URL to crawl"), - }), - run: async ({ url }) => { - const results = await python.runScript("./src/trigger/python/crawler.py", [url]); - - return results.stdout; - }, -}); - -const crawler = ai.tool(crawlerTask); - const queryApprovalTask = schemaTask({ id: "query-approval", description: "Get approval for a SQL query from an admin", @@ -52,6 +37,8 @@ const queryApprovalTask = schemaTask({ // result.ok === false if the token timed out if (!result.ok) { + logger.debug("queryApproval: token timed out"); + return { approved: false, }; diff --git a/references/d3-chat/src/trigger/crawler.ts b/references/d3-chat/src/trigger/crawler.ts new file mode 100644 index 0000000000..a077353d65 --- /dev/null +++ b/references/d3-chat/src/trigger/crawler.ts @@ -0,0 +1,19 @@ +import { python } from "@trigger.dev/python"; +import { ai } from "@trigger.dev/sdk/ai"; +import { schemaTask } from "@trigger.dev/sdk"; +import { z } from "zod"; + +const crawlerTask = schemaTask({ + id: "crawler", + description: "Crawl a URL and return the markdown", + schema: z.object({ + url: z.string().describe("The URL to crawl"), + }), + run: async ({ url }) => { + const results = await python.runScript("./src/trigger/python/crawler.py", [url]); + + return results.stdout; + }, +}); + +export const crawler = ai.tool(crawlerTask);