Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions packages/cli-v3/src/entryPoints/deploy-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
TracingSDK,
usage,
UsageTimeoutManager,
StandardMetadataManager,
} from "@trigger.dev/core/v3/workers";
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
import { readFile } from "node:fs/promises";
Expand Down Expand Up @@ -99,6 +100,8 @@ timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager));
taskCatalog.setGlobalTaskCatalog(new StandardTaskCatalog());
const durableClock = new DurableClock();
clock.setGlobalClock(durableClock);
const runMetadataManager = new StandardMetadataManager();
runMetadata.setGlobalManager(runMetadataManager);

const triggerLogLevel = getEnvVar("TRIGGER_LOG_LEVEL");

Expand Down Expand Up @@ -305,7 +308,7 @@ const zodIpc = new ZodIpcConnection({
_execution = execution;
_isRunning = true;

runMetadata.startPeriodicFlush(
runMetadataManager.startPeriodicFlush(
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
);

Expand Down Expand Up @@ -437,7 +440,7 @@ async function flushTracingSDK(timeoutInMs: number = 10_000) {
async function flushMetadata(timeoutInMs: number = 10_000) {
const now = performance.now();

await Promise.race([runMetadata.flush(), setTimeout(timeoutInMs)]);
await Promise.race([runMetadataManager.flush(), setTimeout(timeoutInMs)]);

const duration = performance.now() - now;

Expand Down
7 changes: 5 additions & 2 deletions packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
TracingSDK,
usage,
getNumberEnvVar,
StandardMetadataManager,
} from "@trigger.dev/core/v3/workers";
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
import { readFile } from "node:fs/promises";
Expand Down Expand Up @@ -81,6 +82,8 @@ usage.setGlobalUsageManager(devUsageManager);
const devRuntimeManager = new DevRuntimeManager();
runtime.setGlobalRuntimeManager(devRuntimeManager);
timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager));
const runMetadataManager = new StandardMetadataManager();
runMetadata.setGlobalManager(runMetadataManager);

const triggerLogLevel = getEnvVar("TRIGGER_LOG_LEVEL");

Expand Down Expand Up @@ -275,7 +278,7 @@ const zodIpc = new ZodIpcConnection({
_execution = execution;
_isRunning = true;

runMetadata.startPeriodicFlush(
runMetadataManager.startPeriodicFlush(
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
);
const measurement = usage.start();
Expand Down Expand Up @@ -350,7 +353,7 @@ const zodIpc = new ZodIpcConnection({
}
},
FLUSH: async ({ timeoutInMs }, sender) => {
await Promise.allSettled([_tracingSDK?.flush(), runMetadata.flush()]);
await Promise.allSettled([_tracingSDK?.flush(), runMetadataManager.flush()]);
},
},
});
Expand Down
135 changes: 17 additions & 118 deletions packages/core/src/v3/runMetadata/index.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import { dequal } from "dequal/lite";
import { DeserializedJson } from "../../schemas/json.js";
import { apiClientManager } from "../apiClientManager-api.js";
import { taskContext } from "../task-context-api.js";
import { getGlobal, registerGlobal } from "../utils/globals.js";
import { ApiRequestOptions } from "../zodfetch.js";
import { JSONHeroPath } from "@jsonhero/path";
import { NoopRunMetadataManager } from "./noopManager.js";
import { RunMetadataManager } from "./types.js";

const API_NAME = "run-metadata";

export class RunMetadataAPI {
const NOOP_MANAGER = new NoopRunMetadataManager();

export class RunMetadataAPI implements RunMetadataManager {
private static _instance?: RunMetadataAPI;
private flushTimeoutId: NodeJS.Timeout | null = null;
private hasChanges: boolean = false;

private constructor() {}

Expand All @@ -23,138 +21,39 @@ export class RunMetadataAPI {
return this._instance;
}

get store(): Record<string, DeserializedJson> | undefined {
return getGlobal(API_NAME);
setGlobalManager(manager: RunMetadataManager): boolean {
return registerGlobal(API_NAME, manager);
}

set store(value: Record<string, DeserializedJson> | undefined) {
registerGlobal(API_NAME, value, true);
#getManager(): RunMetadataManager {
return getGlobal(API_NAME) ?? NOOP_MANAGER;
}

public enterWithMetadata(metadata: Record<string, DeserializedJson>): void {
registerGlobal(API_NAME, metadata);
this.#getManager().enterWithMetadata(metadata);
}

public current(): Record<string, DeserializedJson> | undefined {
return this.store;
return this.#getManager().current();
}

public getKey(key: string): DeserializedJson | undefined {
return this.store?.[key];
return this.#getManager().getKey(key);
}

public setKey(key: string, value: DeserializedJson) {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

let nextStore: Record<string, DeserializedJson> | undefined = this.store
? structuredClone(this.store)
: undefined;

if (key.startsWith("$.")) {
const path = new JSONHeroPath(key);
path.set(nextStore, value);
} else {
nextStore = {
...(nextStore ?? {}),
[key]: value,
};
}

if (!nextStore) {
return;
}

if (!dequal(this.store, nextStore)) {
this.hasChanges = true;
}

this.store = nextStore;
return this.#getManager().setKey(key, value);
}

public deleteKey(key: string) {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

const nextStore = { ...(this.store ?? {}) };
delete nextStore[key];

if (!dequal(this.store, nextStore)) {
this.hasChanges = true;
}

this.store = nextStore;
return this.#getManager().deleteKey(key);
}

public update(metadata: Record<string, DeserializedJson>): void {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

if (!dequal(this.store, metadata)) {
this.hasChanges = true;
}

this.store = metadata;
return this.#getManager().update(metadata);
}

public async flush(requestOptions?: ApiRequestOptions): Promise<void> {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

if (!this.store) {
return;
}

if (!this.hasChanges) {
return;
}

const apiClient = apiClientManager.clientOrThrow();

try {
this.hasChanges = false;
await apiClient.updateRunMetadata(runId, { metadata: this.store }, requestOptions);
} catch (error) {
this.hasChanges = true;
throw error;
}
}

public startPeriodicFlush(intervalMs: number = 1000) {
const periodicFlush = async (intervalMs: number) => {
try {
await this.flush();
} catch (error) {
console.error("Failed to flush metadata", error);
throw error;
} finally {
scheduleNext();
}
};

const scheduleNext = () => {
this.flushTimeoutId = setTimeout(() => periodicFlush(intervalMs), intervalMs);
};

scheduleNext();
}

stopPeriodicFlush(): void {
if (this.flushTimeoutId) {
clearTimeout(this.flushTimeoutId);
this.flushTimeoutId = null;
}
flush(requestOptions?: ApiRequestOptions): Promise<void> {
return this.#getManager().flush(requestOptions);
}
}
140 changes: 140 additions & 0 deletions packages/core/src/v3/runMetadata/manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import { JSONHeroPath } from "@jsonhero/path";
import { dequal } from "dequal/lite";
import { DeserializedJson } from "../../schemas/json.js";
import { apiClientManager } from "../apiClientManager-api.js";
import { taskContext } from "../task-context-api.js";
import { ApiRequestOptions } from "../zodfetch.js";
import { RunMetadataManager } from "./types.js";

export class StandardMetadataManager implements RunMetadataManager {
private flushTimeoutId: NodeJS.Timeout | null = null;
private hasChanges: boolean = false;
private store: Record<string, DeserializedJson> | undefined;

public enterWithMetadata(metadata: Record<string, DeserializedJson>): void {
this.store = metadata ?? {};
}

public current(): Record<string, DeserializedJson> | undefined {
return this.store;
}

public getKey(key: string): DeserializedJson | undefined {
return this.store?.[key];
}

public setKey(key: string, value: DeserializedJson) {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

let nextStore: Record<string, DeserializedJson> | undefined = this.store
? structuredClone(this.store)
: undefined;

if (key.startsWith("$.")) {
const path = new JSONHeroPath(key);
path.set(nextStore, value);
} else {
nextStore = {
...(nextStore ?? {}),
[key]: value,
};
}

if (!nextStore) {
return;
}

if (!dequal(this.store, nextStore)) {
this.hasChanges = true;
}

this.store = nextStore;
}

public deleteKey(key: string) {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

const nextStore = { ...(this.store ?? {}) };
delete nextStore[key];

if (!dequal(this.store, nextStore)) {
this.hasChanges = true;
}

this.store = nextStore;
}

public update(metadata: Record<string, DeserializedJson>): void {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

if (!dequal(this.store, metadata)) {
this.hasChanges = true;
}

this.store = metadata;
}

public async flush(requestOptions?: ApiRequestOptions): Promise<void> {
const runId = taskContext.ctx?.run.id;

if (!runId) {
return;
}

if (!this.store) {
return;
}

if (!this.hasChanges) {
return;
}

const apiClient = apiClientManager.clientOrThrow();

try {
this.hasChanges = false;
await apiClient.updateRunMetadata(runId, { metadata: this.store }, requestOptions);
} catch (error) {
this.hasChanges = true;
throw error;
}
}

public startPeriodicFlush(intervalMs: number = 1000) {
const periodicFlush = async (intervalMs: number) => {
try {
await this.flush();
} catch (error) {
console.error("Failed to flush metadata", error);
throw error;
} finally {
scheduleNext();
}
};

const scheduleNext = () => {
this.flushTimeoutId = setTimeout(() => periodicFlush(intervalMs), intervalMs);
};

scheduleNext();
}

stopPeriodicFlush(): void {
if (this.flushTimeoutId) {
clearTimeout(this.flushTimeoutId);
this.flushTimeoutId = null;
}
}
}
Loading
Loading