Skip to content

Commit 15d0936

Browse files
feat(iapp)!: support bulk processing (#82)
BREAKING CHANGE: result file renamed to `result.json`; single protectedData result file now contains `{"success": <boolean>, "protectedData"?: <address>, "error"?: <string> }`; upon error, the iapp will now exit 0 and output `"success": false` and `"error": <string>` rather than falling the task. Co-authored-by: Pierre Jeanjacquot <[email protected]>
1 parent 5a74743 commit 15d0936

File tree

14 files changed

+7127
-11309
lines changed

14 files changed

+7127
-11309
lines changed

dapp/README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ fill in the environment variables:
1212

1313
- **IEXEC_IN**: The path to the input directory on your local machine where the unencrypted data .zip file will be stored. This file contains the telegram chat ID address to which the message will be sent.
1414
- **IEXEC_OUT**: The path on your local machine where the result of the Dapp execution will be written.
15-
- **IEXEC_DATASET_FILENAME**: The name of the data file that you place in the **IEXEC_IN** directory.
15+
- data to process
16+
- either **IEXEC_DATASET_FILENAME**: The name of the data file that you place in the **IEXEC_IN** directory.
17+
- or multiple datasets using:
18+
- **IEXEC_BULK_SLICE_SIZE**
19+
- **IEXEC_DATASET\_\<index\>\_FILENAME**: The name of the data file for dataset at index <index> (starting from 1) that you place in the **IEXEC_IN** directory.
1620
- **IEXEC_APP_DEVELOPER_SECRET**: A JSON string with the following keys:
1721
- **TELEGRAM_BOT_TOKEN**: The API key of the telegram bot used to send the message.
1822
- **IEXEC_REQUESTER_SECRET_1**: A JSON string with the following keys:

dapp/package-lock.json

Lines changed: 6565 additions & 11000 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dapp/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
"eslint-config-prettier": "^9.0.0",
3434
"eslint-plugin-import": "^2.28.1",
3535
"eslint-plugin-sonarjs": "^0.21.0",
36-
"iexec": "^8.2.1",
36+
"iexec": "^8.22.0",
3737
"jest": "^29.7.0",
3838
"prettier": "^2.8.8"
3939
}

dapp/src/decryptContent.js

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,40 +16,43 @@ export const downloadEncryptedContent = async (
1616
}
1717
const arrayBuffer = await res.arrayBuffer();
1818
return new Uint8Array(arrayBuffer);
19-
} catch (error) {
20-
console.error('Error downloading encrypted content:', error);
21-
throw error;
19+
} catch {
20+
throw new Error('Failed to download encrypted content');
2221
}
2322
};
2423

2524
export const decryptContent = (encryptedContent, encryptionKey) => {
26-
const ivBytes = encryptedContent.slice(0, 16);
27-
let ciphertextBytes = encryptedContent.slice(16);
28-
const key = forge.util.createBuffer(Buffer.from(encryptionKey, 'base64'));
29-
const decipher = forge.cipher.createDecipher('AES-CBC', key);
30-
31-
decipher.start({ iv: forge.util.createBuffer(ivBytes) });
25+
try {
26+
const ivBytes = encryptedContent.slice(0, 16);
27+
let ciphertextBytes = encryptedContent.slice(16);
28+
const key = forge.util.createBuffer(Buffer.from(encryptionKey, 'base64'));
29+
const decipher = forge.cipher.createDecipher('AES-CBC', key);
30+
31+
decipher.start({ iv: forge.util.createBuffer(ivBytes) });
32+
33+
const CHUNK_SIZE = 10 * 1000 * 1000;
34+
let decryptedBuffer = Buffer.from([]);
35+
36+
while (ciphertextBytes.length > 0) {
37+
// flush the decipher buffer
38+
decryptedBuffer = Buffer.concat([
39+
decryptedBuffer,
40+
Buffer.from(decipher.output.getBytes(), 'binary'),
41+
]);
42+
const chunk = ciphertextBytes.slice(0, CHUNK_SIZE);
43+
ciphertextBytes = ciphertextBytes.slice(CHUNK_SIZE);
44+
decipher.update(forge.util.createBuffer(chunk));
45+
}
3246

33-
const CHUNK_SIZE = 10 * 1000 * 1000;
34-
let decryptedBuffer = Buffer.from([]);
47+
decipher.finish();
3548

36-
while (ciphertextBytes.length > 0) {
37-
// flush the decipher buffer
3849
decryptedBuffer = Buffer.concat([
3950
decryptedBuffer,
4051
Buffer.from(decipher.output.getBytes(), 'binary'),
4152
]);
42-
const chunk = ciphertextBytes.slice(0, CHUNK_SIZE);
43-
ciphertextBytes = ciphertextBytes.slice(CHUNK_SIZE);
44-
decipher.update(forge.util.createBuffer(chunk));
45-
}
4653

47-
decipher.finish();
48-
49-
decryptedBuffer = Buffer.concat([
50-
decryptedBuffer,
51-
Buffer.from(decipher.output.getBytes(), 'binary'),
52-
]);
53-
54-
return decryptedBuffer.toString();
54+
return decryptedBuffer.toString();
55+
} catch {
56+
throw new Error('Failed to decrypt content');
57+
}
5558
};

