Skip to content

Commit e7b2d59

Browse files
feat: add waitForResult option to processBulkRequest
1 parent e3787aa commit e7b2d59

File tree

3 files changed

+134
-24
lines changed

3 files changed

+134
-24
lines changed

packages/sdk/src/lib/dataProtectorCore/IExecDataProtectorCore.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,9 @@ class IExecDataProtectorCore extends IExecDataProtectorModule {
100100
});
101101
}
102102

103-
async processBulkRequest(
104-
args: ProcessBulkRequestParams
105-
): Promise<ProcessBulkRequestResponse> {
103+
async processBulkRequest<Params extends ProcessBulkRequestParams>(
104+
args: Params
105+
): Promise<ProcessBulkRequestResponse<Params>> {
106106
await this.init();
107107
await isValidProvider(this.iexec);
108108
return processBulkRequest({

packages/sdk/src/lib/dataProtectorCore/processBulkRequest.ts

Lines changed: 95 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,13 @@ import {
2525
OnStatusUpdateFn,
2626
ProcessBulkRequestParams,
2727
ProcessBulkRequestResponse,
28+
ProcessBulkRequestResponseBase,
29+
ProcessBulkRequestResponseWithResult,
2830
ProcessBulkRequestStatuses,
2931
} from '../types/index.js';
3032
import { IExecConsumer, VoucherInfo } from '../types/internalTypes.js';
33+
import { getResultFromCompletedTask } from './getResultFromCompletedTask.js';
34+
import { waitForTaskCompletion } from './waitForTaskCompletion.js';
3135

3236
export type ProcessBulkRequest = typeof processBulkRequest;
3337

@@ -38,18 +42,22 @@ const waitForRetry = (ms: number): Promise<void> => {
3842
});
3943
};
4044

41-
export const processBulkRequest = async ({
45+
export const processBulkRequest = async <
46+
Params extends ProcessBulkRequestParams
47+
>({
4248
iexec = throwIfMissing(),
4349
defaultWorkerpool,
4450
bulkRequest,
4551
workerpool,
4652
useVoucher = false,
4753
voucherOwner,
54+
path,
4855
pemPrivateKey,
56+
waitForResult = false,
4957
onStatusUpdate = () => {},
50-
}: IExecConsumer &
51-
DefaultWorkerpoolConsumer &
52-
ProcessBulkRequestParams): Promise<ProcessBulkRequestResponse> => {
58+
}: IExecConsumer & DefaultWorkerpoolConsumer & Params): Promise<
59+
ProcessBulkRequestResponse<Params>
60+
> => {
5361
const vRequestorder = bulkRequestSchema()
5462
.label('bulkRequest')
5563
.required()
@@ -64,6 +72,10 @@ export const processBulkRequest = async ({
6472
const vVoucherOwner = addressOrEnsSchema()
6573
.label('voucherOwner')
6674
.validateSync(voucherOwner);
75+
const vWaitForResult = booleanSchema()
76+
.label('waitForResult')
77+
.validateSync(waitForResult);
78+
const vPath = stringSchema().label('path').validateSync(path);
6779
const vPemPrivateKey = stringSchema()
6880
.label('pemPrivateKey')
6981
.validateSync(pemPrivateKey);
@@ -72,17 +84,22 @@ export const processBulkRequest = async ({
7284
OnStatusUpdateFn<ProcessBulkRequestStatuses>
7385
>(onStatusUpdate);
7486

75-
// Validate that pemPrivateKey is provided if iexec_result_encryption is true
76-
if (
87+
const iexecResultEncryption =
7788
// JSON parse safe thanks to bulkRequestSchema validation
78-
JSON.parse(vRequestorder.params)?.iexec_result_encryption === true &&
79-
!vPemPrivateKey
80-
) {
89+
JSON.parse(vRequestorder.params)?.iexec_result_encryption === true;
90+
// Validate that pemPrivateKey is provided if iexec_result_encryption is true
91+
if (vWaitForResult && iexecResultEncryption && !vPemPrivateKey) {
8192
throw new ValidationError(
8293
'Missing pemPrivateKey required for result decryption'
8394
);
8495
}
8596

97+
if (vWaitForResult && !iexecResultEncryption && vPemPrivateKey) {
98+
throw new ValidationError(
99+
'pemPrivateKey is passed but result encryption is not enabled in bulkRequest this is likely an error when preparing the bulk request'
100+
);
101+
}
102+
86103
try {
87104
let userVoucher: VoucherInfo | undefined;
88105
if (vUseVoucher) {
@@ -241,11 +258,7 @@ export const processBulkRequest = async ({
241258
{ pageSize: Math.max(volume, 10) } // Fetch all deals (min page size 10)
242259
);
243260

244-
const tasks: Array<{
245-
taskId: string;
246-
dealId: string;
247-
bulkIndex: number;
248-
}> = [];
261+
const tasks: ProcessBulkRequestResponseBase['tasks'] = [];
249262

250263
for (const deal of deals) {
251264
const dealTasks = await Promise.all(
@@ -259,6 +272,7 @@ export const processBulkRequest = async ({
259272
);
260273
tasks.push(...dealTasks);
261274
}
275+
tasks.sort((a, b) => a.bulkIndex - b.bulkIndex);
262276

263277
vOnStatusUpdate({
264278
title: 'MATCH_ORDERS_LOOP',
@@ -269,8 +283,74 @@ export const processBulkRequest = async ({
269283
},
270284
});
271285

286+
if (!vWaitForResult) {
287+
return {
288+
tasks,
289+
} as ProcessBulkRequestResponse<Params>;
290+
}
291+
292+
const tasksWithResults =
293+
tasks as ProcessBulkRequestResponseWithResult['tasks'];
294+
295+
await Promise.all(
296+
tasksWithResults.map(async (task) => {
297+
try {
298+
vOnStatusUpdate({
299+
title: 'PROCESS_BULK_SLICE',
300+
isDone: false,
301+
payload: task,
302+
});
303+
vOnStatusUpdate({
304+
title: 'CONSUME_TASK',
305+
isDone: false,
306+
payload: task,
307+
});
308+
const { status, success } = await waitForTaskCompletion({
309+
iexec,
310+
taskId: task.taskId,
311+
dealId: task.dealId,
312+
});
313+
task.status = status;
314+
task.success = success;
315+
vOnStatusUpdate({
316+
title: 'CONSUME_TASK',
317+
isDone: true,
318+
payload: task,
319+
});
320+
if (!success) {
321+
throw new Error(`Task ended with status: ${status}`);
322+
}
323+
const { result } = await getResultFromCompletedTask({
324+
iexec,
325+
taskId: task.taskId,
326+
path: vPath,
327+
pemPrivateKey: vPemPrivateKey,
328+
onStatusUpdate: (update) => {
329+
vOnStatusUpdate({
330+
...update,
331+
payload: { ...update.payload, task },
332+
});
333+
},
334+
});
335+
task.result = result;
336+
vOnStatusUpdate({
337+
title: 'PROCESS_BULK_SLICE',
338+
isDone: true,
339+
payload: task,
340+
});
341+
} catch (error) {
342+
task.error = error as Error;
343+
vOnStatusUpdate({
344+
title: 'PROCESS_BULK_SLICE',
345+
isDone: true,
346+
payload: task,
347+
});
348+
}
349+
})
350+
);
351+
272352
return {
273-
tasks: tasks.sort((a, b) => a.bulkIndex - b.bulkIndex),
353+
tasks: tasksWithResults,
274354
};
275355
} catch (error) {
276356
console.error('[processBulkRequest] ERROR', error);

packages/sdk/src/lib/types/coreTypes.ts

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,10 @@ export type WaitForTaskCompletionParams = {
233233
onStatusUpdate?: OnStatusUpdateFn<WaitForTaskCompletionStatuses>;
234234
};
235235

236+
export type TaskStatus = 'COMPLETED' | 'FAILED' | 'TIMEOUT';
237+
236238
export type WaitForTaskCompletionResponse = {
237-
status: 'COMPLETED' | 'FAILED' | 'TIMEOUT';
239+
status: TaskStatus;
238240
success: boolean;
239241
};
240242

@@ -354,6 +356,8 @@ export type ProcessProtectedDataParams = {
354356

355357
/**
356358
* The file name of the desired file in the returned ZIP file.
359+
*
360+
* Ignored if `waitForResult` is `false`
357361
*/
358362
path?: string;
359363

@@ -390,7 +394,7 @@ export type ProcessProtectedDataParams = {
390394

391395
/**
392396
* Enable result encryption for the processed data.
393-
* @default = false
397+
* @default false
394398
*/
395399
encryptResult?: boolean;
396400

@@ -402,7 +406,7 @@ export type ProcessProtectedDataParams = {
402406

403407
/**
404408
* Whether to wait for the result of the processing or not.
405-
* @default = true
409+
* @default true
406410
*/
407411
waitForResult?: boolean;
408412

@@ -535,7 +539,8 @@ export type ProcessBulkRequestStatuses =
535539
| 'REQUEST_TO_PROCESS_BULK_DATA'
536540
| 'CONSUME_TASK'
537541
| 'CONSUME_RESULT_DOWNLOAD'
538-
| 'CONSUME_RESULT_DECRYPT';
542+
| 'CONSUME_RESULT_DECRYPT'
543+
| 'PROCESS_BULK_SLICE';
539544

540545
export type ProcessBulkRequestParams = {
541546
/**
@@ -545,6 +550,8 @@ export type ProcessBulkRequestParams = {
545550

546551
/**
547552
* Path to the result file in the app's output
553+
*
554+
* Ignored if `waitForResult` is `false`
548555
*/
549556
path?: string;
550557

@@ -565,20 +572,43 @@ export type ProcessBulkRequestParams = {
565572

566573
/**
567574
* Private key in PEM format for result decryption.
568-
* required if bulkRequest use results encryption.
575+
*
576+
* Required if `bulkRequest` use results encryption and `waitForResult` is `true`.
569577
*/
570578
pemPrivateKey?: string;
571579

580+
/**
581+
* Whether to wait for the result of the bulk request.
582+
* @default false
583+
*/
584+
waitForResult?: boolean;
585+
572586
/**
573587
* Callback function that will get called at each step of the process
574588
*/
575589
onStatusUpdate?: OnStatusUpdateFn<ProcessBulkRequestStatuses>;
576590
};
577591

578-
export type ProcessBulkRequestResponse = {
592+
export type ProcessBulkRequestResponse<T> = T extends { waitForResult: true }
593+
? ProcessBulkRequestResponseWithResult
594+
: ProcessBulkRequestResponseBase;
595+
596+
export type ProcessBulkRequestResponseBase = {
597+
tasks: Array<{
598+
taskId: string;
599+
dealId: string;
600+
bulkIndex: number;
601+
}>;
602+
};
603+
604+
export type ProcessBulkRequestResponseWithResult = {
579605
tasks: Array<{
580606
taskId: string;
581607
dealId: string;
582608
bulkIndex: number;
609+
success: boolean;
610+
status: TaskStatus;
611+
result?: ArrayBuffer;
612+
error?: Error;
583613
}>;
584614
};

0 commit comments

Comments
 (0)