Skip to content

Commit 8b37bca

Browse files
authored
feat: manipulate list of ReplicateStatusCause in TaskManagerService (#659)
1 parent 38c0457 commit 8b37bca

File tree

10 files changed

+316
-194
lines changed

10 files changed

+316
-194
lines changed

src/main/java/com/iexec/worker/chain/ContributionService.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.web3j.protocol.core.methods.response.Log;
2828

2929
import java.math.BigInteger;
30+
import java.util.ArrayList;
31+
import java.util.List;
3032
import java.util.Optional;
3133

3234
import static com.iexec.common.replicate.ReplicateStatusCause.*;
@@ -54,57 +56,63 @@ public boolean isChainTaskInitialized(String chainTaskId) {
5456
return iexecHubService.getTaskDescription(chainTaskId) != null;
5557
}
5658

57-
public Optional<ReplicateStatusCause> getCannotContributeStatusCause(final String chainTaskId) {
59+
public List<ReplicateStatusCause> getCannotContributeStatusCause(final String chainTaskId) {
60+
final List<ReplicateStatusCause> causes = new ArrayList<>();
61+
5862
if (!isWorkerpoolAuthorizationPresent(chainTaskId)) {
59-
return Optional.of(WORKERPOOL_AUTHORIZATION_NOT_FOUND);
63+
causes.add(WORKERPOOL_AUTHORIZATION_NOT_FOUND);
6064
}
6165

6266
final TaskDescription taskDescription = iexecHubService.getTaskDescription(chainTaskId);
6367

6468
final ChainTask chainTask = iexecHubService.getChainTask(chainTaskId).orElse(null);
6569
if (chainTask == null) {
66-
return Optional.of(CHAIN_UNREACHABLE);
70+
causes.add(CHAIN_UNREACHABLE);
71+
return causes;
6772
}
6873

6974
// No staking in contributeAndFinalize
7075
if (taskDescription != null && !taskDescription.isEligibleToContributeAndFinalize()
7176
&& !hasEnoughStakeToContribute(chainTask)) {
72-
return Optional.of(STAKE_TOO_LOW);
77+
causes.add(STAKE_TOO_LOW);
7378
}
7479

7580
if (chainTask.getStatus() != ChainTaskStatus.ACTIVE) {
76-
return Optional.of(TASK_NOT_ACTIVE);
81+
causes.add(TASK_NOT_ACTIVE);
7782
}
7883

7984
if (chainTask.isContributionDeadlineReached()) {
80-
return Optional.of(CONTRIBUTION_TIMEOUT);
85+
causes.add(CONTRIBUTION_TIMEOUT);
8186
}
8287

8388
if (chainTask.hasContributionFrom(workerWalletAddress)) {
84-
return Optional.of(CONTRIBUTION_ALREADY_SET);
89+
causes.add(CONTRIBUTION_ALREADY_SET);
8590
}
8691

87-
return Optional.empty();
92+
return causes;
8893
}
8994

90-
public Optional<ReplicateStatusCause> getCannotContributeAndFinalizeStatusCause(final String chainTaskId) {
95+
public List<ReplicateStatusCause> getCannotContributeAndFinalizeStatusCause(final String chainTaskId) {
96+
final List<ReplicateStatusCause> causes = new ArrayList<>();
97+
9198
// check TRUST is 1
9299
final TaskDescription taskDescription = iexecHubService.getTaskDescription(chainTaskId);
93100
if (taskDescription == null || !BigInteger.ONE.equals(taskDescription.getTrust())) {
94-
return Optional.of(TRUST_NOT_1);
101+
causes.add(TRUST_NOT_1);
95102
}
96103

97104
final ChainTask chainTask = iexecHubService.getChainTask(chainTaskId).orElse(null);
98105
if (chainTask == null) {
99-
return Optional.of(CHAIN_UNREACHABLE);
106+
causes.add(CHAIN_UNREACHABLE);
107+
return causes;
100108
}
101109

102110
// check TASK_ALREADY_CONTRIBUTED
103111
if (chainTask.hasContributions()) {
104-
return Optional.of(TASK_ALREADY_CONTRIBUTED);
112+
causes.add(TASK_ALREADY_CONTRIBUTED);
105113
}
106114

107-
return Optional.empty();
115+
return causes;
108116
}
109117

110118
private boolean isWorkerpoolAuthorizationPresent(String chainTaskId) {

src/main/java/com/iexec/worker/result/ResultService.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,13 @@ public boolean isResultZipFound(final String chainTaskId) {
9393
}
9494

9595
public boolean writeErrorToIexecOut(final String chainTaskId, final ReplicateStatus errorStatus,
96-
final ReplicateStatusCause errorCause) {
97-
final String errorContent = String.format("[IEXEC] Error occurred while computing"
98-
+ " the task [error:%s, cause:%s]", errorStatus, errorCause);
96+
final List<ReplicateStatusCause> causes) {
97+
if (causes == null || causes.isEmpty()) {
98+
log.error("No error causes provided [chainTaskId:{}]", chainTaskId);
99+
return false;
100+
}
101+
final String errorContent = String.format("[IEXEC] Errors occurred while computing the task [error:%s, causes:%s]",
102+
errorStatus, causes);
99103
final ComputedFile computedFile = ComputedFile.builder()
100104
.deterministicOutputPath(IexecFileHelper.SLASH_IEXEC_OUT +
101105
File.separator + ERROR_FILENAME)

src/main/java/com/iexec/worker/task/TaskManagerService.java

Lines changed: 54 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import lombok.extern.slf4j.Slf4j;
4646
import org.springframework.stereotype.Service;
4747

48+
import java.util.List;
4849
import java.util.Optional;
4950

5051
import static com.iexec.common.replicate.ReplicateStatus.APP_DOWNLOAD_FAILED;
@@ -95,17 +96,16 @@ public TaskManagerService(
9596

9697
ReplicateActionResponse start(TaskDescription taskDescription) {
9798
final String chainTaskId = taskDescription.getChainTaskId();
98-
Optional<ReplicateStatusCause> oErrorStatus =
99+
List<ReplicateStatusCause> causes =
99100
contributionService.getCannotContributeStatusCause(chainTaskId);
100101
String context = "start";
101-
if (oErrorStatus.isPresent()) {
102-
return getFailureResponseAndPrintError(oErrorStatus.get(),
103-
context, chainTaskId);
102+
if (!causes.isEmpty()) {
103+
return getFailureResponseAndPrintErrors(causes, context, chainTaskId);
104104
}
105105

106106
// result encryption is not supported for standard tasks
107107
if (!taskDescription.isTeeTask() && taskDescription.getDealParams().isIexecResultEncryption()) {
108-
return getFailureResponseAndPrintError(TASK_DESCRIPTION_INVALID,
108+
return getFailureResponseAndPrintErrors(List.of(TASK_DESCRIPTION_INVALID),
109109
context, chainTaskId);
110110
}
111111

@@ -114,10 +114,10 @@ ReplicateActionResponse start(TaskDescription taskDescription) {
114114
// then we won't be able to run the task.
115115
// So it should be aborted right now.
116116
final TeeService teeService = teeServicesManager.getTeeService(taskDescription.getTeeFramework());
117-
final Optional<ReplicateStatusCause> teePrerequisitesIssue = teeService.areTeePrerequisitesMetForTask(chainTaskId);
118-
if (teePrerequisitesIssue.isPresent()) {
119-
log.error("TEE prerequisites are not met [chainTaskId: {}, issue: {}]", chainTaskId, teePrerequisitesIssue.get());
120-
return getFailureResponseAndPrintError(teePrerequisitesIssue.get(), context, chainTaskId);
117+
final List<ReplicateStatusCause> teePrerequisitesIssues = teeService.areTeePrerequisitesMetForTask(chainTaskId);
118+
if (!teePrerequisitesIssues.isEmpty()) {
119+
log.error("TEE prerequisites are not met [chainTaskId: {}, issues: {}]", chainTaskId, teePrerequisitesIssues);
120+
return getFailureResponseAndPrintErrors(teePrerequisitesIssues, context, chainTaskId);
121121
}
122122

123123
final WorkerpoolAuthorization workerpoolAuthorization = contributionService.getWorkerpoolAuthorization(chainTaskId);
@@ -131,19 +131,18 @@ ReplicateActionResponse start(TaskDescription taskDescription) {
131131

132132
ReplicateActionResponse downloadApp(TaskDescription taskDescription) {
133133
final String chainTaskId = taskDescription.getChainTaskId();
134-
Optional<ReplicateStatusCause> oErrorStatus =
134+
List<ReplicateStatusCause> causes =
135135
contributionService.getCannotContributeStatusCause(chainTaskId);
136136
String context = "download app";
137-
if (oErrorStatus.isPresent()) {
138-
return getFailureResponseAndPrintError(oErrorStatus.get(),
139-
context, chainTaskId);
137+
if (!causes.isEmpty()) {
138+
return getFailureResponseAndPrintErrors(causes, context, chainTaskId);
140139
}
141140

142141
if (computeManagerService.downloadApp(taskDescription)) {
143142
return ReplicateActionResponse.success();
144143
}
145144
return triggerPostComputeHookOnError(chainTaskId, context, taskDescription,
146-
APP_DOWNLOAD_FAILED, APP_IMAGE_DOWNLOAD_FAILED);
145+
APP_DOWNLOAD_FAILED, List.of(APP_IMAGE_DOWNLOAD_FAILED));
147146
}
148147

149148
/*
@@ -174,12 +173,11 @@ ReplicateActionResponse downloadData(final TaskDescription taskDescription) {
174173
log.info("Dataset and input files will be downloaded by the pre-compute enclave [chainTaskId:{}]", chainTaskId);
175174
return ReplicateActionResponse.success();
176175
}
177-
Optional<ReplicateStatusCause> errorStatus =
176+
List<ReplicateStatusCause> causes =
178177
contributionService.getCannotContributeStatusCause(chainTaskId);
179178
String context = "download data";
180-
if (errorStatus.isPresent()) {
181-
return getFailureResponseAndPrintError(errorStatus.get(),
182-
context, chainTaskId);
179+
if (!causes.isEmpty()) {
180+
return getFailureResponseAndPrintErrors(causes, context, chainTaskId);
183181
}
184182
try {
185183
// download dataset for standard task
@@ -197,7 +195,7 @@ ReplicateActionResponse downloadData(final TaskDescription taskDescription) {
197195
}
198196
} catch (WorkflowException e) {
199197
return triggerPostComputeHookOnError(chainTaskId, context, taskDescription,
200-
DATA_DOWNLOAD_FAILED, e.getReplicateStatusCause());
198+
DATA_DOWNLOAD_FAILED, List.of(e.getReplicateStatusCause()));
201199
}
202200
return ReplicateActionResponse.success();
203201
}
@@ -206,14 +204,14 @@ private ReplicateActionResponse triggerPostComputeHookOnError(String chainTaskId
206204
String context,
207205
TaskDescription taskDescription,
208206
ReplicateStatus errorStatus,
209-
ReplicateStatusCause errorCause) {
210-
// log original error
211-
logError(errorCause, context, chainTaskId);
212-
boolean isOk = resultService.writeErrorToIexecOut(chainTaskId, errorStatus, errorCause);
207+
List<ReplicateStatusCause> causes) {
208+
// log original errors
209+
causes.forEach(cause -> logError(cause, context, chainTaskId));
210+
boolean isOk = resultService.writeErrorToIexecOut(chainTaskId, errorStatus, causes);
213211
// try to run post-compute
214212
if (isOk && computeManagerService.runPostCompute(taskDescription, null).isSuccessful()) {
215213
//Graceful error, worker will be prompt to contribute
216-
return ReplicateActionResponse.failure(errorCause);
214+
return ReplicateActionResponse.failure(causes.get(0));
217215
}
218216
//Download failed hard, worker cannot contribute
219217
logError(POST_COMPUTE_FAILED_UNKNOWN_ISSUE, context, chainTaskId);
@@ -222,22 +220,21 @@ private ReplicateActionResponse triggerPostComputeHookOnError(String chainTaskId
222220

223221
ReplicateActionResponse compute(TaskDescription taskDescription) {
224222
final String chainTaskId = taskDescription.getChainTaskId();
225-
final ReplicateStatusCause errorStatus = contributionService.getCannotContributeStatusCause(chainTaskId)
226-
.orElse(null);
223+
final List<ReplicateStatusCause> causes = contributionService.getCannotContributeStatusCause(chainTaskId);
227224
final String context = "compute";
228-
if (errorStatus != null) {
229-
return getFailureResponseAndPrintError(errorStatus, context, chainTaskId);
225+
if (!causes.isEmpty()) {
226+
return getFailureResponseAndPrintErrors(causes, context, chainTaskId);
230227
}
231228

232229
if (!computeManagerService.isAppDownloaded(taskDescription.getAppUri())) {
233-
return getFailureResponseAndPrintError(APP_NOT_FOUND_LOCALLY,
230+
return getFailureResponseAndPrintErrors(List.of(APP_NOT_FOUND_LOCALLY),
234231
context, chainTaskId);
235232
}
236233

237234
if (taskDescription.isTeeTask()) {
238235
TeeService teeService = teeServicesManager.getTeeService(taskDescription.getTeeFramework());
239236
if (!teeService.prepareTeeForTask(chainTaskId)) {
240-
return getFailureResponseAndPrintError(TEE_PREPARATION_FAILED,
237+
return getFailureResponseAndPrintErrors(List.of(TEE_PREPARATION_FAILED),
241238
context, chainTaskId);
242239
}
243240
}
@@ -249,8 +246,8 @@ ReplicateActionResponse compute(TaskDescription taskDescription) {
249246
computeManagerService.runPreCompute(taskDescription,
250247
workerpoolAuthorization);
251248
if (!preResponse.isSuccessful()) {
252-
return getFailureResponseAndPrintError(
253-
preResponse.getExitCauses().get(0), //TODO: Handle list of causes
249+
return getFailureResponseAndPrintErrors(
250+
preResponse.getExitCauses(),
254251
context,
255252
chainTaskId
256253
);
@@ -260,11 +257,11 @@ ReplicateActionResponse compute(TaskDescription taskDescription) {
260257
computeManagerService.runCompute(taskDescription,
261258
preResponse.getSecureSession());
262259
if (!appResponse.isSuccessful()) {
263-
final ReplicateStatusCause cause = appResponse.getExitCauses().get(0); //TODO: Handle list of causes
264-
logError(cause, context, chainTaskId);
260+
final List<ReplicateStatusCause> appErrorCauses = appResponse.getExitCauses();
261+
appErrorCauses.forEach(cause -> logError(cause, context, chainTaskId));
265262
return ReplicateActionResponse.failureWithDetails(
266263
ReplicateStatusDetails.builder()
267-
.cause(cause)
264+
.cause(appErrorCauses.get(0)) //TODO: Handle list of causes
268265
.exitCode(appResponse.getExitCode())
269266
.computeLogs(
270267
ComputeLogs.builder()
@@ -279,10 +276,9 @@ ReplicateActionResponse compute(TaskDescription taskDescription) {
279276
computeManagerService.runPostCompute(taskDescription,
280277
preResponse.getSecureSession());
281278
if (!postResponse.isSuccessful()) {
282-
ReplicateStatusCause causes = postResponse.getExitCauses().get(0); //TODO: Handle list of causes
283-
logError(causes, context, chainTaskId);
284-
return ReplicateActionResponse.failureWithStdout(causes,
285-
postResponse.getStdout());
279+
List<ReplicateStatusCause> postComputeErrorCauses = postResponse.getExitCauses();
280+
postComputeErrorCauses.forEach(cause -> logError(cause, context, chainTaskId));
281+
return ReplicateActionResponse.failureWithStdout(postComputeErrorCauses.get(0), postResponse.getStdout()); // TODO: Handle list of causes
286282
}
287283
return ReplicateActionResponse.successWithLogs(
288284
ComputeLogs.builder()
@@ -303,14 +299,13 @@ ReplicateActionResponse compute(TaskDescription taskDescription) {
303299
* @return The response of the 'contribute' or 'contributeAndFinalize' action
304300
*/
305301
private ReplicateActionResponse contributeOrContributeAndFinalize(String chainTaskId, String context) {
306-
Optional<ReplicateStatusCause> oErrorStatus = contributionService.getCannotContributeStatusCause(chainTaskId);
307-
if (oErrorStatus.isPresent()) {
308-
return getFailureResponseAndPrintError(oErrorStatus.get(),
309-
context, chainTaskId);
302+
List<ReplicateStatusCause> causes = contributionService.getCannotContributeStatusCause(chainTaskId);
303+
if (!causes.isEmpty()) {
304+
return getFailureResponseAndPrintErrors(causes, context, chainTaskId);
310305
}
311306

312307
if (!hasEnoughGas()) {
313-
return getFailureResponseAndPrintError(OUT_OF_GAS, context, chainTaskId);
308+
return getFailureResponseAndPrintErrors(List.of(OUT_OF_GAS), context, chainTaskId);
314309
}
315310

316311
ComputedFile computedFile = resultService.getComputedFile(chainTaskId);
@@ -334,10 +329,9 @@ private ReplicateActionResponse contributeOrContributeAndFinalize(String chainTa
334329
response = ReplicateActionResponse.success(chainReceipt);
335330
}
336331
} else if (context.equals(CONTRIBUTE_AND_FINALIZE)) {
337-
oErrorStatus = contributionService.getCannotContributeAndFinalizeStatusCause(chainTaskId);
338-
if (oErrorStatus.isPresent()) {
339-
return getFailureResponseAndPrintError(oErrorStatus.get(),
340-
context, chainTaskId);
332+
causes = contributionService.getCannotContributeAndFinalizeStatusCause(chainTaskId);
333+
if (!causes.isEmpty()) {
334+
return getFailureResponseAndPrintErrors(causes, context, chainTaskId);
341335
}
342336

343337
final WorkerpoolAuthorization workerpoolAuthorization = contributionService.getWorkerpoolAuthorization(chainTaskId);
@@ -370,7 +364,7 @@ ReplicateActionResponse reveal(String chainTaskId,
370364
TaskNotificationExtra extra) {
371365
String context = "reveal";
372366
if (extra == null || extra.getBlockNumber() == 0) {
373-
return getFailureResponseAndPrintError(CONSENSUS_BLOCK_MISSING, context, chainTaskId);
367+
return getFailureResponseAndPrintErrors(List.of(CONSENSUS_BLOCK_MISSING), context, chainTaskId);
374368
}
375369
long consensusBlock = extra.getBlockNumber();
376370

@@ -384,12 +378,12 @@ ReplicateActionResponse reveal(String chainTaskId,
384378
}
385379

386380
if (!revealService.isConsensusBlockReached(chainTaskId, consensusBlock)) {
387-
return getFailureResponseAndPrintError(BLOCK_NOT_REACHED, context, chainTaskId
381+
return getFailureResponseAndPrintErrors(List.of(BLOCK_NOT_REACHED), context, chainTaskId
388382
);
389383
}
390384

391385
if (!revealService.repeatCanReveal(chainTaskId, resultDigest)) {
392-
return getFailureResponseAndPrintError(CANNOT_REVEAL, context, chainTaskId);
386+
return getFailureResponseAndPrintErrors(List.of(CANNOT_REVEAL), context, chainTaskId);
393387
}
394388

395389
if (!hasEnoughGas()) {
@@ -402,7 +396,7 @@ ReplicateActionResponse reveal(String chainTaskId,
402396
revealService.reveal(chainTaskId, resultDigest);
403397
if (oChainReceipt.isEmpty() ||
404398
!isValidChainReceipt(chainTaskId, oChainReceipt.get())) {
405-
return getFailureResponseAndPrintError(CHAIN_RECEIPT_NOT_VALID,
399+
return getFailureResponseAndPrintErrors(List.of(CHAIN_RECEIPT_NOT_VALID),
406400
context, chainTaskId
407401
);
408402
}
@@ -415,7 +409,7 @@ ReplicateActionResponse uploadResult(String chainTaskId) {
415409
String resultLink = resultService.uploadResultAndGetLink(workerpoolAuthorization);
416410
String context = "upload result";
417411
if (resultLink.isEmpty()) {
418-
return getFailureResponseAndPrintError(RESULT_LINK_MISSING,
412+
return getFailureResponseAndPrintErrors(List.of(RESULT_LINK_MISSING),
419413
context, chainTaskId
420414
);
421415
}
@@ -494,9 +488,13 @@ boolean isValidChainReceipt(String chainTaskId,
494488
return true;
495489
}
496490

497-
private ReplicateActionResponse getFailureResponseAndPrintError(ReplicateStatusCause cause, String context, String chainTaskId) {
498-
logError(cause, context, chainTaskId);
499-
return ReplicateActionResponse.failure(cause);
491+
private ReplicateActionResponse getFailureResponseAndPrintErrors(List<ReplicateStatusCause> causes, String context, String chainTaskId) {
492+
if (causes == null || causes.isEmpty()) {
493+
log.error("Failed to {} [chainTaskId:'{}', cause:'UNKNOWN']", context, chainTaskId);
494+
return ReplicateActionResponse.failure();
495+
}
496+
causes.forEach(cause -> logError(cause, context, chainTaskId));
497+
return ReplicateActionResponse.failure(causes.get(0));
500498
}
501499

502500
/**

0 commit comments

Comments
 (0)