Skip to content

Commit 78185bb

Browse files
author
Jérémy James Toussaint
committed
Keep computing task list consistent
1 parent 7ffdd87 commit 78185bb

File tree

4 files changed

+82
-62
lines changed

4 files changed

+82
-62
lines changed

src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,10 @@ public ReplicateSupplyService(ReplicatesService replicatesService,
8686
*/
8787
@Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 5)
8888
Optional<WorkerpoolAuthorization> getAuthOfAvailableReplicate(long workerLastBlock, String walletAddress) {
89-
// return empty if the worker is not registered
90-
Optional<Worker> optional = workerService.getWorker(walletAddress);
91-
if (!optional.isPresent()) {
89+
// return empty if max computing task is reached or if the worker is not found
90+
if (!workerService.canAcceptMoreWorks(walletAddress)) {
9291
return Optional.empty();
9392
}
94-
Worker worker = optional.get();
9593

9694
// return empty if the worker is not sync
9795
//TODO Check if worker node is sync
@@ -100,11 +98,6 @@ Optional<WorkerpoolAuthorization> getAuthOfAvailableReplicate(long workerLastBlo
10098
return Optional.empty();
10199
}
102100

103-
// return empty if the worker already has enough running tasks
104-
if (!workerService.canAcceptMoreWorks(walletAddress)) {
105-
return Optional.empty();
106-
}
107-
108101
if (!web3jService.hasEnoughGas(walletAddress)) {
109102
return Optional.empty();
110103
}
@@ -127,6 +120,12 @@ Optional<WorkerpoolAuthorization> getAuthOfAvailableReplicate(long workerLastBlo
127120
})
128121
.collect(Collectors.toCollection(ArrayList::new));
129122

123+
Optional<Worker> optional = workerService.getWorker(walletAddress);
124+
if (optional.isEmpty()) {
125+
return Optional.empty();
126+
}
127+
Worker worker = optional.get();
128+
130129
for (Task task : validTasks) {
131130
String chainTaskId = task.getChainTaskId();
132131

src/main/java/com/iexec/core/replicate/ReplicatesController.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@
3737
@RestController
3838
public class ReplicatesController {
3939

40-
private ReplicatesService replicatesService;
41-
private ReplicateSupplyService replicateSupplyService;
42-
private JwtTokenProvider jwtTokenProvider;
43-
private WorkerService workerService;
40+
private final ReplicatesService replicatesService;
41+
private final ReplicateSupplyService replicateSupplyService;
42+
private final JwtTokenProvider jwtTokenProvider;
43+
private final WorkerService workerService;
4444

4545
public ReplicatesController(ReplicatesService replicatesService,
4646
ReplicateSupplyService replicateSupplyService,
@@ -55,8 +55,7 @@ public ReplicatesController(ReplicatesService replicatesService,
5555
@GetMapping("/replicates/available")
5656
public ResponseEntity<WorkerpoolAuthorization> getAvailableReplicate(
5757
@RequestParam(name = "blockNumber") long blockNumber,
58-
@RequestHeader("Authorization") String bearerToken
59-
) {
58+
@RequestHeader("Authorization") String bearerToken) {
6059
String workerWalletAddress = jwtTokenProvider.getWalletAddressFromBearerToken(bearerToken);
6160
if (workerWalletAddress.isEmpty()) {
6261
return ResponseEntity.status(HttpStatus.UNAUTHORIZED.value()).build();
@@ -67,11 +66,8 @@ public ResponseEntity<WorkerpoolAuthorization> getAvailableReplicate(
6766
}
6867
workerService.updateLastReplicateDemandDate(workerWalletAddress);
6968

70-
// get WorkerpoolAuthorization if a replicate is available
71-
Optional<WorkerpoolAuthorization> oAuthorization = replicateSupplyService
72-
.getAuthOfAvailableReplicate(blockNumber, workerWalletAddress);
73-
74-
return oAuthorization
69+
return replicateSupplyService
70+
.getAuthOfAvailableReplicate(blockNumber, workerWalletAddress)
7571
.<ResponseEntity<WorkerpoolAuthorization>>map(ResponseEntity::ok)
7672
.orElseGet(() -> status(HttpStatus.NO_CONTENT).build());
7773
}

src/main/java/com/iexec/core/task/listener/ReplicateListeners.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,14 @@ public void onReplicateUpdatedEvent(ReplicateUpdatedEvent event) {
5959
taskService.updateTask(event.getChainTaskId());
6060

6161
/*
62-
* Should release 1 CPU of given worker for this replicate if status is COMPUTED
62+
* Should release 1 CPU of given worker for this replicate if status is
63+
* "COMPUTED" or "*_FAILED" before COMPUTED
6364
* */
64-
if (newStatus.equals(ReplicateStatus.COMPUTED)) {
65+
if (newStatus.equals(ReplicateStatus.START_FAILED)
66+
|| newStatus.equals(ReplicateStatus.APP_DOWNLOAD_FAILED)
67+
|| newStatus.equals(ReplicateStatus.DATA_DOWNLOAD_FAILED)
68+
|| newStatus.equals(ReplicateStatus.COMPUTED)
69+
|| newStatus.equals(ReplicateStatus.COMPUTE_FAILED)) {
6570
workerService.removeComputedChainTaskIdFromWorker(event.getChainTaskId(), event.getWalletAddress());
6671
}
6772

src/test/java/com/iexec/core/replicate/ReplicateListenersTests.java

Lines changed: 60 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -59,22 +59,50 @@ public void init() {
5959
}
6060

6161
@Test
62-
public void shoulUpdateTaskOnReplicateUpdate() {
62+
public void shouldUpdateTaskOnReplicateUpdate() {
6363
List<ReplicateStatus> someStatuses = ReplicateStatus.getSuccessStatuses(); //not exhaustive
6464

6565
for (ReplicateStatus randomStatus: someStatuses){
66-
ReplicateUpdatedEvent replicateUpdatedEvent = ReplicateUpdatedEvent.builder()
67-
.chainTaskId(CHAIN_TASK_ID)
68-
.walletAddress(WORKER_WALLET)
69-
.replicateStatusUpdate(new ReplicateStatusUpdate(randomStatus))
70-
.build();
66+
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(randomStatus);
7167

7268
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
7369
}
7470

7571
Mockito.verify(taskService, Mockito.times(someStatuses.size())).updateTask(any());
7672
}
7773

74+
@Test
75+
public void shouldRemoveFromComputedTasksSinceStartFailed() {
76+
assertIsRemovedFromComputedTasks(START_FAILED);
77+
}
78+
79+
@Test
80+
public void shouldRemoveFromComputedTasksSinceAppDownloadFailed() {
81+
assertIsRemovedFromComputedTasks(APP_DOWNLOAD_FAILED);
82+
}
83+
84+
@Test
85+
public void shouldRemoveFromComputedTasksSinceDataDownloadFailed() {
86+
assertIsRemovedFromComputedTasks(DATA_DOWNLOAD_FAILED);
87+
}
88+
89+
@Test
90+
public void shouldRemoveFromComputedTasksSinceComputedFailed() {
91+
assertIsRemovedFromComputedTasks(COMPUTED);
92+
}
93+
94+
@Test
95+
public void shouldRemoveFromComputedTasksSinceComputeFailed() {
96+
assertIsRemovedFromComputedTasks(COMPUTE_FAILED);
97+
}
98+
99+
private void assertIsRemovedFromComputedTasks(ReplicateStatus computed) {
100+
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(computed);
101+
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
102+
Mockito.verify(workerService, Mockito.times(1))
103+
.removeComputedChainTaskIdFromWorker(CHAIN_TASK_ID, WORKER_WALLET);
104+
}
105+
78106
@Test
79107
public void shouldTriggerDetectOnchainContributedSinceTaskNotActive() {
80108
ReplicateUpdatedEvent replicateUpdatedEvent = ReplicateUpdatedEvent.builder()
@@ -94,29 +122,33 @@ public void shouldNotTriggerDetectOnchain() {
94122
someStatuses.remove(CONTRIBUTING);
95123

96124
for (ReplicateStatus randomStatus: someStatuses){
97-
ReplicateUpdatedEvent replicateUpdatedEvent = ReplicateUpdatedEvent.builder()
98-
.chainTaskId(CHAIN_TASK_ID)
99-
.walletAddress(WORKER_WALLET)
100-
.replicateStatusUpdate(new ReplicateStatusUpdate(randomStatus))
101-
.build();
125+
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(randomStatus);
102126

103127
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
104128
}
105129

106130
Mockito.verify(contributionUnnotifiedDetector, Mockito.times(0)).detectOnchainContributed();
107131
}
108132

133+
@Test
134+
public void shouldNotTriggerDetectOnchainContributedSinceCauseIsNull() {
135+
ReplicateUpdatedEvent replicateUpdatedEvent = ReplicateUpdatedEvent.builder()
136+
.chainTaskId(CHAIN_TASK_ID)
137+
.walletAddress(WORKER_WALLET)
138+
.replicateStatusUpdate(new ReplicateStatusUpdate(CONTRIBUTING))
139+
.build();
140+
141+
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
142+
143+
Mockito.verify(contributionUnnotifiedDetector, Mockito.times(0)).detectOnchainContributed();
144+
}
145+
109146
@Test
110147
public void shouldAddFailedStatusSinceUncompletableReplicateStatus() {
111148
List<ReplicateStatus> uncompletableStatuses = ReplicateStatus.getUncompletableStatuses();
112149

113150
for (ReplicateStatus uncompletableStatus: uncompletableStatuses){
114-
ReplicateUpdatedEvent replicateUpdatedEvent = ReplicateUpdatedEvent.builder()
115-
.chainTaskId(CHAIN_TASK_ID)
116-
.walletAddress(WORKER_WALLET)
117-
// CANT_CONTRIBUTE_SINCE_*, ...
118-
.replicateStatusUpdate(new ReplicateStatusUpdate(uncompletableStatus))
119-
.build();
151+
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(uncompletableStatus);
120152

121153
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
122154
}
@@ -130,12 +162,7 @@ public void shouldNotAddFailedStatusSinceCompletableReplicateStatus() {
130162
List<ReplicateStatus> completableStatuses = ReplicateStatus.getCompletableStatuses();
131163

132164
for (ReplicateStatus completableStatus: completableStatuses){
133-
ReplicateUpdatedEvent replicateUpdatedEvent = ReplicateUpdatedEvent.builder()
134-
.chainTaskId(CHAIN_TASK_ID)
135-
.walletAddress(WORKER_WALLET)
136-
// CREATED, ...
137-
.replicateStatusUpdate(new ReplicateStatusUpdate(completableStatus))
138-
.build();
165+
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(completableStatus);
139166

140167
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
141168
}
@@ -146,11 +173,7 @@ public void shouldNotAddFailedStatusSinceCompletableReplicateStatus() {
146173

147174
@Test
148175
public void shouldRemoveChainTaskIdFromWorkerSinceCompleted() {
149-
ReplicateUpdatedEvent replicateUpdatedEvent = ReplicateUpdatedEvent.builder()
150-
.chainTaskId(CHAIN_TASK_ID)
151-
.walletAddress(WORKER_WALLET)
152-
.replicateStatusUpdate(new ReplicateStatusUpdate(COMPLETED))
153-
.build();
176+
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(COMPLETED);
154177

155178
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
156179

@@ -160,11 +183,7 @@ public void shouldRemoveChainTaskIdFromWorkerSinceCompleted() {
160183

161184
@Test
162185
public void shouldRemoveChainTaskIdFromWorkerSinceFailed() {
163-
ReplicateUpdatedEvent replicateUpdatedEvent = ReplicateUpdatedEvent.builder()
164-
.chainTaskId(CHAIN_TASK_ID)
165-
.walletAddress(WORKER_WALLET)
166-
.replicateStatusUpdate(new ReplicateStatusUpdate(FAILED))
167-
.build();
186+
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(FAILED);
168187

169188
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
170189

@@ -179,12 +198,7 @@ public void shouldNotRemoveChainTaskIdFromWorker() {
179198
someStatuses.remove(FAILED);
180199

181200
for (ReplicateStatus randomStatus: someStatuses){
182-
ReplicateUpdatedEvent replicateUpdatedEvent = ReplicateUpdatedEvent.builder()
183-
.chainTaskId(CHAIN_TASK_ID)
184-
.walletAddress(WORKER_WALLET)
185-
// CREATED, ...
186-
.replicateStatusUpdate(new ReplicateStatusUpdate(randomStatus))
187-
.build();
201+
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(randomStatus);
188202

189203
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
190204
}
@@ -193,5 +207,11 @@ public void shouldNotRemoveChainTaskIdFromWorker() {
193207
.removeChainTaskIdFromWorker(CHAIN_TASK_ID, WORKER_WALLET);
194208
}
195209

196-
210+
private ReplicateUpdatedEvent getMockReplicate(ReplicateStatus computed) {
211+
return ReplicateUpdatedEvent.builder()
212+
.chainTaskId(CHAIN_TASK_ID)
213+
.walletAddress(WORKER_WALLET)
214+
.replicateStatusUpdate(new ReplicateStatusUpdate(computed))
215+
.build();
216+
}
197217
}

0 commit comments

Comments
 (0)