Skip to content

Commit 7ff08c7

Browse files
committed
Check if Worker can still accept more work right before giving it new replicate
1 parent e9ab24a commit 7ff08c7

File tree

8 files changed

+153
-95
lines changed

8 files changed

+153
-95
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
All notable changes to this project will be documented in this file.
44

5+
## [[8.2.3]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.2.3) 2023-12-13
6+
7+
### Bug Fixes
8+
9+
- Check if Worker can still accept more work right before giving it new replicate. (#644)
10+
511
## [[8.2.2]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.2.2) 2023-12-13
612

713
### Bug Fixes

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version=8.2.2
1+
version=8.2.3
22
iexecCommonVersion=8.3.0
33
iexecCommonsPocoVersion=3.1.0
44
iexecBlockchainAdapterVersion=8.2.0

src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,9 @@ public ReplicateSupplyService(ReplicatesService replicatesService,
8686
*/
8787
@Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 5)
8888
Optional<ReplicateTaskSummary> getAvailableReplicateTaskSummary(long workerLastBlock, String walletAddress) {
89-
// return empty if max computing task is reached or if the worker is not found
90-
if (!workerService.canAcceptMoreWorks(walletAddress)) {
91-
return Optional.empty();
92-
}
93-
9489
// return empty if the worker is not sync
9590
//TODO Check if worker node is sync
96-
boolean isWorkerLastBlockAvailable = workerLastBlock > 0;
91+
final boolean isWorkerLastBlockAvailable = workerLastBlock > 0;
9792
if (!isWorkerLastBlockAvailable) {
9893
return Optional.empty();
9994
}
@@ -104,11 +99,16 @@ Optional<ReplicateTaskSummary> getAvailableReplicateTaskSummary(long workerLastB
10499

105100
// TODO : Remove this, the optional can never be empty
106101
// This is covered in workerService.canAcceptMoreWorks
107-
Optional<Worker> optional = workerService.getWorker(walletAddress);
102+
final Optional<Worker> optional = workerService.getWorker(walletAddress);
108103
if (optional.isEmpty()) {
109104
return Optional.empty();
110105
}
111-
Worker worker = optional.get();
106+
final Worker worker = optional.get();
107+
108+
// return empty if max computing task is reached or if the worker is not found
109+
if (!workerService.canAcceptMoreWorks(worker)) {
110+
return Optional.empty();
111+
}
112112

113113
return getReplicateTaskSummaryForAnyAvailableTask(
114114
walletAddress,
@@ -161,8 +161,8 @@ private Optional<ReplicateTaskSummary> getReplicateTaskSummary(Task task, String
161161
chainTaskId,
162162
task.getEnclaveChallenge());
163163
ReplicateTaskSummaryBuilder replicateTaskSummary = ReplicateTaskSummary.builder()
164-
.workerpoolAuthorization(authorization);
165-
if(task.isTeeTask()){
164+
.workerpoolAuthorization(authorization);
165+
if (task.isTeeTask()) {
166166
replicateTaskSummary.smsUrl(task.getSmsUrl());
167167
}
168168
return Optional.of(replicateTaskSummary.build());
@@ -173,7 +173,7 @@ private Optional<ReplicateTaskSummary> getReplicateTaskSummary(Task task, String
173173
* tries to accept the task - i.e. create a new {@link Replicate}
174174
* for that task on that worker.
175175
*
176-
* @param task {@link Task} needing at least one new {@link Replicate}.
176+
* @param task {@link Task} needing at least one new {@link Replicate}.
177177
* @param walletAddress Wallet address of a worker looking for new {@link Task}.
178178
* @return {@literal true} if the task has been accepted,
179179
* {@literal false} otherwise.
@@ -220,8 +220,8 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) {
220220
return false;
221221
}
222222

223-
replicatesService.addNewReplicate(chainTaskId, walletAddress);
224-
workerService.addChainTaskIdToWorker(chainTaskId, walletAddress);
223+
workerService.addChainTaskIdToWorker(chainTaskId, walletAddress)
224+
.ifPresent(worker -> replicatesService.addNewReplicate(chainTaskId, walletAddress));
225225
} finally {
226226
// We should always unlock the task
227227
// so that it could be taken by another replicate
@@ -234,8 +234,8 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) {
234234

235235
/**
236236
* Get notifications missed by the worker during the time it was absent.
237-
*
238-
* @param blockNumber last seen blocknumber by the worker
237+
*
238+
* @param blockNumber last seen blocknumber by the worker
239239
* @param walletAddress of the worker
240240
* @return list of missed notifications. Can be empty if no notification is found
241241
*/
@@ -264,7 +264,7 @@ public List<TaskNotification> getMissedTaskNotifications(long blockNumber, Strin
264264
continue;
265265
}
266266
TaskNotificationExtra taskNotificationExtra =
267-
getTaskNotificationExtra(task, taskNotificationType.get(), walletAddress, enclaveChallenge);
267+
getTaskNotificationExtra(task, taskNotificationType.get(), walletAddress, enclaveChallenge);
268268

269269
TaskNotification taskNotification = TaskNotification.builder()
270270
.chainTaskId(chainTaskId)
@@ -286,7 +286,7 @@ public List<TaskNotification> getMissedTaskNotifications(long blockNumber, Strin
286286
private TaskNotificationExtra getTaskNotificationExtra(Task task, TaskNotificationType taskNotificationType, String walletAddress, String enclaveChallenge) {
287287
TaskNotificationExtra taskNotificationExtra = TaskNotificationExtra.builder().build();
288288

289-
switch (taskNotificationType){
289+
switch (taskNotificationType) {
290290
case PLEASE_CONTRIBUTE:
291291
WorkerpoolAuthorization authorization = signatureService.createAuthorization(
292292
walletAddress, task.getChainTaskId(), enclaveChallenge);
@@ -312,7 +312,7 @@ public Optional<TaskNotificationType> getTaskNotificationType(Task task, Replica
312312
// CONTRIBUTION_TIMEOUT or CONSENSUS_REACHED without contribution
313313
if (task.getCurrentStatus().equals(TaskStatus.CONTRIBUTION_TIMEOUT)
314314
|| (task.getCurrentStatus().equals(TaskStatus.CONSENSUS_REACHED)
315-
&& !replicate.containsContributedStatus())) {
315+
&& !replicate.containsContributedStatus())) {
316316
return Optional.of(TaskNotificationType.PLEASE_ABORT);
317317
}
318318

src/main/java/com/iexec/core/replicate/ReplicatesService.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,20 @@ public ReplicatesService(ReplicatesRepository replicatesRepository,
7272
}
7373

7474
public void addNewReplicate(String chainTaskId, String walletAddress) {
75-
if (getReplicate(chainTaskId, walletAddress).isEmpty()) {
76-
Optional<ReplicatesList> optional = getReplicatesList(chainTaskId);
77-
if (optional.isPresent()) {
78-
ReplicatesList replicatesList = optional.get();
79-
Replicate replicate = new Replicate(walletAddress, chainTaskId);
80-
replicate.setWorkerWeight(iexecHubService.getWorkerWeight(walletAddress));// workerWeight value for pendingWeight estimate
81-
replicatesList.getReplicates().add(replicate);
82-
83-
replicatesRepository.save(replicatesList);
84-
log.info("New replicate saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
85-
}
75+
final Optional<ReplicatesList> oReplicatesList = getReplicatesList(chainTaskId);
76+
if (oReplicatesList.isEmpty()) {
77+
log.warn("Can't add replicate to unknown ReplicatesList [chainTaskId:{}, workerName:{}]", chainTaskId, walletAddress);
78+
return;
79+
}
80+
81+
final ReplicatesList replicatesList = oReplicatesList.get();
82+
if (replicatesList.getReplicateOfWorker(walletAddress).isEmpty()) {
83+
Replicate replicate = new Replicate(walletAddress, chainTaskId);
84+
replicate.setWorkerWeight(iexecHubService.getWorkerWeight(walletAddress));// workerWeight value for pendingWeight estimate
85+
replicatesList.getReplicates().add(replicate);
86+
87+
replicatesRepository.save(replicatesList);
88+
log.info("New replicate saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
8689
} else {
8790
log.error("Replicate already saved [chainTaskId:{}, walletAddress:{}]", chainTaskId, walletAddress);
8891
}
@@ -635,4 +638,4 @@ public void setRevealTimeoutStatusIfNeeded(String chainTaskId, Replicate replica
635638
updateReplicateStatus(chainTaskId, replicate.getWalletAddress(), statusUpdate);
636639
}
637640
}
638-
}
641+
}

src/main/java/com/iexec/core/worker/WorkerService.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,10 @@ public Optional<Worker> getWorker(String walletAddress) {
6969
return workerRepository.findByWalletAddress(walletAddress);
7070
}
7171

72-
public boolean isAllowedToJoin(String workerAddress){
72+
public boolean isAllowedToJoin(String workerAddress) {
7373
List<String> whitelist = workerConfiguration.getWhitelist();
7474
// if the whitelist is empty, there is no restriction on the workers
75-
if (whitelist.isEmpty()){
75+
if (whitelist.isEmpty()) {
7676
return true;
7777
}
7878
return whitelist.contains(workerAddress);
@@ -135,17 +135,17 @@ public List<Worker> getAliveWorkers() {
135135

136136
public boolean canAcceptMoreWorks(String walletAddress) {
137137
Optional<Worker> optionalWorker = getWorker(walletAddress);
138-
if (optionalWorker.isEmpty()){
139-
return false;
140-
}
138+
return optionalWorker.filter(this::canAcceptMoreWorks).isPresent();
139+
140+
}
141141

142-
Worker worker = optionalWorker.get();
142+
public boolean canAcceptMoreWorks(Worker worker) {
143143
int workerMaxNbTasks = worker.getMaxNbTasks();
144144
int runningReplicateNb = worker.getComputingChainTaskIds().size();
145145

146146
if (runningReplicateNb >= workerMaxNbTasks) {
147147
log.debug("Worker asking for too many replicates [walletAddress:{}, runningReplicateNb:{}, workerMaxNbTasks:{}]",
148-
walletAddress, runningReplicateNb, workerMaxNbTasks);
148+
worker.getWalletAddress(), runningReplicateNb, workerMaxNbTasks);
149149
return false;
150150
}
151151

@@ -154,44 +154,44 @@ public boolean canAcceptMoreWorks(String walletAddress) {
154154

155155
public int getAliveAvailableCpu() {
156156
int availableCpus = 0;
157-
for (Worker worker: getAliveWorkers()) {
157+
for (Worker worker : getAliveWorkers()) {
158158
if (worker.isGpuEnabled()) {
159159
continue;
160160
}
161161

162162
int workerCpuNb = worker.getCpuNb();
163163
int computingReplicateNb = worker.getComputingChainTaskIds().size();
164164
int availableCpu = workerCpuNb - computingReplicateNb;
165-
availableCpus+= availableCpu;
165+
availableCpus += availableCpu;
166166
}
167167
return availableCpus;
168168
}
169169

170170
public int getAliveTotalCpu() {
171171
int totalCpus = 0;
172-
for (Worker worker: getAliveWorkers()){
173-
if(worker.isGpuEnabled()) {
172+
for (Worker worker : getAliveWorkers()) {
173+
if (worker.isGpuEnabled()) {
174174
continue;
175175
}
176-
totalCpus+= worker.getCpuNb();
176+
totalCpus += worker.getCpuNb();
177177
}
178178
return totalCpus;
179179
}
180180

181181
// We suppose for now that 1 Gpu enabled worker has only one GPU
182182
public int getAliveTotalGpu() {
183183
int totalGpus = 0;
184-
for(Worker worker: getAliveWorkers()) {
184+
for (Worker worker : getAliveWorkers()) {
185185
if (worker.isGpuEnabled()) {
186186
totalGpus++;
187187
}
188188
}
189189
return totalGpus;
190190
}
191191

192-
public int getAliveAvailableGpu () {
192+
public int getAliveAvailableGpu() {
193193
int availableGpus = getAliveTotalGpu();
194-
for (Worker worker: getAliveWorkers()) {
194+
for (Worker worker : getAliveWorkers()) {
195195
if (worker.isGpuEnabled()) {
196196
boolean isWorking = !worker.getComputingChainTaskIds().isEmpty();
197197
if (isWorking) {
@@ -246,13 +246,20 @@ public Optional<Worker> addChainTaskIdToWorker(String chainTaskId, String wallet
246246
}
247247

248248
private Optional<Worker> addChainTaskIdToWorkerWithoutThreadSafety(String chainTaskId, String walletAddress) {
249-
Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
249+
final Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
250250
if (optional.isPresent()) {
251-
Worker worker = optional.get();
251+
final Worker worker = optional.get();
252+
if (!canAcceptMoreWorks(worker)) {
253+
log.warn("Can't add chainTaskId to worker when already full [chainTaskId:{}, workerName:{}]",
254+
chainTaskId, walletAddress);
255+
return Optional.empty();
256+
}
252257
worker.addChainTaskId(chainTaskId);
253258
log.info("Added chainTaskId to worker [chainTaskId:{}, workerName:{}]", chainTaskId, walletAddress);
254259
return Optional.of(workerRepository.save(worker));
255260
}
261+
log.warn("Can't add chainTaskId to worker when unknown worker [chainTaskId:{}, workerName:{}]",
262+
chainTaskId, walletAddress);
256263
return Optional.empty();
257264
}
258265

0 commit comments

Comments
 (0)