Skip to content

Commit aca4feb

Browse files
Merge pull request #106 from iExecBlockchainComputing/dontUnsubscribeWhenSessionClosed
Dont unsubscribe when stomp session is closed
2 parents 1a1f18c + 988e95a commit aca4feb

File tree

1 file changed

+19
-7
lines changed

1 file changed

+19
-7
lines changed

src/main/java/com/iexec/worker/pubsub/SubscriptionService.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import lombok.extern.slf4j.Slf4j;
1616
import org.springframework.lang.Nullable;
1717
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
18+
import org.springframework.messaging.simp.SimpMessageType;
1819
import org.springframework.messaging.simp.stomp.*;
1920
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
2021
import org.springframework.stereotype.Service;
@@ -94,7 +95,7 @@ private void reSubscribeToTopics() {
9495
List<String> chainTaskIds = new ArrayList<>(chainTaskIdToSubscription.keySet());
9596
log.info("ReSubscribing to topics [chainTaskIds: {}]", chainTaskIds.toString());
9697
for (String chainTaskId : chainTaskIds) {
97-
unsubscribeFromTopic(chainTaskId);
98+
chainTaskIdToSubscription.remove(chainTaskId);
9899
subscribeToTopic(chainTaskId);
99100
}
100101
log.info("ReSubscribed to topics [chainTaskIds: {}]", chainTaskIds.toString());
@@ -110,14 +111,21 @@ public void afterConnected(StompSession session, StompHeaders connectedHeaders)
110111
@Override
111112
public void handleException(StompSession session, @Nullable StompCommand command,
112113
StompHeaders headers, byte[] payload, Throwable exception) {
113-
log.error("Received handleException [session: {}, isConnected: {}, Exception: {}]",
114-
session.getSessionId(), session.isConnected(), exception.getMessage());
114+
SimpMessageType messageType = null;
115+
if (command != null) {
116+
messageType = command.getMessageType();
117+
}
118+
log.error("Received handleException [session: {}, isConnected: {}, command: {}, exception: {}]",
119+
session.getSessionId(), session.isConnected(), messageType, exception.getMessage());
120+
exception.printStackTrace();
115121
this.restartStomp();
116122
}
117123

118124
@Override
119125
public void handleTransportError(StompSession session, Throwable exception) {
120-
log.info("Received handleTransportError [session: {}, isConnected: {}]", session.getSessionId(), session.isConnected());
126+
log.info("Received handleTransportError [session: {}, isConnected: {}, exception: {}]",
127+
session.getSessionId(), session.isConnected(), exception.getMessage());
128+
exception.printStackTrace();
121129
this.restartStomp();
122130
}
123131

@@ -140,9 +148,13 @@ public Type getPayloadType(StompHeaders headers) {
140148
}
141149

142150
@Override
143-
public void handleFrame(StompHeaders headers, Object payload) {
144-
TaskNotification taskNotification = (TaskNotification) payload;
145-
handleTaskNotification(taskNotification);
151+
public void handleFrame(StompHeaders headers, @Nullable Object payload) {
152+
if (payload != null) {
153+
TaskNotification taskNotification = (TaskNotification) payload;
154+
handleTaskNotification(taskNotification);
155+
} else {
156+
log.info("Payload of TaskNotification is null [chainTaskId:{}]", chainTaskId);
157+
}
146158
}
147159
});
148160
chainTaskIdToSubscription.put(chainTaskId, subscription);

0 commit comments

Comments
 (0)