Skip to content

Commit f246c27

Browse files
authored
Add synchronized keyword on abort method to avoid concurrency issues on file system (#643)
1 parent 144c54a commit f246c27

File tree

8 files changed

+93
-68
lines changed

8 files changed

+93
-68
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,18 @@ All notable changes to this project will be documented in this file.
1010
- Validate authorization proof for pre/post-compute requests. (#635)
1111
- Add `WebSocketBlockchainListener` to fetch latest block without polling the blockchain network. (#639)
1212

13+
### Bug Fixes
14+
15+
- Add synchronized keyword on abort method to avoid concurrency issues on file system. (#643)
16+
1317
### Quality
1418

1519
- Refactor `RestTemplateConfig` to use `HttpClient 5` and improve proxy handling. (#626)
1620
- Replace deprecated `connect` with `connectAsync` in `StompClientService`. (#627)
1721
- Remove redundant blockchain calls to diminish pressure on Ethereum JSON-RPC API. (#632)
1822
- Stop using `TestUtils` in `ContributionServiceTests`. (#640)
1923
- Fix several issues raised by SonarQube Cloud. (#642)
24+
- Improve JavaDoc comments in `ComputeManagerService` and `DockerService`. (#643)
2025

2126
### Breaking API changes
2227

src/main/java/com/iexec/worker/compute/ComputeManagerService.java

Lines changed: 64 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,7 +41,6 @@
4141
import java.time.temporal.ChronoUnit;
4242
import java.util.HashMap;
4343
import java.util.Map;
44-
import java.util.function.Predicate;
4544

4645
@Slf4j
4746
@Service
@@ -77,6 +76,15 @@ public ComputeManagerService(
7776
this.resultService = resultService;
7877
}
7978

79+
/**
80+
* Download OCI image of the application to execute.
81+
* <p>
82+
* The download fails for a bad task description or if a timeout is reached.
83+
* The timeout is computed by calling {@link #computeImagePullTimeout(TaskDescription)}.
84+
*
85+
* @param taskDescription Task description containing application type and download URI
86+
* @return true if download succeeded, false otherwise
87+
*/
8088
public boolean downloadApp(TaskDescription taskDescription) {
8189
if (taskDescription == null || taskDescription.getAppType() == null) {
8290
return false;
@@ -88,8 +96,9 @@ public boolean downloadApp(TaskDescription taskDescription) {
8896
}
8997

9098
final long pullTimeout = computeImagePullTimeout(taskDescription);
91-
return dockerService.getClient(taskDescription.getAppUri())
99+
dockerService.getClient(taskDescription.getAppUri())
92100
.pullImage(taskDescription.getAppUri(), Duration.of(pullTimeout, ChronoUnit.MINUTES));
101+
return dockerService.getClient(taskDescription.getAppUri()).isImagePresent(taskDescription.getAppUri());
93102
}
94103

95104
/**
@@ -139,35 +148,45 @@ public boolean isAppDownloaded(String imageUri) {
139148
}
140149

141150
/**
142-
* Standard tasks: download secrets && decrypt dataset (TODO: rewritte or remove)
143-
* <p>
151+
* Execute pre-compute stage for standard and TEE tasks.
152+
* <ul>
153+
* <li>Standard tasks: Nothing is executed, an empty result is returned
154+
* <li>TEE tasks: Call {@link PreComputeService#runTeePreCompute(TaskDescription, WorkerpoolAuthorization)}
155+
* </ul>
144156
* TEE tasks: download pre-compute and post-compute images,
145157
* create SCONE secure session, and run pre-compute container.
146158
*
147-
* @param taskDescription
148-
* @param workerpoolAuth
149-
* @return
159+
* @param taskDescription Description of the task
160+
* @param workerpoolAuth Authorization to contribute delivered by the scheduler for the given task
161+
* @return {@code PreComputeResponse} instance
162+
* @see PreComputeService#runTeePreCompute(TaskDescription, WorkerpoolAuthorization)
150163
*/
151-
public PreComputeResponse runPreCompute(TaskDescription taskDescription,
152-
WorkerpoolAuthorization workerpoolAuth) {
164+
public PreComputeResponse runPreCompute(final TaskDescription taskDescription,
165+
final WorkerpoolAuthorization workerpoolAuth) {
153166
log.info("Running pre-compute [chainTaskId:{}, isTee:{}]",
154-
taskDescription.getChainTaskId(),
155-
taskDescription.isTeeTask());
167+
taskDescription.getChainTaskId(), taskDescription.isTeeTask());
156168

157169
if (taskDescription.isTeeTask()) {
158-
return preComputeService.runTeePreCompute(taskDescription,
159-
workerpoolAuth);
170+
return preComputeService.runTeePreCompute(taskDescription, workerpoolAuth);
160171
}
161172
return PreComputeResponse.builder().build();
162173
}
163174

164-
public AppComputeResponse runCompute(TaskDescription taskDescription,
165-
TeeSessionGenerationResponse secureSession) {
166-
String chainTaskId = taskDescription.getChainTaskId();
167-
log.info("Running compute [chainTaskId:{}, isTee:{}]", chainTaskId,
168-
taskDescription.isTeeTask());
175+
/**
176+
* Execute application stage for standard and TEE tasks.
177+
*
178+
* @param taskDescription Description of the task
179+
* @param secureSession Session ID and session storage URL for TEE tasks
180+
* @return {@code AppComputeResponse} instance
181+
* @see AppComputeService#runCompute(TaskDescription, TeeSessionGenerationResponse)
182+
*/
183+
public AppComputeResponse runCompute(final TaskDescription taskDescription,
184+
final TeeSessionGenerationResponse secureSession) {
185+
final String chainTaskId = taskDescription.getChainTaskId();
186+
log.info("Running compute [chainTaskId:{}, isTee:{}]",
187+
chainTaskId, taskDescription.isTeeTask());
169188

170-
AppComputeResponse appComputeResponse =
189+
final AppComputeResponse appComputeResponse =
171190
appComputeService.runCompute(taskDescription, secureSession);
172191

173192
if (appComputeResponse.isSuccessful()) {
@@ -179,55 +198,58 @@ public AppComputeResponse runCompute(TaskDescription taskDescription,
179198

180199
private void writeLogs(String chainTaskId, String filename, String logs) {
181200
if (!logs.isEmpty()) {
182-
String filePath = workerConfigService.getTaskIexecOutDir(chainTaskId) + File.separator + filename;
183-
File file = FileHelper.createFileWithContent(filePath, logs);
184-
log.info("Saved logs file [path:{}]",
185-
file.getAbsolutePath());
201+
final String filePath = workerConfigService.getTaskIexecOutDir(chainTaskId) + File.separator + filename;
202+
final File file = FileHelper.createFileWithContent(filePath, logs);
203+
log.info("Saved logs file [path:{}]", file.getAbsolutePath());
186204
//TODO Make sure file is properly written
187205
}
188206
}
189207

190-
/*
191-
* - Copy computed.json file produced by the compute stage to /output
192-
* - Zip iexec_out folder
193-
* For TEE tasks, worker-tee-post-compute will do those two steps since
194-
* all files in are protected.
208+
/**
209+
* Execute post-compute stage for standard and TEE tasks.
210+
* <p>
211+
* This method calls methods from {@code PostComputeService} depending on the Task type.
195212
*
196-
* - Save stdout file
213+
* @param taskDescription Description of the task
214+
* @param secureSession Session ID and session storage URL for TEE tasks
215+
* @return {@code PostComputeResponse} instance
216+
* @see PostComputeService#runStandardPostCompute(TaskDescription)
217+
* @see PostComputeService#runTeePostCompute(TaskDescription, TeeSessionGenerationResponse)
197218
*/
198-
public PostComputeResponse runPostCompute(TaskDescription taskDescription,
199-
TeeSessionGenerationResponse secureSession) {
200-
String chainTaskId = taskDescription.getChainTaskId();
219+
public PostComputeResponse runPostCompute(final TaskDescription taskDescription,
220+
final TeeSessionGenerationResponse secureSession) {
221+
final String chainTaskId = taskDescription.getChainTaskId();
201222
log.info("Running post-compute [chainTaskId:{}, isTee:{}]",
202223
chainTaskId, taskDescription.isTeeTask());
203-
PostComputeResponse postComputeResponse = PostComputeResponse.builder()
204-
.exitCause(ReplicateStatusCause.POST_COMPUTE_FAILED_UNKNOWN_ISSUE)
205-
.build();
206224

225+
final PostComputeResponse postComputeResponse;
207226
if (!taskDescription.isTeeTask()) {
208227
postComputeResponse = postComputeService.runStandardPostCompute(taskDescription);
209228
} else if (secureSession != null) {
210229
postComputeResponse = postComputeService.runTeePostCompute(taskDescription, secureSession);
230+
} else {
231+
postComputeResponse = PostComputeResponse.builder()
232+
.exitCause(ReplicateStatusCause.POST_COMPUTE_FAILED_UNKNOWN_ISSUE)
233+
.build();
211234
}
212235
if (!postComputeResponse.isSuccessful()) {
213236
return postComputeResponse;
214237
}
215-
ComputedFile computedFile = resultService.readComputedFile(chainTaskId);
238+
final ComputedFile computedFile = resultService.readComputedFile(chainTaskId);
216239
if (computedFile == null) {
217240
postComputeResponse.setExitCause(ReplicateStatusCause.POST_COMPUTE_COMPUTED_FILE_NOT_FOUND);
218241
return postComputeResponse;
219242
}
220-
String resultDigest = resultService.computeResultDigest(computedFile);
243+
final String resultDigest = resultService.computeResultDigest(computedFile);
221244
if (resultDigest.isEmpty()) {
222245
postComputeResponse.setExitCause(ReplicateStatusCause.POST_COMPUTE_RESULT_DIGEST_COMPUTATION_FAILED);
223246
}
224-
resultService.saveResultInfo(chainTaskId, taskDescription, computedFile);
247+
resultService.saveResultInfo(taskDescription, computedFile);
225248
return postComputeResponse;
226249
}
227250

228-
public boolean abort(String chainTaskId) {
229-
Predicate<String> containsChainTaskId = name -> name.contains(chainTaskId);
230-
long remaining = dockerService.stopRunningContainersWithNamePredicate(containsChainTaskId);
251+
public boolean abort(final String chainTaskId) {
252+
final long remaining = dockerService.stopRunningContainersWithNameContaining(chainTaskId);
231253
log.info("Stopped task containers [chainTaskId:{}, remaining:{}]", chainTaskId, remaining);
232254
return remaining == 0L;
233255
}

src/main/java/com/iexec/worker/docker/DockerService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -202,18 +202,18 @@ public void stopAllRunningContainers() {
202202
}
203203

204204
/**
205-
* Stop running containers with names that match the provided predicate and
205+
* Stop running containers with names that contain the provided pattern and
206206
* remove them from the running containers record. This is typically used when
207207
* the worker aborts a task and needs to stop its pre-compute, compute, or
208208
* post-compute containers. The container itself is not removed here as it is
209209
* removed by its watcher thread.
210210
*
211-
* @param containerNamePredicate predicate that contains a condition on the
212-
* container name.
211+
* @param pattern containers whose name contains this pattern will be removed.
213212
* @return The remaining count of containers matching the provided predicate.
214213
*/
215-
public long stopRunningContainersWithNamePredicate(Predicate<String> containerNamePredicate) {
216-
log.info("Stopping containers with names matching the provided predicate");
214+
public long stopRunningContainersWithNameContaining(final String pattern) {
215+
log.info("Stopping containers with names containing the following pattern [pattern:{}]", pattern);
216+
final Predicate<String> containerNamePredicate = name -> name.contains(pattern);
217217
List.copyOf(runningContainersRecord).stream()
218218
.filter(containerNamePredicate)
219219
.forEach(this::stopRunningContainer);

src/main/java/com/iexec/worker/replicate/ReplicateRecoveryService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ boolean canReplicateBeRecovered(TaskNotification missedTaskNotification) {
9696
}
9797

9898
final ComputedFile computedFile = resultService.getComputedFile(chainTaskId);
99-
resultService.saveResultInfo(chainTaskId, taskDescription, computedFile);
99+
resultService.saveResultInfo(taskDescription, computedFile);
100100
subscriptionService.subscribeToTopic(chainTaskId);
101101
applicationEventPublisher.publishEvent(missedTaskNotification);
102102

src/main/java/com/iexec/worker/result/ResultService.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public boolean writeErrorToIexecOut(final String chainTaskId, final ReplicateSta
116116
+ IexecFileHelper.COMPUTED_JSON, computedFileJsonAsString.getBytes());
117117
}
118118

119-
public void saveResultInfo(final String chainTaskId, final TaskDescription taskDescription,
119+
public void saveResultInfo(final TaskDescription taskDescription,
120120
final ComputedFile computedFile) {
121121
final ResultInfo resultInfo = ResultInfo.builder()
122122
.image(taskDescription.getAppUri())
@@ -125,7 +125,7 @@ public void saveResultInfo(final String chainTaskId, final TaskDescription taskD
125125
.datasetUri(taskDescription.getDatasetUri())
126126
.build();
127127

128-
resultInfoMap.put(chainTaskId, resultInfo);
128+
resultInfoMap.put(taskDescription.getChainTaskId(), resultInfo);
129129
}
130130

131131
public ResultModel getResultModelWithZip(final String chainTaskId) {
@@ -409,12 +409,11 @@ public boolean purgeTask(final String chainTaskId) {
409409
final boolean deletedInMap = !resultInfoMap.containsKey(chainTaskId);
410410
final boolean deletedTaskFolder = !new File(taskBaseDir).exists();
411411

412-
boolean deleted = deletedInMap && deletedTaskFolder;
413-
if (deletedTaskFolder) {
412+
final boolean deleted = deletedInMap && deletedTaskFolder;
413+
if (deleted) {
414414
log.info("The result of the chainTaskId has been deleted [chainTaskId:{}]", chainTaskId);
415415
} else {
416-
log.warn("The result of the chainTaskId couldn't be deleted [chainTaskId:{}, deletedInMap:{}, " +
417-
"deletedTaskFolder:{}]",
416+
log.warn("The result of the chainTaskId couldn't be deleted [chainTaskId:{}, deletedInMap:{}, deletedTaskFolder:{}]",
418417
chainTaskId, deletedInMap, deletedTaskFolder);
419418
}
420419

src/main/java/com/iexec/worker/task/TaskManagerService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -452,10 +452,10 @@ ReplicateActionResponse complete(String chainTaskId) {
452452
* @param chainTaskId Task ID
453453
* @return {@literal true} if all cleanup operations went well, {@literal false} otherwise
454454
*/
455-
boolean abort(String chainTaskId) {
455+
synchronized boolean abort(final String chainTaskId) {
456456
log.info("Aborting task [chainTaskId:{}]", chainTaskId);
457-
boolean allContainersStopped = computeManagerService.abort(chainTaskId);
458-
boolean allServicesPurged = purgeService.purgeAllServices(chainTaskId);
457+
final boolean allContainersStopped = computeManagerService.abort(chainTaskId);
458+
final boolean allServicesPurged = purgeService.purgeAllServices(chainTaskId);
459459
final boolean isSuccess = allContainersStopped && allServicesPurged;
460460
if (!isSuccess) {
461461
log.error("Failed to abort task [chainTaskId:{}, containers:{}, services:{}]",

src/test/java/com/iexec/worker/compute/ComputeManagerServiceTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -54,7 +54,6 @@
5454

5555
import static org.assertj.core.api.Assertions.assertThat;
5656
import static org.mockito.ArgumentMatchers.any;
57-
import static org.mockito.ArgumentMatchers.anyString;
5857
import static org.mockito.Mockito.*;
5958

6059
@ExtendWith(MockitoExtension.class)
@@ -119,6 +118,7 @@ void shouldDownloadApp() {
119118
when(dockerRegistryConfiguration.getMaxPullTimeout()).thenReturn(Duration.of(30, ChronoUnit.MINUTES));
120119
when(dockerService.getClient(taskDescription.getAppUri())).thenReturn(dockerClient);
121120
when(dockerClient.pullImage(taskDescription.getAppUri(), Duration.of(7, ChronoUnit.MINUTES))).thenReturn(true);
121+
when(dockerClient.isImagePresent(taskDescription.getAppUri())).thenReturn(true);
122122
assertThat(computeManagerService.downloadApp(taskDescription)).isTrue();
123123
}
124124

@@ -127,6 +127,7 @@ void shouldNotDownloadAppSincePullImageFailed() {
127127
final TaskDescription taskDescription = createTaskDescriptionBuilder(true).build();
128128
when(dockerService.getClient(taskDescription.getAppUri())).thenReturn(dockerClient);
129129
when(dockerClient.pullImage(taskDescription.getAppUri(), Duration.ofMinutes(0))).thenReturn(false);
130+
when(dockerClient.isImagePresent(taskDescription.getAppUri())).thenReturn(false);
130131
assertThat(computeManagerService.downloadApp(taskDescription)).isFalse();
131132
}
132133

@@ -349,7 +350,7 @@ void shouldRunStandardPostCompute() {
349350
verify(postComputeService).runStandardPostCompute(taskDescription);
350351
verify(resultService).readComputedFile(CHAIN_TASK_ID);
351352
verify(resultService).computeResultDigest(computedFile);
352-
verify(resultService).saveResultInfo(anyString(), any(), any());
353+
verify(resultService).saveResultInfo(any(), any());
353354
}
354355

355356
@ParameterizedTest
@@ -390,7 +391,7 @@ void shouldRunTeePostCompute() {
390391
verify(postComputeService).runTeePostCompute(taskDescription, SECURE_SESSION);
391392
verify(resultService).readComputedFile(CHAIN_TASK_ID);
392393
verify(resultService).computeResultDigest(computedFile);
393-
verify(resultService).saveResultInfo(anyString(), any(), any());
394+
verify(resultService).saveResultInfo(any(), any());
394395
}
395396

396397
@Test
@@ -458,13 +459,13 @@ void computeImagePullTimeout(long maxExecutionTime,
458459
// region abort
459460
@Test
460461
void shouldNotAbortWhenContainersAreStillRunning() {
461-
when(dockerService.stopRunningContainersWithNamePredicate(any())).thenReturn(1L);
462+
when(dockerService.stopRunningContainersWithNameContaining(any())).thenReturn(1L);
462463
assertThat(computeManagerService.abort(CHAIN_TASK_ID)).isFalse();
463464
}
464465

465466
@Test
466467
void shouldAbortTask() {
467-
when(dockerService.stopRunningContainersWithNamePredicate(any())).thenReturn(0L);
468+
when(dockerService.stopRunningContainersWithNameContaining(any())).thenReturn(0L);
468469
assertThat(computeManagerService.abort(CHAIN_TASK_ID)).isTrue();
469470
}
470471
// endregion

src/test/java/com/iexec/worker/docker/DockerServiceTests.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,7 +39,6 @@
3939

4040
import java.io.File;
4141
import java.util.Optional;
42-
import java.util.function.Predicate;
4342
import java.util.stream.Stream;
4443

4544
import static com.iexec.commons.containers.client.DockerClientInstance.DEFAULT_DOCKER_REGISTRY;
@@ -422,8 +421,7 @@ void shouldStopRunningContainersWithNamePattern() {
422421
dockerService.addToRunningContainersRecord("containerName1");
423422
dockerService.addToRunningContainersRecord("containerName2");
424423

425-
Predicate<String> containsChainTaskId = name -> name.contains(CHAIN_TASK_ID);
426-
dockerService.stopRunningContainersWithNamePredicate(containsChainTaskId);
424+
dockerService.stopRunningContainersWithNameContaining(CHAIN_TASK_ID);
427425
// Verify we removed all containers matching the predicate
428426
verify(dockerService, times(3)).stopRunningContainer(anyString());
429427
// Verify we removed only containers matching the predicate

0 commit comments

Comments
 (0)