@@ -5,7 +5,7 @@ import io.github.resilience4j.retry.annotation.Retry
55import net.leanix.githubagent.handler.BrokerStompSessionHandler
66import net.leanix.githubagent.services.LeanIXAuthService
77import net.leanix.githubagent.shared.GitHubAgentProperties.GITHUB_AGENT_VERSION
8- import org.springframework.beans.factory.annotation.Value
8+ import org.slf4j.LoggerFactory
99import org.springframework.context.annotation.Bean
1010import org.springframework.context.annotation.Configuration
1111import org.springframework.messaging.converter.MappingJackson2MessageConverter
@@ -17,6 +17,7 @@ import org.springframework.web.socket.client.standard.StandardWebSocketClient
1717import org.springframework.web.socket.messaging.WebSocketStompClient
1818import org.springframework.web.socket.sockjs.client.SockJsClient
1919import org.springframework.web.socket.sockjs.client.WebSocketTransport
20+ import java.util.concurrent.ScheduledFuture
2021
2122@Configuration
2223class WebSocketClientConfig (
@@ -25,21 +26,49 @@ class WebSocketClientConfig(
2526 private val leanIXAuthService : LeanIXAuthService ,
2627 private val leanIXProperties : LeanIXProperties ,
2728 private val gitHubEnterpriseProperties : GitHubEnterpriseProperties ,
28- @Value(" \$ {websocket.heartbeat-interval}" ) private val heartbeatInterval : Long
2929) {
30+
31+ private var heartbeatTask: ScheduledFuture <* >? = null
32+ private val logger = LoggerFactory .getLogger(WebSocketClientConfig ::class .java)
33+
3034 @Retry(name = " ws_init_session" )
3135 fun initSession (): StompSession {
3236 val headers = WebSocketHttpHeaders ()
3337 val stompHeaders = StompHeaders ()
3438 stompHeaders[" Authorization" ] = " Bearer ${leanIXAuthService.getBearerToken()} "
3539 stompHeaders[" GitHub-Enterprise-URL" ] = gitHubEnterpriseProperties.baseUrl
3640 stompHeaders[" GitHub-Agent-Version" ] = GITHUB_AGENT_VERSION
37- return stompClient().connectAsync(
41+ val session = stompClient().connectAsync(
3842 leanIXProperties.wsBaseUrl,
3943 headers,
4044 stompHeaders,
4145 brokerStompSessionHandler,
4246 ).get()
47+
48+ sendHeartbeat(session)
49+ return session
50+ }
51+
52+ fun sendHeartbeat (session : StompSession ) {
53+ val scheduler = ThreadPoolTaskScheduler ()
54+ scheduler.initialize()
55+ heartbeatTask = scheduler.scheduleAtFixedRate({
56+ kotlin.runCatching {
57+ if (session.isConnected) {
58+ session.send(" /app/ghe/heartbeat" , " " )
59+ logger.debug(" Heartbeat sent to /app/heartbeat" )
60+ } else {
61+ logger.warn(" Session is not connected, stopping heartbeat" )
62+ stopHeartbeat()
63+ }
64+ }.onFailure {
65+ logger.error(" Failed to send heartbeat: ${it.message} " )
66+ }
67+ }, java.time.Duration .ofSeconds(30 ))
68+ }
69+
70+ fun stopHeartbeat () {
71+ heartbeatTask?.cancel(true )
4372 }
4473
4574 @Bean
@@ -52,7 +81,6 @@ class WebSocketClientConfig(
5281 val sockJsClient = SockJsClient (transports)
5382 val stompClient = WebSocketStompClient (sockJsClient)
5483 stompClient.messageConverter = jsonConverter
55- stompClient.defaultHeartbeat = longArrayOf(heartbeatInterval, 0 )
5684 val scheduler = ThreadPoolTaskScheduler ()
5785 scheduler.initialize()
5886 stompClient.taskScheduler = scheduler
0 commit comments