Skip to content

Commit 8adb3f2

Browse files
authored
Implement Purgeable on SubscriptionService (#620)
1 parent 3fd27ee commit 8adb3f2

File tree

7 files changed

+65
-64
lines changed

7 files changed

+65
-64
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file.
88

99
- Add workerpool address in configuration instead of reading from Scheduler `/workers/config` endpoint. (#607)
1010
- Set `0x0` as default value for Workerpool address and prevents startup if incorrectly configured. (#608)
11+
- Implement `Purgeable` on `SubscriptionService`. (#620)
1112

1213
### Bug fixes
1314

src/main/java/com/iexec/worker/pubsub/SubscriptionService.java

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.iexec.worker.pubsub;
1818

19+
import com.iexec.common.lifecycle.purge.Purgeable;
1920
import com.iexec.core.notification.TaskNotification;
2021
import lombok.AllArgsConstructor;
2122
import lombok.Getter;
@@ -28,6 +29,7 @@
2829
import org.springframework.messaging.simp.stomp.StompSession.Subscription;
2930
import org.springframework.stereotype.Service;
3031

32+
import javax.annotation.PreDestroy;
3133
import java.lang.reflect.Type;
3234
import java.util.Map;
3335
import java.util.Set;
@@ -38,8 +40,7 @@
3840

3941
@Slf4j
4042
@Service
41-
// FIXME: implement `Purgeable`
42-
public class SubscriptionService {
43+
public class SubscriptionService implements Purgeable {
4344

4445
private final Map<String, Subscription> chainTaskIdToSubscription = new ConcurrentHashMap<>();
4546
private final ApplicationEventPublisher eventPublisher;
@@ -64,14 +65,14 @@ public SubscriptionService(ApplicationEventPublisher applicationEventPublisher,
6465
*
6566
* @param chainTaskId id of the task to which to subscribe
6667
*/
67-
public void subscribeToTopic(String chainTaskId) {
68-
String topic = getTaskTopicName(chainTaskId);
68+
public void subscribeToTopic(final String chainTaskId) {
69+
final String topic = getTaskTopicName(chainTaskId);
6970
if (this.chainTaskIdToSubscription.containsKey(chainTaskId)) {
7071
log.info("Already subscribed to topic [chainTaskId:{}, topic:{}]",
7172
chainTaskId, topic);
7273
return;
7374
}
74-
MessageHandler messageHandler = new MessageHandler(chainTaskId, this.workerWalletAddress);
75+
final MessageHandler messageHandler = new MessageHandler(chainTaskId, this.workerWalletAddress);
7576
stompClientService.subscribeToTopic(topic, messageHandler).ifPresentOrElse(
7677
subscription -> {
7778
this.chainTaskIdToSubscription.put(chainTaskId, subscription);
@@ -84,16 +85,27 @@ public void subscribeToTopic(String chainTaskId) {
8485
/**
8586
* Unsubscribe from topic if already subscribed.
8687
*
87-
* @param chainTaskId
88+
* @param chainTaskId id of the task to unsubscribe and purge
89+
* @return true if unsubscribed and purge topic successfully, false otherwise
8890
*/
89-
public void unsubscribeFromTopic(String chainTaskId) {
91+
@Override
92+
public boolean purgeTask(final String chainTaskId) {
93+
log.debug("purgeTask [chainTaskId:{}]", chainTaskId);
9094
if (!isSubscribedToTopic(chainTaskId)) {
9195
log.error("Already unsubscribed from topic [chainTaskId:{}]", chainTaskId);
92-
return;
96+
return true;
9397
}
9498
this.chainTaskIdToSubscription.get(chainTaskId).unsubscribe();
9599
this.chainTaskIdToSubscription.remove(chainTaskId);
96100
log.info("Unsubscribed from topic [chainTaskId:{}]", chainTaskId);
101+
return !isSubscribedToTopic(chainTaskId);
102+
}
103+
104+
@Override
105+
@PreDestroy
106+
public void purgeAllTasksData() {
107+
this.chainTaskIdToSubscription.keySet().forEach(this::purgeTask);
108+
this.chainTaskIdToSubscription.clear();
97109
}
98110

99111
/**
@@ -102,7 +114,7 @@ public void unsubscribeFromTopic(String chainTaskId) {
102114
* @param chainTaskId id of the task to check
103115
* @return true if subscribed, false otherwise
104116
*/
105-
public boolean isSubscribedToTopic(String chainTaskId) {
117+
public boolean isSubscribedToTopic(final String chainTaskId) {
106118
return this.chainTaskIdToSubscription.containsKey(chainTaskId);
107119
}
108120

@@ -113,7 +125,7 @@ public boolean isSubscribedToTopic(String chainTaskId) {
113125
@EventListener(SessionCreatedEvent.class)
114126
synchronized void reSubscribeToTopics() {
115127
log.debug("Received new SessionCreatedEvent");
116-
Set<String> chainTaskIds = this.chainTaskIdToSubscription.keySet();
128+
final Set<String> chainTaskIds = this.chainTaskIdToSubscription.keySet();
117129
if (chainTaskIds.isEmpty()) {
118130
log.info("No topic to resubscribe to");
119131
} else {
@@ -166,7 +178,7 @@ public void waitForSessionReady() throws InterruptedException {
166178
}
167179
}
168180

169-
private String getTaskTopicName(String chainTaskId) {
181+
private String getTaskTopicName(final String chainTaskId) {
170182
return "/topic/task/" + chainTaskId;
171183
}
172184

@@ -181,17 +193,17 @@ public class MessageHandler implements StompFrameHandler {
181193
private final String workerWalletAddress;
182194

183195
@Override
184-
public Type getPayloadType(StompHeaders headers) {
196+
public Type getPayloadType(final StompHeaders headers) {
185197
return TaskNotification.class;
186198
}
187199

188200
@Override
189-
public void handleFrame(StompHeaders headers, @Nullable Object payload) {
201+
public void handleFrame(final StompHeaders headers, @Nullable final Object payload) {
190202
if (payload == null) {
191203
log.error("Payload of TaskNotification is null [chainTaskId:{}]", this.chainTaskId);
192204
return;
193205
}
194-
TaskNotification taskNotification = (TaskNotification) payload;
206+
final TaskNotification taskNotification = (TaskNotification) payload;
195207
if (!isWorkerInvolved(taskNotification)) {
196208
return;
197209
}
@@ -200,7 +212,7 @@ public void handleFrame(StompHeaders headers, @Nullable Object payload) {
200212
eventPublisher.publishEvent(taskNotification);
201213
}
202214

203-
private boolean isWorkerInvolved(TaskNotification notification) {
215+
private boolean isWorkerInvolved(final TaskNotification notification) {
204216
return notification.getWorkersAddress() != null &&
205217
(notification.getWorkersAddress().isEmpty() || // for all workers
206218
notification.getWorkersAddress().contains(this.workerWalletAddress));

src/main/java/com/iexec/worker/task/TaskManagerService.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import com.iexec.worker.compute.post.PostComputeResponse;
3636
import com.iexec.worker.compute.pre.PreComputeResponse;
3737
import com.iexec.worker.dataset.DataService;
38-
import com.iexec.worker.pubsub.SubscriptionService;
3938
import com.iexec.worker.replicate.ReplicateActionResponse;
4039
import com.iexec.worker.result.ResultService;
4140
import com.iexec.worker.sms.SmsService;
@@ -68,7 +67,6 @@ public class TaskManagerService {
6867
private final DataService dataService;
6968
private final ResultService resultService;
7069
private final SmsService smsService;
71-
private final SubscriptionService subscriptionService;
7270
private final PurgeService purgeService;
7371
private final String workerWalletAddress;
7472

@@ -81,7 +79,6 @@ public TaskManagerService(
8179
DataService dataService,
8280
ResultService resultService,
8381
SmsService smsService,
84-
SubscriptionService subscriptionService,
8582
PurgeService purgeService,
8683
String workerWalletAddress) {
8784
this.iexecHubService = iexecHubService;
@@ -92,7 +89,6 @@ public TaskManagerService(
9289
this.dataService = dataService;
9390
this.resultService = resultService;
9491
this.smsService = smsService;
95-
this.subscriptionService = subscriptionService;
9692
this.purgeService = purgeService;
9793
this.workerWalletAddress = workerWalletAddress;
9894
}
@@ -468,7 +464,6 @@ ReplicateActionResponse complete(String chainTaskId) {
468464
*/
469465
boolean abort(String chainTaskId) {
470466
log.info("Aborting task [chainTaskId:{}]", chainTaskId);
471-
subscriptionService.unsubscribeFromTopic(chainTaskId);
472467
boolean allContainersStopped = computeManagerService.abort(chainTaskId);
473468
boolean allServicesPurged = purgeService.purgeAllServices(chainTaskId);
474469
final boolean isSuccess = allContainersStopped && allServicesPurged;

src/main/java/com/iexec/worker/task/TaskNotificationService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ public void onTaskNotification(TaskNotification notification) {
167167
case PLEASE_COMPLETE:
168168
updateStatusAndGetNextAction(chainTaskId, COMPLETING);
169169
actionResponse = taskManagerService.complete(chainTaskId);
170-
subscriptionService.unsubscribeFromTopic(chainTaskId);
171170
nextStatus = actionResponse.isSuccess() ? COMPLETED : COMPLETE_FAILED;
172171
nextAction = updateStatusAndGetNextAction(chainTaskId, nextStatus, actionResponse.getDetails());
173172
break;

src/test/java/com/iexec/worker/pubsub/SubscriptionServiceTests.java

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 IEXEC BLOCKCHAIN TECH
2+
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,12 +16,12 @@
1616

1717
package com.iexec.worker.pubsub;
1818

19-
import org.junit.jupiter.api.BeforeEach;
2019
import org.junit.jupiter.api.Test;
20+
import org.junit.jupiter.api.extension.ExtendWith;
2121
import org.mockito.InjectMocks;
2222
import org.mockito.Mock;
23-
import org.mockito.MockitoAnnotations;
2423
import org.mockito.Spy;
24+
import org.mockito.junit.jupiter.MockitoExtension;
2525
import org.springframework.context.ApplicationEventPublisher;
2626
import org.springframework.messaging.simp.stomp.StompSession.Subscription;
2727
import org.springframework.test.util.ReflectionTestUtils;
@@ -36,8 +36,11 @@
3636
import static org.mockito.ArgumentMatchers.anyString;
3737
import static org.mockito.Mockito.*;
3838

39+
@ExtendWith(MockitoExtension.class)
3940
class SubscriptionServiceTests {
4041

42+
@Mock
43+
private Subscription subscription;
4144
@Mock
4245
private ApplicationEventPublisher applicationEventPublisher;
4346
@Mock
@@ -47,19 +50,10 @@ class SubscriptionServiceTests {
4750
@InjectMocks
4851
SubscriptionService subscriptionService;
4952

50-
private static final String WORKER_WALLET_ADDRESS = "0x1234";
5153
private static final String CHAIN_TASK_ID = "chaintaskid";
5254
private static final String CHAIN_TASK_ID_2 = "chaintaskid2";
5355

54-
@Mock
55-
private Subscription subscription;
56-
57-
@BeforeEach
58-
void init() {
59-
MockitoAnnotations.openMocks(this);
60-
subscriptionService = spy(new SubscriptionService(applicationEventPublisher, stompClientService, WORKER_WALLET_ADDRESS));
61-
}
62-
56+
// region subscribe
6357
@Test
6458
void shouldSubscribeToTopic() {
6559
when(stompClientService.subscribeToTopic(anyString(), any())).thenReturn(Optional.of(subscription));
@@ -79,24 +73,27 @@ void shouldNotSubscribeToExistingTopic() {
7973
assertThat(subscriptionService.isSubscribedToTopic(CHAIN_TASK_ID)).isTrue();
8074
verify(stompClientService, atMost(1)).subscribeToTopic(anyString(), any());
8175
}
76+
// endregion
8277

78+
// region unsubscribe
8379
@Test
8480
void shouldUnsubscribeFromTopic() {
8581
when(stompClientService.subscribeToTopic(anyString(), any())).thenReturn(Optional.of(subscription));
8682

8783
subscriptionService.subscribeToTopic(CHAIN_TASK_ID);
8884
assertThat(subscriptionService.isSubscribedToTopic(CHAIN_TASK_ID)).isTrue();
89-
subscriptionService.unsubscribeFromTopic(CHAIN_TASK_ID);
85+
assertThat(subscriptionService.purgeTask(CHAIN_TASK_ID)).isTrue();
86+
verify(subscription).unsubscribe();
9087
assertThat(subscriptionService.isSubscribedToTopic(CHAIN_TASK_ID)).isFalse();
91-
verify(subscription, times(1)).unsubscribe();
9288
}
9389

9490
@Test
9591
void shouldNotUnsubscribeFromNonexistentTopic() {
9692
assertThat(subscriptionService.isSubscribedToTopic(CHAIN_TASK_ID)).isFalse();
97-
subscriptionService.unsubscribeFromTopic(CHAIN_TASK_ID);
93+
assertThat(subscriptionService.purgeTask(CHAIN_TASK_ID)).isTrue();
9894
verify(subscription, never()).unsubscribe();
9995
}
96+
// endregion
10097

10198
// region reSubscribeToTopics
10299
@Test
@@ -186,4 +183,26 @@ void shouldWaitForSessionReady() throws ExecutionException, InterruptedException
186183
future.get(10, TimeUnit.MILLISECONDS);
187184
}
188185
// endregion
186+
187+
// region purgeAllTasksData
188+
@Test
189+
void shouldPurgeMultipleTasks() {
190+
when(stompClientService.subscribeToTopic(anyString(), any())).thenReturn(Optional.of(subscription));
191+
192+
subscriptionService.subscribeToTopic(CHAIN_TASK_ID);
193+
subscriptionService.subscribeToTopic(CHAIN_TASK_ID_2);
194+
195+
subscriptionService.purgeAllTasksData();
196+
197+
verify(subscription, times(2)).unsubscribe();
198+
assertThat(subscriptionService.isSubscribedToTopic(CHAIN_TASK_ID)).isFalse();
199+
assertThat(subscriptionService.isSubscribedToTopic(CHAIN_TASK_ID_2)).isFalse();
200+
}
201+
202+
@Test
203+
void shouldHandleEmptyPurgeAllTasksData() {
204+
subscriptionService.purgeAllTasksData();
205+
verify(subscription, never()).unsubscribe();
206+
}
207+
// endregion
189208
}

src/test/java/com/iexec/worker/task/TaskManagerServiceTests.java

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,18 @@
3737
import com.iexec.worker.compute.post.PostComputeResponse;
3838
import com.iexec.worker.compute.pre.PreComputeResponse;
3939
import com.iexec.worker.dataset.DataService;
40-
import com.iexec.worker.pubsub.SubscriptionService;
4140
import com.iexec.worker.replicate.ReplicateActionResponse;
4241
import com.iexec.worker.result.ResultService;
4342
import com.iexec.worker.sms.SmsService;
4443
import com.iexec.worker.tee.TeeService;
4544
import com.iexec.worker.tee.TeeServicesManager;
4645
import com.iexec.worker.utils.WorkflowException;
4746
import org.assertj.core.api.Assertions;
48-
import org.junit.jupiter.api.BeforeEach;
4947
import org.junit.jupiter.api.Test;
5048
import org.junit.jupiter.api.extension.ExtendWith;
5149
import org.junit.jupiter.params.ParameterizedTest;
5250
import org.junit.jupiter.params.provider.EnumSource;
51+
import org.mockito.InjectMocks;
5352
import org.mockito.Mock;
5453
import org.mockito.junit.jupiter.MockitoExtension;
5554

@@ -75,6 +74,7 @@ class TaskManagerServiceTests {
7574
.workerWallet(WORKER_ADDRESS)
7675
.build();
7776

77+
@InjectMocks
7878
private TaskManagerService taskManagerService;
7979
@Mock
8080
private IexecHubService iexecHubService;
@@ -93,30 +93,11 @@ class TaskManagerServiceTests {
9393
@Mock
9494
private SmsService smsService;
9595
@Mock
96-
private SubscriptionService subscriptionService;
97-
@Mock
9896
private PurgeService purgeService;
9997

10098
@Mock
10199
private TeeService teeMockedService;
102100

103-
@BeforeEach
104-
void init() {
105-
taskManagerService = new TaskManagerService(
106-
iexecHubService,
107-
contributionService,
108-
revealService,
109-
computeManagerService,
110-
teeServicesManager,
111-
dataService,
112-
resultService,
113-
smsService,
114-
subscriptionService,
115-
purgeService,
116-
WORKER_ADDRESS
117-
);
118-
}
119-
120101
TaskDescription.TaskDescriptionBuilder getTaskDescriptionBuilder(boolean isTeeTask) {
121102
final DealParams dealParams = DealParams.builder()
122103
.iexecInputFiles(List.of("https://ab.cd/ef.jpeg"))
@@ -1364,7 +1345,6 @@ void shouldReturnFalseWhenRemainingContainers() {
13641345
when(computeManagerService.abort(CHAIN_TASK_ID)).thenReturn(false);
13651346
when(purgeService.purgeAllServices(CHAIN_TASK_ID)).thenReturn(true);
13661347
assertThat(taskManagerService.abort(CHAIN_TASK_ID)).isFalse();
1367-
verify(subscriptionService).unsubscribeFromTopic(CHAIN_TASK_ID);
13681348
verify(computeManagerService).abort(CHAIN_TASK_ID);
13691349
verify(purgeService).purgeAllServices(CHAIN_TASK_ID);
13701350
}
@@ -1374,7 +1354,6 @@ void shouldReturnFalseWhenRemainingService() {
13741354
when(computeManagerService.abort(CHAIN_TASK_ID)).thenReturn(true);
13751355
when(purgeService.purgeAllServices(CHAIN_TASK_ID)).thenReturn(false);
13761356
assertThat(taskManagerService.abort(CHAIN_TASK_ID)).isFalse();
1377-
verify(subscriptionService).unsubscribeFromTopic(CHAIN_TASK_ID);
13781357
verify(computeManagerService).abort(CHAIN_TASK_ID);
13791358
verify(purgeService).purgeAllServices(CHAIN_TASK_ID);
13801359
}
@@ -1384,7 +1363,6 @@ void shouldAbortTask() {
13841363
when(computeManagerService.abort(CHAIN_TASK_ID)).thenReturn(true);
13851364
when(purgeService.purgeAllServices(CHAIN_TASK_ID)).thenReturn(true);
13861365
assertThat(taskManagerService.abort(CHAIN_TASK_ID)).isTrue();
1387-
verify(subscriptionService).unsubscribeFromTopic(CHAIN_TASK_ID);
13881366
verify(computeManagerService).abort(CHAIN_TASK_ID);
13891367
verify(purgeService).purgeAllServices(CHAIN_TASK_ID);
13901368
}

0 commit comments

Comments
 (0)