Skip to content

Commit 1f9a123

Browse files
Merge pull request #201 from iExecBlockchainComputing/notification-type-for-recovery
Notification type for recovery
2 parents c5cf832 + f0165a7 commit 1f9a123

File tree

13 files changed

+283
-545
lines changed

13 files changed

+283
-545
lines changed
Lines changed: 32 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,23 @@
11
package com.iexec.worker.amnesia;
22

3-
import java.util.ArrayList;
4-
import java.util.Collections;
5-
import java.util.List;
6-
import java.util.Optional;
7-
83
import com.iexec.common.chain.ContributionAuthorization;
9-
import com.iexec.common.replicate.AvailableReplicateModel;
10-
import com.iexec.common.disconnection.InterruptedReplicateModel;
11-
import com.iexec.common.disconnection.RecoveryAction;
4+
import com.iexec.common.notification.TaskNotification;
5+
import com.iexec.common.notification.TaskNotificationType;
6+
import com.iexec.common.task.TaskDescription;
127
import com.iexec.worker.chain.IexecHubService;
13-
import com.iexec.worker.executor.TaskExecutorService;
148
import com.iexec.worker.feign.CustomFeignClient;
159
import com.iexec.worker.pubsub.SubscriptionService;
16-
import com.iexec.worker.replicate.ReplicateService;
1710
import com.iexec.worker.result.ResultService;
18-
11+
import lombok.extern.slf4j.Slf4j;
1912
import org.springframework.stereotype.Service;
2013

21-
import lombok.extern.slf4j.Slf4j;
14+
import java.util.ArrayList;
15+
import java.util.Collections;
16+
import java.util.List;
17+
import java.util.Optional;
2218

2319