dapp/src/executeTask.js

Lines changed: 128 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { IExecDataProtectorDeserializer } from '@iexec/dataprotector-deserializer';
21
import { promises as fs } from 'fs';
2+
import { IExecDataProtectorDeserializer } from '@iexec/dataprotector-deserializer';
33
import { decryptContent, downloadEncryptedContent } from './decryptContent.js';
44
import sendTelegram from './telegramService.js';
55
import {
@@ -9,79 +9,148 @@ import {
99
validateWorkerEnv,
1010
} from './validation.js';
1111

12-
async function writeTaskOutput(path, message) {
12+
async function processProtectedData({
13+
index,
14+
IEXEC_IN,
15+
appDeveloperSecret,
16+
requesterSecret,
17+
}) {
18+
const datasetFilename =
19+
index > 0
20+
? process.env[`IEXEC_DATASET_${index}_FILENAME`]
21+
: process.env.IEXEC_DATASET_FILENAME;
22+
const result = { index, protectedData: datasetFilename };
1323
try {
14-
await fs.writeFile(path, message);
15-
console.log(`File successfully written at path: ${path}`);
16-
} catch {
17-
console.error('Failed to write Task Output');
18-
process.exit(1);
24+
let protectedData;
25+
try {
26+
const deserializerConfig = datasetFilename
27+
? { protectedDataPath: `${IEXEC_IN}/${datasetFilename}` }
28+
: {};
29+
30+
const deserializer = new IExecDataProtectorDeserializer(
31+
deserializerConfig
32+
);
33+
protectedData = {
34+
chatId: await deserializer.getValue('telegram_chatId', 'string'),
35+
};
36+
} catch (e) {
37+
throw Error(`Failed to parse ProtectedData ${index}: ${e.message}`);
38+
}
39+
40+
validateProtectedData(protectedData);
41+
42+
const encryptedTelegramContent = await downloadEncryptedContent(
43+
requesterSecret.telegramContentMultiAddr
44+
);
45+
46+
const telegramContent = decryptContent(
47+
encryptedTelegramContent,
48+
requesterSecret.telegramContentEncryptionKey
49+
);
50+
51+
await sendTelegram({
52+
chatId: protectedData.chatId,
53+
message: telegramContent,
54+
botToken: appDeveloperSecret.TELEGRAM_BOT_TOKEN,
55+
senderName: requesterSecret.senderName,
56+
});
57+
result.success = true;
58+
} catch (e) {
59+
result.success = false;
60+
result.error = e.message;
1961
}
62+
console.log(`Protected data ${index} processed:`, result);
63+
return result;
2064
}
2165

2266
async function start() {
23-
const { IEXEC_OUT, IEXEC_APP_DEVELOPER_SECRET, IEXEC_REQUESTER_SECRET_1 } =
24-
process.env;
67+
const {
68+
IEXEC_OUT,
69+
IEXEC_APP_DEVELOPER_SECRET,
70+
IEXEC_REQUESTER_SECRET_1,
71+
IEXEC_IN,
72+
IEXEC_BULK_SLICE_SIZE,
73+
} = process.env;
2574

26-
// Check worker env
2775
const workerEnv = validateWorkerEnv({ IEXEC_OUT });
28-
// Parse the app developer secret environment variable
29-
let appDeveloperSecret;
30-
try {
31-
appDeveloperSecret = JSON.parse(IEXEC_APP_DEVELOPER_SECRET);
32-
} catch {
33-
throw Error('Failed to parse the developer secret');
34-
}
35-
appDeveloperSecret = validateAppSecret(appDeveloperSecret);
3676

37-
// Parse the requester secret environment variable
38-
let requesterSecret;
77+
let result; // { success: boolean, error?: string , protectedData?: string, results?: { index: number, protectedData: string, success: boolean, error?: string }[] }
3978
try {
40-
requesterSecret = IEXEC_REQUESTER_SECRET_1
41-
? JSON.parse(IEXEC_REQUESTER_SECRET_1)
42-
: {};
43-
} catch {
44-
throw Error('Failed to parse requester secret');
45-
}
46-
requesterSecret = validateRequesterSecret(requesterSecret);
79+
let appDeveloperSecret;
80+
try {
81+
appDeveloperSecret = JSON.parse(IEXEC_APP_DEVELOPER_SECRET);
82+
} catch {
83+
throw Error('Failed to parse the developer secret');
84+
}
85+
appDeveloperSecret = validateAppSecret(appDeveloperSecret);
4786

48-
// Parse the protected data and get the requester secret (chatId)
49-
let protectedData;
50-
try {
51-
const deserializer = new IExecDataProtectorDeserializer();
52-
protectedData = {
53-
chatId: await deserializer.getValue('telegram_chatId', 'string'),
54-
};
55-
} catch (e) {
56-
throw Error(`Failed to parse ProtectedData: ${e.message}`);
57-
}
58-
// Validate the protected data (chatId)
59-
validateProtectedData(protectedData);
87+
let requesterSecret;
88+
try {
89+
requesterSecret = JSON.parse(IEXEC_REQUESTER_SECRET_1);
90+
} catch {
91+
throw Error('Failed to parse requester secret');
92+
}
6093

61-
const encryptedTelegramContent = await downloadEncryptedContent(
62-
requesterSecret.telegramContentMultiAddr
63-
);
94+
requesterSecret = validateRequesterSecret(requesterSecret);
6495

65-
const telegramContent = decryptContent(
66-
encryptedTelegramContent,
67-
requesterSecret.telegramContentEncryptionKey
68-
);
96+
const bulkSize = parseInt(IEXEC_BULK_SLICE_SIZE, 10) || 0;
97+
98+
// Process multiple protected data
99+
if (bulkSize > 0) {
100+
console.log(`Processing ${bulkSize} protected data...`);
101+
const processPromises = new Array(bulkSize).fill(null).map((_, index) =>
102+
processProtectedData({
103+
index: index + 1,
104+
IEXEC_IN,
105+
appDeveloperSecret,
106+
requesterSecret,
107+
})
108+
);
109+
const results = await Promise.all(processPromises);
110+
const successCount = results.filter((r) => r.success === true).length;
111+
const errorCount = results.filter((r) => r.success !== true).length;
112+
result = {
113+
success: errorCount === 0,
114+
error: errorCount > 0 ? 'Partial failure' : undefined,
115+
totalCount: results.length,
116+
successCount,
117+
errorCount,
118+
results: results.map((r) => ({
119+
index: r.index,
120+
protectedData: r.protectedData,
121+
success: r.success,
122+
error: r.error,
123+
})),
124+
};
125+
} else {
126+
console.log('Processing single protected data...');
127+
const { protectedData, success, error } = await processProtectedData({
128+
index: 0,
129+
IEXEC_IN,
130+
appDeveloperSecret,
131+
requesterSecret,
132+
});
133+
result = { protectedData, success, error };
134+
}
135+
} catch (e) {
136+
console.error('Something went wrong:', e.message);
137+
result = { success: false, error: e.message };
138+
}
69139

70-
const response = await sendTelegram({
71-
chatId: protectedData.chatId,
72-
message: telegramContent,
73-
botToken: appDeveloperSecret.TELEGRAM_BOT_TOKEN,
74-
senderName: requesterSecret.senderName,
75-
});
76-
await writeTaskOutput(
77-
`${workerEnv.IEXEC_OUT}/result.txt`,
78-
JSON.stringify(response, null, 2)
140+
console.log('Writing results:', JSON.stringify(result));
141+
await fs.writeFile(
142+
`${workerEnv.IEXEC_OUT}/result.json`,
143+
JSON.stringify(result, null, 2)
79144
);
80-
await writeTaskOutput(
145+
await fs.writeFile(
81146
`${workerEnv.IEXEC_OUT}/computed.json`,
82-
JSON.stringify({
83-
'deterministic-output-path': `${workerEnv.IEXEC_OUT}/result.txt`,
84-
})
147+
JSON.stringify(
148+
{
149+
'deterministic-output-path': `${workerEnv.IEXEC_OUT}/result.json`,
150+
},
151+
null,
152+
2
153+
)
85154
);
86155
}
87156

dapp/src/telegramService.js

Lines changed: 19 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,49 +4,27 @@ async function sendTelegram({
44
botToken,
55
senderName = 'Web3Telegram Dapp',
66
}) {
7-
if (!botToken || botToken.trim() === '')
8-
throw new Error('Bot token is required');
9-
if (!chatId || chatId.trim() === '') throw new Error('Chat ID is required');
10-
if (!message || message.trim() === '')
11-
throw new Error('Message content is required');
12-
137
const messageToSend = `Message from: ${senderName}\n${message}`;
14-
15-
try {
16-
const response = await fetch(
17-
`https://api.telegram.org/bot${botToken}/sendMessage`,
18-
{
19-
method: 'POST',
20-
headers: {
21-
'Content-Type': 'application/json',
22-
},
23-
body: JSON.stringify({
24-
chat_id: chatId,
25-
text: messageToSend,
26-
parse_mode: 'HTML',
27-
}),
28-
}
29-
);
30-
31-
if (!response.ok) {
32-
console.error('Telegram API Error');
33-
return {
34-
message: 'Failed to send Telegram message.',
35-
status: response.status,
36-
};
8+
const response = await fetch(
9+
`https://api.telegram.org/bot${botToken}/sendMessage`,
10+
{
11+
method: 'POST',
12+
headers: {
13+
'Content-Type': 'application/json',
14+
},
15+
body: JSON.stringify({
16+
chat_id: chatId,
17+
text: messageToSend,
18+
parse_mode: 'HTML',
19+
}),
3720
}
38-
39-
console.log('Message successfully sent by Telegram bot.');
40-
return {
41-
message: 'Your telegram message has been sent successfully.',
42-
status: 200,
43-
};
44-
} catch (error) {
45-
console.error('Failed to send Telegram message');
46-
return {
47-
message: 'Failed to send Telegram message.',
48-
status: 500,
49-
};
21+
).catch(() => {
22+
throw new Error('Failed to reach Telegram bot API');
23+
});
24+
if (!response.ok) {
25+
throw new Error(
26+
`Failed to send Telegram message, bot API answered with status: ${response.status}`
27+
);
5028
}
5129
}
5230

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
invalid content
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
{"deterministic-output-path":"./tests/_test_outputs_/iexec_out/result.txt"}
1+
{
2+
"deterministic-output-path": "./tests/_test_outputs_/iexec_out/result.json"
3+
}

0 commit comments

Comments
 (0)