Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import io.github.resilience4j.retry.annotation.Retry
import net.leanix.githubagent.handler.BrokerStompSessionHandler
import net.leanix.githubagent.services.LeanIXAuthService
import net.leanix.githubagent.shared.GitHubAgentProperties.GITHUB_AGENT_VERSION
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.converter.MappingJackson2MessageConverter
Expand All @@ -17,59 +16,28 @@ import org.springframework.web.socket.client.standard.StandardWebSocketClient
import org.springframework.web.socket.messaging.WebSocketStompClient
import org.springframework.web.socket.sockjs.client.SockJsClient
import org.springframework.web.socket.sockjs.client.WebSocketTransport
import java.time.Duration
import java.util.concurrent.ScheduledFuture

@Configuration
class WebSocketClientConfig(
private val brokerStompSessionHandler: BrokerStompSessionHandler,
private val objectMapper: ObjectMapper,
private val leanIXAuthService: LeanIXAuthService,
private val leanIXProperties: LeanIXProperties,
private val gitHubEnterpriseProperties: GitHubEnterpriseProperties,
private val gitHubEnterpriseProperties: GitHubEnterpriseProperties
) {

private var heartbeatTask: ScheduledFuture<*>? = null
private val logger = LoggerFactory.getLogger(WebSocketClientConfig::class.java)

@Retry(name = "ws_init_session")
fun initSession(): StompSession {
val headers = WebSocketHttpHeaders()
val stompHeaders = StompHeaders()
stompHeaders["Authorization"] = "Bearer ${leanIXAuthService.getBearerToken()}"
stompHeaders["GitHub-Enterprise-URL"] = gitHubEnterpriseProperties.baseUrl
stompHeaders["GitHub-Agent-Version"] = GITHUB_AGENT_VERSION
val session = stompClient().connectAsync(
return stompClient().connectAsync(
leanIXProperties.wsBaseUrl,
headers,
stompHeaders,
brokerStompSessionHandler,
).get()

sendHeartbeat(session)
return session
}

fun sendHeartbeat(session: StompSession) {
val scheduler = ThreadPoolTaskScheduler()
scheduler.initialize()
heartbeatTask = scheduler.scheduleAtFixedRate({
kotlin.runCatching {
if (session.isConnected) {
session.send("/app/ghe/heartbeat", "")
logger.debug("Heartbeat sent to /app/heartbeat")
} else {
logger.warn("Session is not connected, stopping heartbeat")
stopHeartbeat()
}
}.onFailure {
logger.error("Failed to send heartbeat: ${it.message}")
}
}, Duration.ofSeconds(60))
}

fun stopHeartbeat() {
heartbeatTask?.cancel(true)
}

@Bean
Expand All @@ -82,6 +50,7 @@ class WebSocketClientConfig(
val sockJsClient = SockJsClient(transports)
val stompClient = WebSocketStompClient(sockJsClient)
stompClient.messageConverter = jsonConverter
stompClient.defaultHeartbeat = longArrayOf(30000, 0)
val scheduler = ThreadPoolTaskScheduler()
scheduler.initialize()
stompClient.taskScheduler = scheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package net.leanix.githubagent.config

import com.fasterxml.jackson.databind.ObjectMapper
import io.mockk.coEvery
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import kotlinx.coroutines.runBlocking
import net.leanix.githubagent.handler.BrokerStompSessionHandler
import net.leanix.githubagent.services.LeanIXAuthService
Expand All @@ -16,7 +14,6 @@ import org.springframework.messaging.simp.stomp.StompHeaders
import org.springframework.messaging.simp.stomp.StompSession
import org.springframework.web.socket.WebSocketHttpHeaders
import org.springframework.web.socket.messaging.WebSocketStompClient
import java.util.concurrent.ScheduledFuture

class WebSocketClientConfigTests {
private lateinit var webSocketClientConfig: WebSocketClientConfig
Expand All @@ -26,7 +23,6 @@ class WebSocketClientConfigTests {
private lateinit var leanIXProperties: LeanIXProperties
private lateinit var gitHubEnterpriseProperties: GitHubEnterpriseProperties
private lateinit var leanIXAuthService: LeanIXAuthService
private lateinit var scheduledFuture: ScheduledFuture<*>

@BeforeEach
fun setUp() {
Expand All @@ -38,8 +34,6 @@ class WebSocketClientConfigTests {
stompSession = mockk()
authService = mockk()
leanIXAuthService = mockk()
scheduledFuture = mockk()

webSocketClientConfig = WebSocketClientConfig(
brokerStompSessionHandler,
objectMapper,
Expand Down Expand Up @@ -69,15 +63,4 @@ class WebSocketClientConfigTests {

assertEquals(null, session)
}

@Test
fun `should send heartbeat when session is connected`() {
val receiptable = mockk<StompSession.Receiptable>()
every { stompSession.isConnected } returns true
every { stompSession.send(any<String>(), any()) } returns receiptable

webSocketClientConfig.sendHeartbeat(stompSession)

verify { stompSession.send("/app/ghe/heartbeat", "") }
}
}
Loading