24-
/*
20+
/*
2521
* This service is used to remind the worker of possible interrupted works
2622
* after a restart and how to deal with each interruption
2723
*/
@@ -31,140 +27,61 @@ public class AmnesiaRecoveryService {
3127

3228
private CustomFeignClient customFeignClient;
3329
private SubscriptionService subscriptionService;
34-
private ReplicateService replicateService;
3530
private ResultService resultService;
36-
private TaskExecutorService taskExecutorService;
3731
private IexecHubService iexecHubService;
3832

3933
public AmnesiaRecoveryService(CustomFeignClient customFeignClient,
4034
SubscriptionService subscriptionService,
41-
ReplicateService replicateService,
4235
ResultService resultService,
43-
TaskExecutorService taskExecutorService,
4436
IexecHubService iexecHubService) {
4537
this.customFeignClient = customFeignClient;
4638
this.subscriptionService = subscriptionService;
47-
this.replicateService = replicateService;
4839
this.resultService = resultService;
49-
this.taskExecutorService = taskExecutorService;
5040
this.iexecHubService = iexecHubService;
5141
}
5242

5343
public List<String> recoverInterruptedReplicates() {
54-
long lasAvailableBlockNumber = iexecHubService.getLatestBlockNumber();
55-
List<InterruptedReplicateModel> interruptedReplicates = customFeignClient.getInterruptedReplicates(
56-
lasAvailableBlockNumber);
44+
long latestAvailableBlockNumber = iexecHubService.getLatestBlockNumber();
45+
List<TaskNotification> missedTaskNotifications = customFeignClient.getMissedTaskNotifications(
46+
latestAvailableBlockNumber);
5747
List<String> recoveredChainTaskIds = new ArrayList<>();
5848

59-
if (interruptedReplicates == null || interruptedReplicates.isEmpty()) {
49+
if (missedTaskNotifications == null || missedTaskNotifications.isEmpty()) {
6050
log.info("No interrupted tasks to recover");
6151
return Collections.emptyList();
6252
}
6353

64-
for (InterruptedReplicateModel interruptedReplicate : interruptedReplicates) {
54+
for (TaskNotification missedTaskNotification : missedTaskNotifications) {
55+
TaskNotificationType taskNotificationType = missedTaskNotification.getTaskNotificationType();
56+
String chainTaskId = missedTaskNotification.getChainTaskId();
57+
boolean isResultAvailable = resultService.isResultAvailable(chainTaskId);
6558

66-
ContributionAuthorization contributionAuth = interruptedReplicate.getContributionAuthorization();
67-
RecoveryAction recoveryAction = interruptedReplicate.getRecoveryAction();
68-
String chainTaskId = contributionAuth.getChainTaskId();
69-
boolean isResultAvailable = isResultAvailable(chainTaskId);
59+
log.info("Recovering interrupted task [chainTaskId:{}, taskNotificationType:{}]",
60+
chainTaskId, taskNotificationType);
7061

71-
log.info("Recovering interrupted task [chainTaskId:{}, recoveryAction:{}]",
72-
chainTaskId, recoveryAction);
73-
74-
if (!isResultAvailable && recoveryAction != RecoveryAction.CONTRIBUTE) {
75-
log.error("Could not recover task, result not found [chainTaskId:{}, RecoveryAction:{}]",
76-
chainTaskId, recoveryAction);
62+
if (!isResultAvailable && taskNotificationType != TaskNotificationType.PLEASE_CONTRIBUTE) {
63+
log.error("Could not recover task, result not found [chainTaskId:{}, taskNotificationType:{}]",
64+
chainTaskId, taskNotificationType);
7765
continue;
7866
}
7967

80-
Optional<AvailableReplicateModel> oReplicateModel =
81-
replicateService.retrieveAvailableReplicateModelFromContribAuth(contributionAuth);
68+
Optional<TaskDescription> optionalTaskDescription = iexecHubService.getTaskDescriptionFromChain(chainTaskId);
8269

83-
if (!oReplicateModel.isPresent()) {
84-
log.error("Could not recover task, no replicateModel retrieved [chainTaskId:{}, RecoveryAction:{}]",
85-
chainTaskId, recoveryAction);
70+
if (!optionalTaskDescription.isPresent()) {
71+
log.error("Could not recover task, no TaskDescription retrieved [chainTaskId:{}, taskNotificationType:{}]",
72+
chainTaskId, taskNotificationType);
8673
continue;
8774
}
8875

89-
AvailableReplicateModel replicateModel = oReplicateModel.get();
90-
recoverReplicate(interruptedReplicate, replicateModel);
91-
recoveredChainTaskIds.add(chainTaskId);
92-
}
76+
TaskDescription taskDescription = optionalTaskDescription.get();
9377

94-
return recoveredChainTaskIds;
95-
}
96-
97-
public void recoverReplicate(InterruptedReplicateModel interruptedReplicate,
98-
AvailableReplicateModel replicateModel) {
99-
100-
ContributionAuthorization contributionAuth = interruptedReplicate.getContributionAuthorization();
101-
String chainTaskId = contributionAuth.getChainTaskId();
102-
103-
switch (interruptedReplicate.getRecoveryAction()) {
104-
case WAIT:
105-
subscriptionService.subscribeToTopic(chainTaskId);
106-
resultService.saveResultInfo(chainTaskId, replicateModel);
107-
break;
108-
109-
case CONTRIBUTE:
110-
subscriptionService.subscribeToTopic(chainTaskId);
111-
recoverReplicateByContributing(contributionAuth, replicateModel);
112-
break;
113-
114-
case ABORT_CONSENSUS_REACHED:
115-
taskExecutorService.abortConsensusReached(chainTaskId);
116-
break;
117-
118-
case ABORT_CONTRIBUTION_TIMEOUT:
119-
taskExecutorService.abortContributionTimeout(chainTaskId);
120-
break;
121-
122-
case REVEAL:
123-
subscriptionService.subscribeToTopic(chainTaskId);
124-
resultService.saveResultInfo(chainTaskId, replicateModel);
125-
taskExecutorService.reveal(chainTaskId, iexecHubService.getLatestBlockNumber());
126-
break;
127-
128-
case UPLOAD_RESULT:
129-
subscriptionService.subscribeToTopic(chainTaskId);
130-
resultService.saveResultInfo(chainTaskId, replicateModel);
131-
taskExecutorService.uploadResult(chainTaskId);
132-
break;
133-
134-
case COMPLETE:
135-
taskExecutorService.completeTask(chainTaskId);
136-
break;
137-
138-
default:
139-
break;
140-
}
141-
}
142-
143-
private boolean isResultAvailable(String chainTaskId) {
144-
boolean isResultZipFound = resultService.isResultZipFound(chainTaskId);
145-
boolean isResultFolderFound = resultService.isResultFolderFound(chainTaskId);
146-
147-
if (!isResultZipFound && !isResultFolderFound) return false;
78+
subscriptionService.subscribeToTopic(chainTaskId);
79+
resultService.saveResultInfo(chainTaskId, taskDescription);
80+
subscriptionService.handleTaskNotification(missedTaskNotification);
14881

149-
if (!isResultZipFound) resultService.zipResultFolder(chainTaskId);
150-
151-
return true;
152-
}
153-
154-
public void recoverReplicateByContributing(ContributionAuthorization contributionAuth,
155-
AvailableReplicateModel replicateModel) {
156-
157-
String chainTaskId = contributionAuth.getChainTaskId();
158-
boolean isResultAvailable = isResultAvailable(chainTaskId);
159-
160-
if (!isResultAvailable) {
161-
log.info("Result not found, re-running computation to recover task " +
162-
"[chainTaskId:{}, recoveryAction:CONTRIBUTE]", chainTaskId);
163-
taskExecutorService.addReplicate(replicateModel);
164-
return;
82+
recoveredChainTaskIds.add(chainTaskId);
16583
}
16684

167-
resultService.saveResultInfo(chainTaskId, replicateModel);
168-
taskExecutorService.contribute(contributionAuth);
85+
return recoveredChainTaskIds;
16986
}
17087
}

src/main/java/com/iexec/worker/docker/DockerComputationService.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.iexec.worker.docker;
22

3-
import com.iexec.common.replicate.AvailableReplicateModel;
3+
import com.iexec.common.task.TaskDescription;
44
import com.iexec.worker.config.WorkerConfigurationService;
55
import com.spotify.docker.client.messages.ContainerConfig;
66
import lombok.extern.slf4j.Slf4j;
@@ -28,9 +28,9 @@ public DockerComputationService(CustomDockerClient dockerClient,
2828
this.configurationService = configurationService;
2929
}
3030

31-
public String dockerRunAndGetLogs(AvailableReplicateModel replicateModel, String datasetFilename) {
32-
String chainTaskId = replicateModel.getContributionAuthorization().getChainTaskId();
33-
String image = replicateModel.getAppUri();
31+
public String dockerRunAndGetLogs(TaskDescription taskDescription, String datasetFilename) {
32+
String chainTaskId = taskDescription.getChainTaskId();
33+
String image = taskDescription.getAppUri();
3434
//TODO: check image equals image:tag
3535
String stdout = "";
3636

@@ -41,17 +41,17 @@ public String dockerRunAndGetLogs(AvailableReplicateModel replicateModel, String
4141
String hostBaseVolume = configurationService.getTaskBaseDir(chainTaskId);
4242
ContainerConfig containerConfig;
4343

44-
if (replicateModel.isTrustedExecution()) {
45-
containerConfig = getContainerConfig(image, replicateModel.getCmd(), hostBaseVolume,
44+
if (taskDescription.isTrustedExecution()) {
45+
containerConfig = getContainerConfig(image, taskDescription.getCmd(), hostBaseVolume,
4646
TEE_DOCKER_ENV_CHAIN_TASKID + "=" + chainTaskId,
4747
TEE_DOCKER_ENV_WORKER_ADDRESS + "=" + configurationService.getWorkerWalletAddress(),
4848
DATASET_FILENAME + "=" + datasetFilename);
4949
} else {
50-
containerConfig = getContainerConfig(image, replicateModel.getCmd(), hostBaseVolume,
50+
containerConfig = getContainerConfig(image, taskDescription.getCmd(), hostBaseVolume,
5151
DATASET_FILENAME + "=" + datasetFilename);
5252
}
5353

54-
stdout = startComputationAndGetLogs(chainTaskId, containerConfig, replicateModel.getMaxExecutionTime());
54+
stdout = startComputationAndGetLogs(chainTaskId, containerConfig, taskDescription.getMaxExecutionTime());
5555

5656
return stdout;
5757
}

0 commit comments

Comments
 (0)