Skip to content

Commit a31b6c2

Browse files
author
Ugo Plouviez
committed
Use TaskDescription instead of AvailableReplicateModel
1 parent f1be9ff commit a31b6c2

File tree

9 files changed

+60
-193
lines changed

9 files changed

+60
-193
lines changed

src/main/java/com/iexec/worker/amnesia/AmnesiaRecoveryService.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@
33
import com.iexec.common.chain.ContributionAuthorization;
44
import com.iexec.common.notification.TaskNotification;
55
import com.iexec.common.notification.TaskNotificationType;
6-
import com.iexec.common.replicate.AvailableReplicateModel;
6+
import com.iexec.common.task.TaskDescription;
77
import com.iexec.worker.chain.IexecHubService;
8-
import com.iexec.worker.executor.TaskExecutorService;
98
import com.iexec.worker.feign.CustomFeignClient;
109
import com.iexec.worker.pubsub.SubscriptionService;
11-
import com.iexec.worker.replicate.ReplicateService;
1210
import com.iexec.worker.result.ResultService;
1311
import lombok.extern.slf4j.Slf4j;
1412
import org.springframework.stereotype.Service;
@@ -29,22 +27,16 @@ public class AmnesiaRecoveryService {
2927

3028
private CustomFeignClient customFeignClient;
3129
private SubscriptionService subscriptionService;
32-
private ReplicateService replicateService;
3330
private ResultService resultService;
34-
private TaskExecutorService taskExecutorService;
3531
private IexecHubService iexecHubService;
3632

3733
public AmnesiaRecoveryService(CustomFeignClient customFeignClient,
3834
SubscriptionService subscriptionService,
39-
ReplicateService replicateService,
4035
ResultService resultService,
41-
TaskExecutorService taskExecutorService,
4236
IexecHubService iexecHubService) {
4337
this.customFeignClient = customFeignClient;
4438
this.subscriptionService = subscriptionService;
45-
this.replicateService = replicateService;
4639
this.resultService = resultService;
47-
this.taskExecutorService = taskExecutorService;
4840
this.iexecHubService = iexecHubService;
4941
}
5042

@@ -75,29 +67,28 @@ public List<String> recoverInterruptedReplicates() {
7567
continue;
7668
}
7769

78-
Optional<AvailableReplicateModel> oReplicateModel =
79-
replicateService.retrieveAvailableReplicateModelFromContribAuth(contributionAuth);
70+
Optional<TaskDescription> optionalTaskDescription = iexecHubService.getTaskDescriptionFromChain(chainTaskId);
8071

81-
if (!oReplicateModel.isPresent()) {
72+
if (!optionalTaskDescription.isPresent()) {
8273
log.error("Could not recover task, no replicateModel retrieved [chainTaskId:{}, RecoveryAction:{}]",
8374
chainTaskId, taskNotificationType);
8475
continue;
8576
}
8677

87-
AvailableReplicateModel replicateModel = oReplicateModel.get();
88-
recoverReplicate(missedTaskNotification, replicateModel);
78+
TaskDescription taskDescription = optionalTaskDescription.get();
79+
recoverTask(missedTaskNotification, taskDescription);
8980
recoveredChainTaskIds.add(chainTaskId);
9081
}
9182

9283
return recoveredChainTaskIds;
9384
}
9485

