Skip to content

Commit 9dbe02e

Browse files
author
Jérémy James Toussaint
committed
Notification type for recovery
1 parent c5cf832 commit 9dbe02e

File tree

2 files changed

+55
-44
lines changed

2 files changed

+55
-44
lines changed

src/main/java/com/iexec/worker/amnesia/AmnesiaRecoveryService.java

Lines changed: 53 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,26 @@
11
package com.iexec.worker.amnesia;
22

3-
import java.util.ArrayList;
4-
import java.util.Collections;
5-
import java.util.List;
6-
import java.util.Optional;
7-
83
import com.iexec.common.chain.ContributionAuthorization;
9-
import com.iexec.common.replicate.AvailableReplicateModel;
104
import com.iexec.common.disconnection.InterruptedReplicateModel;
11-
import com.iexec.common.disconnection.RecoveryAction;
5+
import com.iexec.common.notification.TaskNotification;
6+
import com.iexec.common.notification.TaskNotificationType;
7+
import com.iexec.common.replicate.AvailableReplicateModel;
128
import com.iexec.worker.chain.IexecHubService;
139
import com.iexec.worker.executor.TaskExecutorService;
1410
import com.iexec.worker.feign.CustomFeignClient;
1511
import com.iexec.worker.pubsub.SubscriptionService;
1612
import com.iexec.worker.replicate.ReplicateService;
1713
import com.iexec.worker.result.ResultService;
18-
14+
import lombok.extern.slf4j.Slf4j;
1915
import org.springframework.stereotype.Service;
2016

21-
import lombok.extern.slf4j.Slf4j;
17+
import java.util.ArrayList;
18+
import java.util.Collections;
19+
import java.util.List;
20+
import java.util.Optional;
2221

2322

