Skip to content

Commit de4b827

Browse files
authored
Merge pull request #189 from iExecBlockchainComputing/patch
Fix issue #188
2 parents 38476b0 + 4a179a6 commit de4b827

File tree

2 files changed

+82
-116
lines changed

2 files changed

+82
-116
lines changed

src/main/java/com/iexec/worker/amnesia/AmnesiaRecoveryService.java

Lines changed: 65 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public List<String> recoverInterruptedReplicates() {
5757
List<String> recoveredChainTaskIds = new ArrayList<>();
5858

5959
if (interruptedReplicates == null || interruptedReplicates.isEmpty()) {
60-
log.info("no interrupted tasks to recover");
60+
log.info("No interrupted tasks to recover");
6161
return Collections.emptyList();
6262
}
6363

@@ -66,151 +66,105 @@ public List<String> recoverInterruptedReplicates() {
6666
ContributionAuthorization contributionAuth = interruptedReplicate.getContributionAuthorization();
6767
RecoveryAction recoveryAction = interruptedReplicate.getRecoveryAction();
6868
String chainTaskId = contributionAuth.getChainTaskId();
69+
boolean isResultAvailable = isResultAvailable(chainTaskId);
6970

70-
log.info("recovering interrupted task [chainTaskId:{}, recoveryAction:{}]",
71+
log.info("Recovering interrupted task [chainTaskId:{}, recoveryAction:{}]",
7172
chainTaskId, recoveryAction);
7273

73-
boolean shouldSubscribe = false;
74-
boolean shouldKeepResultFolder = true;
75-
76-
switch (interruptedReplicate.getRecoveryAction()) {
77-
case WAIT:
78-
shouldSubscribe = true; // just subscribe and wait
79-
break;
80-
81-
case CONTRIBUTE:
82-
shouldSubscribe = recoverReplicateByContributing(contributionAuth);
83-
shouldKeepResultFolder = shouldSubscribe;
84-
break;
85-
86-
case ABORT_CONSENSUS_REACHED:
87-
taskExecutorService.abortConsensusReached(chainTaskId);
88-
break;
89-
90-
case ABORT_CONTRIBUTION_TIMEOUT:
91-
taskExecutorService.abortContributionTimeout(chainTaskId);
92-
break;
93-
94-
case REVEAL:
95-
shouldSubscribe = recoverReplicateByRevealing(contributionAuth);
96-
shouldKeepResultFolder = shouldSubscribe;
97-
break;
98-
99-
case UPLOAD_RESULT:
100-
shouldSubscribe = recoverReplicateByUploadingResult(contributionAuth);
101-
shouldKeepResultFolder = shouldSubscribe;
102-
break;
103-
104-
case COMPLETE:
105-
taskExecutorService.completeTask(chainTaskId);
106-
break;
74+
if (!isResultAvailable && recoveryAction != RecoveryAction.CONTRIBUTE) {
75+
log.error("Could not recover task, result not found [chainTaskId:{}, RecoveryAction:{}]",
76+
chainTaskId, recoveryAction);
77+
continue;
10778
}
10879

109-
if (shouldSubscribe) {
110-
subscriptionService.subscribeToTopic(chainTaskId);
111-
}
80+
Optional<AvailableReplicateModel> oReplicateModel =
81+
replicateService.retrieveAvailableReplicateModelFromContribAuth(contributionAuth);
11282

113-
if (shouldKeepResultFolder) {
114-
recoveredChainTaskIds.add(chainTaskId);
83+
if (!oReplicateModel.isPresent()) {
84+
log.error("Could not recover task, no replicateModel retrieved [chainTaskId:{}, RecoveryAction:{}]",
85+
chainTaskId, recoveryAction);
86+
continue;
11587
}
88+
89+
AvailableReplicateModel replicateModel = oReplicateModel.get();
90+
recoverReplicate(interruptedReplicate, replicateModel);
91+
recoveredChainTaskIds.add(chainTaskId);
11692
}
11793

11894
return recoveredChainTaskIds;
11995
}
12096

121-
public boolean recoverReplicateByContributing(ContributionAuthorization contributionAuth) {
122-
String chainTaskId = contributionAuth.getChainTaskId();
123-
124-
Optional<AvailableReplicateModel> oReplicateModel =
125-
replicateService.retrieveAvailableReplicateModelFromContribAuth(contributionAuth);
97+
public void recoverReplicate(InterruptedReplicateModel interruptedReplicate,
98+
AvailableReplicateModel replicateModel) {
12699

127-
if (!oReplicateModel.isPresent()) {
128-
log.info("could not retrieve replicateModel from contributionAuth to recover task "
129-
+ "[chainTaskId:{}, RecoveryAction:CONTRIBUTE]", chainTaskId);
130-
return false;
131-
}
100+
ContributionAuthorization contributionAuth = interruptedReplicate.getContributionAuthorization();
101+
String chainTaskId = contributionAuth.getChainTaskId();
132102

133-
AvailableReplicateModel replicateModel = oReplicateModel.get();
103+
switch (interruptedReplicate.getRecoveryAction()) {
104+
case WAIT:
105+
subscriptionService.subscribeToTopic(chainTaskId);
106+
resultService.saveResultInfo(chainTaskId, replicateModel);
107+
break;
134108

135-
boolean isResultZipFound = resultService.isResultZipFound(chainTaskId);
136-
boolean isResultFolderFound = resultService.isResultFolderFound(chainTaskId);
109+
case CONTRIBUTE:
110+
subscriptionService.subscribeToTopic(chainTaskId);
111+
recoverReplicateByContributing(contributionAuth, replicateModel);
112+
break;
137113

138-
if (!isResultFolderFound && !isResultZipFound) {
139-
// re-run computation
140-
taskExecutorService.addReplicate(replicateModel);
141-
return true;
142-
}
114+
case ABORT_CONSENSUS_REACHED:
115+
taskExecutorService.abortConsensusReached(chainTaskId);
116+
break;
143117

144-
if (!isResultZipFound) {
145-
resultService.zipResultFolder(chainTaskId);
146-
}
118+
case ABORT_CONTRIBUTION_TIMEOUT:
119+
taskExecutorService.abortContributionTimeout(chainTaskId);
120+
break;
147121

148-
resultService.saveResultInfo(chainTaskId, replicateModel);
149-
taskExecutorService.contribute(contributionAuth);
150-
return true;
151-
}
122+
case REVEAL:
123+
subscriptionService.subscribeToTopic(chainTaskId);
124+
resultService.saveResultInfo(chainTaskId, replicateModel);
125+
taskExecutorService.reveal(chainTaskId);
126+
break;
152127

153-
public boolean recoverReplicateByRevealing(ContributionAuthorization contributionAuth) {
154-
String chainTaskId = contributionAuth.getChainTaskId();
128+
case UPLOAD_RESULT:
129+
subscriptionService.subscribeToTopic(chainTaskId);
130+
resultService.saveResultInfo(chainTaskId, replicateModel);
131+
taskExecutorService.uploadResult(chainTaskId);
132+
break;
155133

156-
Optional<AvailableReplicateModel> oReplicateModel =
157-
replicateService.retrieveAvailableReplicateModelFromContribAuth(contributionAuth);
134+
case COMPLETE:
135+
taskExecutorService.completeTask(chainTaskId);
136+
break;
158137

159-
if (!oReplicateModel.isPresent()) {
160-
log.info("could not retrieve replicateModel from contributionAuth to recover task "
161-
+ "[chainTaskId:{}, RecoveryAction:CONTRIBUTE]", chainTaskId);
162-
return false;
138+
default:
139+
break;
163140
}
141+
}
164142

165-
AvailableReplicateModel replicateModel = oReplicateModel.get();
166-
143+
private boolean isResultAvailable(String chainTaskId) {
167144
boolean isResultZipFound = resultService.isResultZipFound(chainTaskId);
168145
boolean isResultFolderFound = resultService.isResultFolderFound(chainTaskId);
169146

170-
if (!isResultZipFound && !isResultFolderFound) {
171-
log.error("couldn't recover task by revealing since result was not found "
172-
+ "[chainTaskId:{}]", chainTaskId);
173-
return false;
174-
}
147+
if (!isResultZipFound && !isResultFolderFound) return false;
175148

176-
if (!isResultZipFound) {
177-
resultService.zipResultFolder(chainTaskId);
178-
}
149+
if (!isResultZipFound) resultService.zipResultFolder(chainTaskId);
179150

180-
resultService.saveResultInfo(chainTaskId, replicateModel);
181-
taskExecutorService.reveal(chainTaskId);
182151
return true;
183152
}
184153

185-
public boolean recoverReplicateByUploadingResult(ContributionAuthorization contributionAuth) {
186-
String chainTaskId = contributionAuth.getChainTaskId();
187-
188-
Optional<AvailableReplicateModel> oReplicateModel =
189-
replicateService.retrieveAvailableReplicateModelFromContribAuth(contributionAuth);
154+
public void recoverReplicateByContributing(ContributionAuthorization contributionAuth,
155+
AvailableReplicateModel replicateModel) {
190156

191-
if (!oReplicateModel.isPresent()) {
192-
log.info("could not retrieve replicateModel from contributionAuth to recover task "
193-
+ "[chainTaskId:{}, RecoveryAction:CONTRIBUTE]", chainTaskId);
194-
return false;
195-
}
196-
197-
AvailableReplicateModel replicateModel = oReplicateModel.get();
198-
199-
boolean isResultZipFound = resultService.isResultZipFound(chainTaskId);
200-
boolean isResultFolderFound = resultService.isResultFolderFound(chainTaskId);
201-
202-
if (!isResultZipFound && !isResultFolderFound) {
203-
log.error("couldn't recover task by uploading since result was not found "
204-
+ "[chainTaskId:{}]", chainTaskId);
205-
return false;
206-
}
157+
String chainTaskId = contributionAuth.getChainTaskId();
158+
boolean isResultAvailable = isResultAvailable(chainTaskId);
207159

208-
if (!isResultZipFound) {
209-
resultService.zipResultFolder(chainTaskId);
160+
if (!isResultAvailable) {
161+
log.info("Result not found, re-running computation to recover task " +
162+
"[chainTaskId:{}, recoveryAction:CONTRIBUTE]", chainTaskId);
163+
taskExecutorService.addReplicate(replicateModel);
164+
return;
210165
}
211166

212167
resultService.saveResultInfo(chainTaskId, replicateModel);
213-
taskExecutorService.uploadResult(chainTaskId);
214-
return true;
168+
taskExecutorService.contribute(contributionAuth);
215169
}
216170
}

src/test/java/com/iexec/worker/amnesia/AmnesiaRecoveryServiceTests.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,12 @@ public void shouldNotRecoverSinceNothingToRecover() {
6363
@Test
6464
public void shouldRecoverByWaiting() {
6565
when(iexecHubService.getLatestBlockNumber()).thenReturn(blockNumber);
66+
when(resultService.isResultZipFound(CHAIN_TASK_ID)).thenReturn(true);
67+
when(replicateService.retrieveAvailableReplicateModelFromContribAuth(any()))
68+
.thenReturn(getStubModel());
6669
when(customFeignClient.getInterruptedReplicates(blockNumber))
6770
.thenReturn(getStubInterruptedReplicateList(RecoveryAction.WAIT));
68-
71+
6972
List<String> recovered = amnesiaRecoveryService.recoverInterruptedReplicates();
7073

7174
assertThat(recovered).isNotEmpty();
@@ -113,6 +116,9 @@ public void shouldAbortSinceConsensusReached() {
113116
when(iexecHubService.getLatestBlockNumber()).thenReturn(blockNumber);
114117
when(customFeignClient.getInterruptedReplicates(blockNumber))
115118
.thenReturn(getStubInterruptedReplicateList(RecoveryAction.ABORT_CONSENSUS_REACHED));
119+
when(resultService.isResultZipFound(CHAIN_TASK_ID)).thenReturn(true);
120+
when(replicateService.retrieveAvailableReplicateModelFromContribAuth(any()))
121+
.thenReturn(getStubModel());
116122

117123
List<String> recovered = amnesiaRecoveryService.recoverInterruptedReplicates();
118124

@@ -127,9 +133,11 @@ public void shouldAbortSinceConsensusReached() {
127133
public void shouldAbortSinceContributionTimeout() {
128134
when(iexecHubService.getLatestBlockNumber()).thenReturn(blockNumber);
129135
when(customFeignClient.getInterruptedReplicates(blockNumber))
130-
.thenReturn(getStubInterruptedReplicateList(
131-
RecoveryAction.ABORT_CONTRIBUTION_TIMEOUT));
132-
136+
.thenReturn(getStubInterruptedReplicateList(RecoveryAction.ABORT_CONTRIBUTION_TIMEOUT));
137+
when(resultService.isResultZipFound(CHAIN_TASK_ID)).thenReturn(true);
138+
when(replicateService.retrieveAvailableReplicateModelFromContribAuth(any()))
139+
.thenReturn(getStubModel());
140+
133141
List<String> recovered = amnesiaRecoveryService.recoverInterruptedReplicates();
134142

135143
assertThat(recovered).isNotEmpty();
@@ -214,7 +222,11 @@ public void shouldCompleteTask() {
214222
when(iexecHubService.getLatestBlockNumber()).thenReturn(blockNumber);
215223
when(customFeignClient.getInterruptedReplicates(blockNumber))
216224
.thenReturn(getStubInterruptedReplicateList(RecoveryAction.COMPLETE));
217-
225+
226+
when(resultService.isResultZipFound(CHAIN_TASK_ID)).thenReturn(true);
227+
when(replicateService.retrieveAvailableReplicateModelFromContribAuth(any()))
228+
.thenReturn(getStubModel());
229+
218230
List<String> recovered = amnesiaRecoveryService.recoverInterruptedReplicates();
219231

220232
assertThat(recovered).isNotEmpty();

0 commit comments

Comments
 (0)