Skip to content

Commit cff3d2c

Browse files
committed
fix processing flows
1 parent bf2fff4 commit cff3d2c

File tree

2 files changed

+38
-39
lines changed

2 files changed

+38
-39
lines changed

apps/app/src/lib/api/processing.ts

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,6 @@ export function useProcessingJobs(
1616
feedId: string | null,
1717
options?: { enabled?: boolean },
1818
) {
19-
const query = useApiQuery<ProcessingJobsResponse, Error, ProcessingJob[]>(
20-
["processingJobs", submissionId, feedId],
21-
`/processing/jobs/${submissionId}?feedId=${feedId}`,
22-
{
23-
enabled:
24-
options?.enabled !== undefined
25-
? options.enabled && !!submissionId && !!feedId
26-
: !!submissionId && !!feedId,
27-
select: (data) => data.data.jobs,
28-
},
29-
);
30-
31-
const isProcessing = query.data?.some((job) => job.status === "processing");
32-
3319
return useApiQuery<ProcessingJobsResponse, Error, ProcessingJob[]>(
3420
["processingJobs", submissionId, feedId],
3521
`/processing/jobs/${submissionId}?feedId=${feedId}`,
@@ -39,7 +25,12 @@ export function useProcessingJobs(
3925
? options.enabled && !!submissionId && !!feedId
4026
: !!submissionId && !!feedId,
4127
select: (data) => data.data.jobs,
42-
refetchInterval: isProcessing ? 5000 : false,
28+
refetchInterval: (query) =>
29+
query.state.data?.data.jobs.some(
30+
(job: ProcessingJob) => job.status === "processing",
31+
)
32+
? 5000
33+
: false,
4334
},
4435
);
4536
}
@@ -51,26 +42,19 @@ export function useProcessingSteps(
5142
jobId: string | null,
5243
options?: { enabled?: boolean },
5344
) {
54-
const query = useApiQuery<ProcessingStepsResponse, Error, ProcessingStep[]>(
55-
["processingSteps", jobId],
56-
`/processing/jobs/${jobId}/steps`,
57-
{
58-
enabled:
59-
options?.enabled !== undefined ? options.enabled && !!jobId : !!jobId,
60-
select: (data) => data.data.steps,
61-
},
62-
);
63-
64-
const isProcessing = query.data?.some((step) => step.status === "processing");
65-
6645
return useApiQuery<ProcessingStepsResponse, Error, ProcessingStep[]>(
6746
["processingSteps", jobId],
6847
`/processing/jobs/${jobId}/steps`,
6948
{
7049
enabled:
7150
options?.enabled !== undefined ? options.enabled && !!jobId : !!jobId,
7251
select: (data) => data.data.steps,
73-
refetchInterval: isProcessing ? 5000 : false,
52+
refetchInterval: (query) =>
53+
query.state.data?.data.steps.some(
54+
(step: ProcessingStep) => step.status === "processing",
55+
)
56+
? 5000
57+
: false,
7458
},
7559
);
7660
}

packages/core-services/src/services/processing.service.ts

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -104,18 +104,29 @@ export class ProcessingService implements IBaseService {
104104
}
105105

106106
for (const planStep of globalStepsWithOrder) {
107-
const result = await this.executeTransform(
108-
job.id,
109-
globalOutput,
110-
planStep,
111-
planStep.order,
112-
);
113-
executedSteps.push(result.step);
114-
globalOutput = result.output;
107+
if (planStep.type === "transformation") {
108+
const result = await this.executeTransform(
109+
job.id,
110+
globalOutput,
111+
planStep,
112+
planStep.order,
113+
);
114+
executedSteps.push(result.step);
115+
globalOutput = result.output;
116+
} else if (planStep.type === "distribution") {
117+
const step = await this.executeDistribution(
118+
job.id,
119+
globalOutput,
120+
planStep,
121+
planStep.order,
122+
);
123+
executedSteps.push(step);
124+
}
115125
}
116126

117127
const branchPromises = Array.from(branchesWithOrder.values()).map(
118128
async (branch) => {
129+
const branchExecutedSteps: SelectProcessingStep[] = [];
119130
let branchContent = globalOutput;
120131
for (const planStep of branch) {
121132
if (planStep.type === "transformation") {
@@ -125,7 +136,7 @@ export class ProcessingService implements IBaseService {
125136
planStep,
126137
planStep.order,
127138
);
128-
executedSteps.push(result.step);
139+
branchExecutedSteps.push(result.step);
129140
branchContent = result.output;
130141
} else if (planStep.type === "distribution") {
131142
const step = await this.executeDistribution(
@@ -134,13 +145,17 @@ export class ProcessingService implements IBaseService {
134145
planStep,
135146
planStep.order,
136147
);
137-
executedSteps.push(step);
148+
branchExecutedSteps.push(step);
138149
}
139150
}
151+
return branchExecutedSteps;
140152
},
141153
);
142154

143-
await Promise.all(branchPromises);
155+
const branchesExecutedSteps = await Promise.all(branchPromises);
156+
branchesExecutedSteps.forEach((branchSteps) => {
157+
executedSteps.push(...branchSteps);
158+
});
144159

145160
const hasFailures = executedSteps.some((s) => s.status === "failed");
146161
const finalStatus = hasFailures ? "completed_with_errors" : "completed";

0 commit comments

Comments
 (0)