diff --git a/packages/cli-v3/src/entryPoints/deploy-run-worker.ts b/packages/cli-v3/src/entryPoints/deploy-run-worker.ts index 7944a4b9ba..96038f095e 100644 --- a/packages/cli-v3/src/entryPoints/deploy-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/deploy-run-worker.ts @@ -33,6 +33,7 @@ import { TracingSDK, usage, UsageTimeoutManager, + StandardMetadataManager, } from "@trigger.dev/core/v3/workers"; import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc"; import { readFile } from "node:fs/promises"; @@ -99,6 +100,8 @@ timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager)); taskCatalog.setGlobalTaskCatalog(new StandardTaskCatalog()); const durableClock = new DurableClock(); clock.setGlobalClock(durableClock); +const runMetadataManager = new StandardMetadataManager(); +runMetadata.setGlobalManager(runMetadataManager); const triggerLogLevel = getEnvVar("TRIGGER_LOG_LEVEL"); @@ -305,7 +308,7 @@ const zodIpc = new ZodIpcConnection({ _execution = execution; _isRunning = true; - runMetadata.startPeriodicFlush( + runMetadataManager.startPeriodicFlush( getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000) ); @@ -437,7 +440,7 @@ async function flushTracingSDK(timeoutInMs: number = 10_000) { async function flushMetadata(timeoutInMs: number = 10_000) { const now = performance.now(); - await Promise.race([runMetadata.flush(), setTimeout(timeoutInMs)]); + await Promise.race([runMetadataManager.flush(), setTimeout(timeoutInMs)]); const duration = performance.now() - now; diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index 1298860b9f..c18dee8cb1 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -32,6 +32,7 @@ import { TracingSDK, usage, getNumberEnvVar, + StandardMetadataManager, } from "@trigger.dev/core/v3/workers"; import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc"; import { readFile } from "node:fs/promises"; @@ -81,6 +82,8 @@ usage.setGlobalUsageManager(devUsageManager); const devRuntimeManager = new DevRuntimeManager(); runtime.setGlobalRuntimeManager(devRuntimeManager); timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager)); +const runMetadataManager = new StandardMetadataManager(); +runMetadata.setGlobalManager(runMetadataManager); const triggerLogLevel = getEnvVar("TRIGGER_LOG_LEVEL"); @@ -275,7 +278,7 @@ const zodIpc = new ZodIpcConnection({ _execution = execution; _isRunning = true; - runMetadata.startPeriodicFlush( + runMetadataManager.startPeriodicFlush( getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000) ); const measurement = usage.start(); @@ -350,7 +353,7 @@ const zodIpc = new ZodIpcConnection({ } }, FLUSH: async ({ timeoutInMs }, sender) => { - await Promise.allSettled([_tracingSDK?.flush(), runMetadata.flush()]); + await Promise.allSettled([_tracingSDK?.flush(), runMetadataManager.flush()]); }, }, }); diff --git a/packages/core/src/v3/runMetadata/index.ts b/packages/core/src/v3/runMetadata/index.ts index 0ea9cd2ea6..f8590bfafc 100644 --- a/packages/core/src/v3/runMetadata/index.ts +++ b/packages/core/src/v3/runMetadata/index.ts @@ -1,17 +1,15 @@ -import { dequal } from "dequal/lite"; import { DeserializedJson } from "../../schemas/json.js"; -import { apiClientManager } from "../apiClientManager-api.js"; -import { taskContext } from "../task-context-api.js"; import { getGlobal, registerGlobal } from "../utils/globals.js"; import { ApiRequestOptions } from "../zodfetch.js"; -import { JSONHeroPath } from "@jsonhero/path"; +import { NoopRunMetadataManager } from "./noopManager.js"; +import { RunMetadataManager } from "./types.js"; const API_NAME = "run-metadata"; -export class RunMetadataAPI { +const NOOP_MANAGER = new NoopRunMetadataManager(); + +export class RunMetadataAPI implements RunMetadataManager { private static _instance?: RunMetadataAPI; - private flushTimeoutId: NodeJS.Timeout | null = null; - private hasChanges: boolean = false; private constructor() {} @@ -23,138 +21,39 @@ export class RunMetadataAPI { return this._instance; } - get store(): Record | undefined { - return getGlobal(API_NAME); + setGlobalManager(manager: RunMetadataManager): boolean { + return registerGlobal(API_NAME, manager); } - set store(value: Record | undefined) { - registerGlobal(API_NAME, value, true); + #getManager(): RunMetadataManager { + return getGlobal(API_NAME) ?? NOOP_MANAGER; } public enterWithMetadata(metadata: Record): void { - registerGlobal(API_NAME, metadata); + this.#getManager().enterWithMetadata(metadata); } public current(): Record | undefined { - return this.store; + return this.#getManager().current(); } public getKey(key: string): DeserializedJson | undefined { - return this.store?.[key]; + return this.#getManager().getKey(key); } public setKey(key: string, value: DeserializedJson) { - const runId = taskContext.ctx?.run.id; - - if (!runId) { - return; - } - - let nextStore: Record | undefined = this.store - ? structuredClone(this.store) - : undefined; - - if (key.startsWith("$.")) { - const path = new JSONHeroPath(key); - path.set(nextStore, value); - } else { - nextStore = { - ...(nextStore ?? {}), - [key]: value, - }; - } - - if (!nextStore) { - return; - } - - if (!dequal(this.store, nextStore)) { - this.hasChanges = true; - } - - this.store = nextStore; + return this.#getManager().setKey(key, value); } public deleteKey(key: string) { - const runId = taskContext.ctx?.run.id; - - if (!runId) { - return; - } - - const nextStore = { ...(this.store ?? {}) }; - delete nextStore[key]; - - if (!dequal(this.store, nextStore)) { - this.hasChanges = true; - } - - this.store = nextStore; + return this.#getManager().deleteKey(key); } public update(metadata: Record): void { - const runId = taskContext.ctx?.run.id; - - if (!runId) { - return; - } - - if (!dequal(this.store, metadata)) { - this.hasChanges = true; - } - - this.store = metadata; + return this.#getManager().update(metadata); } - public async flush(requestOptions?: ApiRequestOptions): Promise { - const runId = taskContext.ctx?.run.id; - - if (!runId) { - return; - } - - if (!this.store) { - return; - } - - if (!this.hasChanges) { - return; - } - - const apiClient = apiClientManager.clientOrThrow(); - - try { - this.hasChanges = false; - await apiClient.updateRunMetadata(runId, { metadata: this.store }, requestOptions); - } catch (error) { - this.hasChanges = true; - throw error; - } - } - - public startPeriodicFlush(intervalMs: number = 1000) { - const periodicFlush = async (intervalMs: number) => { - try { - await this.flush(); - } catch (error) { - console.error("Failed to flush metadata", error); - throw error; - } finally { - scheduleNext(); - } - }; - - const scheduleNext = () => { - this.flushTimeoutId = setTimeout(() => periodicFlush(intervalMs), intervalMs); - }; - - scheduleNext(); - } - - stopPeriodicFlush(): void { - if (this.flushTimeoutId) { - clearTimeout(this.flushTimeoutId); - this.flushTimeoutId = null; - } + flush(requestOptions?: ApiRequestOptions): Promise { + return this.#getManager().flush(requestOptions); } } diff --git a/packages/core/src/v3/runMetadata/manager.ts b/packages/core/src/v3/runMetadata/manager.ts new file mode 100644 index 0000000000..ed5b59ad7f --- /dev/null +++ b/packages/core/src/v3/runMetadata/manager.ts @@ -0,0 +1,140 @@ +import { JSONHeroPath } from "@jsonhero/path"; +import { dequal } from "dequal/lite"; +import { DeserializedJson } from "../../schemas/json.js"; +import { apiClientManager } from "../apiClientManager-api.js"; +import { taskContext } from "../task-context-api.js"; +import { ApiRequestOptions } from "../zodfetch.js"; +import { RunMetadataManager } from "./types.js"; + +export class StandardMetadataManager implements RunMetadataManager { + private flushTimeoutId: NodeJS.Timeout | null = null; + private hasChanges: boolean = false; + private store: Record | undefined; + + public enterWithMetadata(metadata: Record): void { + this.store = metadata ?? {}; + } + + public current(): Record | undefined { + return this.store; + } + + public getKey(key: string): DeserializedJson | undefined { + return this.store?.[key]; + } + + public setKey(key: string, value: DeserializedJson) { + const runId = taskContext.ctx?.run.id; + + if (!runId) { + return; + } + + let nextStore: Record | undefined = this.store + ? structuredClone(this.store) + : undefined; + + if (key.startsWith("$.")) { + const path = new JSONHeroPath(key); + path.set(nextStore, value); + } else { + nextStore = { + ...(nextStore ?? {}), + [key]: value, + }; + } + + if (!nextStore) { + return; + } + + if (!dequal(this.store, nextStore)) { + this.hasChanges = true; + } + + this.store = nextStore; + } + + public deleteKey(key: string) { + const runId = taskContext.ctx?.run.id; + + if (!runId) { + return; + } + + const nextStore = { ...(this.store ?? {}) }; + delete nextStore[key]; + + if (!dequal(this.store, nextStore)) { + this.hasChanges = true; + } + + this.store = nextStore; + } + + public update(metadata: Record): void { + const runId = taskContext.ctx?.run.id; + + if (!runId) { + return; + } + + if (!dequal(this.store, metadata)) { + this.hasChanges = true; + } + + this.store = metadata; + } + + public async flush(requestOptions?: ApiRequestOptions): Promise { + const runId = taskContext.ctx?.run.id; + + if (!runId) { + return; + } + + if (!this.store) { + return; + } + + if (!this.hasChanges) { + return; + } + + const apiClient = apiClientManager.clientOrThrow(); + + try { + this.hasChanges = false; + await apiClient.updateRunMetadata(runId, { metadata: this.store }, requestOptions); + } catch (error) { + this.hasChanges = true; + throw error; + } + } + + public startPeriodicFlush(intervalMs: number = 1000) { + const periodicFlush = async (intervalMs: number) => { + try { + await this.flush(); + } catch (error) { + console.error("Failed to flush metadata", error); + throw error; + } finally { + scheduleNext(); + } + }; + + const scheduleNext = () => { + this.flushTimeoutId = setTimeout(() => periodicFlush(intervalMs), intervalMs); + }; + + scheduleNext(); + } + + stopPeriodicFlush(): void { + if (this.flushTimeoutId) { + clearTimeout(this.flushTimeoutId); + this.flushTimeoutId = null; + } + } +} diff --git a/packages/core/src/v3/runMetadata/noopManager.ts b/packages/core/src/v3/runMetadata/noopManager.ts new file mode 100644 index 0000000000..d054844ab1 --- /dev/null +++ b/packages/core/src/v3/runMetadata/noopManager.ts @@ -0,0 +1,25 @@ +import { DeserializedJson } from "../../schemas/json.js"; +import { ApiRequestOptions } from "../zodfetch.js"; +import type { RunMetadataManager } from "./types.js"; + +export class NoopRunMetadataManager implements RunMetadataManager { + flush(requestOptions?: ApiRequestOptions): Promise { + throw new Error("Method not implemented."); + } + enterWithMetadata(metadata: Record): void {} + current(): Record | undefined { + throw new Error("Method not implemented."); + } + getKey(key: string): DeserializedJson | undefined { + throw new Error("Method not implemented."); + } + setKey(key: string, value: DeserializedJson): void { + throw new Error("Method not implemented."); + } + deleteKey(key: string): void { + throw new Error("Method not implemented."); + } + update(metadata: Record): void { + throw new Error("Method not implemented."); + } +} diff --git a/packages/core/src/v3/runMetadata/types.ts b/packages/core/src/v3/runMetadata/types.ts new file mode 100644 index 0000000000..c827b4a15a --- /dev/null +++ b/packages/core/src/v3/runMetadata/types.ts @@ -0,0 +1,13 @@ +import { DeserializedJson } from "../../schemas/json.js"; +import { ApiRequestOptions } from "../zodfetch.js"; + +export interface RunMetadataManager { + // Instance Methods + enterWithMetadata(metadata: Record): void; + current(): Record | undefined; + getKey(key: string): DeserializedJson | undefined; + setKey(key: string, value: DeserializedJson): void; + deleteKey(key: string): void; + update(metadata: Record): void; + flush(requestOptions?: ApiRequestOptions): Promise; +} diff --git a/packages/core/src/v3/utils/globals.ts b/packages/core/src/v3/utils/globals.ts index 24938a2ff8..d24ffd9554 100644 --- a/packages/core/src/v3/utils/globals.ts +++ b/packages/core/src/v3/utils/globals.ts @@ -1,6 +1,7 @@ import { DeserializedJson } from "../../schemas/json.js"; import { ApiClientConfiguration } from "../apiClientManager/types.js"; import { Clock } from "../clock/clock.js"; +import { RunMetadataManager } from "../runMetadata/types.js"; import type { RuntimeManager } from "../runtime/manager.js"; import { TaskCatalog } from "../task-catalog/catalog.js"; import { TaskContext } from "../taskContext/types.js"; @@ -56,6 +57,6 @@ type TriggerDotDevGlobalAPI = { ["task-catalog"]?: TaskCatalog; ["task-context"]?: TaskContext; ["api-client"]?: ApiClientConfiguration; - ["run-metadata"]?: Record; + ["run-metadata"]?: RunMetadataManager; ["timeout"]?: TimeoutManager; }; diff --git a/packages/core/src/v3/workers/index.ts b/packages/core/src/v3/workers/index.ts index 61a62aa55f..0323c223da 100644 --- a/packages/core/src/v3/workers/index.ts +++ b/packages/core/src/v3/workers/index.ts @@ -14,3 +14,4 @@ export * from "../usage-api.js"; export { DevUsageManager } from "../usage/devUsageManager.js"; export { ProdUsageManager, type ProdUsageManagerOptions } from "../usage/prodUsageManager.js"; export { UsageTimeoutManager } from "../timeout/usageTimeoutManager.js"; +export { StandardMetadataManager } from "../runMetadata/manager.js"; diff --git a/packages/react-hooks/src/hooks/useApiClient.ts b/packages/react-hooks/src/hooks/useApiClient.ts index d37c4293e2..4ecf638b14 100644 --- a/packages/react-hooks/src/hooks/useApiClient.ts +++ b/packages/react-hooks/src/hooks/useApiClient.ts @@ -6,9 +6,11 @@ import { useTriggerAuthContext } from "../contexts.js"; export function useApiClient() { const auth = useTriggerAuthContext(); - if (!auth.baseURL || !auth.accessToken) { - throw new Error("Missing baseURL or accessToken in TriggerAuthContext"); + const baseUrl = auth.baseURL ?? "https://api.trigger.dev"; + + if (!auth.accessToken) { + throw new Error("Missing accessToken in TriggerAuthContext"); } - return new ApiClient(auth.baseURL, auth.accessToken, auth.requestOptions); + return new ApiClient(baseUrl, auth.accessToken, auth.requestOptions); } diff --git a/references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx b/references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx index 5f2ada7c31..7fc7063205 100644 --- a/references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx +++ b/references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx @@ -41,7 +41,9 @@ function RunDetailsWrapper({ runId }: { runId: string }) { export default function ClientRunDetails({ runId, jwt }: { runId: string; jwt: string }) { return ( - + ); diff --git a/references/nextjs-realtime/src/app/uploads/[id]/ClientUploadDetails.tsx b/references/nextjs-realtime/src/app/uploads/[id]/ClientUploadDetails.tsx index 71a249d62a..7c9dd8a3bd 100644 --- a/references/nextjs-realtime/src/app/uploads/[id]/ClientUploadDetails.tsx +++ b/references/nextjs-realtime/src/app/uploads/[id]/ClientUploadDetails.tsx @@ -54,7 +54,7 @@ function UploadDetailsWrapper({ fileId }: { fileId: string }) { ); @@ -69,7 +69,7 @@ export default function ClientUploadDetails({ }) { return (