Skip to content

Commit 6640cd5

Browse files
authored
Adds branching (#216)
* Adds branching * branches with step order
1 parent d1e8f9f commit 6640cd5

File tree

2 files changed

+92
-25
lines changed

2 files changed

+92
-25
lines changed

packages/core-services/src/services/processing.service.ts

Lines changed: 90 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -80,34 +80,68 @@ export class ProcessingService implements IBaseService {
8080
"Processing job started.",
8181
);
8282

83-
let currentContent: unknown = content;
8483
const executedSteps: SelectProcessingStep[] = [];
85-
8684
try {
87-
for (let i = 0; i < plan.length; i++) {
88-
const planStep = plan[i];
89-
const stepOrder = i + 1;
90-
91-
if (planStep.type === "transformation") {
92-
const result = await this.executeTransform(
93-
job.id,
94-
currentContent,
95-
planStep,
96-
stepOrder,
97-
);
98-
executedSteps.push(result.step);
99-
currentContent = result.output;
100-
} else if (planStep.type === "distribution") {
101-
const step = await this.executeDistribution(
102-
job.id,
103-
currentContent,
104-
planStep,
105-
stepOrder,
106-
);
107-
executedSteps.push(step);
85+
let globalOutput: unknown = content;
86+
87+
const planWithOrder = plan.map((step, index) => ({
88+
...step,
89+
order: index + 1,
90+
}));
91+
92+
const globalStepsWithOrder: typeof planWithOrder = [];
93+
const branchesWithOrder = new Map<string, typeof planWithOrder>();
94+
95+
for (const step of planWithOrder) {
96+
if (step.branchId) {
97+
if (!branchesWithOrder.has(step.branchId)) {
98+
branchesWithOrder.set(step.branchId, []);
99+
}
100+
branchesWithOrder.get(step.branchId)!.push(step);
101+
} else {
102+
globalStepsWithOrder.push(step);
108103
}
109104
}
110105

106+
for (const planStep of globalStepsWithOrder) {
107+
const result = await this.executeTransform(
108+
job.id,
109+
globalOutput,
110+
planStep,
111+
planStep.order,
112+
);
113+
executedSteps.push(result.step);
114+
globalOutput = result.output;
115+
}
116+
117+
const branchPromises = Array.from(branchesWithOrder.values()).map(
118+
async (branch) => {
119+
let branchContent = globalOutput;
120+
for (const planStep of branch) {
121+
if (planStep.type === "transformation") {
122+
const result = await this.executeTransform(
123+
job.id,
124+
branchContent,
125+
planStep,
126+
planStep.order,
127+
);
128+
executedSteps.push(result.step);
129+
branchContent = result.output;
130+
} else if (planStep.type === "distribution") {
131+
const step = await this.executeDistribution(
132+
job.id,
133+
branchContent,
134+
planStep,
135+
planStep.order,
136+
);
137+
executedSteps.push(step);
138+
}
139+
}
140+
},
141+
);
142+
143+
await Promise.all(branchPromises);
144+
111145
const hasFailures = executedSteps.some((s) => s.status === "failed");
112146
const finalStatus = hasFailures ? "completed_with_errors" : "completed";
113147

@@ -293,7 +327,17 @@ export class ProcessingService implements IBaseService {
293327
);
294328
}
295329

296-
const partialPlan = plan.slice(stepIndex);
330+
const stepToRetryDetails = plan[stepIndex];
331+
let partialPlan: ProcessingPlan;
332+
333+
if (stepToRetryDetails.branchId) {
334+
partialPlan = plan
335+
.slice(stepIndex)
336+
.filter((step) => step.branchId === stepToRetryDetails.branchId);
337+
} else {
338+
// Fallback for old plans without branchId
339+
partialPlan = plan.slice(stepIndex);
340+
}
297341
const content = stepToRetry.input;
298342

299343
return this.process(
@@ -323,20 +367,23 @@ export class ProcessingService implements IBaseService {
323367

324368
// Add distributor transforms and distribution steps
325369
config.distribute?.forEach((d) => {
370+
const branchId = randomUUID();
326371
d.transform?.forEach((t) => {
327372
plan.push({
328373
type: "transformation",
329374
stage: "distributor",
330375
plugin: t.plugin,
331376
config: t.config,
332377
distributorPlugin: d.plugin,
378+
branchId,
333379
});
334380
});
335381
plan.push({
336382
type: "distribution",
337383
stage: "distributor",
338384
plugin: d.plugin,
339385
config: d.config,
386+
branchId,
340387
});
341388
});
342389

@@ -405,7 +452,25 @@ export class ProcessingService implements IBaseService {
405452

406453
const plan = originalJob.plan as ProcessingPlan;
407454
const stepIndex = stepToTweak.stepOrder - 1;
408-
const partialPlan = plan.slice(stepIndex);
455+
456+
if (stepIndex < 0 || stepIndex >= plan.length) {
457+
throw new ProcessorError(
458+
"invalid_step_index",
459+
"Step to tweak is out of bounds of the processing plan.",
460+
);
461+
}
462+
463+
const stepToTweakDetails = plan[stepIndex];
464+
let partialPlan: ProcessingPlan;
465+
466+
if (stepToTweakDetails.branchId) {
467+
partialPlan = plan
468+
.slice(stepIndex)
469+
.filter((step) => step.branchId === stepToTweakDetails.branchId);
470+
} else {
471+
// Fallback for old plans without branchId
472+
partialPlan = plan.slice(stepIndex);
473+
}
409474

410475
return this.process(
411476
parsedInput,

packages/shared-db/src/schema/processing.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ export const ProcessingPlanStepSchema = z.object({
5757
config: z.record(z.unknown()).optional(),
5858
// For distributor transforms, this links back to the parent distributor plugin
5959
distributorPlugin: z.string().optional(),
60+
// A unique identifier for a branch of distributor steps
61+
branchId: z.string().optional(),
6062
});
6163
export type ProcessingPlanStep = z.infer<typeof ProcessingPlanStepSchema>;
6264

0 commit comments

Comments
 (0)