Skip to content

Commit e2a758d

Browse files
authored
feat: implement bulk processing feature to support multiple datasets in TEE session (#310)
1 parent f01bc28 commit e2a758d

File tree

12 files changed

+456
-54
lines changed

12 files changed

+456
-54
lines changed

gradle.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# x-release-please-start-version
22
version=9.1.0
33
# x-release-please-end
4-
iexecCommonVersion=9.0.0
5-
iexecCommonsPocoVersion=5.0.0
4+
iexecCommonsPocoVersion=5.2.0
5+
iexecCommonVersion=9.1.0
66

77
nexusUser
88
nexusPassword

src/main/java/com/iexec/sms/tee/TeeController.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,8 @@ public ResponseEntity<ApiResponseBody<TeeSessionGenerationResponse, TeeSessionGe
209209
String taskId = workerpoolAuthorization.getChainTaskId();
210210
workerAddress = Keys.toChecksumAddress(workerAddress);
211211
String attestingEnclave = workerpoolAuthorization.getEnclaveChallenge();
212-
log.info("TEE session request [taskId:{}, workerAddress:{}]",
213-
taskId, workerAddress);
212+
log.info("TEE session request [taskId:{}, dealId:{}, taskIndex:{}, workerAddress:{}]",
213+
taskId, workerpoolAuthorization.getDealId(), workerpoolAuthorization.getTaskIndex(), workerAddress);
214214
try {
215215
TeeSessionGenerationResponse teeSessionGenerationResponse = teeSessionService
216216
.generateTeeSession(taskId, workerAddress, attestingEnclave);
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2025 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.iexec.sms.tee.bulk;
18+
19+
import com.iexec.commons.poco.order.DatasetOrder;
20+
import feign.Param;
21+
import feign.RequestLine;
22+
23+
import java.util.List;
24+
25+
public interface IpfsClient {
26+
@RequestLine("GET /ipfs/{cid}")
27+
List<String> readBulkCid(@Param("cid") final String cid);
28+
29+
@RequestLine("GET /ipfs/{cid}")
30+
List<DatasetOrder> readOrders(@Param("cid") final String cid);
31+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2025 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.iexec.sms.tee.bulk;
18+
19+
import com.iexec.common.utils.FeignBuilder;
20+
import feign.Logger;
21+
import jakarta.validation.constraints.NotEmpty;
22+
import lombok.Value;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.hibernate.validator.constraints.URL;
25+
import org.springframework.boot.context.properties.ConfigurationProperties;
26+
import org.springframework.context.annotation.Bean;
27+
import org.springframework.validation.annotation.Validated;
28+
29+
@Slf4j
30+
@Value
31+
@Validated
32+
@ConfigurationProperties(prefix = "ipfs")
33+
public class IpfsConfiguration {
34+
@URL
35+
@NotEmpty
36+
String gatewayUrl;
37+
38+
@Bean
39+
IpfsClient ipfsClient() {
40+
log.info("starting with ipfs located at {}", gatewayUrl);
41+
return FeignBuilder.createBuilder(Logger.Level.BASIC).target(IpfsClient.class, gatewayUrl);
42+
}
43+
}

src/main/java/com/iexec/sms/tee/session/base/SecretSessionBaseService.java

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,20 @@
1818

1919
import com.iexec.common.utils.IexecEnvUtils;
2020
import com.iexec.common.utils.IexecFileHelper;
21+
import com.iexec.commons.poco.chain.ChainDataset;
2122
import com.iexec.commons.poco.chain.DealParams;
23+
import com.iexec.commons.poco.order.DatasetOrder;
24+
import com.iexec.commons.poco.security.Signature;
2225
import com.iexec.commons.poco.task.TaskDescription;
2326
import com.iexec.commons.poco.tee.TeeEnclaveConfiguration;
27+
import com.iexec.commons.poco.utils.SignatureUtils;
28+
import com.iexec.sms.chain.IexecHubService;
2429
import com.iexec.sms.secret.compute.*;
2530
import com.iexec.sms.secret.web2.Web2Secret;
2631
import com.iexec.sms.secret.web2.Web2SecretHeader;
2732
import com.iexec.sms.secret.web2.Web2SecretService;
2833
import com.iexec.sms.secret.web3.Web3SecretService;
34+
import com.iexec.sms.tee.bulk.IpfsClient;
2935
import com.iexec.sms.tee.challenge.EthereumCredentials;
3036
import com.iexec.sms.tee.challenge.TeeChallenge;
3137
import com.iexec.sms.tee.challenge.TeeChallengeService;
@@ -39,6 +45,7 @@
3945
import org.apache.commons.lang3.StringUtils;
4046
import org.springframework.stereotype.Service;
4147

48+
import java.math.BigInteger;
4249
import java.util.*;
4350

4451
import static com.iexec.common.worker.tee.TeeSessionEnvironmentVariable.*;
@@ -57,23 +64,35 @@
5764
public class SecretSessionBaseService {
5865

5966
static final String EMPTY_STRING_VALUE = "";
67+
static final BigInteger BULK_DATASET_VOLUME = BigInteger.TWO.pow(53).subtract(BigInteger.ONE);
68+
static final String BULK_DATASET_PREFIX = "BULK_DATASET_";
69+
static final String BULK_DATASET_URL_SUFFIX = "_URL";
70+
static final String BULK_DATASET_CHECKSUM_SUFFIX = "_CHECKSUM";
71+
static final String BULK_DATASET_KEY_SUFFIX = "_KEY";
72+
static final String BULK_DATASET_FILENAME_SUFFIX = "_FILENAME";
6073
static final String IEXEC_APP_DEVELOPER_SECRET_PREFIX = "IEXEC_APP_DEVELOPER_SECRET_";
6174
static final String IEXEC_REQUESTER_SECRET_PREFIX = "IEXEC_REQUESTER_SECRET_";
6275

76+
private final IpfsClient ipfsClient;
77+
private final IexecHubService iexecHubService;
6378
private final Web3SecretService web3SecretService;
6479
private final Web2SecretService web2SecretService;
6580
private final TeeChallengeService teeChallengeService;
6681
private final TeeTaskComputeSecretService teeTaskComputeSecretService;
6782

6883
public SecretSessionBaseService(
84+
final IpfsClient ipfsClient,
85+
final IexecHubService iexecHubService,
6986
final Web3SecretService web3SecretService,
7087
final Web2SecretService web2SecretService,
7188
final TeeChallengeService teeChallengeService,
7289
final TeeTaskComputeSecretService teeTaskComputeSecretService) {
90+
this.iexecHubService = iexecHubService;
7391
this.web3SecretService = web3SecretService;
7492
this.web2SecretService = web2SecretService;
7593
this.teeChallengeService = teeChallengeService;
7694
this.teeTaskComputeSecretService = teeTaskComputeSecretService;
95+
this.ipfsClient = ipfsClient;
7796
}
7897

7998
/**
@@ -99,8 +118,7 @@ public SecretSessionBase getSecretsTokens(final TeeSessionRequest request) throw
99118
final TaskDescription taskDescription = request.getTaskDescription();
100119
final Map<String, String> signTokens = getSignTokens(request);
101120
// pre-compute
102-
final boolean isPreComputeRequired = taskDescription.containsDataset() || taskDescription.containsInputFiles();
103-
if (isPreComputeRequired) {
121+
if (taskDescription.requiresPreCompute()) {
104122
sessionBase.preCompute(getPreComputeTokens(request, signTokens));
105123
}
106124
// app
@@ -151,6 +169,71 @@ Map<String, String> getSignTokens(final TeeSessionRequest request) throws TeeSes
151169

152170
// region pre-compute
153171

172+
/**
173+
* Fetch dataset orders related to a bulk processing slice from IPFS
174+
*
175+
* @param taskDescription A task part of a bulk processing deal and corresponding to one of the slices
176+
* @return The list of {@code DatasetOrder} found in the slice, or an empty list if any issue arises
177+
*/
178+
private List<DatasetOrder> fetchDatasetOrders(final TaskDescription taskDescription) {
179+
try {
180+
final String bulkCid = taskDescription.getDealParams().getBulkCid();
181+
final int bulkSliceIndex = taskDescription.getBotIndex() - taskDescription.getBotFirstIndex();
182+
log.info("Fetching dataset orders for a bulk slice [chainTaskId:{}, bulkCid:{}, bulkSliceIndex:{}]",
183+
taskDescription.getChainTaskId(), bulkCid, bulkSliceIndex);
184+
final List<String> bulkSlices = ipfsClient.readBulkCid(bulkCid);
185+
final List<DatasetOrder> tempList = ipfsClient.readOrders(bulkSlices.get(bulkSliceIndex));
186+
return List.copyOf(tempList);
187+
} catch (final Exception e) {
188+
log.error("Error during bulk computation", e);
189+
return List.of();
190+
}
191+
}
192+
193+
private Map<String, Object> getBulkDatasetTokens(final int index,
194+
final TaskDescription taskDescription,
195+
final DatasetOrder datasetOrder) {
196+
final String prefix = BULK_DATASET_PREFIX + (index + 1);
197+
final ChainDataset dataset = iexecHubService.getChainDataset(datasetOrder.getDataset()).orElse(null);
198+
if (isBulkDatasetOrderValid(taskDescription, datasetOrder) && dataset != null) {
199+
final String datasetKey = web3SecretService.getDecryptedValue(datasetOrder.getDataset()).orElse("");
200+
return Map.of(
201+
prefix + BULK_DATASET_URL_SUFFIX, dataset.getMultiaddr(),
202+
prefix + BULK_DATASET_CHECKSUM_SUFFIX, dataset.getChecksum(),
203+
prefix + BULK_DATASET_KEY_SUFFIX, datasetKey,
204+
prefix + BULK_DATASET_FILENAME_SUFFIX, datasetOrder.getDataset()
205+
);
206+
} else {
207+
return Map.of(
208+
prefix + BULK_DATASET_URL_SUFFIX, EMPTY_STRING_VALUE,
209+
prefix + BULK_DATASET_CHECKSUM_SUFFIX, EMPTY_STRING_VALUE,
210+
prefix + BULK_DATASET_KEY_SUFFIX, EMPTY_STRING_VALUE,
211+
prefix + BULK_DATASET_FILENAME_SUFFIX, datasetOrder.getDataset()
212+
);
213+
}
214+
}
215+
216+
boolean isBulkDatasetOrderValid(final TaskDescription taskDescription, final DatasetOrder datasetOrder) {
217+
try {
218+
final Signature signature = new Signature(datasetOrder.getSign());
219+
final String orderHash = datasetOrder.computeHash(iexecHubService.getOrdersDomain());
220+
final String owner = iexecHubService.getOwner(datasetOrder.getDataset());
221+
final boolean isSignedByOwner = SignatureUtils.doesSignatureMatchesAddress(
222+
signature.getR(), signature.getS(), orderHash, owner);
223+
final BigInteger consumedVolume = iexecHubService.viewConsumed(orderHash);
224+
final boolean isVolumeValid = BULK_DATASET_VOLUME.equals(datasetOrder.getVolume());
225+
final boolean isOrderNotFullyConsumed = !BULK_DATASET_VOLUME.equals(consumedVolume);
226+
final boolean isTagValid = taskDescription.getTag().equals(datasetOrder.getTag());
227+
log.info("Check bulk dataset order [chainTaskId:{}, dataset:{}, owner:{}, isSignedByOwner:{}, isVolumeValid:{}, isOrderNotFullyConsumed:{}, isTagValid:{}]",
228+
taskDescription.getChainTaskId(), datasetOrder.getDataset(), owner, isSignedByOwner, isVolumeValid, isOrderNotFullyConsumed, isTagValid);
229+
return isSignedByOwner && isVolumeValid && isOrderNotFullyConsumed && isTagValid;
230+
} catch (Exception e) {
231+
log.error("Failed to perform all checks on dataset [chainTaskId:{}, dataset:{}]",
232+
taskDescription.getChainTaskId(), datasetOrder.getDataset());
233+
return false;
234+
}
235+
}
236+
154237
/**
155238
* Get tokens to be injected in the pre-compute enclave.
156239
*
@@ -170,6 +253,15 @@ SecretEnclaveBase getPreComputeTokens(final TeeSessionRequest request, final Map
170253
// `IS_DATASET_REQUIRED` still meaningful?
171254
tokens.put(IS_DATASET_REQUIRED.name(), taskDescription.containsDataset());
172255

256+
if (taskDescription.isBulkRequest()) {
257+
final List<DatasetOrder> orders = fetchDatasetOrders(taskDescription);
258+
tokens.put(BULK_SIZE.name(), orders.size());
259+
for (int i = 0; i < orders.size(); i++) {
260+
final DatasetOrder order = orders.get(i);
261+
tokens.putAll(getBulkDatasetTokens(i, taskDescription, order));
262+
}
263+
}
264+
173265
final List<String> trustedEnv = new ArrayList<>();
174266
if (taskDescription.containsDataset()) {
175267
final String datasetKey = web3SecretService
@@ -243,6 +335,17 @@ SecretEnclaveBase getAppTokens(final TeeSessionRequest request) throws TeeSessio
243335
tokens.putAll(computeSecrets);
244336
// trusted env variables (not confidential)
245337
tokens.putAll(IexecEnvUtils.getComputeStageEnvMap(taskDescription));
338+
339+
if (taskDescription.isBulkRequest()) {
340+
final List<String> addresses = fetchDatasetOrders(taskDescription).stream()
341+
.map(DatasetOrder::getDataset)
342+
.toList();
343+
tokens.put(BULK_SIZE.name(), addresses.size());
344+
for (int i = 0; i < addresses.size(); i++) {
345+
tokens.put(BULK_DATASET_PREFIX + (i + 1) + BULK_DATASET_FILENAME_SUFFIX, addresses.get(i));
346+
}
347+
}
348+
246349
return enclaveBase
247350
.environment(tokens)
248351
.build();

src/main/resources/application.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ chain:
4242
gas-price-multiplier: ${IEXEC_GAS_PRICE_MULTIPLIER:1.0} # txs will be sent with networkGasPrice*gasPriceMultiplier, 4.0 means superfast
4343
gas-price-cap: ${IEXEC_GAS_PRICE_CAP:22000000000} #in Wei, will be used for txs if networkGasPrice*gasPriceMultiplier > gasPriceCap
4444

45+
ipfs:
46+
gateway-url: https://ipfs.iex.ec
47+
4548
metrics:
4649
storage:
4750
refresh-interval: ${IEXEC_SMS_METRICS_STORAGE_REFRESH_INTERVAL:30} # In seconds
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2025 IEXEC BLOCKCHAIN TECH
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.iexec.sms.tee.bulk;
18+
19+
import jakarta.validation.ConstraintViolation;
20+
import jakarta.validation.Validation;
21+
import jakarta.validation.Validator;
22+
import jakarta.validation.ValidatorFactory;
23+
import org.junit.jupiter.api.BeforeEach;
24+
import org.junit.jupiter.api.Test;
25+
import org.junit.jupiter.params.ParameterizedTest;
26+
import org.junit.jupiter.params.provider.NullSource;
27+
import org.junit.jupiter.params.provider.ValueSource;
28+
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
31+
class IpsConfigurationTests {
32+
private Validator validator;
33+
34+
@BeforeEach
35+
void setUp() {
36+
try (final ValidatorFactory factory = Validation.buildDefaultValidatorFactory()) {
37+
validator = factory.getValidator();
38+
}
39+
}
40+
41+
@ParameterizedTest
42+
@NullSource
43+
@ValueSource(strings = "")
44+
void shouldNotBeValidWithEmptyValues(final String gatewayUrl) {
45+
final IpfsConfiguration config = new IpfsConfiguration(gatewayUrl);
46+
assertThat(validator.validate(config))
47+
.isNotEmpty()
48+
.extracting(ConstraintViolation::getMessage)
49+
.containsExactly("must not be empty");
50+
}
51+
52+
@ParameterizedTest
53+
@ValueSource(strings = {" ", "not-an-url"})
54+
void shouldNotBeValidWithInvalidUrls(final String gatewayUrl) {
55+
final IpfsConfiguration config = new IpfsConfiguration(gatewayUrl);
56+
assertThat(validator.validate(config))
57+
.isNotEmpty()
58+
.extracting(ConstraintViolation::getMessage)
59+
.containsExactly("must be a valid URL");
60+
}
61+
62+
@Test
63+
void shouldBeValid() {
64+
final IpfsConfiguration config = new IpfsConfiguration("http://localhost");
65+
assertThat(validator.validate(config)).isEmpty();
66+
}
67+
}

0 commit comments

Comments
 (0)