Skip to content

Commit 7ecf317

Browse files
feat(iapp): add bulk processing support
1 parent 62a4a3c commit 7ecf317

File tree

4 files changed

+193
-35
lines changed

4 files changed

+193
-35
lines changed

dapp/src/executeTask.js

Lines changed: 72 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { IExecDataProtectorDeserializer } from '@iexec/dataprotector-deserializer';
22
import { promises as fs } from 'fs';
3-
import path from 'node:path';
43
import { decryptContent, downloadEncryptedContent } from './decryptContent.js';
54
import sendTelegram from './telegramService.js';
65
import {
@@ -34,7 +33,7 @@ async function processProtectedData(
3433
let protectedData;
3534
try {
3635
const deserializerConfig = datasetFilename
37-
? { protectedDataPath: path.join(IEXEC_IN, datasetFilename) }
36+
? { protectedDataPath: `${IEXEC_IN}/${datasetFilename}` }
3837
: {};
3938

4039
const deserializer = new IExecDataProtectorDeserializer(deserializerConfig);
@@ -66,29 +65,30 @@ async function processProtectedData(
6665
senderName: requesterSecret.senderName,
6766
});
6867

69-
// Write individual result file
70-
const resultFileName = index > 0 ? `${datasetFilename}.txt` : 'result.txt';
71-
await writeTaskOutput(
72-
path.join(IEXEC_OUT, resultFileName),
73-
JSON.stringify(response, null, 2)
74-
);
68+
// Write individual result file only for single processing
69+
if (index === 0) {
70+
await writeTaskOutput(
71+
`${IEXEC_OUT}/result.txt`,
72+
JSON.stringify(response, null, 2)
73+
);
74+
}
7575

76-
return { index, response, resultFileName };
76+
return { index, response };
7777
}
7878

7979
async function start() {
8080
const {
8181
IEXEC_OUT,
82-
IEXEC_IN,
8382
IEXEC_APP_DEVELOPER_SECRET,
8483
IEXEC_REQUESTER_SECRET_1,
84+
IEXEC_IN,
8585
IEXEC_BULK_SLICE_SIZE,
8686
} = process.env;
8787

8888
// Check worker env
8989
const workerEnv = validateWorkerEnv({ IEXEC_OUT });
9090

91-
// Parse the app developer secret
91+
// Parse the app developer secret environment variable
9292
let appDeveloperSecret;
9393
try {
9494
appDeveloperSecret = JSON.parse(IEXEC_APP_DEVELOPER_SECRET);
@@ -97,7 +97,7 @@ async function start() {
9797
}
9898
appDeveloperSecret = validateAppSecret(appDeveloperSecret);
9999

100-
// Parse the requester secret
100+
// Parse the requester secret environment variable
101101
let requesterSecret;
102102
try {
103103
requesterSecret = IEXEC_REQUESTER_SECRET_1
@@ -109,23 +109,36 @@ async function start() {
109109
requesterSecret = validateRequesterSecret(requesterSecret);
110110

111111
const bulkSize = parseInt(IEXEC_BULK_SLICE_SIZE) || 0;
112-
113112
const results = [];
114113

115114
if (bulkSize > 0) {
116115
// Process multiple protected data
117116
for (let i = 1; i <= bulkSize; i++) {
118117
const datasetFilename = process.env[`IEXEC_DATASET_${i}_FILENAME`];
119118

120-
const result = await processProtectedData(i, {
121-
IEXEC_IN,
122-
IEXEC_OUT: workerEnv.IEXEC_OUT,
123-
appDeveloperSecret,
124-
requesterSecret,
125-
datasetFilename,
126-
});
127-
128-
results.push(result);
119+
try {
120+
const result = await processProtectedData(i, {
121+
IEXEC_IN,
122+
IEXEC_OUT: workerEnv.IEXEC_OUT,
123+
appDeveloperSecret,
124+
requesterSecret,
125+
datasetFilename,
126+
});
127+
128+
results.push(result);
129+
} catch (error) {
130+
// Create an error result for this dataset
131+
results.push({
132+
index: i,
133+
resultFileName: datasetFilename
134+
? `${datasetFilename}.txt`
135+
: `dataset-${i}.txt`,
136+
response: {
137+
status: 500,
138+
message: `Failed to process dataset ${i}: ${error.message}`,
139+
},
140+
});
141+
}
129142
}
130143
} else {
131144
// Process single protected data
@@ -139,20 +152,45 @@ async function start() {
139152
results.push(result);
140153
}
141154

142-
// Write computed.json with all results
143-
const computedOutput = {
144-
'deterministic-output-path': workerEnv.IEXEC_OUT,
145-
'bulk-results': results.map((r) => ({
146-
index: r.index,
147-
file: r.resultFileName,
148-
status: r.response.status === 200 ? 'success' : 'error',
149-
})),
150-
'total-processed': results.length,
151-
};
155+
// Generate computed.json - same format for both single and bulk
156+
157+
// Create result.txt for bulk processing (similar to single processing)
158+
if (bulkSize > 0) {
159+
const successCount = results.filter(
160+
(r) => r.response.status === 200
161+
).length;
162+
const errorCount = results.filter((r) => r.response.status !== 200).length;
163+
164+
const bulkResult = {
165+
message: `Bulk processing completed: ${successCount} successful, ${errorCount} failed`,
166+
status: 200,
167+
'total-processed': results.length,
168+
'success-count': successCount,
169+
'error-count': errorCount,
170+
'dataset-results': results.map((r) => ({
171+
index: r.index,
172+
dataset:
173+
process.env[`IEXEC_DATASET_${r.index}_FILENAME`] ||
174+
`dataset-${r.index}`,
175+
response: r.response,
176+
})),
177+
};
178+
179+
await writeTaskOutput(
180+
`${workerEnv.IEXEC_OUT}/result.txt`,
181+
JSON.stringify(bulkResult, null, 2)
182+
);
183+
}
152184

153185
await writeTaskOutput(
154-
path.join(workerEnv.IEXEC_OUT, 'computed.json'),
155-
JSON.stringify(computedOutput, null, 2)
186+
`${workerEnv.IEXEC_OUT}/computed.json`,
187+
JSON.stringify(
188+
{
189+
'deterministic-output-path': `${workerEnv.IEXEC_OUT}/result.txt`,
190+
},
191+
null,
192+
2
193+
)
156194
);
157195
}
158196

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
invalid content

dapp/tests/e2e/app.test.js

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,4 +191,119 @@ describe('sendTelegram', () => {
191191
'deterministic-output-path': `${IEXEC_OUT}/result.txt`,
192192
});
193193
});
194+
195+
describe('Bulk Processing', () => {
196+
beforeEach(() => {
197+
// Setup bulk processing environment
198+
process.env.IEXEC_BULK_SLICE_SIZE = '2';
199+
process.env.IEXEC_DATASET_1_FILENAME = 'data-chatId.zip';
200+
process.env.IEXEC_DATASET_2_FILENAME = 'data-chatId.zip';
201+
});
202+
203+
it('should process multiple datasets successfully', async () => {
204+
await expect(start()).resolves.toBeUndefined();
205+
206+
const { IEXEC_OUT } = process.env;
207+
208+
// Check individual result files are NOT created for bulk processing
209+
// Only result.txt and computed.json should exist
210+
211+
// Check result.txt (main output file)
212+
const resultTxt = await fsPromises.readFile(
213+
path.join(IEXEC_OUT, 'result.txt'),
214+
'utf-8'
215+
);
216+
217+
const result = JSON.parse(resultTxt);
218+
expect(result).toStrictEqual({
219+
message: 'Bulk processing completed: 2 successful, 0 failed',
220+
status: 200,
221+
'total-processed': 2,
222+
'success-count': 2,
223+
'error-count': 0,
224+
'dataset-results': [
225+
{
226+
index: 1,
227+
dataset: 'data-chatId.zip',
228+
response: {
229+
message: 'Your telegram message has been sent successfully.',
230+
status: 200,
231+
},
232+
},
233+
{
234+
index: 2,
235+
dataset: 'data-chatId.zip',
236+
response: {
237+
message: 'Your telegram message has been sent successfully.',
238+
status: 200,
239+
},
240+
},
241+
],
242+
});
243+
244+
// Check computed.json
245+
const computedJson = await fsPromises.readFile(
246+
path.join(IEXEC_OUT, 'computed.json'),
247+
'utf-8'
248+
);
249+
250+
const computed = JSON.parse(computedJson);
251+
expect(computed).toStrictEqual({
252+
'deterministic-output-path': `${IEXEC_OUT}/result.txt`,
253+
});
254+
});
255+
256+
it('should handle bulk processing with mixed results', async () => {
257+
process.env.IEXEC_DATASET_1_FILENAME = 'data-chatId.zip'; // Valid dataset
258+
process.env.IEXEC_DATASET_2_FILENAME = 'invalid-data.zip'; // Invalid dataset
259+
260+
await expect(start()).resolves.toBeUndefined();
261+
262+
const { IEXEC_OUT } = process.env;
263+
264+
// Check result.txt (main output file)
265+
const resultTxt = await fsPromises.readFile(
266+
path.join(IEXEC_OUT, 'result.txt'),
267+
'utf-8'
268+
);
269+
270+
const result = JSON.parse(resultTxt);
271+
expect(result).toStrictEqual({
272+
message: 'Bulk processing completed: 1 successful, 1 failed',
273+
status: 200,
274+
'total-processed': 2,
275+
'success-count': 1,
276+
'error-count': 1,
277+
'dataset-results': [
278+
{
279+
index: 1,
280+
dataset: 'data-chatId.zip',
281+
response: {
282+
message: 'Your telegram message has been sent successfully.',
283+
status: 200,
284+
},
285+
},
286+
{
287+
index: 2,
288+
dataset: 'invalid-data.zip',
289+
response: {
290+
status: 500,
291+
message: expect.stringContaining('Failed to process dataset 2'),
292+
},
293+
},
294+
],
295+
});
296+
297+
// Check computed.json
298+
const computedJson = await fsPromises.readFile(
299+
path.join(IEXEC_OUT, 'computed.json'),
300+
'utf-8'
301+
);
302+
303+
const computed = JSON.parse(computedJson);
304+
expect(computed).toStrictEqual({
305+
'deterministic-output-path': `${IEXEC_OUT}/result.txt`,
306+
});
307+
});
308+
});
194309
});

dapp/tests/unit/sendTelegram.test.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,11 @@ describe('start function', () => {
105105
);
106106
expect(fs.writeFile).toHaveBeenCalledWith(
107107
'/mock/output/computed.json',
108-
JSON.stringify({ 'deterministic-output-path': '/mock/output/result.txt' })
108+
JSON.stringify(
109+
{ 'deterministic-output-path': '/mock/output/result.txt' },
110+
null,
111+
2
112+
)
109113
);
110114
});
111115

0 commit comments

Comments
 (0)