@@ -5,7 +5,7 @@ import io.github.resilience4j.retry.annotation.Retry
55import net.leanix.githubagent.handler.BrokerStompSessionHandler
66import net.leanix.githubagent.services.LeanIXAuthService
77import net.leanix.githubagent.shared.GitHubAgentProperties.GITHUB_AGENT_VERSION
8- import org.springframework.beans.factory.annotation.Value
8+ import org.slf4j.LoggerFactory
99import org.springframework.context.annotation.Bean
1010import org.springframework.context.annotation.Configuration
1111import org.springframework.messaging.converter.MappingJackson2MessageConverter
@@ -17,6 +17,8 @@ import org.springframework.web.socket.client.standard.StandardWebSocketClient
1717import org.springframework.web.socket.messaging.WebSocketStompClient
1818import org.springframework.web.socket.sockjs.client.SockJsClient
1919import org.springframework.web.socket.sockjs.client.WebSocketTransport
20+ import java.time.Duration
21+ import java.util.concurrent.ScheduledFuture
2022
2123@Configuration
2224class WebSocketClientConfig (
@@ -25,21 +27,49 @@ class WebSocketClientConfig(
2527 private val leanIXAuthService : LeanIXAuthService ,
2628 private val leanIXProperties : LeanIXProperties ,
2729 private val gitHubEnterpriseProperties : GitHubEnterpriseProperties ,
28- @Value(" \$ {websocket.heartbeat-interval}" ) private val heartbeatInterval : Long
2930) {
31+
32+ private var heartbeatTask: ScheduledFuture <* >? = null
33+ private val logger = LoggerFactory .getLogger(WebSocketClientConfig ::class .java)
34+
3035 @Retry(name = " ws_init_session" )
3136 fun initSession (): StompSession {
3237 val headers = WebSocketHttpHeaders ()
3338 val stompHeaders = StompHeaders ()
3439 stompHeaders[" Authorization" ] = " Bearer ${leanIXAuthService.getBearerToken()} "
3540 stompHeaders[" GitHub-Enterprise-URL" ] = gitHubEnterpriseProperties.baseUrl
3641 stompHeaders[" GitHub-Agent-Version" ] = GITHUB_AGENT_VERSION
37- return stompClient().connectAsync(
42+ val session = stompClient().connectAsync(
3843 leanIXProperties.wsBaseUrl,
3944 headers,
4045 stompHeaders,
4146 brokerStompSessionHandler,
4247 ).get()
48+
49+ sendHeartbeat(session)
50+ return session
51+ }
52+
53+ fun sendHeartbeat (session : StompSession ) {
54+ val scheduler = ThreadPoolTaskScheduler ()
55+ scheduler.initialize()
56+ heartbeatTask = scheduler.scheduleAtFixedRate({
57+ kotlin.runCatching {
58+ if (session.isConnected) {
59+ session.send(" /app/ghe/heartbeat" , " " )
60+ logger.debug(" Heartbeat sent to /app/heartbeat" )
61+ } else {
62+ logger.warn(" Session is not connected, stopping heartbeat" )
63+ stopHeartbeat()
64+ }
65+ }.onFailure {
66+ logger.error(" Failed to send heartbeat: ${it.message} " )
67+ }
68+ }, Duration .ofSeconds(60 ))
69+ }
70+
71+ fun stopHeartbeat () {
72+ heartbeatTask?.cancel(true )
4373 }
4474
4575 @Bean
@@ -52,7 +82,6 @@ class WebSocketClientConfig(
5282 val sockJsClient = SockJsClient (transports)
5383 val stompClient = WebSocketStompClient (sockJsClient)
5484 stompClient.messageConverter = jsonConverter
55- stompClient.defaultHeartbeat = longArrayOf(heartbeatInterval, 0 )
5685 val scheduler = ThreadPoolTaskScheduler ()
5786 scheduler.initialize()
5887 stompClient.taskScheduler = scheduler
0 commit comments