24-
/*
23+
/*
2524
* This service is used to remind the worker of possible interrupted works
2625
* after a restart and how to deal with each interruption
2726
*/
@@ -64,16 +63,16 @@ public List<String> recoverInterruptedReplicates() {
6463
for (InterruptedReplicateModel interruptedReplicate : interruptedReplicates) {
6564

6665
ContributionAuthorization contributionAuth = interruptedReplicate.getContributionAuthorization();
67-
RecoveryAction recoveryAction = interruptedReplicate.getRecoveryAction();
66+
TaskNotificationType taskNotificationType = interruptedReplicate.getTaskNotificationType();
6867
String chainTaskId = contributionAuth.getChainTaskId();
6968
boolean isResultAvailable = isResultAvailable(chainTaskId);
7069

71-
log.info("Recovering interrupted task [chainTaskId:{}, recoveryAction:{}]",
72-
chainTaskId, recoveryAction);
70+
log.info("Recovering interrupted task [chainTaskId:{}, taskNotificationType:{}]",
71+
chainTaskId, taskNotificationType);
7372

74-
if (!isResultAvailable && recoveryAction != RecoveryAction.CONTRIBUTE) {
73+
if (!isResultAvailable && taskNotificationType != TaskNotificationType.PLEASE_CONTRIBUTE) {
7574
log.error("Could not recover task, result not found [chainTaskId:{}, RecoveryAction:{}]",
76-
chainTaskId, recoveryAction);
75+
chainTaskId, taskNotificationType);
7776
continue;
7877
}
7978

@@ -82,7 +81,7 @@ public List<String> recoverInterruptedReplicates() {
8281

8382
if (!oReplicateModel.isPresent()) {
8483
log.error("Could not recover task, no replicateModel retrieved [chainTaskId:{}, RecoveryAction:{}]",
85-
chainTaskId, recoveryAction);
84+
chainTaskId, taskNotificationType);
8685
continue;
8786
}
8887

@@ -100,44 +99,56 @@ public void recoverReplicate(InterruptedReplicateModel interruptedReplicate,
10099
ContributionAuthorization contributionAuth = interruptedReplicate.getContributionAuthorization();
101100
String chainTaskId = contributionAuth.getChainTaskId();
102101

103-
switch (interruptedReplicate.getRecoveryAction()) {
104-
case WAIT:
105-
subscriptionService.subscribeToTopic(chainTaskId);
106-
resultService.saveResultInfo(chainTaskId, replicateModel);
107-
break;
102+
subscriptionService.subscribeToTopic(chainTaskId);
103+
resultService.saveResultInfo(chainTaskId, replicateModel);
104+
105+
TaskNotification taskNotification = null;
108106

109-
case CONTRIBUTE:
110-
subscriptionService.subscribeToTopic(chainTaskId);
107+
switch (interruptedReplicate.getTaskNotificationType()) {
108+
case PLEASE_CONTRIBUTE:
111109
recoverReplicateByContributing(contributionAuth, replicateModel);
112110
break;
113-
114-
case ABORT_CONSENSUS_REACHED:
115-
taskExecutorService.abortConsensusReached(chainTaskId);
111+
case PLEASE_ABORT_CONSENSUS_REACHED:
112+
taskNotification = TaskNotification.builder()
113+
.chainTaskId(chainTaskId)
114+
.taskNotificationType(TaskNotificationType.PLEASE_ABORT_CONSENSUS_REACHED)
115+
.build();
116116
break;
117-
118-
case ABORT_CONTRIBUTION_TIMEOUT:
119-
taskExecutorService.abortContributionTimeout(chainTaskId);
117+
case PLEASE_ABORT_CONTRIBUTION_TIMEOUT:
118+
taskNotification = TaskNotification.builder()
119+
.chainTaskId(chainTaskId)
120+
.taskNotificationType(TaskNotificationType.PLEASE_ABORT_CONTRIBUTION_TIMEOUT)
121+
.build();
120122
break;
121123

122-
case REVEAL:
123-
subscriptionService.subscribeToTopic(chainTaskId);
124-
resultService.saveResultInfo(chainTaskId, replicateModel);
125-
taskExecutorService.reveal(chainTaskId, iexecHubService.getLatestBlockNumber());
124+
case PLEASE_REVEAL:
125+
taskNotification = TaskNotification.builder()
126+
.chainTaskId(chainTaskId)
127+
.taskNotificationType(TaskNotificationType.PLEASE_REVEAL)
128+
.blockNumber(iexecHubService.getLatestBlockNumber())
129+
.build();
126130
break;
127-
128-
case UPLOAD_RESULT:
129-
subscriptionService.subscribeToTopic(chainTaskId);
130-
resultService.saveResultInfo(chainTaskId, replicateModel);
131-
taskExecutorService.uploadResult(chainTaskId);
131+
case PLEASE_UPLOAD:
132+
taskNotification = TaskNotification.builder()
133+
.chainTaskId(chainTaskId)
134+
.taskNotificationType(TaskNotificationType.PLEASE_UPLOAD)
135+
.build();
132136
break;
133-
134-
case COMPLETE:
135-
taskExecutorService.completeTask(chainTaskId);
137+
case PLEASE_COMPLETE:
138+
taskNotification = TaskNotification.builder()
139+
.chainTaskId(chainTaskId)
140+
.taskNotificationType(TaskNotificationType.PLEASE_COMPLETE)
141+
.build();
136142
break;
137-
138143
default:
139144
break;
140145
}
146+
147+
if (taskNotification != null) {
148+
subscriptionService.handleTaskNotification(taskNotification);
149+
}
150+
151+
141152
}
142153

143154
private boolean isResultAvailable(String chainTaskId) {

src/main/java/com/iexec/worker/pubsub/SubscriptionService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public void unsubscribeFromTopic(String chainTaskId) {
171171
}
172172
}
173173

174-
private void handleTaskNotification(TaskNotification notif) {
174+
public void handleTaskNotification(TaskNotification notif) {
175175
if (notif.getWorkersAddress().contains(workerWalletAddress)
176176
|| notif.getWorkersAddress().isEmpty()) {
177177
log.info("Received notification [notification:{}]", notif);
@@ -198,7 +198,7 @@ private void handleTaskNotification(TaskNotification notif) {
198198
taskExecutorService.uploadResult(chainTaskId);
199199
break;
200200

201-
case COMPLETED:
201+
case PLEASE_COMPLETE:
202202
unsubscribeFromTopic(chainTaskId);
203203
taskExecutorService.completeTask(chainTaskId);
204204
break;

0 commit comments

Comments
 (0)