1+ package com .iexec .worker .pubsub ;
2+
3+ import java .util .Arrays ;
4+ import java .util .List ;
5+ import java .util .concurrent .ArrayBlockingQueue ;
6+ import java .util .concurrent .BlockingQueue ;
7+ import java .util .concurrent .TimeUnit ;
8+
9+ import javax .annotation .PostConstruct ;
10+
11+ import com .iexec .worker .config .CoreConfigurationService ;
12+
13+ import org .springframework .context .ApplicationEventPublisher ;
14+ import org .springframework .lang .Nullable ;
15+ import org .springframework .messaging .converter .MappingJackson2MessageConverter ;
16+ import org .springframework .messaging .simp .SimpMessageType ;
17+ import org .springframework .messaging .simp .stomp .StompCommand ;
18+ import org .springframework .messaging .simp .stomp .StompFrameHandler ;
19+ import org .springframework .messaging .simp .stomp .StompHeaders ;
20+ import org .springframework .messaging .simp .stomp .StompSession ;
21+ import org .springframework .messaging .simp .stomp .StompSession .Subscription ;
22+ import org .springframework .messaging .simp .stomp .StompSessionHandlerAdapter ;
23+ import org .springframework .scheduling .annotation .Scheduled ;
24+ import org .springframework .scheduling .concurrent .ConcurrentTaskScheduler ;
25+ import org .springframework .stereotype .Component ;
26+ import org .springframework .web .client .RestTemplate ;
27+ import org .springframework .web .socket .client .WebSocketClient ;
28+ import org .springframework .web .socket .client .standard .StandardWebSocketClient ;
29+ import org .springframework .web .socket .messaging .WebSocketStompClient ;
30+ import org .springframework .web .socket .sockjs .client .RestTemplateXhrTransport ;
31+ import org .springframework .web .socket .sockjs .client .SockJsClient ;
32+ import org .springframework .web .socket .sockjs .client .Transport ;
33+ import org .springframework .web .socket .sockjs .client .WebSocketTransport ;
34+
35+ import lombok .NoArgsConstructor ;
36+ import lombok .extern .slf4j .Slf4j ;
37+
38+ @ Slf4j
39+ @ Component
40+ public class StompClient {
41+
42+ private static final int SESSION_REFRESH_DELAY = 5 ;
43+ private final BlockingQueue <SessionRequestEvent > sessionRequestQueue = new ArrayBlockingQueue <>(1 );
44+ private final ApplicationEventPublisher eventPublisher ;
45+ private final String webSocketServerUrl ;
46+ private final WebSocketStompClient stompClient ;
47+ private StompSession session ;
48+
49+ public StompClient (ApplicationEventPublisher applicationEventPublisher ,
50+ CoreConfigurationService coreConfigService , RestTemplate restTemplate ) {
51+ this .eventPublisher = applicationEventPublisher ;
52+ this .webSocketServerUrl = coreConfigService .getUrl () + "/connect" ;
53+ log .info ("Creating STOMP client" );
54+ WebSocketClient webSocketClient = new StandardWebSocketClient ();
55+ List <Transport > webSocketTransports = Arrays .asList (
56+ new WebSocketTransport (webSocketClient ),
57+ new RestTemplateXhrTransport (restTemplate )
58+ );
59+ SockJsClient sockJsClient = new SockJsClient (webSocketTransports );
60+ // without SockJS: new WebSocketStompClient(webSocketClient);
61+ this .stompClient = new WebSocketStompClient (sockJsClient );
62+ this .stompClient .setAutoStartup (true );
63+ this .stompClient .setMessageConverter (new MappingJackson2MessageConverter ());
64+ this .stompClient .setTaskScheduler (new ConcurrentTaskScheduler ());
65+ log .info ("Created STOMP client" );
66+ }
67+
68+ /**
69+ * Subscribe to a topic and provide a {@link StompFrameHandler}
70+ * to handle received messages.
71+ *
72+ * @param topic
73+ * @param messageHandler an implementation of
74+ * @return
75+ */
76+ Subscription subscribeToTopic (String topic , StompFrameHandler messageHandler ) {
77+ return this .session .subscribe (topic , messageHandler );
78+ }
79+
80+ @ PostConstruct
81+ private void init () {
82+ requestNewSession ();
83+ }
84+
85+ /**
86+ * Add new SessionRequestEvent to the queue. A queue listener
87+ * will consume this event and create a new STOMP session.
88+ * This does not raise an error if the queue is full.
89+ */
90+ private void requestNewSession () {
91+ this .sessionRequestQueue .offer (new SessionRequestEvent ());
92+ }
93+
94+ /**
95+ * Listen to new session request events and refresh the websocket
96+ * connection by establishing a new STOMP session. Only one of
97+ * the received requests in a fixed time interval
98+ * (SESSION_REFRESH_DELAY) will be processed. We use @Scheduled
99+ * to start the watcher asynchronously with an initial delay and
100+ * restart it in case a problem occurs.
101+ *
102+ * @throws InterruptedException
103+ */
104+ @ Scheduled (fixedDelay = 1000 )
105+ private void listenToSessionRequestEventsInTheQueue () throws InterruptedException {
106+ while (true ) {
107+ // get the first request event or wait until available
108+ this .sessionRequestQueue .take ();
109+ // wait some time for the wave of request events coming
110+ // from possibly different threads to finish
111+ TimeUnit .SECONDS .sleep (SESSION_REFRESH_DELAY );
112+ log .info ("Creating new STOMP session" );
113+ // purge redundant request events
114+ this .sessionRequestQueue .clear ();
115+ // Only one attempt should pass through
116+ log .debug ("Sending new STOMP connection request" );
117+ this .stompClient .connect (webSocketServerUrl , new SessionHandler ());
118+ }
119+ }
120+
121+ /**
122+ * Provide callbacks to handle STOMP session establishment or
123+ * failure.
124+ */
125+ private class SessionHandler extends StompSessionHandlerAdapter {
126+
127+ @ Override
128+ public void afterConnected (StompSession session , StompHeaders connectedHeaders ) {
129+ log .info ("Connected to STOMP session [session: {}, isConnected: {}]" ,
130+ session .getSessionId (), session .isConnected ());
131+ StompClient .this .session = session ;
132+ // notify subscribers
133+ eventPublisher .publishEvent (new SessionCreatedEvent ());
134+ }
135+
136+ /**
137+ * Handle any exception arising while processing a STOMP frame such as a
138+ * failure to convert the payload or an unhandled exception in the
139+ * application {@code StompFrameHandler}.
140+ *
141+ * @param session the client STOMP session
142+ * @param command the STOMP command of the frame
143+ * @param headers the headers
144+ * @param payload the raw payload
145+ * @param exception the exception
146+ */
147+ @ Override
148+ public void handleException (StompSession session , @ Nullable StompCommand command ,
149+ StompHeaders headers , byte [] payload , Throwable exception ) {
150+ SimpMessageType messageType = command != null ? command .getMessageType () : null ;
151+ log .error ("STOMP frame processing error [session: {}, isConnected: {}, command: {}, exception: {}]" ,
152+ session .getSessionId (), session .isConnected (), messageType , exception .getMessage ());
153+ }
154+
155+ /**
156+ * Handle a low level transport error which could be an I/O error or a
157+ * failure to encode or decode a STOMP message.
158+ * <p>Note that
159+ * {@link org.springframework.messaging.simp.stomp.ConnectionLostException
160+ * ConnectionLostException} will be passed into this method when the
161+ * connection is lost rather than closed normally via
162+ * {@link StompSession#disconnect()}.
163+ *
164+ * @param session the client STOMP session
165+ * @param exception the exception that occurred
166+ */
167+ @ Override
168+ public void handleTransportError (StompSession session , Throwable exception ) {
169+ log .error ("STOMP transport error [session: {}, isConnected: {}, exception: {}]" ,
170+ session .getSessionId (), session .isConnected (), exception .getMessage ());
171+ requestNewSession ();
172+ }
173+ }
174+
175+ /**
176+ * Request a new STOMP session by adding
177+ * this event to the queue.
178+ */
179+ @ NoArgsConstructor
180+ private class SessionRequestEvent {
181+
182+ }
183+ }
0 commit comments