Skip to content

Commit f1be9ff

Browse files
author
Jérémy James Toussaint
committed
Compute with ContributionAuthorization
1 parent 57bc15c commit f1be9ff

File tree

2 files changed

+38
-24
lines changed

2 files changed

+38
-24
lines changed

src/main/java/com/iexec/worker/executor/TaskExecutorService.java

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.iexec.worker.dataset.DatasetService;
1818
import com.iexec.worker.docker.DockerComputationService;
1919
import com.iexec.worker.feign.CustomFeignClient;
20+
import com.iexec.worker.replicate.ReplicateService;
2021
import com.iexec.worker.result.ResultService;
2122
import com.iexec.worker.sms.SmsService;
2223
import com.iexec.worker.utils.LoggingUtils;
@@ -53,6 +54,7 @@ public class TaskExecutorService {
5354
private IexecHubService iexecHubService;
5455
private SmsService smsService;
5556
private Web3jService web3jService;
57+
private ReplicateService replicateService;
5658

5759
// internal variables
5860
private int maxNbExecutions;
@@ -67,7 +69,8 @@ public TaskExecutorService(DatasetService datasetService,
6769
WorkerConfigurationService workerConfigurationService,
6870
IexecHubService iexecHubService,
6971
SmsService smsService,
70-
Web3jService web3jService) {
72+
Web3jService web3jService,
73+
ReplicateService replicateService) {
7174
this.datasetService = datasetService;
7275
this.dockerComputationService = dockerComputationService;
7376
this.resultService = resultService;
@@ -78,6 +81,7 @@ public TaskExecutorService(DatasetService datasetService,
7881
this.iexecHubService = iexecHubService;
7982
this.smsService = smsService;
8083
this.web3jService = web3jService;
84+
this.replicateService = replicateService;
8185

8286
maxNbExecutions = Runtime.getRuntime().availableProcessors() - 1;
8387
executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxNbExecutions);
@@ -87,12 +91,15 @@ public boolean canAcceptMoreReplicates() {
8791
return executor.getActiveCount() < maxNbExecutions;
8892
}
8993

90-
public CompletableFuture<Void> addReplicate(AvailableReplicateModel replicateModel) {
91-
ContributionAuthorization contributionAuth = replicateModel.getContributionAuthorization();
94+
public CompletableFuture<Void> addReplicate(ContributionAuthorization contributionAuth) {
95+
9296
String chainTaskId = contributionAuth.getChainTaskId();
9397

94-
return CompletableFuture.supplyAsync(() -> compute(replicateModel), executor)
95-
.thenApply(stdout -> resultService.saveResult(chainTaskId, replicateModel, stdout))
98+
Optional<AvailableReplicateModel> replicateModel =
99+
replicateService.contributionAuthToReplicate(contributionAuth);
100+
101+
return CompletableFuture.supplyAsync(() -> compute(contributionAuth), executor)
102+
.thenApply(stdout -> resultService.saveResult(chainTaskId, replicateModel.get(), stdout))
96103
.thenAccept(isSaved -> {
97104
if (isSaved) contribute(contributionAuth);
98105
})
@@ -105,24 +112,22 @@ public CompletableFuture<Void> addReplicate(AvailableReplicateModel replicateMod
105112
}
106113

107114

108-
public void tryToContribute(ContributionAuthorization contributionAuth,
109-
AvailableReplicateModel replicateModel) {
115+
public void tryToContribute(ContributionAuthorization contributionAuth) {
110116

111117
String chainTaskId = contributionAuth.getChainTaskId();
112118
boolean isResultAvailable = resultService.isResultAvailable(chainTaskId);
113119

114120
if (!isResultAvailable) {
115121
log.info("Result not found, will restart task from RUNNING [chainTaskId:{}]", chainTaskId);
116-
addReplicate(replicateModel);
122+
addReplicate(contributionAuth);
117123
} else {
118124
log.info("Result found, will restart task from CONTRIBUTING [chainTaskId:{}]", chainTaskId);
119125
contribute(contributionAuth);
120126
}
121127
}
122128

123129
@Async
124-
private String compute(AvailableReplicateModel replicateModel) {
125-
ContributionAuthorization contributionAuth = replicateModel.getContributionAuthorization();
130+
private String compute(ContributionAuthorization contributionAuth) {
126131
String chainTaskId = contributionAuth.getChainTaskId();
127132
String stdout = "";
128133

@@ -137,20 +142,31 @@ private String compute(AvailableReplicateModel replicateModel) {
137142
throw new UnsupportedOperationException("Task needs TEE, I don't support it");
138143
}
139144

145+
Optional<AvailableReplicateModel> optionalAvailableReplicateModel =
146+
replicateService.contributionAuthToReplicate(contributionAuth);
147+
148+
if (!optionalAvailableReplicateModel.isPresent()){
149+
stdout = "AvailableReplicateModel not found";
150+
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
151+
return stdout;
152+
}
153+
154+
AvailableReplicateModel availableReplicateModel = optionalAvailableReplicateModel.get();
155+
140156
// check app type
141157
customFeignClient.updateReplicateStatus(chainTaskId, RUNNING);
142-
if (!replicateModel.getAppType().equals(DappType.DOCKER)) {
158+
if (!availableReplicateModel.getAppType().equals(DappType.DOCKER)) {
143159
stdout = "Application is not of type Docker";
144160
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
145161
return stdout;
146162
}
147163

148164
// pull app
149165
customFeignClient.updateReplicateStatus(chainTaskId, APP_DOWNLOADING);
150-
boolean isAppDownloaded = dockerComputationService.dockerPull(chainTaskId, replicateModel.getAppUri());
166+
boolean isAppDownloaded = dockerComputationService.dockerPull(chainTaskId, availableReplicateModel.getAppUri());
151167
if (!isAppDownloaded) {
152168
customFeignClient.updateReplicateStatus(chainTaskId, APP_DOWNLOAD_FAILED);
153-
stdout = "Failed to pull application image, URI:" + replicateModel.getAppUri();
169+
stdout = "Failed to pull application image, URI:" + availableReplicateModel.getAppUri();
154170
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
155171
return stdout;
156172
}
@@ -159,10 +175,10 @@ private String compute(AvailableReplicateModel replicateModel) {
159175

160176
// pull data
161177
customFeignClient.updateReplicateStatus(chainTaskId, DATA_DOWNLOADING);
162-
boolean isDatasetDownloaded = datasetService.downloadDataset(chainTaskId, replicateModel.getDatasetUri());
178+
boolean isDatasetDownloaded = datasetService.downloadDataset(chainTaskId, availableReplicateModel.getDatasetUri());
163179
if (!isDatasetDownloaded) {
164180
customFeignClient.updateReplicateStatus(chainTaskId, DATA_DOWNLOAD_FAILED);
165-
stdout = "Failed to pull dataset, URI:" + replicateModel.getDatasetUri();
181+
stdout = "Failed to pull dataset, URI:" + availableReplicateModel.getDatasetUri();
166182
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
167183
return stdout;
168184
}
@@ -181,19 +197,19 @@ private String compute(AvailableReplicateModel replicateModel) {
181197
boolean isDatasetDecrypted = false;
182198

183199
if (isDatasetDecryptionNeeded) {
184-
isDatasetDecrypted = datasetService.decryptDataset(chainTaskId, replicateModel.getDatasetUri());
200+
isDatasetDecrypted = datasetService.decryptDataset(chainTaskId, availableReplicateModel.getDatasetUri());
185201
}
186202

187203
if (isDatasetDecryptionNeeded && !isDatasetDecrypted) {
188204
customFeignClient.updateReplicateStatus(chainTaskId, COMPUTE_FAILED);
189-
stdout = "Failed to decrypt dataset, URI:" + replicateModel.getDatasetUri();
205+
stdout = "Failed to decrypt dataset, URI:" + availableReplicateModel.getDatasetUri();
190206
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
191207
return stdout;
192208
}
193209

194210
// compute
195-
String datasetFilename = datasetService.getDatasetFilename(replicateModel.getDatasetUri());
196-
stdout = dockerComputationService.dockerRunAndGetLogs(replicateModel, datasetFilename);
211+
String datasetFilename = datasetService.getDatasetFilename(availableReplicateModel.getDatasetUri());
212+
stdout = dockerComputationService.dockerRunAndGetLogs(availableReplicateModel, datasetFilename);
197213

198214
if (stdout.isEmpty()) {
199215
customFeignClient.updateReplicateStatus(chainTaskId, COMPUTE_FAILED);

src/main/java/com/iexec/worker/pubsub/SubscriptionService.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -185,12 +185,10 @@ public void handleTaskNotification(TaskNotification notif) {
185185
switch (type) {
186186
case PLEASE_CONTRIBUTE:
187187
ContributionAuthorization contribAuth = notif.getTaskNotificationExtra().getContributionAuthorization();
188-
Optional<AvailableReplicateModel> replicateModel =
189-
replicateService.contributionAuthToReplicate(contribAuth);
190-
if (replicateModel.isPresent()){
191-
taskExecutorService.tryToContribute(contribAuth, replicateModel.get());
188+
if (contribAuth != null){
189+
taskExecutorService.tryToContribute(contribAuth);
192190
} else {
193-
log.error("Empty AvailableReplicateModel for PLEASE_CONTRIBUTE[chainTaskId:{}]", chainTaskId);
191+
log.error("Empty contribAuth for PLEASE_CONTRIBUTE [chainTaskId:{}]", chainTaskId);
194192
}
195193
break;
196194
case PLEASE_ABORT_CONTRIBUTION_TIMEOUT:

0 commit comments

Comments
 (0)