Skip to content

Commit 17f3895

Browse files
committed
add metadata tests and a few more utilties
1 parent 9a6741e commit 17f3895

File tree

11 files changed

+348
-25
lines changed

11 files changed

+348
-25
lines changed

packages/cli-v3/src/entryPoints/deploy-run-worker.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
timeout,
1717
runMetadata,
1818
waitUntil,
19+
apiClientManager,
1920
} from "@trigger.dev/core/v3";
2021
import { TriggerTracer } from "@trigger.dev/core/v3/tracer";
2122
import { ProdRuntimeManager } from "@trigger.dev/core/v3/prod";
@@ -103,6 +104,7 @@ taskCatalog.setGlobalTaskCatalog(new StandardTaskCatalog());
103104
const durableClock = new DurableClock();
104105
clock.setGlobalClock(durableClock);
105106
const runMetadataManager = new StandardMetadataManager(
107+
apiClientManager.clientOrThrow(),
106108
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev"
107109
);
108110
runMetadata.setGlobalManager(runMetadataManager);
@@ -319,6 +321,8 @@ const zodIpc = new ZodIpcConnection({
319321
_execution = execution;
320322
_isRunning = true;
321323

324+
runMetadataManager.runId = execution.run.id;
325+
322326
runMetadataManager.startPeriodicFlush(
323327
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
324328
);

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
timeout,
1717
runMetadata,
1818
waitUntil,
19+
apiClientManager,
1920
} from "@trigger.dev/core/v3";
2021
import { TriggerTracer } from "@trigger.dev/core/v3/tracer";
2122
import { DevRuntimeManager } from "@trigger.dev/core/v3/dev";
@@ -85,6 +86,7 @@ const devRuntimeManager = new DevRuntimeManager();
8586
runtime.setGlobalRuntimeManager(devRuntimeManager);
8687
timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager));
8788
const runMetadataManager = new StandardMetadataManager(
89+
apiClientManager.clientOrThrow(),
8890
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev"
8991
);
9092
runMetadata.setGlobalManager(runMetadataManager);
@@ -289,6 +291,8 @@ const zodIpc = new ZodIpcConnection({
289291
_execution = execution;
290292
_isRunning = true;
291293

294+
runMetadataManager.runId = execution.run.id;
295+
292296
runMetadataManager.startPeriodicFlush(
293297
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
294298
);

packages/core/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@
209209
},
210210
"devDependencies": {
211211
"@arethetypeswrong/cli": "^0.15.4",
212+
"@epic-web/test-server": "^0.1.0",
212213
"@types/humanize-duration": "^3.27.1",
213214
"@types/node": "20.14.14",
214215
"@types/readable-stream": "^4.0.14",

packages/core/src/v3/runMetadata/index.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,18 @@ export class RunMetadataAPI implements RunMetadataManager {
4949
return this.#getManager().deleteKey(key);
5050
}
5151

52+
public incrementKey(key: string, value: number): void {
53+
return this.#getManager().incrementKey(key, value);
54+
}
55+
56+
decrementKey(key: string, value: number): void {
57+
return this.#getManager().decrementKey(key, value);
58+
}
59+
60+
appendKey(key: string, value: DeserializedJson): void {
61+
return this.#getManager().appendKey(key, value);
62+
}
63+
5264
public update(metadata: Record<string, DeserializedJson>): void {
5365
return this.#getManager().update(metadata);
5466
}

packages/core/src/v3/runMetadata/manager.ts

Lines changed: 87 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
import { JSONHeroPath } from "@jsonhero/path";
22
import { dequal } from "dequal/lite";
33
import { DeserializedJson } from "../../schemas/json.js";
4-
import { apiClientManager } from "../apiClientManager-api.js";
5-
import { taskContext } from "../task-context-api.js";
64
import { ApiRequestOptions } from "../zodfetch.js";
75
import { RunMetadataManager } from "./types.js";
86
import { MetadataStream } from "./metadataStream.js";
7+
import { ApiClient } from "../apiClient/index.js";
98

109
export class StandardMetadataManager implements RunMetadataManager {
1110
private flushTimeoutId: NodeJS.Timeout | null = null;
@@ -14,7 +13,12 @@ export class StandardMetadataManager implements RunMetadataManager {
1413
// Add a Map to track active streams
1514
private activeStreams = new Map<string, MetadataStream<any>>();
1615

17-
constructor(private streamsBaseUrl: string) {}
16+
public runId: string | undefined;
17+
18+
constructor(
19+
private apiClient: ApiClient,
20+
private streamsBaseUrl: string
21+
) {}
1822

1923
public enterWithMetadata(metadata: Record<string, DeserializedJson>): void {
2024
this.store = metadata ?? {};
@@ -29,9 +33,7 @@ export class StandardMetadataManager implements RunMetadataManager {
2933
}
3034

3135
public setKey(key: string, value: DeserializedJson) {
32-
const runId = taskContext.ctx?.run.id;
33-
34-
if (!runId) {
36+
if (!this.runId) {
3537
return;
3638
}
3739

@@ -61,9 +63,7 @@ export class StandardMetadataManager implements RunMetadataManager {
6163
}
6264

6365
public deleteKey(key: string) {
64-
const runId = taskContext.ctx?.run.id;
65-
66-
if (!runId) {
66+
if (!this.runId) {
6767
return;
6868
}
6969

@@ -77,10 +77,81 @@ export class StandardMetadataManager implements RunMetadataManager {
7777
this.store = nextStore;
7878
}
7979

80-
public update(metadata: Record<string, DeserializedJson>): void {
81-
const runId = taskContext.ctx?.run.id;
80+
public appendKey(key: string, value: DeserializedJson) {
81+
if (!this.runId) {
82+
return;
83+
}
84+
85+
let nextStore: Record<string, DeserializedJson> | undefined = this.store
86+
? structuredClone(this.store)
87+
: {};
88+
89+
if (key.startsWith("$.")) {
90+
const path = new JSONHeroPath(key);
91+
const currentValue = path.first(nextStore);
92+
93+
if (currentValue === undefined) {
94+
// Initialize as array with single item
95+
path.set(nextStore, [value]);
96+
} else if (Array.isArray(currentValue)) {
97+
// Append to existing array
98+
path.set(nextStore, [...currentValue, value]);
99+
} else {
100+
// Convert to array if not already
101+
path.set(nextStore, [currentValue, value]);
102+
}
103+
} else {
104+
const currentValue = nextStore[key];
105+
106+
if (currentValue === undefined) {
107+
// Initialize as array with single item
108+
nextStore[key] = [value];
109+
} else if (Array.isArray(currentValue)) {
110+
// Append to existing array
111+
nextStore[key] = [...currentValue, value];
112+
} else {
113+
// Convert to array if not already
114+
nextStore[key] = [currentValue, value];
115+
}
116+
}
82117

83-
if (!runId) {
118+
if (!dequal(this.store, nextStore)) {
119+
this.hasChanges = true;
120+
}
121+
122+
this.store = nextStore;
123+
}
124+
125+
public incrementKey(key: string, increment: number = 1) {
126+
if (!this.runId) {
127+
return;
128+
}
129+
130+
let nextStore = this.store ? structuredClone(this.store) : {};
131+
let currentValue = key.startsWith("$.")
132+
? new JSONHeroPath(key).first(nextStore)
133+
: nextStore[key];
134+
135+
const newValue = (typeof currentValue === "number" ? currentValue : 0) + increment;
136+
137+
if (key.startsWith("$.")) {
138+
new JSONHeroPath(key).set(nextStore, newValue);
139+
} else {
140+
nextStore[key] = newValue;
141+
}
142+
143+
if (!dequal(this.store, nextStore)) {
144+
this.hasChanges = true;
145+
this.store = nextStore;
146+
}
147+
}
148+
149+
public decrementKey(key: string, decrement: number = 1) {
150+
this.incrementKey(key, -decrement);
151+
}
152+
153+
public update(metadata: Record<string, DeserializedJson>): void {
154+
if (!this.runId) {
84155
return;
85156
}
86157

@@ -96,9 +167,7 @@ export class StandardMetadataManager implements RunMetadataManager {
96167
value: AsyncIterable<T>,
97168
signal?: AbortSignal
98169
): Promise<AsyncIterable<T>> {
99-
const runId = taskContext.ctx?.run.id;
100-
101-
if (!runId) {
170+
if (!this.runId) {
102171
return value;
103172
}
104173

@@ -109,7 +178,7 @@ export class StandardMetadataManager implements RunMetadataManager {
109178

110179
const streamInstance = new MetadataStream({
111180
key,
112-
runId,
181+
runId: this.runId,
113182
iterator: value[Symbol.asyncIterator](),
114183
baseUrl: this.streamsBaseUrl,
115184
signal,
@@ -153,9 +222,7 @@ export class StandardMetadataManager implements RunMetadataManager {
153222
}
154223

155224
public async flush(requestOptions?: ApiRequestOptions): Promise<void> {
156-
const runId = taskContext.ctx?.run.id;
157-
158-
if (!runId) {
225+
if (!this.runId) {
159226
return;
160227
}
161228

@@ -167,11 +234,9 @@ export class StandardMetadataManager implements RunMetadataManager {
167234
return;
168235
}
169236

170-
const apiClient = apiClientManager.clientOrThrow();
171-
172237
try {
173238
this.hasChanges = false;
174-
await apiClient.updateRunMetadata(runId, { metadata: this.store }, requestOptions);
239+
await this.apiClient.updateRunMetadata(this.runId, { metadata: this.store }, requestOptions);
175240
} catch (error) {
176241
this.hasChanges = true;
177242
throw error;

packages/core/src/v3/runMetadata/metadataStream.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ export class MetadataStream<T> {
6363
cancel: () => this.controller.abort(),
6464
});
6565

66+
console.log("Posting server stream to ", this.options.baseUrl);
67+
6668
return fetch(
6769
`${this.options.baseUrl}/realtime/v1/streams/${this.options.runId}/${this.options.key}`,
6870
{
@@ -73,9 +75,7 @@ export class MetadataStream<T> {
7375
duplex: "half",
7476
signal: this.controller.signal,
7577
}
76-
).catch((error) => {
77-
console.error("Error in stream:", error);
78-
});
78+
);
7979
}
8080

8181
public async wait(): Promise<void> {

packages/core/src/v3/runMetadata/noopManager.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,15 @@ import { ApiRequestOptions } from "../zodfetch.js";
33
import type { RunMetadataManager } from "./types.js";
44

55
export class NoopRunMetadataManager implements RunMetadataManager {
6+
appendKey(key: string, value: DeserializedJson): void {
7+
throw new Error("Method not implemented.");
8+
}
9+
incrementKey(key: string, value: number): void {
10+
throw new Error("Method not implemented.");
11+
}
12+
decrementKey(key: string, value: number): void {
13+
throw new Error("Method not implemented.");
14+
}
615
stream<T>(key: string, value: AsyncIterable<T>): Promise<AsyncIterable<T>> {
716
throw new Error("Method not implemented.");
817
}

packages/core/src/v3/runMetadata/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ export interface RunMetadataManager {
88
getKey(key: string): DeserializedJson | undefined;
99
setKey(key: string, value: DeserializedJson): void;
1010
deleteKey(key: string): void;
11+
appendKey(key: string, value: DeserializedJson): void;
12+
incrementKey(key: string, value: number): void;
13+
decrementKey(key: string, value: number): void;
1114
update(metadata: Record<string, DeserializedJson>): void;
1215
flush(requestOptions?: ApiRequestOptions): Promise<void>;
1316
stream<T>(key: string, value: AsyncIterable<T>, signal?: AbortSignal): Promise<AsyncIterable<T>>;

0 commit comments

Comments
 (0)