Skip to content

Commit a1aeecb

Browse files
Merge branch 'master' into access-token-and-feign-retry
2 parents 4ff6187 + 48f5028 commit a1aeecb

File tree

2 files changed

+94
-34
lines changed

2 files changed

+94
-34
lines changed

src/main/java/com/iexec/worker/chain/ContributionService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,4 +176,13 @@ private boolean isSignatureValid(byte[] message, Signature sign, String signerAd
176176
public boolean hasEnoughGas() {
177177
return iexecHubService.hasEnoughGas();
178178
}
179+
180+
public boolean isContributionDeadlineReached(String chainTaskId) {
181+
Optional<ChainTask> oTask = iexecHubService.getChainTask(chainTaskId);
182+
if (!oTask.isPresent()) {
183+
return true;
184+
}
185+
ChainTask task = oTask.get();
186+
return isBeforeContributionDeadlineToContribute(task);
187+
}
179188
}

src/main/java/com/iexec/worker/executor/TaskExecutorService.java

Lines changed: 85 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public void tryToContribute(ContributionAuthorization contributionAuth) {
140140
@Async
141141
private String compute(ContributionAuthorization contributionAuth) {
142142
String chainTaskId = contributionAuth.getChainTaskId();
143-
String stdout = "";
143+
String message = "";
144144

145145
if (!contributionService.isChainTaskInitialized(chainTaskId)) {
146146
log.error("Task not initialized onchain yet [ChainTaskId:{}]", chainTaskId);
@@ -150,9 +150,9 @@ private String compute(ContributionAuthorization contributionAuth) {
150150
Optional<TaskDescription> taskDescriptionFromChain = iexecHubService.getTaskDescriptionFromChain(chainTaskId);
151151

152152
if (!taskDescriptionFromChain.isPresent()) {
153-
stdout = "AvailableReplicateModel not found";
154-
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
155-
return stdout;
153+
message = "AvailableReplicateModel not found";
154+
log.error(message + " [chainTaskId:{}]", chainTaskId);
155+
return message;
156156
}
157157

158158
TaskDescription taskDescription = taskDescriptionFromChain.get();
@@ -166,33 +166,23 @@ private String compute(ContributionAuthorization contributionAuth) {
166166
// check app type
167167
customFeignClient.updateReplicateStatus(chainTaskId, RUNNING);
168168
if (!taskDescription.getAppType().equals(DappType.DOCKER)) {
169-
stdout = "Application is not of type Docker";
170-
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
171-
return stdout;
169+
message = "Application is not of type Docker";
170+
log.error(message + " [chainTaskId:{}]", chainTaskId);
171+
return message;
172172
}
173173

174-
// pull app
175-
customFeignClient.updateReplicateStatus(chainTaskId, APP_DOWNLOADING);
176-
boolean isAppDownloaded = dockerComputationService.dockerPull(chainTaskId, taskDescription.getAppUri());
177-
if (!isAppDownloaded) {
178-
customFeignClient.updateReplicateStatus(chainTaskId, APP_DOWNLOAD_FAILED);
179-
stdout = "Failed to pull application image, URI:" + taskDescription.getAppUri();
180-
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
181-
return stdout;
174+
// Try to downloadApp
175+
String errorDwnlApp = tryToDownloadApp(taskDescription);
176+
if (!errorDwnlApp.isEmpty()) {
177+
return errorDwnlApp;
182178
}
183-
184179
customFeignClient.updateReplicateStatus(chainTaskId, APP_DOWNLOADED);
185180

186-
// pull data
187-
customFeignClient.updateReplicateStatus(chainTaskId, DATA_DOWNLOADING);
188-
boolean isDatasetDownloaded = datasetService.downloadDataset(chainTaskId, taskDescription.getDatasetUri());
189-
if (!isDatasetDownloaded) {
190-
customFeignClient.updateReplicateStatus(chainTaskId, DATA_DOWNLOAD_FAILED);
191-
stdout = "Failed to pull dataset, URI:" + taskDescription.getDatasetUri();
192-
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
193-
return stdout;
181+
// Try to downloadData
182+
String errorDwnlData = tryToDownloadData(taskDescription);
183+
if (!errorDwnlData.isEmpty()) {
184+
return errorDwnlData;
194185
}
195-
196186
customFeignClient.updateReplicateStatus(chainTaskId, DATA_DOWNLOADED);
197187

198188
boolean isFetched = smsService.fetchTaskSecrets(contributionAuth);
@@ -212,24 +202,85 @@ private String compute(ContributionAuthorization contributionAuth) {
212202

213203
if (isDatasetDecryptionNeeded && !isDatasetDecrypted) {
214204
customFeignClient.updateReplicateStatus(chainTaskId, COMPUTE_FAILED);
215-
stdout = "Failed to decrypt dataset, URI:" + taskDescription.getDatasetUri();
216-
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
217-
return stdout;
205+
message = "Failed to decrypt dataset, URI:" + taskDescription.getDatasetUri();
206+
log.error(message + " [chainTaskId:{}]", chainTaskId);
207+
return message;
208+
}
209+
210+
String error = checkContributionAbility(chainTaskId);
211+
if (!error.isEmpty()) {
212+
return error;
218213
}
219214

220215
// compute
221216
String datasetFilename = datasetService.getDatasetFilename(taskDescription.getDatasetUri());
222-
stdout = dockerComputationService.dockerRunAndGetLogs(taskDescription, datasetFilename);
217+
message = dockerComputationService.dockerRunAndGetLogs(taskDescription, datasetFilename);
223218

224-
if (stdout.isEmpty()) {
219+
if (message.isEmpty()) {
225220
customFeignClient.updateReplicateStatus(chainTaskId, COMPUTE_FAILED);
226-
stdout = "Failed to start computation";
227-
log.error(stdout + " [chainTaskId:{}]", chainTaskId);
228-
return stdout;
221+
message = "Failed to start computation";
222+
log.error(message + " [chainTaskId:{}]", chainTaskId);
223+
return message;
229224
}
230225

231226
customFeignClient.updateReplicateStatus(chainTaskId, COMPUTED);
232-
return stdout;
227+
return message;
228+
}
229+
230+
private String tryToDownloadApp(TaskDescription taskDescription) {
231+
String chainTaskId = taskDescription.getChainTaskId();
232+
233+
String error = checkContributionAbility(chainTaskId);
234+
if (!error.isEmpty()) {
235+
return error;
236+
}
237+
238+
// pull app
239+
customFeignClient.updateReplicateStatus(chainTaskId, APP_DOWNLOADING);
240+
boolean isAppDownloaded = dockerComputationService.dockerPull(chainTaskId, taskDescription.getAppUri());
241+
if (!isAppDownloaded) {
242+
customFeignClient.updateReplicateStatus(chainTaskId, APP_DOWNLOAD_FAILED);
243+
String errorMessage = "Failed to pull application image, URI:" + taskDescription.getAppUri();
244+
log.error(errorMessage + " [chainTaskId:{}]", chainTaskId);
245+
return errorMessage;
246+
}
247+
248+
return "";
249+
}
250+
251+
private String tryToDownloadData(TaskDescription taskDescription) {
252+
String chainTaskId = taskDescription.getChainTaskId();
253+
254+
String error = checkContributionAbility(chainTaskId);
255+
if (!error.isEmpty()) {
256+
return error;
257+
}
258+
259+
// pull data
260+
customFeignClient.updateReplicateStatus(chainTaskId, DATA_DOWNLOADING);
261+
boolean isDatasetDownloaded = datasetService.downloadDataset(chainTaskId, taskDescription.getDatasetUri());
262+
if (!isDatasetDownloaded) {
263+
customFeignClient.updateReplicateStatus(chainTaskId, DATA_DOWNLOAD_FAILED);
264+
String errorMessage = "Failed to pull dataset, URI:" + taskDescription.getDatasetUri();
265+
log.error(errorMessage + " [chainTaskId:{}]", chainTaskId);
266+
return errorMessage;
267+
}
268+
269+
return "";
270+
}
271+
272+
private String checkContributionAbility(String chainTaskId) {
273+
String errorMessage = "";
274+
275+
Optional<ReplicateStatus> contributionStatus = contributionService.getCanContributeStatus(chainTaskId);
276+
if(contributionStatus.isPresent() && !contributionStatus.get().equals(CAN_CONTRIBUTE)) {
277+
errorMessage = "The worker cannot contribute";
278+
log.error(errorMessage + " [chainTaskId:{}, replicateStatus:{}]", chainTaskId, contributionStatus.get());
279+
customFeignClient.updateReplicateStatus(chainTaskId, contributionStatus.get());
280+
return errorMessage;
281+
}
282+
283+
return errorMessage;
233284
}
234285

235286
@Async

0 commit comments

Comments
 (0)