Skip to content

Commit ed9100e

Browse files
Merge pull request #104 from iExecBlockchainComputing/restartSubscription
Restart notification subscriptions
2 parents 03c0f6e + 6818f60 commit ed9100e

File tree

2 files changed

+50
-40
lines changed

2 files changed

+50
-40
lines changed

src/main/java/com/iexec/worker/pubsub/SubscriptionService.java

Lines changed: 49 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
import com.iexec.common.chain.ChainReceipt;
44
import com.iexec.common.result.TaskNotification;
55
import com.iexec.common.result.TaskNotificationType;
6+
import com.iexec.common.result.eip712.Eip712Challenge;
67
import com.iexec.worker.chain.RevealService;
78
import com.iexec.worker.config.CoreConfigurationService;
89
import com.iexec.worker.config.PublicConfigurationService;
910
import com.iexec.worker.config.WorkerConfigurationService;
1011
import com.iexec.worker.feign.CustomFeignClient;
1112
import com.iexec.worker.feign.ResultRepoClient;
12-
import com.iexec.common.result.eip712.Eip712Challenge;
1313
import com.iexec.worker.result.Eip712ChallengeService;
1414
import com.iexec.worker.result.ResultService;
1515
import lombok.extern.slf4j.Slf4j;
@@ -24,6 +24,8 @@
2424