95-
public void recoverReplicate(TaskNotification taskNotification,
96-
AvailableReplicateModel replicateModel) {
86+
public void recoverTask(TaskNotification taskNotification,
87+
TaskDescription taskDescription) {
9788
String chainTaskId = taskNotification.getChainTaskId();
9889

9990
subscriptionService.subscribeToTopic(chainTaskId);
100-
resultService.saveResultInfo(chainTaskId, replicateModel);
91+
resultService.saveResultInfo(chainTaskId, taskDescription);
10192
subscriptionService.handleTaskNotification(taskNotification);
10293
}
10394

src/main/java/com/iexec/worker/docker/DockerComputationService.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.iexec.worker.docker;
22

3-
import com.iexec.common.replicate.AvailableReplicateModel;
3+
import com.iexec.common.task.TaskDescription;
44
import com.iexec.worker.config.WorkerConfigurationService;
55
import com.spotify.docker.client.messages.ContainerConfig;
66
import lombok.extern.slf4j.Slf4j;
@@ -28,9 +28,9 @@ public DockerComputationService(CustomDockerClient dockerClient,
2828
this.configurationService = configurationService;
2929
}
3030

31-
public String dockerRunAndGetLogs(AvailableReplicateModel replicateModel, String datasetFilename) {
32-
String chainTaskId = replicateModel.getContributionAuthorization().getChainTaskId();
33-
String image = replicateModel.getAppUri();
31+
public String dockerRunAndGetLogs(TaskDescription taskDescription, String datasetFilename) {
32+
String chainTaskId = taskDescription.getChainTaskId();
33+
String image = taskDescription.getAppUri();
3434
//TODO: check image equals image:tag
3535
String stdout = "";
3636

@@ -41,17 +41,17 @@ public String dockerRunAndGetLogs(AvailableReplicateModel replicateModel, String
4141
String hostBaseVolume = configurationService.getTaskBaseDir(chainTaskId);
4242
ContainerConfig containerConfig;
4343

44-
if (replicateModel.isTrustedExecution()) {
45-
containerConfig = getContainerConfig(image, replicateModel.getCmd(), hostBaseVolume,
44+
if (taskDescription.isTrustedExecution()) {
45+
containerConfig = getContainerConfig(image, taskDescription.getCmd(), hostBaseVolume,
4646
TEE_DOCKER_ENV_CHAIN_TASKID + "=" + chainTaskId,
4747
TEE_DOCKER_ENV_WORKER_ADDRESS + "=" + configurationService.getWorkerWalletAddress(),
4848
DATASET_FILENAME + "=" + datasetFilename);
4949
} else {
50-
containerConfig = getContainerConfig(image, replicateModel.getCmd(), hostBaseVolume,
50+
containerConfig = getContainerConfig(image, taskDescription.getCmd(), hostBaseVolume,
5151
DATASET_FILENAME + "=" + datasetFilename);
5252
}
5353

54-
stdout = startComputationAndGetLogs(chainTaskId, containerConfig, replicateModel.getMaxExecutionTime());
54+
stdout = startComputationAndGetLogs(chainTaskId, containerConfig, taskDescription.getMaxExecutionTime());
5555

5656
return stdout;
5757
}

src/main/java/com/iexec/worker/executor/TaskExecutorService.java

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
import com.iexec.common.chain.ChainReceipt;
44
import com.iexec.common.chain.ContributionAuthorization;
55
import com.iexec.common.dapp.DappType;
6-
import com.iexec.common.replicate.AvailableReplicateModel;
76
import com.iexec.common.replicate.ReplicateDetails;
87
import com.iexec.common.replicate.ReplicateStatus;
98
import com.iexec.common.security.Signature;
9+
import com.iexec.common.task.TaskDescription;
1010
import com.iexec.common.utils.BytesUtils;
1111
import com.iexec.common.utils.SignatureUtils;
1212
import com.iexec.worker.chain.ContributionService;
@@ -17,13 +17,10 @@
1717
import com.iexec.worker.dataset.DatasetService;
1818
import com.iexec.worker.docker.DockerComputationService;
1919
import com.iexec.worker.feign.CustomFeignClient;
20-
import com.iexec.worker.replicate.ReplicateService;
2120
import com.iexec.worker.result.ResultService;
2221
import com.iexec.worker.sms.SmsService;
2322
import com.iexec.worker.utils.LoggingUtils;
24-
2523
import lombok.extern.slf4j.Slf4j;
26-
2724
import org.springframework.scheduling.annotation.Async;
2825
import org.springframework.stereotype.Service;
2926

@@ -54,11 +51,11 @@ public class TaskExecutorService {
5451
private IexecHubService iexecHubService;
5552
private SmsService smsService;
5653
private Web3jService web3jService;
57-
private ReplicateService replicateService;
5854

5955
// internal variables
6056
private int maxNbExecutions;
6157
private ThreadPoolExecutor executor;
58+
private String corePublicAddress;
6259

6360
public TaskExecutorService(DatasetService datasetService,
6461
DockerComputationService dockerComputationService,
@@ -69,8 +66,7 @@ public TaskExecutorService(DatasetService datasetService,
6966
WorkerConfigurationService workerConfigurationService,
7067
IexecHubService iexecHubService,
7168
SmsService smsService,
72-
Web3jService web3jService,
73-
ReplicateService replicateService) {
69+
Web3jService web3jService) {
7470
this.datasetService = datasetService;
7571
this.dockerComputationService = dockerComputationService;
7672
this.resultService = resultService;
@@ -81,10 +77,10 @@ public TaskExecutorService(DatasetService datasetService,
8177
this.iexecHubService = iexecHubService;
8278
this.smsService = smsService;
8379
this.web3jService = web3jService;
84-
this.replicateService = replicateService;
8580

8681
maxNbExecutions = Runtime.getRuntime().availableProcessors() - 1;
8782
executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxNbExecutions);
83+
corePublicAddress = customFeignClient.getPublicConfiguration().getSchedulerPublicAddress();
8884
}
8985

9086
public boolean canAcceptMoreReplicates() {
@@ -95,11 +91,10 @@ public CompletableFuture<Void> addReplicate(ContributionAuthorization contributi
9591

9692
String chainTaskId = contributionAuth.getChainTaskId();
9793

98-
Optional<AvailableReplicateModel> replicateModel =
99-
replicateService.contributionAuthToReplicate(contributionAuth);
94+
Optional<TaskDescription> taskDescriptionFromChain = iexecHubService.getTaskDescriptionFromChain(chainTaskId);
10095

10196
return CompletableFuture.supplyAsync(() -> compute(contributionAuth), executor)
102-
.thenApply(stdout -> resultService.saveResult(chainTaskId, replicateModel.get(), stdout))
97+
.thenApply(stdout -> resultService.saveResult(chainTaskId, taskDescriptionFromChain.get(), stdout))
10398
.thenAccept(isSaved -> {
10499
if (isSaved) contribute(contributionAuth);
105100
})
@@ -115,6 +110,13 @@ public CompletableFuture<Void> addReplicate(ContributionAuthorization contributi
115110
public void tryToContribute(ContributionAuthorization contributionAuth) {
116111

117112
String chainTaskId = contributionAuth.getChainTaskId();
113+
114+
if (!contributionService.isContributionAuthorizationValid(contributionAuth, corePublicAddress)) {
115+
log.error("The contribution contribAuth is NOT valid, the task will not be performed"
116+
+ " [chainTaskId:{}, contribAuth:{}]", chainTaskId, contributionAuth);
117+
return;
118+
}
119+
118120
boolean isResultAvailable = resultService.isResultAvailable(chainTaskId);
119121

120122
if (!isResultAvailable) {
@@ -142,31 +144,30 @@ private String compute(ContributionAuthorization contributionAuth) {
142144
throw new UnsupportedOperationException("Task needs TEE, I don't support it");
143145
}
144146

145-
Optional<AvailableReplicateModel> optionalAvailableReplicateModel =
146-
replicateService.contributionAuthToReplicate(contributionAuth);
147+
Optional<TaskDescription> taskDescriptionFromChain = iexecHubService.getTaskDescriptionFromChain(chainTaskId);
147148

148-
if (!optionalAvailableReplicateModel.isPresent()){
149+
if (!taskDescriptionFromChain.isPresent()){
149150
stdout = "AvailableReplicateModel not found";
150151
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
151152
return stdout;
152153
}
153154

154-
AvailableReplicateModel availableReplicateModel = optionalAvailableReplicateModel.get();
155+
TaskDescription taskDescription = taskDescriptionFromChain.get();
155156

156157
// check app type
157158
customFeignClient.updateReplicateStatus(chainTaskId, RUNNING);
158-
if (!availableReplicateModel.getAppType().equals(DappType.DOCKER)) {
159+
if (!taskDescription.getAppType().equals(DappType.DOCKER)) {
159160
stdout = "Application is not of type Docker";
160161
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
161162
return stdout;
162163
}
163164

164165
// pull app
165166
customFeignClient.updateReplicateStatus(chainTaskId, APP_DOWNLOADING);
166-
boolean isAppDownloaded = dockerComputationService.dockerPull(chainTaskId, availableReplicateModel.getAppUri());
167+
boolean isAppDownloaded = dockerComputationService.dockerPull(chainTaskId, taskDescription.getAppUri());
167168
if (!isAppDownloaded) {
168169
customFeignClient.updateReplicateStatus(chainTaskId, APP_DOWNLOAD_FAILED);
169-
stdout = "Failed to pull application image, URI:" + availableReplicateModel.getAppUri();
170+
stdout = "Failed to pull application image, URI:" + taskDescription.getAppUri();
170171
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
171172
return stdout;
172173
}
@@ -175,10 +176,10 @@ private String compute(ContributionAuthorization contributionAuth) {
175176

176177
// pull data
177178
customFeignClient.updateReplicateStatus(chainTaskId, DATA_DOWNLOADING);
178-
boolean isDatasetDownloaded = datasetService.downloadDataset(chainTaskId, availableReplicateModel.getDatasetUri());
179+
boolean isDatasetDownloaded = datasetService.downloadDataset(chainTaskId, taskDescription.getDatasetUri());
179180
if (!isDatasetDownloaded) {
180181
customFeignClient.updateReplicateStatus(chainTaskId, DATA_DOWNLOAD_FAILED);
181-
stdout = "Failed to pull dataset, URI:" + availableReplicateModel.getDatasetUri();
182+
stdout = "Failed to pull dataset, URI:" + taskDescription.getDatasetUri();
182183
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
183184
return stdout;
184185
}
@@ -197,19 +198,19 @@ private String compute(ContributionAuthorization contributionAuth) {
197198
boolean isDatasetDecrypted = false;
198199

199200
if (isDatasetDecryptionNeeded) {
200-
isDatasetDecrypted = datasetService.decryptDataset(chainTaskId, availableReplicateModel.getDatasetUri());
201+
isDatasetDecrypted = datasetService.decryptDataset(chainTaskId, taskDescription.getDatasetUri());
201202
}
202203

203204
if (isDatasetDecryptionNeeded && !isDatasetDecrypted) {
204205
customFeignClient.updateReplicateStatus(chainTaskId, COMPUTE_FAILED);
205-
stdout = "Failed to decrypt dataset, URI:" + availableReplicateModel.getDatasetUri();
206+
stdout = "Failed to decrypt dataset, URI:" + taskDescription.getDatasetUri();
206207
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
207208
return stdout;
208209
}
209210

210211
// compute
211-
String datasetFilename = datasetService.getDatasetFilename(availableReplicateModel.getDatasetUri());
212-
stdout = dockerComputationService.dockerRunAndGetLogs(availableReplicateModel, datasetFilename);
212+
String datasetFilename = datasetService.getDatasetFilename(taskDescription.getDatasetUri());
213+
stdout = dockerComputationService.dockerRunAndGetLogs(taskDescription, datasetFilename);
213214

214215
if (stdout.isEmpty()) {
215216
customFeignClient.updateReplicateStatus(chainTaskId, COMPUTE_FAILED);

src/main/java/com/iexec/worker/pubsub/SubscriptionService.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,9 @@
33
import com.iexec.common.chain.ContributionAuthorization;
44
import com.iexec.common.notification.TaskNotification;
55
import com.iexec.common.notification.TaskNotificationType;
6-
import com.iexec.common.replicate.AvailableReplicateModel;
76
import com.iexec.worker.config.CoreConfigurationService;
87
import com.iexec.worker.config.WorkerConfigurationService;
98
import com.iexec.worker.executor.TaskExecutorService;
10-
import com.iexec.worker.replicate.ReplicateService;
119
import lombok.extern.slf4j.Slf4j;
1210
import org.springframework.http.ResponseEntity;
1311
import org.springframework.lang.Nullable;
@@ -27,7 +25,10 @@
2725

2826
import javax.annotation.PostConstruct;
2927
import java.lang.reflect.Type;
30-
import java.util.*;
28+
import java.util.ArrayList;
29+
import java.util.Arrays;
30+
import java.util.List;
31+
import java.util.Map;
3132
import java.util.concurrent.ConcurrentHashMap;
3233

3334

@@ -39,7 +40,6 @@ public class SubscriptionService extends StompSessionHandlerAdapter {
3940
private final int corePort;
4041
private final String workerWalletAddress;
4142
private RestTemplate restTemplate;
42-
private ReplicateService replicateService;
4343
// external services
4444
private TaskExecutorService taskExecutorService;
4545

@@ -52,15 +52,13 @@ public class SubscriptionService extends StompSessionHandlerAdapter {
5252
public SubscriptionService(CoreConfigurationService coreConfigurationService,
5353
WorkerConfigurationService workerConfigurationService,
5454
TaskExecutorService taskExecutorService,
55-
RestTemplate restTemplate,
56-
ReplicateService replicateService) {
55+
RestTemplate restTemplate) {
5756
this.taskExecutorService = taskExecutorService;
5857

5958
this.coreHost = coreConfigurationService.getHost();
6059
this.corePort = coreConfigurationService.getPort();
6160
this.workerWalletAddress = workerConfigurationService.getWorkerWalletAddress();
6261
this.restTemplate = restTemplate;
63-
this.replicateService = replicateService;
6462

6563
chainTaskIdToSubscription = new ConcurrentHashMap<>();
6664
url = "http://" + coreHost + ":" + corePort + "/connect";

src/main/java/com/iexec/worker/replicate/ReplicateDemandService.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,22 @@
11
package com.iexec.worker.replicate;
22

3-
import java.util.Collections;
4-
import java.util.Optional;
5-
63
import com.iexec.common.chain.ContributionAuthorization;
74
import com.iexec.common.notification.TaskNotification;
85
import com.iexec.common.notification.TaskNotificationExtra;
96
import com.iexec.common.notification.TaskNotificationType;
10-
import com.iexec.common.replicate.AvailableReplicateModel;
117
import com.iexec.worker.chain.ContributionService;
128
import com.iexec.worker.chain.IexecHubService;
139
import com.iexec.worker.executor.TaskExecutorService;
1410
import com.iexec.worker.feign.CustomFeignClient;
1511
import com.iexec.worker.pubsub.SubscriptionService;
16-
1712
import lombok.extern.slf4j.Slf4j;
1813
import org.springframework.beans.factory.annotation.Autowired;
1914
import org.springframework.scheduling.annotation.Scheduled;
2015
import org.springframework.stereotype.Service;
2116

17+
import java.util.Collections;
18+
import java.util.Optional;
19+
2220

2321
@Slf4j
2422
@Service
@@ -28,21 +26,18 @@ public class ReplicateDemandService {
2826
private TaskExecutorService taskExecutorService;
2927
private IexecHubService iexecHubService;
3028
private SubscriptionService subscriptionService;
31-
private ReplicateService replicateService;
3229
private ContributionService contributionService;
3330

3431
@Autowired
3532
public ReplicateDemandService(TaskExecutorService taskExecutorService,
3633
IexecHubService iexecHubService,
3734
CustomFeignClient customFeignClient,
3835
SubscriptionService subscriptionService,
39-
ReplicateService replicateService,
4036
ContributionService contributionService) {
4137
this.customFeignClient = customFeignClient;
4238
this.taskExecutorService = taskExecutorService;
4339
this.iexecHubService = iexecHubService;
4440
this.subscriptionService = subscriptionService;
45-
this.replicateService = replicateService;
4641
this.contributionService = contributionService;
4742
}
4843

0 commit comments

Comments
 (0)