Skip to content

Commit d880bc8

Browse files
committed
Make serialization functions async with Encryptor interface
1 parent 3e3830d commit d880bc8

File tree

42 files changed

+1126
-492
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1126
-492
lines changed

packages/cli/src/lib/inspect/output.ts

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,9 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => {
534534
},
535535
resolveData,
536536
});
537-
const runsWithHydratedIO = runs.data.map(hydrateResourceIO);
537+
const runsWithHydratedIO = await Promise.all(
538+
runs.data.map(async (run) => hydrateResourceIO(run, world))
539+
);
538540
showJson({ ...runs, data: runsWithHydratedIO });
539541
return;
540542
} catch (error) {
@@ -572,7 +574,9 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => {
572574
}
573575
},
574576
displayPage: async (runs) => {
575-
const runsWithHydratedIO = runs.map(hydrateResourceIO);
577+
const runsWithHydratedIO = await Promise.all(
578+
runs.map(async (run) => hydrateResourceIO(run, world))
579+
);
576580
logger.log(showTable(runsWithHydratedIO, props, opts));
577581
},
578582
});
@@ -588,7 +592,9 @@ export const getRecentRun = async (
588592
pagination: { limit: 1, sortOrder: opts.sort || 'desc' },
589593
resolveData: 'none', // Don't need data for just getting the ID
590594
});
591-
runs.data = runs.data.map(hydrateResourceIO);
595+
runs.data = await Promise.all(
596+
runs.data.map(async (run) => hydrateResourceIO(run, world))
597+
);
592598
return runs.data[0];
593599
} catch (error) {
594600
if (handleApiError(error, opts.backend)) {
@@ -608,7 +614,7 @@ export const showRun = async (
608614
}
609615
try {
610616
const run = await world.runs.get(runId, { resolveData: 'all' });
611-
const runWithHydratedIO = hydrateResourceIO(run);
617+
const runWithHydratedIO = await hydrateResourceIO(run, world);
612618
if (opts.json) {
613619
showJson(runWithHydratedIO);
614620
return;
@@ -711,7 +717,9 @@ export const listSteps = async (
711717
}
712718
},
713719
displayPage: async (steps) => {
714-
const stepsWithHydratedIO = steps.map(hydrateResourceIO);
720+
const stepsWithHydratedIO = await Promise.all(
721+
steps.map(async (step) => hydrateResourceIO(step, world))
722+
);
715723
logger.log(showTable(stepsWithHydratedIO, props, opts));
716724
showInspectInfoBox('step');
717725
},
@@ -735,7 +743,7 @@ export const showStep = async (
735743
const step = await world.steps.get(opts.runId, stepId, {
736744
resolveData: 'all',
737745
});
738-
const stepWithHydratedIO = hydrateResourceIO(step);
746+
const stepWithHydratedIO = await hydrateResourceIO(step, world);
739747
if (opts.json) {
740748
showJson(stepWithHydratedIO);
741749
return;
@@ -950,7 +958,9 @@ export const listHooks = async (world: World, opts: InspectCLIOptions = {}) => {
950958
},
951959
resolveData,
952960
});
953-
const hydratedHooks = hooks.data.map(hydrateResourceIO);
961+
const hydratedHooks = await Promise.all(
962+
hooks.data.map(async (hook) => hydrateResourceIO(hook, world))
963+
);
954964
showJson({ ...hooks, data: hydratedHooks });
955965
return;
956966
} catch (error) {
@@ -994,7 +1004,9 @@ export const listHooks = async (world: World, opts: InspectCLIOptions = {}) => {
9941004
}
9951005
},
9961006
displayPage: async (hooks) => {
997-
const hydratedHooks = hooks.map(hydrateResourceIO);
1007+
const hydratedHooks = await Promise.all(
1008+
hooks.map(async (hook) => hydrateResourceIO(hook, world))
1009+
);
9981010
logger.log(showTable(hydratedHooks, HOOK_LISTED_PROPS, opts));
9991011
showInspectInfoBox('hook');
10001012
},
@@ -1013,7 +1025,7 @@ export const showHook = async (
10131025
const hook = await world.hooks.get(hookId, {
10141026
resolveData: 'all',
10151027
});
1016-
const hydratedHook = hydrateResourceIO(hook);
1028+
const hydratedHook = await hydrateResourceIO(hook, world);
10171029
if (opts.json) {
10181030
showJson(hydratedHook);
10191031
return;

packages/core/e2e/bench.bench.ts

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { withResolvers } from '@workflow/utils';
21
import fs from 'fs';
32
import path from 'path';
43
import { bench, describe } from 'vitest';
@@ -45,10 +44,7 @@ async function triggerWorkflow(
4544
url.searchParams.set('workflowFile', workflowFile);
4645
url.searchParams.set('workflowFn', workflowFn);
4746

48-
const ops: Promise<void>[] = [];
49-
const { promise: runIdPromise, resolve: resolveRunId } =
50-
withResolvers<string>();
51-
const dehydratedArgs = dehydrateWorkflowArguments(args, ops, runIdPromise);
47+
const dehydratedArgs = await dehydrateWorkflowArguments(args, '', {});
5248

5349
const res = await fetch(url, {
5450
method: 'POST',
@@ -66,10 +62,6 @@ async function triggerWorkflow(
6662
);
6763
}
6864
const run = await res.json();
69-
resolveRunId(run.runId);
70-
71-
// Resolve and wait for any stream operations
72-
await Promise.all(ops);
7365

7466
return run;
7567
}

packages/core/e2e/e2e.test.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { withResolvers } from '@workflow/utils';
21
import fs from 'fs';
32
import path from 'path';
43
import { afterAll, assert, describe, expect, test } from 'vitest';
@@ -66,10 +65,7 @@ async function triggerWorkflow(
6665
url.searchParams.set('workflowFile', workflowFile);
6766
url.searchParams.set('workflowFn', workflowFn);
6867

69-
const ops: Promise<void>[] = [];
70-
const { promise: runIdPromise, resolve: resolveRunId } =
71-
withResolvers<string>();
72-
const dehydratedArgs = dehydrateWorkflowArguments(args, ops, runIdPromise);
68+
const dehydratedArgs = await dehydrateWorkflowArguments(args, '', {});
7369

7470
const res = await fetch(url, {
7571
method: 'POST',
@@ -87,8 +83,6 @@ async function triggerWorkflow(
8783
);
8884
}
8985
const run = await res.json();
90-
resolveRunId(run.runId);
91-
9286
// Collect runId for observability links (Vercel world only)
9387
if (process.env.WORKFLOW_VERCEL_ENV) {
9488
const testName = expect.getState().currentTestName || workflowFn;

packages/core/src/observability.test.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { inspect } from 'node:util';
22
import { WORKFLOW_DESERIALIZE, WORKFLOW_SERIALIZE } from '@workflow/serde';
3+
import type { World } from '@workflow/world';
34
import { describe, expect, it } from 'vitest';
45
import { registerSerializationClass } from './class-serialization.js';
56
import {
@@ -15,6 +16,8 @@ import {
1516
} from './observability.js';
1617
import { dehydrateStepReturnValue } from './serialization.js';
1718

19+
const mockWorld = {} as World;
20+
1821
describe('ClassInstanceRef', () => {
1922
describe('constructor and properties', () => {
2023
it('should create instance with correct properties', () => {
@@ -332,10 +335,15 @@ describe('hydrateResourceIO with custom class instances', () => {
332335
(TestPoint as any).classId = 'test//TestPoint';
333336
registerSerializationClass('test//TestPoint', TestPoint);
334337

335-
it('should convert Instance type to ClassInstanceRef in step output', () => {
338+
it('should convert Instance type to ClassInstanceRef in step output', async () => {
336339
// Simulate serialized step data with a custom class instance
337340
const point = new TestPoint(3, 4);
338-
const serialized = dehydrateStepReturnValue(point, [], 'wrun_test');
341+
const serialized = await dehydrateStepReturnValue(
342+
point,
343+
'wrun_test',
344+
{},
345+
[]
346+
);
339347

340348
// Create a step resource with serialized output
341349
const step = {
@@ -346,7 +354,7 @@ describe('hydrateResourceIO with custom class instances', () => {
346354

347355
// Hydrate the step - this should convert Instance to ClassInstanceRef
348356
// because the class is not registered in the o11y context (streamPrintRevivers)
349-
const hydrated = hydrateResourceIO(step);
357+
const hydrated = await hydrateResourceIO(step, mockWorld);
350358

351359
// The output should be a ClassInstanceRef
352360
expect(isClassInstanceRef(hydrated.output)).toBe(true);

packages/core/src/observability.ts

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import { inspect } from 'node:util';
77
import { parseClassName } from '@workflow/utils/parse-name';
8+
import type { World } from '@workflow/world';
89
import { unflatten } from 'devalue';
910
import { runtimeLogger } from './logger.js';
1011
import {
@@ -254,20 +255,22 @@ const hydrateLegacyData = (data: any[]): unknown => {
254255
return unflatten(data, getObservabilityRevivers());
255256
};
256257

257-
const hydrateStepIO = <
258+
const hydrateStepIO = async <
258259
T extends { stepId?: string; input?: any; output?: any; runId?: string },
259260
>(
260-
step: T
261-
): T => {
261+
step: T,
262+
world: World
263+
): Promise<T> => {
262264
let hydratedInput = step.input;
263265
let hydratedOutput = step.output;
264266

265267
// Hydrate input - handle both binary (specVersion 2) and legacy (specVersion 1) formats
266268
if (isBinaryFormat(step.input) && step.input.byteLength > 0) {
267-
hydratedInput = hydrateStepArguments(
269+
hydratedInput = await hydrateStepArguments(
268270
step.input,
269-
[],
270271
step.runId as string,
272+
world,
273+
[],
271274
globalThis,
272275
streamPrintRevivers
273276
);
@@ -277,8 +280,10 @@ const hydrateStepIO = <
277280

278281
// Hydrate output - handle both binary (specVersion 2) and legacy (specVersion 1) formats
279282
if (isBinaryFormat(step.output)) {
280-
hydratedOutput = hydrateStepReturnValue(
283+
hydratedOutput = await hydrateStepReturnValue(
281284
step.output,
285+
step.runId as string,
286+
world,
282287
globalThis,
283288
streamPrintRevivers
284289
);
@@ -293,18 +298,21 @@ const hydrateStepIO = <
293298
};
294299
};
295300

296-
const hydrateWorkflowIO = <
301+
const hydrateWorkflowIO = async <
297302
T extends { runId?: string; input?: any; output?: any },
298303
>(
299-
workflow: T
300-
): T => {
304+
workflow: T,
305+
world: World
306+
): Promise<T> => {
301307
let hydratedInput = workflow.input;
302308
let hydratedOutput = workflow.output;
303309

304310
// Hydrate input - handle both binary (specVersion 2) and legacy (specVersion 1) formats
305311
if (isBinaryFormat(workflow.input) && workflow.input.byteLength > 0) {
306-
hydratedInput = hydrateWorkflowArguments(
312+
hydratedInput = await hydrateWorkflowArguments(
307313
workflow.input,
314+
workflow.runId as string,
315+
world,
308316
globalThis,
309317
streamPrintRevivers
310318
);
@@ -314,10 +322,11 @@ const hydrateWorkflowIO = <
314322

315323
// Hydrate output - handle both binary (specVersion 2) and legacy (specVersion 1) formats
316324
if (isBinaryFormat(workflow.output)) {
317-
hydratedOutput = hydrateWorkflowReturnValue(
325+
hydratedOutput = await hydrateWorkflowReturnValue(
318326
workflow.output,
319-
[],
320327
workflow.runId as string,
328+
world,
329+
[],
321330
globalThis,
322331
streamPrintRevivers
323332
);
@@ -332,11 +341,12 @@ const hydrateWorkflowIO = <
332341
};
333342
};
334343

335-
const hydrateEventData = <
344+
const hydrateEventData = async <
336345
T extends { eventId?: string; eventData?: any; runId?: string },
337346
>(
338-
event: T
339-
): T => {
347+
event: T,
348+
world: World
349+
): Promise<T> => {
340350
if (!event.eventData) {
341351
return event;
342352
}
@@ -348,8 +358,10 @@ const hydrateEventData = <
348358
if ('result' in eventData && typeof eventData.result === 'object') {
349359
// Handle both binary (specVersion 2) and legacy (specVersion 1) formats
350360
if (isBinaryFormat(eventData.result)) {
351-
eventData.result = hydrateStepReturnValue(
361+
eventData.result = await hydrateStepReturnValue(
352362
eventData.result,
363+
event.runId as string,
364+
world,
353365
globalThis,
354366
streamPrintRevivers
355367
);
@@ -369,18 +381,22 @@ const hydrateEventData = <
369381
};
370382
};
371383

372-
const hydrateHookMetadata = <T extends { hookId?: string; metadata?: any }>(
373-
hook: T
374-
): T => {
384+
const hydrateHookMetadata = async <
385+
T extends { hookId?: string; metadata?: any },
386+
>(
387+
hook: T,
388+
world: World
389+
): Promise<T> => {
375390
let hydratedMetadata = hook.metadata;
376391

377392
if (hook.metadata && 'runId' in hook) {
378393
// Handle both binary (specVersion 2) and legacy (specVersion 1) formats
379394
if (isBinaryFormat(hook.metadata)) {
380-
hydratedMetadata = hydrateStepArguments(
395+
hydratedMetadata = await hydrateStepArguments(
381396
hook.metadata,
382-
[],
383397
hook.runId as string,
398+
world,
399+
[],
384400
globalThis,
385401
streamPrintRevivers
386402
);
@@ -395,7 +411,7 @@ const hydrateHookMetadata = <T extends { hookId?: string; metadata?: any }>(
395411
};
396412
};
397413

398-
export const hydrateResourceIO = <
414+
export const hydrateResourceIO = async <
399415
T extends {
400416
stepId?: string;
401417
hookId?: string;
@@ -407,20 +423,21 @@ export const hydrateResourceIO = <
407423
executionContext?: any;
408424
},
409425
>(
410-
resource: T
411-
): T => {
426+
resource: T,
427+
world: World
428+
): Promise<T> => {
412429
if (!resource) {
413430
return resource;
414431
}
415432
let hydrated: T;
416433
if ('stepId' in resource) {
417-
hydrated = hydrateStepIO(resource);
434+
hydrated = await hydrateStepIO(resource, world);
418435
} else if ('hookId' in resource) {
419-
hydrated = hydrateHookMetadata(resource);
436+
hydrated = await hydrateHookMetadata(resource, world);
420437
} else if ('eventId' in resource) {
421-
hydrated = hydrateEventData(resource);
438+
hydrated = await hydrateEventData(resource, world);
422439
} else {
423-
hydrated = hydrateWorkflowIO(resource);
440+
hydrated = await hydrateWorkflowIO(resource, world);
424441
}
425442
if ('executionContext' in hydrated) {
426443
const { executionContext, ...rest } = hydrated;

packages/core/src/private.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
* Utils used by the bundler when transforming code
33
*/
44

5+
import type { World } from '@workflow/world';
56
import type { EventsConsumer } from './events-consumer.js';
67
import type { QueueItem } from './global.js';
78
import type { Serializable } from './schemas.js';
@@ -46,4 +47,8 @@ export interface WorkflowOrchestratorContext {
4647
onWorkflowError: (error: Error) => void;
4748
generateUlid: () => string;
4849
generateNanoid: () => string;
50+
/** The workflow run ID */
51+
runId: string;
52+
/** The world instance (used as Encryptor for serialization) */
53+
world: World;
4954
}

packages/core/src/runtime.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ export function workflowEntrypoint(
199199
const result = await runWorkflow(
200200
workflowCode,
201201
workflowRun,
202-
events
202+
events,
203+
world
203204
);
204205

205206
// Complete the workflow run via event (event-sourced architecture)

0 commit comments

Comments
 (0)