Skip to content

Commit 14048b6

Browse files
author
Ugo Plouviez
committed
Add unit tests for Subscription service
1 parent 912ea8a commit 14048b6

File tree

3 files changed

+136
-14
lines changed

3 files changed

+136
-14
lines changed

src/main/java/com/iexec/worker/pubsub/SubscriptionService.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,11 @@
3636
@Service
3737
public class SubscriptionService extends StompSessionHandlerAdapter {
3838

39-
private final String coreHost;
40-
private final int corePort;
41-
private final String workerWalletAddress;
4239
private RestTemplate restTemplate;
4340
// external services
4441
private TaskExecutorService taskExecutorService;
42+
private CoreConfigurationService coreConfigurationService;
43+
private WorkerConfigurationService workerConfigurationService;
4544

4645
// internal components
4746
private StompSession session;
@@ -54,22 +53,23 @@ public SubscriptionService(CoreConfigurationService coreConfigurationService,
5453
TaskExecutorService taskExecutorService,
5554
RestTemplate restTemplate) {
5655
this.taskExecutorService = taskExecutorService;
57-
58-
this.coreHost = coreConfigurationService.getHost();
59-
this.corePort = coreConfigurationService.getPort();
60-
this.workerWalletAddress = workerConfigurationService.getWorkerWalletAddress();
6156
this.restTemplate = restTemplate;
62-
57+
this.coreConfigurationService = coreConfigurationService;
58+
this.workerConfigurationService = workerConfigurationService;
6359
chainTaskIdToSubscription = new ConcurrentHashMap<>();
64-
url = "http://" + coreHost + ":" + corePort + "/connect";
60+
6561
}
6662

6763
@PostConstruct
68-
private void run() {
64+
void init() {
65+
String coreHost = coreConfigurationService.getHost();
66+
int corePort = coreConfigurationService.getPort();
67+
this.url = "http://" + coreHost + ":" + corePort + "/connect";
68+
6969
this.restartStomp();
7070
}
7171

72-
private void restartStomp() {
72+
void restartStomp() {
7373
log.info("Starting STOMP");
7474
if (isConnectEndpointUp()) {
7575
WebSocketClient webSocketClient = new StandardWebSocketClient();
@@ -90,7 +90,7 @@ private boolean isConnectEndpointUp() {
9090
if (checkConnectionEntity.getStatusCode().is2xxSuccessful()) {
9191
return true;
9292
}
93-
log.error("isConnectEndpointUp failed (will retry) [url:{}, status:]", url, checkConnectionEntity.getStatusCode());
93+
log.error("isConnectEndpointUp failed (will retry) [url:{}, status:{}]", url, checkConnectionEntity.getStatusCode());
9494
try {
9595
Thread.sleep(1000);
9696
} catch (InterruptedException e) {
@@ -173,7 +173,7 @@ public void unsubscribeFromTopic(String chainTaskId) {
173173
}
174174

175175
public void handleTaskNotification(TaskNotification notif) {
176-
if (notif.getWorkersAddress().contains(workerWalletAddress)
176+
if (notif.getWorkersAddress().contains(workerConfigurationService.getWorkerWalletAddress())
177177
|| notif.getWorkersAddress().isEmpty()) {
178178
log.info("Received notification [notification:{}]", notif);
179179

src/test/java/com/iexec/worker/amnesia/AmnesiaRecoveryServiceTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import com.iexec.common.notification.TaskNotificationType;
77
import com.iexec.common.task.TaskDescription;
88
import com.iexec.worker.chain.IexecHubService;
9-
import com.iexec.worker.executor.TaskExecutorService;
109
import com.iexec.worker.feign.CustomFeignClient;
1110
import com.iexec.worker.pubsub.SubscriptionService;
1211
import com.iexec.worker.result.ResultService;
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package com.iexec.worker.pubsub;
2+
3+
import com.iexec.common.chain.ContributionAuthorization;
4+
import com.iexec.common.notification.TaskNotification;
5+
import com.iexec.common.notification.TaskNotificationExtra;
6+
import com.iexec.common.notification.TaskNotificationType;
7+
import com.iexec.worker.config.WorkerConfigurationService;
8+
import com.iexec.worker.executor.TaskExecutorService;
9+
import org.junit.Before;
10+
import org.junit.Test;
11+
import org.mockito.InjectMocks;
12+
import org.mockito.Mock;
13+
import org.mockito.Mockito;
14+
import org.mockito.MockitoAnnotations;
15+
16+
import java.util.Collections;
17+
18+
import static org.mockito.ArgumentMatchers.*;
19+
import static org.mockito.Mockito.when;
20+
21+
public class SubscriptionServiceTests {
22+
23+
@Mock
24+
private WorkerConfigurationService workerConfigurationService;
25+
@Mock
26+
private TaskExecutorService taskExecutorService;
27+
28+
@InjectMocks
29+
SubscriptionService subscriptionService;
30+
31+
private static final String WORKER_WALLET_ADDRESS = "0x1234";
32+
private static final String CHAIN_TASK_ID = "chaintaskid";
33+
34+
private TaskNotification notifTemplate = TaskNotification.builder()
35+
.workersAddress(Collections.singletonList(WORKER_WALLET_ADDRESS))
36+
.taskNotificationExtra(TaskNotificationExtra.builder()
37+
.build())
38+
.chainTaskId(CHAIN_TASK_ID)
39+
.build();
40+
41+
@Before
42+
public void init() {
43+
MockitoAnnotations.initMocks(this);
44+
45+
when(workerConfigurationService.getWorkerWalletAddress()).thenReturn(WORKER_WALLET_ADDRESS);
46+
47+
48+
}
49+
50+
51+
@Test
52+
public void shouldNotHandleNotificationSinceWorkerWalletNotInNotif() {
53+
54+
TaskNotification notif = TaskNotification.builder()
55+
.workersAddress(Collections.singletonList("0xabcd"))
56+
.chainTaskId(CHAIN_TASK_ID)
57+
.build();
58+
59+
subscriptionService.handleTaskNotification(notif);
60+
61+
Mockito.verifyZeroInteractions(taskExecutorService);
62+
}
63+
64+
@Test
65+
public void shouldNotContributeSinceNoContributionAuthorization() {
66+
notifTemplate.setTaskNotificationType(TaskNotificationType.PLEASE_CONTRIBUTE);
67+
subscriptionService.handleTaskNotification(notifTemplate);
68+
69+
Mockito.verifyZeroInteractions(taskExecutorService);
70+
}
71+
72+
@Test
73+
public void shouldTryToContribute() {
74+
notifTemplate.setTaskNotificationType(TaskNotificationType.PLEASE_CONTRIBUTE);
75+
notifTemplate.setTaskNotificationExtra(TaskNotificationExtra.builder()
76+
.contributionAuthorization(new ContributionAuthorization())
77+
.build());
78+
79+
subscriptionService.handleTaskNotification(notifTemplate);
80+
81+
Mockito.verify(taskExecutorService, Mockito.times(1)).tryToContribute(any());
82+
}
83+
84+
@Test
85+
public void shouldAbortOnContributionTimeout() {
86+
notifTemplate.setTaskNotificationType(TaskNotificationType.PLEASE_ABORT_CONTRIBUTION_TIMEOUT);
87+
subscriptionService.handleTaskNotification(notifTemplate);
88+
89+
Mockito.verify(taskExecutorService, Mockito.times(1)).abortContributionTimeout(CHAIN_TASK_ID);
90+
}
91+
92+
@Test
93+
public void shouldAbortOnConsensusReached() {
94+
notifTemplate.setTaskNotificationType(TaskNotificationType.PLEASE_ABORT_CONSENSUS_REACHED);
95+
subscriptionService.handleTaskNotification(notifTemplate);
96+
97+
Mockito.verify(taskExecutorService, Mockito.times(1)).abortConsensusReached(CHAIN_TASK_ID);
98+
}
99+
100+
@Test
101+
public void shouldReveal() {
102+
notifTemplate.setTaskNotificationType(TaskNotificationType.PLEASE_REVEAL);
103+
subscriptionService.handleTaskNotification(notifTemplate);
104+
105+
Mockito.verify(taskExecutorService, Mockito.times(1)).reveal(eq(CHAIN_TASK_ID), anyLong());
106+
}
107+
108+
@Test
109+
public void shouldUpload() {
110+
notifTemplate.setTaskNotificationType(TaskNotificationType.PLEASE_UPLOAD);
111+
subscriptionService.handleTaskNotification(notifTemplate);
112+
113+
Mockito.verify(taskExecutorService, Mockito.times(1)).uploadResult(CHAIN_TASK_ID);
114+
}
115+
116+
@Test
117+
public void shouldComplete() {
118+
notifTemplate.setTaskNotificationType(TaskNotificationType.PLEASE_COMPLETE);
119+
subscriptionService.handleTaskNotification(notifTemplate);
120+
121+
Mockito.verify(taskExecutorService, Mockito.times(1)).completeTask(CHAIN_TASK_ID);
122+
}
123+
}

0 commit comments

Comments
 (0)