Skip to content

Commit 62a4a3c

Browse files
feat(iapp): add bulk processing support
1 parent e73755e commit 62a4a3c

File tree

1 file changed

+104
-33
lines changed

1 file changed

+104
-33
lines changed

dapp/src/executeTask.js

Lines changed: 104 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { IExecDataProtectorDeserializer } from '@iexec/dataprotector-deserializer';
22
import { promises as fs } from 'fs';
3+
import path from 'node:path';
34
import { decryptContent, downloadEncryptedContent } from './decryptContent.js';
45
import sendTelegram from './telegramService.js';
56
import {
@@ -19,45 +20,35 @@ async function writeTaskOutput(path, message) {
1920
}
2021
}
2122

22-
async function start() {
23-
const { IEXEC_OUT, IEXEC_APP_DEVELOPER_SECRET, IEXEC_REQUESTER_SECRET_1 } =
24-
process.env;
25-
26-
// Check worker env
27-
const workerEnv = validateWorkerEnv({ IEXEC_OUT });
28-
// Parse the app developer secret environment variable
29-
let appDeveloperSecret;
30-
try {
31-
appDeveloperSecret = JSON.parse(IEXEC_APP_DEVELOPER_SECRET);
32-
} catch {
33-
throw Error('Failed to parse the developer secret');
23+
async function processProtectedData(
24+
index,
25+
{
26+
IEXEC_IN,
27+
IEXEC_OUT,
28+
appDeveloperSecret,
29+
requesterSecret,
30+
datasetFilename = null,
3431
}
35-
appDeveloperSecret = validateAppSecret(appDeveloperSecret);
36-
37-
// Parse the requester secret environment variable
38-
let requesterSecret;
32+
) {
33+
// Parse the protected data
34+
let protectedData;
3935
try {
40-
requesterSecret = IEXEC_REQUESTER_SECRET_1
41-
? JSON.parse(IEXEC_REQUESTER_SECRET_1)
36+
const deserializerConfig = datasetFilename
37+
? { protectedDataPath: path.join(IEXEC_IN, datasetFilename) }
4238
: {};
43-
} catch {
44-
throw Error('Failed to parse requester secret');
45-
}
46-
requesterSecret = validateRequesterSecret(requesterSecret);
4739

48-
// Parse the protected data and get the requester secret (chatId)
49-
let protectedData;
50-
try {
51-
const deserializer = new IExecDataProtectorDeserializer();
40+
const deserializer = new IExecDataProtectorDeserializer(deserializerConfig);
5241
protectedData = {
5342
chatId: await deserializer.getValue('telegram_chatId', 'string'),
5443
};
5544
} catch (e) {
56-
throw Error(`Failed to parse ProtectedData: ${e.message}`);
45+
throw Error(`Failed to parse ProtectedData ${index}: ${e.message}`);
5746
}
58-
// Validate the protected data (chatId)
47+
48+
// Validate the protected data
5949
validateProtectedData(protectedData);
6050

51+
// Download and decrypt content
6152
const encryptedTelegramContent = await downloadEncryptedContent(
6253
requesterSecret.telegramContentMultiAddr
6354
);
@@ -67,21 +58,101 @@ async function start() {
6758
requesterSecret.telegramContentEncryptionKey
6859
);
6960

61+
// Send telegram message
7062
const response = await sendTelegram({
7163
chatId: protectedData.chatId,
7264
message: telegramContent,
7365
botToken: appDeveloperSecret.TELEGRAM_BOT_TOKEN,
7466
senderName: requesterSecret.senderName,
7567
});
68+
69+
// Write individual result file
70+
const resultFileName = index > 0 ? `${datasetFilename}.txt` : 'result.txt';
7671
await writeTaskOutput(
77-
`${workerEnv.IEXEC_OUT}/result.txt`,
72+
path.join(IEXEC_OUT, resultFileName),
7873
JSON.stringify(response, null, 2)
7974
);
75+
76+
return { index, response, resultFileName };
77+
}
78+
79+
async function start() {
80+
const {
81+
IEXEC_OUT,
82+
IEXEC_IN,
83+
IEXEC_APP_DEVELOPER_SECRET,
84+
IEXEC_REQUESTER_SECRET_1,
85+
IEXEC_BULK_SLICE_SIZE,
86+
} = process.env;
87+
88+
// Check worker env
89+
const workerEnv = validateWorkerEnv({ IEXEC_OUT });
90+
91+
// Parse the app developer secret
92+
let appDeveloperSecret;
93+
try {
94+
appDeveloperSecret = JSON.parse(IEXEC_APP_DEVELOPER_SECRET);
95+
} catch {
96+
throw Error('Failed to parse the developer secret');
97+
}
98+
appDeveloperSecret = validateAppSecret(appDeveloperSecret);
99+
100+
// Parse the requester secret
101+
let requesterSecret;
102+
try {
103+
requesterSecret = IEXEC_REQUESTER_SECRET_1
104+
? JSON.parse(IEXEC_REQUESTER_SECRET_1)
105+
: {};
106+
} catch {
107+
throw Error('Failed to parse requester secret');
108+
}
109+
requesterSecret = validateRequesterSecret(requesterSecret);
110+
111+
const bulkSize = parseInt(IEXEC_BULK_SLICE_SIZE) || 0;
112+
113+
const results = [];
114+
115+
if (bulkSize > 0) {
116+
// Process multiple protected data
117+
for (let i = 1; i <= bulkSize; i++) {
118+
const datasetFilename = process.env[`IEXEC_DATASET_${i}_FILENAME`];
119+
120+
const result = await processProtectedData(i, {
121+
IEXEC_IN,
122+
IEXEC_OUT: workerEnv.IEXEC_OUT,
123+
appDeveloperSecret,
124+
requesterSecret,
125+
datasetFilename,
126+
});
127+
128+
results.push(result);
129+
}
130+
} else {
131+
// Process single protected data
132+
const result = await processProtectedData(0, {
133+
IEXEC_IN,
134+
IEXEC_OUT: workerEnv.IEXEC_OUT,
135+
appDeveloperSecret,
136+
requesterSecret,
137+
});
138+
139+
results.push(result);
140+
}
141+
142+
// Write computed.json with all results
143+
const computedOutput = {
144+
'deterministic-output-path': workerEnv.IEXEC_OUT,
145+
'bulk-results': results.map((r) => ({
146+
index: r.index,
147+
file: r.resultFileName,
148+
status: r.response.status === 200 ? 'success' : 'error',
149+
})),
150+
'total-processed': results.length,
151+
};
152+
80153
await writeTaskOutput(
81-
`${workerEnv.IEXEC_OUT}/computed.json`,
82-
JSON.stringify({
83-
'deterministic-output-path': `${workerEnv.IEXEC_OUT}/result.txt`,
84-
})
154+
path.join(workerEnv.IEXEC_OUT, 'computed.json'),
155+
JSON.stringify(computedOutput, null, 2)
85156
);
86157
}
87158

0 commit comments

Comments
 (0)