Skip to content

Commit be52571

Browse files
committed
Add new replicate to ReplicatesList without querying it twice
1 parent 7ff08c7 commit be52571

File tree

5 files changed

+40
-38
lines changed

5 files changed

+40
-38
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ All notable changes to this project will be documented in this file.
66

77
### Bug Fixes
88

9-
- Check if Worker can still accept more work right before giving it new replicate. (#644)
9+
- Check if Worker can still accept more work right before giving it a new replicate. (#644)
1010

1111
## [[8.2.2]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.2.2) 2023-12-13
1212

src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -184,22 +184,6 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) {
184184
}
185185

186186
final String chainTaskId = task.getChainTaskId();
187-
final Optional<ReplicatesList> oReplicatesList = replicatesService.getReplicatesList(chainTaskId);
188-
// Check is only here to prevent
189-
// "`Optional.get()` without `isPresent()` warning".
190-
// This case should not happen.
191-
if (oReplicatesList.isEmpty()) {
192-
return false;
193-
}
194-
195-
final ReplicatesList replicatesList = oReplicatesList.get();
196-
197-
final boolean hasWorkerAlreadyParticipated =
198-
replicatesList.hasWorkerAlreadyParticipated(walletAddress);
199-
if (hasWorkerAlreadyParticipated) {
200-
return false;
201-
}
202-
203187
final Lock lock = taskAccessForNewReplicateLocks
204188
.computeIfAbsent(chainTaskId, k -> new ReentrantLock());
205189
if (!lock.tryLock()) {
@@ -209,6 +193,22 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) {
209193
}
210194

211195
try {
196+
final Optional<ReplicatesList> oReplicatesList = replicatesService.getReplicatesList(chainTaskId);
197+
// Check is only here to prevent
198+
// "`Optional.get()` without `isPresent()` warning".
199+
// This case should not happen.
200+
if (oReplicatesList.isEmpty()) {
201+
return false;
202+
}
203+
204+
final ReplicatesList replicatesList = oReplicatesList.get();
205+
206+
final boolean hasWorkerAlreadyParticipated =
207+
replicatesList.hasWorkerAlreadyParticipated(walletAddress);
208+
if (hasWorkerAlreadyParticipated) {
209+
return false;
210+
}
211+
212212
final boolean taskNeedsMoreContributions = ConsensusHelper.doesTaskNeedMoreContributionsForConsensus(
213213
chainTaskId,
214214
replicatesList.getReplicates(),
@@ -220,16 +220,15 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) {
220220
return false;
221221
}
222222

223-
workerService.addChainTaskIdToWorker(chainTaskId, walletAddress)
224-
.ifPresent(worker -> replicatesService.addNewReplicate(chainTaskId, walletAddress));
223+
return workerService.addChainTaskIdToWorker(chainTaskId, walletAddress)
224+
.map(worker -> replicatesService.addNewReplicate(replicatesList, walletAddress))
225+
.orElse(false);
225226
} finally {
226227
// We should always unlock the task
227228
// so that it could be taken by another replicate
228229
// if there's any issue.
229230
lock.unlock();
230231
}
231-
232-
return true;
233232
}
234233