2525
import javax.annotation.PostConstruct;
2626
import java.lang.reflect.Type;
27+
import java.util.ArrayList;
28+
import java.util.List;
2729
import java.util.Map;
2830
import java.util.Optional;
2931
import java.util.concurrent.ConcurrentHashMap;
@@ -75,16 +77,26 @@ public SubscriptionService(CoreConfigurationService coreConfigurationService,
7577

7678
@PostConstruct
7779
private void run() {
78-
this.connectStomp();
80+
this.restartStomp();
7981
}
8082

81-
private void connectStomp() {
83+
private void restartStomp() {
84+
log.info("Starting STOMP");
8285
WebSocketClient webSocketClient = new StandardWebSocketClient();
83-
stompClient = new WebSocketStompClient(webSocketClient);
84-
stompClient.setMessageConverter(new MappingJackson2MessageConverter());
85-
stompClient.setTaskScheduler(new ConcurrentTaskScheduler());
86-
stompClient.connect(url, this);
87-
log.info("Connect STOMP [isRunning: {}]", stompClient.isRunning());
86+
this.stompClient = new WebSocketStompClient(webSocketClient);
87+
this.stompClient.setMessageConverter(new MappingJackson2MessageConverter());
88+
this.stompClient.setTaskScheduler(new ConcurrentTaskScheduler());
89+
this.stompClient.connect(url, this);
90+
this.reSubscribeToTopics();
91+
log.info("Started STOMP");
92+
}
93+
94+
private void reSubscribeToTopics() {
95+
List<String> chainTaskIds = new ArrayList<>(chainTaskIdToSubscription.keySet());
96+
for (String chainTaskId : chainTaskIds) {
97+
unsubscribeFromTopic(chainTaskId);
98+
subscribeToTopic(chainTaskId);
99+
}
88100
}
89101

90102
@Override
@@ -98,34 +110,44 @@ public void handleException(StompSession session, @Nullable StompCommand command
98110
StompHeaders headers, byte[] payload, Throwable exception) {
99111
log.error("Received handleException [session: {}, isConnected: {}, Exception: {}]",
100112
session.getSessionId(), session.isConnected(), exception.getMessage());
101-
this.connectStomp();
113+
this.restartStomp();
102114
}
103115

104116
@Override
105117
public void handleTransportError(StompSession session, Throwable exception) {
106118
log.info("Received handleTransportError [session: {}, isConnected: {}]", session.getSessionId(), session.isConnected());
107-
this.connectStomp();
119+
this.restartStomp();
108120
}
109121

110-
public void subscribeToTaskNotifications(String chainTaskId) {
122+
private void unsubscribeFromTopic(String chainTaskId) {
111123
if (chainTaskIdToSubscription.containsKey(chainTaskId)) {
112-
log.info("Already subscribed to TaskNotification [chainTaskId:{}]", chainTaskId);
113-
return;
124+
chainTaskIdToSubscription.get(chainTaskId).unsubscribe();
125+
chainTaskIdToSubscription.remove(chainTaskId);
126+
log.info("Unsubscribed from topic [chainTaskId:{}]", chainTaskId);
127+
} else {
128+
log.info("Already unsubscribed from topic [chainTaskId:{}]", chainTaskId);
114129
}
115-
StompSession.Subscription subscription = session.subscribe(getTaskTopicName(chainTaskId), new StompFrameHandler() {
116-
@Override
117-
public Type getPayloadType(StompHeaders headers) {
118-
return TaskNotification.class;
119-
}
130+
}
120131

121-
@Override
122-
public void handleFrame(StompHeaders headers, Object payload) {
123-
TaskNotification taskNotification = (TaskNotification) payload;
124-
handleTaskNotification(taskNotification);
125-
}
126-
});
127-
chainTaskIdToSubscription.put(chainTaskId, subscription);
128-
log.info("Subscribed to topic [chainTaskId:{}, topic:{}]", chainTaskId, getTaskTopicName(chainTaskId));
132+
public void subscribeToTopic(String chainTaskId) {
133+
if (!chainTaskIdToSubscription.containsKey(chainTaskId)) {
134+
StompSession.Subscription subscription = session.subscribe(getTaskTopicName(chainTaskId), new StompFrameHandler() {
135+
@Override
136+
public Type getPayloadType(StompHeaders headers) {
137+
return TaskNotification.class;
138+
}
139+
140+
@Override
141+
public void handleFrame(StompHeaders headers, Object payload) {
142+
TaskNotification taskNotification = (TaskNotification) payload;
143+
handleTaskNotification(taskNotification);
144+
}
145+
});
146+
chainTaskIdToSubscription.put(chainTaskId, subscription);
147+
log.info("Subscribed to topic [chainTaskId:{}, topic:{}]", chainTaskId, getTaskTopicName(chainTaskId));
148+
} else {
149+
log.info("Already subscribed to topic [chainTaskId:{}, topic:{}]", chainTaskId, getTaskTopicName(chainTaskId));
150+
}
129151
}
130152

131153
private void handleTaskNotification(TaskNotification notif) {
@@ -208,19 +230,9 @@ private void completeTask(String chainTaskId) {
208230
feignClient.updateReplicateStatus(chainTaskId, COMPLETED);
209231
}
210232

211-
private void unsubscribeFromTaskNotifications(String chainTaskId) {
212-
if (!chainTaskIdToSubscription.containsKey(chainTaskId)) {
213-
log.info("Already unsubscribed from TaskNotification [chainTaskId:{}]", chainTaskId);
214-
return;
215-
}
216-
chainTaskIdToSubscription.get(chainTaskId).unsubscribe();
217-
chainTaskIdToSubscription.remove(chainTaskId);
218-
log.info("Unsubscribed from taskNotification [chainTaskId:{}]", chainTaskId);
219-
}
220-
221233
private void cleanReplicate(String chainTaskId) {
222234
// unsubscribe from the topic and remove the associated result from the machine
223-
unsubscribeFromTaskNotifications(chainTaskId);
235+
unsubscribeFromTopic(chainTaskId);
224236
resultService.removeResult(chainTaskId);
225237
}
226238

src/main/java/com/iexec/worker/replicate/ReplicateDemandService.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@
1717
import org.springframework.beans.factory.annotation.Autowired;
1818
import org.springframework.scheduling.annotation.Scheduled;
1919
import org.springframework.stereotype.Service;
20-
import org.web3j.utils.Numeric;
2120

22-
import javax.xml.bind.DatatypeConverter;
2321
import java.util.Optional;
2422

2523

@@ -73,7 +71,7 @@ public String askForReplicate() {
7371
return "Bad signature in received replicate";
7472
} else {
7573
log.info("The contribution contribAuth is valid [chainTaskId:{}]", chainTaskId);
76-
subscriptionService.subscribeToTaskNotifications(chainTaskId);
74+
subscriptionService.subscribeToTopic(chainTaskId);
7775

7876
Optional<AvailableReplicateModel> optionalModel = retrieveAvailableReplicateModelFromContribAuth(contribAuth);
7977
if (!optionalModel.isPresent()) {

0 commit comments

Comments
 (0)