Skip to content

Commit 7d2e6bb

Browse files
authored
refactor: move ReplicateListners to com.iexec.core.replicate.listener package (#754)
1 parent a579cd2 commit 7d2e6bb

File tree

2 files changed

+65
-79
lines changed

2 files changed

+65
-79
lines changed

src/main/java/com/iexec/core/task/listener/ReplicateListeners.java renamed to src/main/java/com/iexec/core/replicate/listener/ReplicateListeners.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.iexec.core.task.listener;
17+
package com.iexec.core.replicate.listener;
1818

1919
import com.iexec.common.replicate.ReplicateStatus;
2020
import com.iexec.common.replicate.ReplicateStatusCause;
@@ -40,10 +40,10 @@ public class ReplicateListeners {
4040
private final ContributionUnnotifiedDetector contributionUnnotifiedDetector;
4141
private final ReplicatesService replicatesService;
4242

43-
public ReplicateListeners(WorkerService workerService,
44-
TaskUpdateRequestManager taskUpdateRequestManager,
45-
ContributionUnnotifiedDetector contributionUnnotifiedDetector,
46-
ReplicatesService replicatesService) {
43+
public ReplicateListeners(final WorkerService workerService,
44+
final TaskUpdateRequestManager taskUpdateRequestManager,
45+
final ContributionUnnotifiedDetector contributionUnnotifiedDetector,
46+
final ReplicatesService replicatesService) {
4747
this.workerService = workerService;
4848
this.taskUpdateRequestManager = taskUpdateRequestManager;
4949
this.contributionUnnotifiedDetector = contributionUnnotifiedDetector;
@@ -53,21 +53,21 @@ public ReplicateListeners(WorkerService workerService,
5353
@EventListener
5454
public void onReplicateUpdatedEvent(ReplicateUpdatedEvent event) {
5555
log.debug("Received ReplicateUpdatedEvent [chainTaskId:{}] ", event.getChainTaskId());
56-
ReplicateStatusUpdate statusUpdate = event.getReplicateStatusUpdate();
57-
ReplicateStatus newStatus = statusUpdate.getStatus();
58-
ReplicateStatusCause cause = statusUpdate.getDetails() != null ? statusUpdate.getDetails().getCause() : null;
56+
final ReplicateStatusUpdate statusUpdate = event.getReplicateStatusUpdate();
57+
final ReplicateStatus newStatus = statusUpdate.getStatus();
58+
final ReplicateStatusCause cause = statusUpdate.getDetails() != null ? statusUpdate.getDetails().getCause() : null;
5959

6060
taskUpdateRequestManager.publishRequest(event.getChainTaskId());
6161

6262
/*
6363
* Should release 1 CPU of given worker for this replicate if status is
6464
* "COMPUTED" or "*_FAILED" before COMPUTED
65-
* */
66-
if (newStatus.equals(ReplicateStatus.START_FAILED)
67-
|| newStatus.equals(ReplicateStatus.APP_DOWNLOAD_FAILED)
68-
|| newStatus.equals(ReplicateStatus.DATA_DOWNLOAD_FAILED)
69-
|| newStatus.equals(ReplicateStatus.COMPUTED)
70-
|| newStatus.equals(ReplicateStatus.COMPUTE_FAILED)) {
65+
*/
66+
if (newStatus == ReplicateStatus.START_FAILED
67+
|| newStatus == ReplicateStatus.APP_DOWNLOAD_FAILED
68+
|| newStatus == ReplicateStatus.DATA_DOWNLOAD_FAILED
69+
|| newStatus == ReplicateStatus.COMPUTED
70+
|| newStatus == ReplicateStatus.COMPUTE_FAILED) {
7171
workerService.removeComputedChainTaskIdFromWorker(event.getChainTaskId(), event.getWalletAddress());
7272
}
7373

@@ -78,21 +78,21 @@ public void onReplicateUpdatedEvent(ReplicateUpdatedEvent event) {
7878
* We should start a detector which will look for unnotified contributions and will upgrade
7979
* task to consensus_reached
8080
*/
81-
if (cause != null && cause.equals(TASK_NOT_ACTIVE)) {
81+
if (cause == TASK_NOT_ACTIVE) {
8282
contributionUnnotifiedDetector.detectOnchainDone();
8383
}
8484

8585
/*
8686
* Should add FAILED status if not completable
87-
* */
87+
*/
8888
if (ReplicateStatus.getUncompletableStatuses().contains(newStatus)) {
8989
replicatesService.updateReplicateStatus(event.getChainTaskId(),
9090
event.getWalletAddress(), ReplicateStatusUpdate.poolManagerRequest(FAILED));
9191
}
9292

9393
/*
9494
* Should release given worker for this replicate if status is COMPLETED or FAILED
95-
* */
95+
*/
9696
if (ReplicateStatus.getFinalStatuses().contains(newStatus)) {
9797
workerService.removeChainTaskIdFromWorker(event.getChainTaskId(), event.getWalletAddress());
9898
}

src/test/java/com/iexec/core/replicate/ReplicateListenersTests.java renamed to src/test/java/com/iexec/core/replicate/listener/ReplicateListenersTests.java

Lines changed: 47 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,19 +14,23 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.iexec.core.replicate;
17+
package com.iexec.core.replicate.listener;
1818

1919
import com.iexec.common.replicate.ReplicateStatus;
2020
import com.iexec.common.replicate.ReplicateStatusUpdate;
2121
import com.iexec.core.detector.replicate.ContributionUnnotifiedDetector;
22-
import com.iexec.core.task.listener.ReplicateListeners;
22+
import com.iexec.core.replicate.ReplicateUpdatedEvent;
23+
import com.iexec.core.replicate.ReplicatesService;
2324
import com.iexec.core.task.update.TaskUpdateRequestManager;
2425
import com.iexec.core.worker.WorkerService;
25-
import org.junit.jupiter.api.BeforeEach;
2626
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.api.extension.ExtendWith;
2728
import org.junit.jupiter.params.ParameterizedTest;
2829
import org.junit.jupiter.params.provider.MethodSource;
29-
import org.mockito.*;
30+
import org.mockito.ArgumentCaptor;
31+
import org.mockito.InjectMocks;
32+
import org.mockito.Mock;
33+
import org.mockito.junit.jupiter.MockitoExtension;
3034

3135
import java.util.Arrays;
3236
import java.util.List;
@@ -36,8 +40,11 @@
3640
import static com.iexec.common.replicate.ReplicateStatus.*;
3741
import static com.iexec.common.replicate.ReplicateStatusCause.TASK_NOT_ACTIVE;
3842
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
39-
import static org.mockito.ArgumentMatchers.*;
43+
import static org.mockito.ArgumentMatchers.any;
44+
import static org.mockito.ArgumentMatchers.eq;
45+
import static org.mockito.Mockito.*;
4046

47+
@ExtendWith(MockitoExtension.class)
4148
class ReplicateListenersTests {
4249

4350
private static final String CHAIN_TASK_ID = "chainTaskId";
@@ -55,22 +62,15 @@ class ReplicateListenersTests {
5562
@InjectMocks
5663
private ReplicateListeners replicateListeners;
5764

58-
@BeforeEach
59-
void init() {
60-
MockitoAnnotations.openMocks(this);
61-
}
62-
6365
@Test
6466
void shouldUpdateTaskOnReplicateUpdate() {
65-
List<ReplicateStatus> someStatuses = ReplicateStatus.getSuccessStatuses(); //not exhaustive
67+
final List<ReplicateStatus> someStatuses = ReplicateStatus.getSuccessStatuses(); //not exhaustive
6668

67-
for (ReplicateStatus randomStatus : someStatuses) {
68-
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(randomStatus);
69+
someStatuses.stream()
70+
.map(this::getMockReplicate)
71+
.forEach(replicateListeners::onReplicateUpdatedEvent);
6972

70-
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
71-
}
72-
73-
Mockito.verify(taskUpdateRequestManager, Mockito.times(someStatuses.size())).publishRequest(any());
73+
verify(taskUpdateRequestManager, times(someStatuses.size())).publishRequest(any());
7474
}
7575

7676
@Test
@@ -98,51 +98,46 @@ void shouldRemoveFromComputedTasksSinceComputeFailed() {
9898
assertIsRemovedFromComputedTasks(COMPUTE_FAILED);
9999
}
100100

101-
private void assertIsRemovedFromComputedTasks(ReplicateStatus computed) {
102-
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(computed);
101+
private void assertIsRemovedFromComputedTasks(final ReplicateStatus computed) {
102+
final ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(computed);
103103
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
104-
Mockito.verify(workerService, Mockito.times(1))
105-
.removeComputedChainTaskIdFromWorker(CHAIN_TASK_ID, WORKER_WALLET);
104+
verify(workerService).removeComputedChainTaskIdFromWorker(CHAIN_TASK_ID, WORKER_WALLET);
106105
}
107106

108107
@Test
109108
void shouldTriggerDetectOnchainContributedSinceTaskNotActive() {
110-
ReplicateUpdatedEvent replicateUpdatedEvent = ReplicateUpdatedEvent.builder()
109+
final ReplicateUpdatedEvent replicateUpdatedEvent = ReplicateUpdatedEvent.builder()
111110
.chainTaskId(CHAIN_TASK_ID)
112111
.walletAddress(WORKER_WALLET)
113112
.replicateStatusUpdate(new ReplicateStatusUpdate(CONTRIBUTING, TASK_NOT_ACTIVE))
114113
.build();
115114

116115
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
117116

118-
Mockito.verify(contributionUnnotifiedDetector, Mockito.times(1)).detectOnchainDone();
117+
verify(contributionUnnotifiedDetector).detectOnchainDone();
119118
}
120119

121120
@Test
122121
void shouldNotTriggerDetectOnchain() {
123-
List<ReplicateStatus> someStatuses = ReplicateStatus.getSuccessStatuses(); //not exhaustive
124-
someStatuses.remove(CONTRIBUTING);
125-
126-
for (ReplicateStatus randomStatus : someStatuses) {
127-
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(randomStatus);
122+
ReplicateStatus.getSuccessStatuses().stream()
123+
.filter(status -> status != CONTRIBUTING)
124+
.map(this::getMockReplicate)
125+
.forEach(replicateListeners::onReplicateUpdatedEvent);
128126

129-
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
130-
}
131-
132-
Mockito.verify(contributionUnnotifiedDetector, Mockito.times(0)).detectOnchainDone();
127+
verifyNoInteractions(contributionUnnotifiedDetector);
133128
}
134129

135130
@Test
136131
void shouldNotTriggerDetectOnchainContributedSinceCauseIsNull() {
137-
ReplicateUpdatedEvent replicateUpdatedEvent = ReplicateUpdatedEvent.builder()
132+
final ReplicateUpdatedEvent replicateUpdatedEvent = ReplicateUpdatedEvent.builder()
138133
.chainTaskId(CHAIN_TASK_ID)
139134
.walletAddress(WORKER_WALLET)
140135
.replicateStatusUpdate(new ReplicateStatusUpdate(CONTRIBUTING))
141136
.build();
142137

143138
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
144139

145-
Mockito.verify(contributionUnnotifiedDetector, Mockito.times(0)).detectOnchainDone();
140+
verifyNoInteractions(contributionUnnotifiedDetector);
146141
}
147142

148143
static Stream<ReplicateStatus> getUncompletableStatuses() {
@@ -151,13 +146,12 @@ static Stream<ReplicateStatus> getUncompletableStatuses() {
151146

152147
@ParameterizedTest
153148
@MethodSource("getUncompletableStatuses")
154-
void shouldAddFailedStatusSinceUncompletableReplicateStatus(ReplicateStatus uncompletableStatus) {
155-
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(uncompletableStatus);
149+
void shouldAddFailedStatusSinceUncompletableReplicateStatus(final ReplicateStatus uncompletableStatus) {
150+
final ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(uncompletableStatus);
156151
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
157152

158153
final ArgumentCaptor<ReplicateStatusUpdate> statusUpdate = ArgumentCaptor.forClass(ReplicateStatusUpdate.class);
159-
Mockito.verify(replicatesService, Mockito.times(1))
160-
.updateReplicateStatus(eq(CHAIN_TASK_ID), eq(WORKER_WALLET), statusUpdate.capture());
154+
verify(replicatesService).updateReplicateStatus(eq(CHAIN_TASK_ID), eq(WORKER_WALLET), statusUpdate.capture());
161155
assertThat(statusUpdate.getValue().getStatus()).isEqualTo(FAILED);
162156
}
163157

@@ -168,51 +162,43 @@ static Stream<ReplicateStatus> getCompletableStatuses() {
168162

169163
@ParameterizedTest
170164
@MethodSource("getCompletableStatuses")
171-
void shouldNotAddFailedStatusSinceCompletableReplicateStatus(ReplicateStatus completableStatus) {
172-
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(completableStatus);
165+
void shouldNotAddFailedStatusSinceCompletableReplicateStatus(final ReplicateStatus completableStatus) {
166+
final ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(completableStatus);
173167
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
174168

175-
Mockito.verify(replicatesService, Mockito.times(0))
176-
.updateReplicateStatus(anyString(), anyString(), any(ReplicateStatusUpdate.class));
169+
verifyNoInteractions(replicatesService);
177170
}
178171

179172
@Test
180173
void shouldRemoveChainTaskIdFromWorkerSinceCompleted() {
181-
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(COMPLETED);
174+
final ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(COMPLETED);
182175

183176
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
184177

185-
Mockito.verify(workerService, Mockito.times(1))
186-
.removeChainTaskIdFromWorker(CHAIN_TASK_ID, WORKER_WALLET);
178+
verify(workerService).removeChainTaskIdFromWorker(CHAIN_TASK_ID, WORKER_WALLET);
187179
}
188180

189181
@Test
190182
void shouldRemoveChainTaskIdFromWorkerSinceFailed() {
191-
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(FAILED);
183+
final ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(FAILED);
192184

193185
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
194186

195-
Mockito.verify(workerService, Mockito.times(1))
196-
.removeChainTaskIdFromWorker(CHAIN_TASK_ID, WORKER_WALLET);
187+
verify(workerService).removeChainTaskIdFromWorker(CHAIN_TASK_ID, WORKER_WALLET);
197188
}
198189

199190
@Test
200191
void shouldNotRemoveChainTaskIdFromWorker() {
201-
List<ReplicateStatus> someStatuses = ReplicateStatus.getSuccessStatuses(); //not exhaustive
202-
someStatuses.remove(COMPLETED);
203-
someStatuses.remove(FAILED);
204-
205-
for (ReplicateStatus randomStatus : someStatuses) {
206-
ReplicateUpdatedEvent replicateUpdatedEvent = getMockReplicate(randomStatus);
207-
208-
replicateListeners.onReplicateUpdatedEvent(replicateUpdatedEvent);
209-
}
192+
ReplicateStatus.getSuccessStatuses().stream()
193+
.filter(status -> status != COMPLETED && status != FAILED)
194+
.map(this::getMockReplicate)
195+
.forEach(replicateListeners::onReplicateUpdatedEvent);
210196

211-
Mockito.verify(workerService, Mockito.times(0))
212-
.removeChainTaskIdFromWorker(CHAIN_TASK_ID, WORKER_WALLET);
197+
verify(workerService).removeComputedChainTaskIdFromWorker(CHAIN_TASK_ID, WORKER_WALLET);
198+
verify(workerService, never()).removeChainTaskIdFromWorker(CHAIN_TASK_ID, WORKER_WALLET);
213199
}
214200

215-
private ReplicateUpdatedEvent getMockReplicate(ReplicateStatus computed) {
201+
private ReplicateUpdatedEvent getMockReplicate(final ReplicateStatus computed) {
216202
return ReplicateUpdatedEvent.builder()
217203
.chainTaskId(CHAIN_TASK_ID)
218204
.walletAddress(WORKER_WALLET)

0 commit comments

Comments
 (0)