235234
/**

src/main/java/com/iexec/core/replicate/ReplicatesService.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,8 @@ public ReplicatesService(ReplicatesRepository replicatesRepository,
7171
this.taskLogsService = taskLogsService;
7272
}
7373

74-
public void addNewReplicate(String chainTaskId, String walletAddress) {
75-
final Optional<ReplicatesList> oReplicatesList = getReplicatesList(chainTaskId);
76-
if (oReplicatesList.isEmpty()) {
77-
log.warn("Can't add replicate to unknown ReplicatesList [chainTaskId:{}, workerName:{}]", chainTaskId, walletAddress);
78-
return;
79-
}
80-
81-
final ReplicatesList replicatesList = oReplicatesList.get();
74+
public boolean addNewReplicate(ReplicatesList replicatesList, String walletAddress) {
75+
final String chainTaskId = replicatesList.getChainTaskId();
8276
if (replicatesList.getReplicateOfWorker(walletAddress).isEmpty()) {
8377
Replicate replicate = new Replicate(walletAddress, chainTaskId);
8478
replicate.setWorkerWeight(iexecHubService.getWorkerWeight(walletAddress));// workerWeight value for pendingWeight estimate
@@ -90,6 +84,7 @@ public void addNewReplicate(String chainTaskId, String walletAddress) {
9084
log.error("Replicate already saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
9185
}
9286

87+
return true;
9388
}
9489

9590
public synchronized void createEmptyReplicateList(String chainTaskId) {

src/test/java/com/iexec/core/replicate/ReplicateServiceTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ void shouldCreateNewReplicate() {
8383
ReplicatesList replicatesList = new ReplicatesList(CHAIN_TASK_ID, list);
8484
when(replicatesRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList));
8585
when(replicatesRepository.save(any())).thenReturn(replicatesList);
86-
replicatesService.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_3);
86+
replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_3);
8787
Mockito.verify(replicatesRepository, Mockito.times(1))
8888
.save(any());
8989
}
@@ -103,11 +103,11 @@ void shouldNotCreateNewReplicate() {
103103
when(replicatesRepository.findByChainTaskId(CHAIN_TASK_ID)).thenReturn(Optional.of(replicatesList));
104104
when(replicatesRepository.save(any())).thenReturn(replicatesList);
105105

106-
replicatesService.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1);
106+
replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1);
107107
Mockito.verify(replicatesRepository, Mockito.times(0))
108108
.save(any());
109109

110-
replicatesService.addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_2);
110+
replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_2);
111111
Mockito.verify(replicatesRepository, Mockito.times(0))
112112
.save(any());
113113
}
@@ -1505,4 +1505,4 @@ void computeUpdateReplicateStatusArgsResultUploadFailed() {
15051505
.build());
15061506
}
15071507

1508-
}
1508+
}

src/test/java/com/iexec/core/replicate/ReplicateSupplyServiceTests.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ void shouldNotGetReplicateSinceEnclaveChallengeNeededButNotGenerated() {
338338

339339
assertThat(replicateTaskSummary).isEmpty();
340340

341-
Mockito.verify(replicatesService, Mockito.never()).addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1);
341+
Mockito.verify(replicatesService, Mockito.never()).addNewReplicate(replicatesList, WALLET_WORKER_1);
342342
Mockito.verify(workerService, Mockito.never()).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1);
343343
Mockito.verifyNoInteractions(signatureService);
344344
assertTaskAccessForNewReplicateLockNeverUsed(CHAIN_TASK_ID);
@@ -383,13 +383,15 @@ void shouldGetOnlyOneReplicateSinceOtherOneReachedConsensusDeadline() {
383383
.thenReturn(WorkerpoolAuthorization.builder().chainTaskId(CHAIN_TASK_ID).build());
384384
when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1))
385385
.thenReturn(Optional.of(existingWorker));
386+
when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1))
387+
.thenReturn(true);
386388

387389
final Optional<ReplicateTaskSummary> replicateTaskSummary = replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1);
388390

389391
assertThat(replicateTaskSummary).isPresent();
390392
assertThat(replicateTaskSummary.get().getWorkerpoolAuthorization().getChainTaskId()).isEqualTo(CHAIN_TASK_ID);
391393

392-
Mockito.verify(replicatesService).addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1);
394+
Mockito.verify(replicatesService).addNewReplicate(replicatesList, WALLET_WORKER_1);
393395
Mockito.verify(workerService).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1);
394396
Mockito.verify(signatureService, times(0)).createAuthorization(any(), eq(CHAIN_TASK_ID_2), any());
395397
assertTaskAccessForNewReplicateNotDeadLocking(CHAIN_TASK_ID);
@@ -459,12 +461,14 @@ void shouldGetReplicateWithNoTee() {
459461
when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false);
460462
when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1))
461463
.thenReturn(Optional.of(existingWorker));
464+
when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1))
465+
.thenReturn(true);
462466

463467
Optional<ReplicateTaskSummary> replicateTaskSummary =
464468
replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1);
465469
assertThat(replicateTaskSummary).isPresent();
466470

467-
Mockito.verify(replicatesService).addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1);
471+
Mockito.verify(replicatesService).addNewReplicate(replicatesList, WALLET_WORKER_1);
468472
Mockito.verify(workerService).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1);
469473
Mockito.verify(signatureService).createAuthorization(WALLET_WORKER_1, CHAIN_TASK_ID, BytesUtils.EMPTY_ADDRESS);
470474
assertTaskAccessForNewReplicateNotDeadLocking(CHAIN_TASK_ID);
@@ -501,14 +505,16 @@ void shouldGetReplicateWithTee() {
501505
.thenReturn(new WorkerpoolAuthorization());
502506
when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1))
503507
.thenReturn(Optional.of(existingWorker));
508+
when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1))
509+
.thenReturn(true);
504510

505511
when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false);
506512
Optional<ReplicateTaskSummary> replicateTaskSummary =
507513
replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1);
508514

509515
assertThat(replicateTaskSummary).isPresent();
510516

511-
Mockito.verify(replicatesService).addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1);
517+
Mockito.verify(replicatesService).addNewReplicate(replicatesList, WALLET_WORKER_1);
512518
Mockito.verify(workerService).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1);
513519
assertTaskAccessForNewReplicateNotDeadLocking(CHAIN_TASK_ID);
514520
}
@@ -573,14 +579,16 @@ void shouldTeeNeededTaskBeGivenToTeeEnabledWorker() {
573579
.thenReturn(new WorkerpoolAuthorization());
574580
when(workerService.addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1))
575581
.thenReturn(Optional.of(existingWorker));
582+
when(replicatesService.addNewReplicate(replicatesList, WALLET_WORKER_1))
583+
.thenReturn(true);
576584

577585
when(replicatesList.hasWorkerAlreadyParticipated(WALLET_WORKER_1)).thenReturn(false);
578586
Optional<ReplicateTaskSummary> replicateTaskSummary =
579587
replicateSupplyService.getAvailableReplicateTaskSummary(workerLastBlock, WALLET_WORKER_1);
580588

581589
assertThat(replicateTaskSummary).isPresent();
582590

583-
Mockito.verify(replicatesService).addNewReplicate(CHAIN_TASK_ID, WALLET_WORKER_1);
591+
Mockito.verify(replicatesService).addNewReplicate(replicatesList, WALLET_WORKER_1);
584592
Mockito.verify(workerService).addChainTaskIdToWorker(CHAIN_TASK_ID, WALLET_WORKER_1);
585593
assertTaskAccessForNewReplicateNotDeadLocking(CHAIN_TASK_ID);
586594
}

0 commit comments

Comments
 (0)