Skip to content

Commit c1e6ec3

Browse files
authored
feat: update models to distinguish SGX and TDX enabled workers (#764)
1 parent 14103ec commit c1e6ec3

File tree

12 files changed

+240
-197
lines changed

12 files changed

+240
-197
lines changed

iexec-core-library/src/main/java/com/iexec/core/config/WorkerModel.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ public class WorkerModel {
3131
String cpu;
3232
int cpuNb;
3333
int memorySize;
34-
boolean teeEnabled;
3534
boolean gpuEnabled;
35+
// TODO remove or rename to sgxEnabled in the future
36+
boolean teeEnabled;
37+
boolean tdxEnabled;
3638

3739
@JsonPOJOBuilder(withPrefix = "")
3840
public static class WorkerModelBuilder {

iexec-core-library/src/test/java/com/iexec/core/config/WorkerModelTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ void shouldSerializeAndDeserialize() throws JsonProcessingException {
3131
WorkerModel model = WorkerModel.builder().build();
3232
String jsonString = mapper.writeValueAsString(model);
3333
assertThat(jsonString).isEqualTo("{\"name\":null,\"walletAddress\":null,\"os\":null,\"cpu\":null," +
34-
"\"cpuNb\":0,\"memorySize\":0,\"teeEnabled\":false,\"gpuEnabled\":false}");
34+
"\"cpuNb\":0,\"memorySize\":0,\"gpuEnabled\":false,\"teeEnabled\":false,\"tdxEnabled\":false}");
3535
WorkerModel parsedModel = mapper.readValue(jsonString, WorkerModel.class);
3636
assertThat(parsedModel).isEqualTo(model);
3737
}

src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -98,52 +98,38 @@ Optional<ReplicateTaskSummary> getAvailableReplicateTaskSummary(long workerLastB
9898
return Optional.empty();
9999
}
100100

101-
final Optional<Worker> optional = workerService.getWorker(walletAddress);
102-
if (optional.isEmpty()) {
101+
final Worker worker = workerService.getWorker(walletAddress).orElse(null);
102+
// return empty if the worker is not found or if max computing task is reached
103+
if (worker == null || worker.hasNoRemainingComputingSlot()) {
103104
return Optional.empty();
104105
}
105-
final Worker worker = optional.get();
106106

107-
// return empty if max computing task is reached or if the worker is not found
108-
if (!workerService.canAcceptMoreWorks(worker)) {
109-
return Optional.empty();
110-
}
111-
112-
return getReplicateTaskSummaryForAnyAvailableTask(
113-
walletAddress,
114-
worker.isTeeEnabled()
115-
);
107+
return getReplicateTaskSummaryForAnyAvailableTask(worker);
116108
}
117109

118110
/**
119111
* Loops through available tasks
120112
* and finds the first one that needs a new {@link Replicate}.
121113
*
122-
* @param walletAddress Wallet address of the worker asking for work.
123-
* @param isTeeEnabled Whether this worker supports TEE.
114+
* @param worker scheduler model of the worker asking for work
124115
* @return An {@link Optional} containing a {@link ReplicateTaskSummary}
125116
* if any {@link Task} is available and can be handled by this worker,
126117
* {@link Optional#empty()} otherwise.
127118
*/
128-
private Optional<ReplicateTaskSummary> getReplicateTaskSummaryForAnyAvailableTask(
129-
String walletAddress,
130-
boolean isTeeEnabled) {
119+
private Optional<ReplicateTaskSummary> getReplicateTaskSummaryForAnyAvailableTask(final Worker worker) {
131120
final List<String> alreadyScannedTasks = new ArrayList<>();
121+
final List<String> excludedTags = worker.getExcludedTags();
132122

133123
Optional<ReplicateTaskSummary> replicateTaskSummary = Optional.empty();
134124
while (replicateTaskSummary.isEmpty()) {
135-
final Optional<Task> oTask = taskService.getPrioritizedInitializedOrRunningTask(
136-
!isTeeEnabled,
137-
alreadyScannedTasks
138-
);
139-
if (oTask.isEmpty()) {
125+
final Task task = taskService.getPrioritizedInitializedOrRunningTask(
126+
excludedTags, alreadyScannedTasks).orElse(null);
127+
if (task == null) {
140128
// No more tasks waiting for a new replicate.
141129
return Optional.empty();
142130
}
143-
144-
final Task task = oTask.get();
145131
alreadyScannedTasks.add(task.getChainTaskId());
146-
replicateTaskSummary = getReplicateTaskSummary(task, walletAddress);
132+
replicateTaskSummary = getReplicateTaskSummary(task, worker.getWalletAddress());
147133
}
148134
return replicateTaskSummary;
149135
}

src/main/java/com/iexec/core/task/Task.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -146,7 +146,7 @@ public boolean inCompletionPhase() {
146146
}
147147

148148
public boolean isTeeTask() {
149-
return TeeUtils.isTeeTag(getTag());
149+
return TeeUtils.getTeeFramework(tag) != null;
150150
}
151151

152152
TaskModel generateModel() {

src/main/java/com/iexec/core/task/TaskService.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import com.iexec.commons.poco.chain.ChainTask;
2020
import com.iexec.commons.poco.chain.ChainTaskStatus;
21-
import com.iexec.commons.poco.tee.TeeUtils;
2221
import com.iexec.core.chain.IexecHubService;
2322
import com.iexec.core.replicate.ReplicatesList;
2423
import com.iexec.core.task.event.TaskCreatedEvent;
@@ -229,21 +228,17 @@ public List<Task> findByCurrentStatus(List<TaskStatus> statusList) {
229228
* <p>
230229
* Tasks can be excluded with {@code excludedChainTaskIds}.
231230
*
232-
* @param shouldExcludeTeeTasks Whether TEE tasks should be retrieved
233-
* as well as standard tasks.
234-
* @param excludedChainTaskIds Tasks to exclude from retrieval.
231+
* @param excludedTags Whether some tags should not be eligible, it is focused on TEE tags at the moment
232+
* @param excludedChainTaskIds Tasks to exclude from retrieval.
235233
* @return The first task which is {@link TaskStatus#INITIALIZED}
236234
* or {@link TaskStatus#RUNNING},
237235
* or {@link Optional#empty()} if no task meets the requirements.
238236
*/
239237
public Optional<Task> getPrioritizedInitializedOrRunningTask(
240-
boolean shouldExcludeTeeTasks,
241-
List<String> excludedChainTaskIds) {
242-
final List<String> excludedTags = shouldExcludeTeeTasks
243-
? List.of(TeeUtils.TEE_SCONE_ONLY_TAG, TeeUtils.TEE_GRAMINE_ONLY_TAG)
244-
: null;
238+
final List<String> excludedTags,
239+
final List<String> excludedChainTaskIds) {
245240
return findPrioritizedTask(
246-
Arrays.asList(INITIALIZED, RUNNING),
241+
List.of(INITIALIZED, RUNNING),
247242
excludedTags,
248243
excludedChainTaskIds,
249244
Sort.by(Sort.Order.desc(CURRENT_STATUS_FIELD_NAME),

src/main/java/com/iexec/core/worker/Worker.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,22 +16,24 @@
1616

1717
package com.iexec.core.worker;
1818

19+
import com.iexec.commons.poco.tee.TeeUtils;
1920
import lombok.AllArgsConstructor;
2021
import lombok.Builder;
2122
import lombok.Data;
23+
import lombok.extern.slf4j.Slf4j;
2224
import org.springframework.data.annotation.Id;
2325
import org.springframework.data.mongodb.core.index.Indexed;
2426
import org.springframework.data.mongodb.core.mapping.Document;
2527

2628
import java.util.Date;
2729
import java.util.List;
2830

31+
@Slf4j
2932
@Document
3033
@Data
3134
@Builder
3235
@AllArgsConstructor
3336
public class Worker {
34-
3537
@Id
3638
private String id;
3739
private String name;
@@ -44,8 +46,10 @@ public class Worker {
4446
private int cpuNb;
4547
private int maxNbTasks;
4648
private int memorySize;
47-
private boolean teeEnabled;
4849
private boolean gpuEnabled;
50+
// TODO remove or rename to sgxEnabled in the future
51+
private boolean teeEnabled;
52+
private boolean tdxEnabled;
4953
@Builder.Default
5054
private List<String> participatingChainTaskIds = List.of();
5155
@Builder.Default
@@ -60,12 +64,36 @@ void addChainTaskId(String chainTaskId) {
6064
computingChainTaskIds.add(chainTaskId);
6165
}
6266

63-
void removeChainTaskId(String chainTaskId) {
64-
participatingChainTaskIds.remove(chainTaskId);
65-
computingChainTaskIds.remove(chainTaskId);
67+
/**
68+
* Returns excluded tags depending on worker configuration
69+
*
70+
* @return The list of excluded tags
71+
*/
72+
public List<String> getExcludedTags() {
73+
if (!teeEnabled && !tdxEnabled) {
74+
return List.of(TeeUtils.TEE_TDX_ONLY_TAG, TeeUtils.TEE_SCONE_ONLY_TAG, TeeUtils.TEE_GRAMINE_ONLY_TAG);
75+
} else if (!teeEnabled) {
76+
return List.of(TeeUtils.TEE_SCONE_ONLY_TAG, TeeUtils.TEE_GRAMINE_ONLY_TAG);
77+
} else if (!tdxEnabled) {
78+
return List.of(TeeUtils.TEE_TDX_ONLY_TAG);
79+
} else {
80+
// /!\ teeEnabled and tdxEnabled are both true in this branch
81+
log.warn("Worker seems to support both SGX and TDX, this should not happen [wallet:{}]", walletAddress);
82+
return List.of();
83+
}
6684
}
6785

68-
void removeComputedChainTaskId(String chainTaskId) {
69-
computingChainTaskIds.remove(chainTaskId);
86+
/**
87+
* Returns whether the worker can accept more work or not.
88+
*
89+
* @return {@literal true} when the worker is at max capacity, {@literal false} otherwise
90+
*/
91+
public boolean hasNoRemainingComputingSlot() {
92+
final boolean areAllComputingSlotsInUse = computingChainTaskIds.size() >= maxNbTasks;
93+
if (areAllComputingSlotsInUse) {
94+
log.debug("Worker is computing at max capacity [walletAddress:{}, runningReplicateNb:{}, workerMaxNbTasks:{}]",
95+
walletAddress, computingChainTaskIds.size(), maxNbTasks);
96+
}
97+
return areAllComputingSlotsInUse;
7098
}
7199
}

src/main/java/com/iexec/core/worker/WorkerController.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -102,32 +102,33 @@ public ResponseEntity<String> getToken(@RequestParam(name = "walletAddress") Str
102102
}
103103

104104
@PostMapping(path = "/workers/register")
105-
public ResponseEntity<Worker> registerWorker(@RequestHeader("Authorization") String bearerToken,
106-
@RequestBody WorkerModel model) {
107-
String workerWalletAddress = jwtTokenProvider.getWalletAddressFromBearerToken(bearerToken);
105+
public ResponseEntity<Worker> registerWorker(@RequestHeader("Authorization") final String bearerToken,
106+
@RequestBody final WorkerModel model) {
107+
final String workerWalletAddress = jwtTokenProvider.getWalletAddressFromBearerToken(bearerToken);
108108
if (workerWalletAddress.isEmpty()) {
109109
WorkerUtils.emitWarnOnUnAuthorizedAccess("");
110110
return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build();
111111
}
112112

113113
// if it is a GPU worker, it can process only 1 task at a time, otherwise it can process cpuNb
114-
int maxNbTasks = model.isGpuEnabled() ? 1 : model.getCpuNb();
114+
final int maxNbTasks = model.isGpuEnabled() ? 1 : model.getCpuNb();
115115

116-
Worker worker = Worker.builder()
116+
final Worker worker = Worker.builder()
117117
.name(model.getName())
118118
.walletAddress(workerWalletAddress)
119119
.os(model.getOs())
120120
.cpu(model.getCpu())
121121
.cpuNb(model.getCpuNb())
122122
.maxNbTasks(maxNbTasks)
123123
.memorySize(model.getMemorySize())
124-
.teeEnabled(model.isTeeEnabled())
125124
.gpuEnabled(model.isGpuEnabled())
125+
.teeEnabled(model.isTeeEnabled())
126+
.tdxEnabled(model.isTdxEnabled())
126127
.participatingChainTaskIds(new ArrayList<>())
127128
.computingChainTaskIds(new ArrayList<>())
128129
.build();
129130

130-
Worker savedWorker = workerService.addWorker(worker);
131+
final Worker savedWorker = workerService.addWorker(worker);
131132
log.info("Worker ready [worker:{}]", savedWorker);
132133
return ok(savedWorker);
133134
}

src/main/java/com/iexec/core/worker/WorkerService.java

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -191,19 +191,6 @@ public List<Worker> getAliveWorkers() {
191191
.toList();
192192
return workerRepository.findByWalletAddressIn(aliveWorkers);
193193
}
194-
195-
public boolean canAcceptMoreWorks(Worker worker) {
196-
int workerMaxNbTasks = worker.getMaxNbTasks();
197-
int runningReplicateNb = worker.getComputingChainTaskIds().size();
198-
199-
if (runningReplicateNb >= workerMaxNbTasks) {
200-
log.debug("Worker asking for too many replicates [walletAddress:{}, runningReplicateNb:{}, workerMaxNbTasks:{}]",
201-
worker.getWalletAddress(), runningReplicateNb, workerMaxNbTasks);
202-
return false;
203-
}
204-
205-
return true;
206-
}
207194
// endregion
208195

209196
// region Read-and-write methods
@@ -257,21 +244,20 @@ public Optional<Worker> addChainTaskIdToWorker(String chainTaskId, String wallet
257244
}
258245

259246
private Optional<Worker> addChainTaskIdToWorkerWithoutThreadSafety(String chainTaskId, String walletAddress) {
260-
final Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
261-
if (optional.isPresent()) {
262-
final Worker worker = optional.get();
263-
if (!canAcceptMoreWorks(worker)) {
264-
log.warn("Can't add chainTaskId to worker when already full [chainTaskId:{}, workerAddress:{}]",
265-
chainTaskId, walletAddress);
266-
return Optional.empty();
267-
}
268-
worker.addChainTaskId(chainTaskId);
269-
log.info("Added chainTaskId to worker [chainTaskId:{}, workerAddress:{}]", chainTaskId, walletAddress);
270-
return Optional.of(workerRepository.save(worker));
247+
final Worker worker = workerRepository.findByWalletAddress(walletAddress).orElse(null);
248+
if (worker == null) {
249+
log.warn("Can't add chainTaskId to worker when unknown worker [chainTaskId:{}, workerAddress:{}]",
250+
chainTaskId, walletAddress);
251+
return Optional.empty();
252+
}
253+
if (worker.hasNoRemainingComputingSlot()) {
254+
log.warn("Can't add chainTaskId to worker when already full [chainTaskId:{}, workerAddress:{}]",
255+
chainTaskId, walletAddress);
256+
return Optional.empty();
271257
}
272-
log.warn("Can't add chainTaskId to worker when unknown worker [chainTaskId:{}, workerAddress:{}]",
273-
chainTaskId, walletAddress);
274-
return Optional.empty();
258+
worker.addChainTaskId(chainTaskId);
259+
log.info("Added chainTaskId to worker [chainTaskId:{}, workerAddress:{}]", chainTaskId, walletAddress);
260+
return Optional.of(workerRepository.save(worker));
275261
}
276262

277263
public Optional<Worker> removeChainTaskIdFromWorker(String chainTaskId, String walletAddress) {

0 commit comments

Comments
 (0)