Skip to content

Commit fe3a379

Browse files
committed
Refactor runner status updates and streamline error handling
Expose `getRunStatus` for broader usage and improve error handling by introducing explicit checks for runner status updates. Update tests and add the `NotStarted` state to ensure accurate status transitions in different runner scenarios.
1 parent 6335136 commit fe3a379

File tree

8 files changed

+186
-22
lines changed

8 files changed

+186
-22
lines changed

api/src/integrationTest/kotlin/com/cosmotech/api/home/run/RunControllerTests.kt

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package com.cosmotech.api.home.run
44

55
import com.cosmotech.api.events.CsmEventPublisher
6+
import com.cosmotech.api.events.UpdateRunnerStatus
67
import com.cosmotech.api.home.Constants.PLATFORM_ADMIN_EMAIL
78
import com.cosmotech.api.home.ControllerTestBase
89
import com.cosmotech.api.home.ControllerTestUtils.OrganizationUtils.constructOrganizationCreateRequest
@@ -61,6 +62,7 @@ import com.cosmotech.solution.domain.RunTemplateResourceSizing
6162
import com.ninjasquad.springmockk.SpykBean
6263
import io.mockk.every
6364
import io.mockk.impl.annotations.MockK
65+
import io.mockk.mockk
6466
import java.time.Instant
6567
import java.util.*
6668
import org.json.JSONObject
@@ -175,6 +177,11 @@ class RunControllerTests : ControllerTestBase() {
175177
@WithMockOauth2User
176178
fun list_runs() {
177179

180+
every { eventPublisher.publishEvent(any()) } answers
181+
{
182+
firstArg<UpdateRunnerStatus>().response = "Running"
183+
}
184+
178185
mvc.perform(
179186
get("/organizations/$organizationId/workspaces/$workspaceId/runners/$runnerId}/runs")
180187
.contentType(MediaType.APPLICATION_JSON)
@@ -207,6 +214,11 @@ class RunControllerTests : ControllerTestBase() {
207214
@WithMockOauth2User
208215
fun get_run() {
209216

217+
every { eventPublisher.publishEvent(any()) } answers
218+
{
219+
firstArg<UpdateRunnerStatus>().response = "Running"
220+
}
221+
210222
mvc.perform(
211223
get(
212224
"/organizations/$organizationId/workspaces/$workspaceId/runners/$runnerId}/runs/$runId")
@@ -238,6 +250,23 @@ class RunControllerTests : ControllerTestBase() {
238250
@WithMockOauth2User
239251
fun delete_run() {
240252

253+
every { workflowService.stopWorkflow(any()) } returns mockk<RunStatus>(relaxed = true)
254+
255+
every { eventPublisher.publishEvent(any()) } answers
256+
{
257+
firstArg<UpdateRunnerStatus>().response = "Successful"
258+
} andThenAnswer
259+
{
260+
firstArg<UpdateRunnerStatus>().response = "Successful"
261+
} andThenAnswer
262+
{
263+
firstArg<UpdateRunnerStatus>().response = "Successful"
264+
} andThenAnswer
265+
{
266+
firstArg<UpdateRunnerStatus>().response = "Successful"
267+
} andThenAnswer
268+
{}
269+
241270
mvc.perform(
242271
delete(
243272
"/organizations/$organizationId/workspaces/$workspaceId/runners/$runnerId}/runs/$runId")
@@ -255,6 +284,11 @@ class RunControllerTests : ControllerTestBase() {
255284
@WithMockOauth2User
256285
fun send_data_run() {
257286

287+
every { eventPublisher.publishEvent(any()) } answers
288+
{
289+
firstArg<UpdateRunnerStatus>().response = "Running"
290+
}
291+
258292
val dataToSend =
259293
"""{
260294
"id": "my_table",
@@ -297,6 +331,11 @@ class RunControllerTests : ControllerTestBase() {
297331
@WithMockOauth2User
298332
fun query_data_run() {
299333

334+
every { eventPublisher.publishEvent(any()) } answers
335+
{
336+
firstArg<UpdateRunnerStatus>().response = "Running"
337+
}
338+
300339
val dataToSend =
301340
"""{
302341
"id": "my_table",
@@ -354,6 +393,11 @@ class RunControllerTests : ControllerTestBase() {
354393

355394
every { workflowService.getRunningLogs(any()) } returns logs
356395

396+
every { eventPublisher.publishEvent(any()) } answers
397+
{
398+
firstArg<UpdateRunnerStatus>().response = "Successful"
399+
}
400+
357401
mvc.perform(
358402
get(
359403
"/organizations/$organizationId/workspaces/$workspaceId/runners/$runnerId}/runs/$runId/logs")

api/src/integrationTest/kotlin/com/cosmotech/api/home/runner/RunnerControllerTests.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package com.cosmotech.api.home.runner
44

55
import com.cosmotech.api.events.CsmEventPublisher
66
import com.cosmotech.api.events.RunStart
7+
import com.cosmotech.api.events.UpdateRunnerStatus
78
import com.cosmotech.api.home.Constants.PLATFORM_ADMIN_EMAIL
89
import com.cosmotech.api.home.ControllerTestBase
910
import com.cosmotech.api.home.ControllerTestUtils.OrganizationUtils.constructOrganizationCreateRequest
@@ -732,8 +733,11 @@ class RunnerControllerTests : ControllerTestBase() {
732733
every { eventPublisher.publishEvent(any()) } answers
733734
{
734735
firstArg<RunStart>().response = expectedRunId
735-
} andThen
736-
Unit
736+
} andThenAnswer
737+
{
738+
firstArg<UpdateRunnerStatus>().response = "Running"
739+
} andThenAnswer
740+
{}
737741

738742
mvc.perform(
739743
post("/organizations/$organizationId/workspaces/$workspaceId/runners/$runnerId/start")

run/src/main/kotlin/com/cosmotech/run/service/RunServiceImpl.kt

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ class RunServiceImpl(
399399
return run
400400
}
401401

402-
private fun getRunStatus(run: Run): RunStatus {
402+
fun getRunStatus(run: Run): RunStatus {
403403
val runStatus = this.workflowService.getRunStatus(run)
404404
return runStatus.copy(
405405
state = mapWorkflowPhaseToRunStatus(phase = runStatus.phase, runId = run.id))
@@ -614,18 +614,26 @@ class RunServiceImpl(
614614

615615
@EventListener(UpdateRunnerStatus::class)
616616
fun updateRunnerStatus(updateRunnerStatus: UpdateRunnerStatus) {
617-
val runner =
618-
runnerApiService.getRunner(
619-
updateRunnerStatus.organizationId,
620-
updateRunnerStatus.workspaceId,
621-
updateRunnerStatus.runnerId)
622-
if (runner.lastRunInfo.lastRunId != null) {
623-
val status =
624-
getRunStatus(
625-
runner.organizationId, runner.workspaceId, runner.id, runner.lastRunInfo.lastRunId!!)
617+
val organizationId = updateRunnerStatus.organizationId
618+
val workspaceId = updateRunnerStatus.workspaceId
619+
val runnerId = updateRunnerStatus.runnerId
620+
val runId = updateRunnerStatus.lastRunId
621+
if (runId.isNotEmpty()) {
622+
val run =
623+
runRepository
624+
.findBy(organizationId, workspaceId, runnerId, runId)
625+
.orElseThrow {
626+
throw IllegalArgumentException(
627+
"Run #$runId not found in #$runnerId. In #$workspaceId, #$organizationId.")
628+
}
629+
.withStateInformation()
630+
.withoutSensitiveData()
631+
632+
val status = getRunStatus(run).state
626633
updateRunnerStatus.response = status.toString()
634+
return
627635
}
628-
throw IllegalStateException("LastRunId for runner ${runner.id} cannot be null!")
636+
throw IllegalStateException("LastRunId for runner $runnerId cannot be null!")
629637
}
630638

631639
@EventListener(RunStop::class)

run/src/main/kotlin/com/cosmotech/run/utils/RunExtensions.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ internal fun RunState.isTerminal() =
1414
RunState.Failed,
1515
RunState.Successful -> true
1616
RunState.Unknown,
17-
RunState.Running -> false
17+
RunState.Running,
18+
RunState.NotStarted -> false
1819
}
1920

2021
internal fun RunContainer.getNodeLabelSize(): Map<String, String> {

run/src/main/openapi/run.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,7 @@ components:
442442
- Successful
443443
- Failed
444444
- Unknown
445+
- NotStarted
445446
RunStatusNode:
446447
type: object
447448
description: status of a Run Node

runner/src/integrationTest/kotlin/com/cosmotech/runner/service/RunnerServiceIntegrationTest.kt

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ import io.mockk.every
5858
import io.mockk.junit5.MockKExtension
5959
import io.mockk.mockk
6060
import io.mockk.mockkStatic
61+
import java.lang.IllegalStateException
6162
import java.time.Instant
6263
import java.util.*
6364
import kotlin.test.assertEquals
@@ -107,7 +108,6 @@ class RunnerServiceIntegrationTest : CsmRedisTestBase() {
107108
@Autowired lateinit var solutionApiService: SolutionApiService
108109
@Autowired lateinit var workspaceApiService: WorkspaceApiService
109110
@Autowired lateinit var runnerApiService: RunnerApiServiceInterface
110-
@SpykBean @Autowired lateinit var runnerService: RunnerService
111111
@Autowired lateinit var csmPlatformProperties: CsmPlatformProperties
112112

113113
private var containerRegistryService: ContainerRegistryService = mockk(relaxed = true)
@@ -1020,7 +1020,8 @@ class RunnerServiceIntegrationTest : CsmRedisTestBase() {
10201020
val run = runnerApiService.startRun(organizationSaved.id, workspaceSaved.id, runnerSaved.id)
10211021
assertEquals(expectedRunId, run.id)
10221022

1023-
// This line points the fact that the run status is not final so the getRunner trigger an update
1023+
// This line points to the fact that the run status is not final, so the getRunner trigger an
1024+
// update
10241025
// for lastRunInfo
10251026
every { eventPublisher.publishEvent(any<UpdateRunnerStatus>()) } answers
10261027
{
@@ -1034,6 +1035,96 @@ class RunnerServiceIntegrationTest : CsmRedisTestBase() {
10341035
assertEquals(LastRunInfo.LastRunStatus.Running, runnerStarted.lastRunInfo.lastRunStatus)
10351036
}
10361037

1038+
@Test
1039+
fun `test getRunner when runner has been stopped`() {
1040+
1041+
val expectedRunId = "run-genid12345"
1042+
1043+
every { eventPublisher.publishEvent(any()) } answers
1044+
{
1045+
firstArg<RunStart>().response = expectedRunId
1046+
} andThenAnswer
1047+
{
1048+
// This line points to the fact that the run status is not final, so the getRunner trigger
1049+
// an update
1050+
// for lastRunInfo
1051+
firstArg<UpdateRunnerStatus>().response = "Running"
1052+
} andThenAnswer
1053+
{
1054+
// Mock the RunStop event
1055+
} andThenAnswer
1056+
{
1057+
// Simulate the workflow's stop and the runner.lastInfo update after workflow's stop
1058+
// For the getRunner Called at the end on the test
1059+
firstArg<UpdateRunnerStatus>().response = "Failed"
1060+
}
1061+
1062+
val run = runnerApiService.startRun(organizationSaved.id, workspaceSaved.id, runnerSaved.id)
1063+
assertEquals(expectedRunId, run.id)
1064+
1065+
runnerApiService.stopRun(organizationSaved.id, workspaceSaved.id, runnerSaved.id)
1066+
1067+
val runnerStarted =
1068+
runnerApiService.getRunner(organizationSaved.id, workspaceSaved.id, runnerSaved.id)
1069+
1070+
assertEquals(expectedRunId, runnerStarted.lastRunInfo.lastRunId)
1071+
assertEquals(LastRunInfo.LastRunStatus.Failed, runnerStarted.lastRunInfo.lastRunStatus)
1072+
}
1073+
1074+
@Test
1075+
fun `test to stop a runner when is already finished but lastRunInfo is not updated to Successful`() {
1076+
1077+
val expectedRunId = "run-genid12345"
1078+
1079+
every { eventPublisher.publishEvent(any()) } answers
1080+
{
1081+
firstArg<RunStart>().response = expectedRunId
1082+
} andThenAnswer
1083+
{
1084+
// This line points to the fact that the run status is not final, so the getRunner trigger
1085+
// an update
1086+
// for lastRunInfo
1087+
firstArg<UpdateRunnerStatus>().response = "Successful"
1088+
}
1089+
1090+
val run = runnerApiService.startRun(organizationSaved.id, workspaceSaved.id, runnerSaved.id)
1091+
assertEquals(expectedRunId, run.id)
1092+
1093+
val exception =
1094+
assertThrows<IllegalStateException> {
1095+
runnerApiService.stopRun(organizationSaved.id, workspaceSaved.id, runnerSaved.id)
1096+
}
1097+
1098+
assertEquals("Run $expectedRunId can not be stopped as its already finished", exception.message)
1099+
}
1100+
1101+
@Test
1102+
fun `test to stop a runner when is already finished but lastRunInfo is not updated to Failed`() {
1103+
1104+
val expectedRunId = "run-genid12345"
1105+
1106+
every { eventPublisher.publishEvent(any()) } answers
1107+
{
1108+
firstArg<RunStart>().response = expectedRunId
1109+
} andThenAnswer
1110+
{
1111+
// This line points the fact that the run status is not final so the getRunner trigger an
1112+
// update
1113+
// for lastRunInfo
1114+
firstArg<UpdateRunnerStatus>().response = "Failed"
1115+
}
1116+
1117+
val run = runnerApiService.startRun(organizationSaved.id, workspaceSaved.id, runnerSaved.id)
1118+
assertEquals(expectedRunId, run.id)
1119+
1120+
val exception =
1121+
assertThrows<IllegalStateException> {
1122+
runnerApiService.stopRun(organizationSaved.id, workspaceSaved.id, runnerSaved.id)
1123+
}
1124+
1125+
assertEquals("Run $expectedRunId can not be stopped as its already finished", exception.message)
1126+
}
1127+
10371128
@Test
10381129
fun `As a viewer, I can only see my information in security property for getRunner`() {
10391130
every { getCurrentAccountIdentifier(any()) } returns defaultName

runner/src/main/kotlin/com/cosmotech/runner/service/RunnerApiServiceImpl.kt

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,20 @@ internal class RunnerApiServiceImpl(
104104
override fun stopRun(organizationId: String, workspaceId: String, runnerId: String) {
105105
val runnerService = getRunnerService().inOrganization(organizationId).inWorkspace(workspaceId)
106106

107-
val runnerInstance =
108-
runnerService.getInstance(runnerId, false).userHasPermission(PERMISSION_WRITE)
107+
val runnerInstance = runnerService.getInstance(runnerId).userHasPermission(PERMISSION_WRITE)
109108

110-
runnerService.stopLastRunOf(runnerInstance)
109+
val lastRunInfo = runnerInstance.getRunnerDataObjet().lastRunInfo
110+
111+
if (lastRunInfo.lastRunStatus in
112+
listOf(
113+
LastRunInfo.LastRunStatus.Running,
114+
LastRunInfo.LastRunStatus.NotStarted,
115+
LastRunInfo.LastRunStatus.Unknown)) {
116+
runnerService.stopLastRunOf(runnerInstance)
117+
return
118+
}
119+
throw IllegalStateException(
120+
"Run ${lastRunInfo.lastRunId} can not be stopped as its already finished")
111121
}
112122

113123
override fun createRunnerAccessControl(

runner/src/main/kotlin/com/cosmotech/runner/service/RunnerService.kt

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,13 @@ class RunnerService(
175175
return runnerInstance.initialize()
176176
}
177177

178-
fun getInstance(runnerId: String, withStatusUpdate: Boolean = true): RunnerInstance {
178+
fun getInstance(runnerId: String): RunnerInstance {
179179
var runner =
180180
runnerRepository.findBy(organization!!.id, workspace!!.id, runnerId).orElseThrow {
181181
CsmResourceNotFoundException(
182182
"Runner $runnerId not found in workspace ${workspace!!.id} and organization ${organization!!.id}")
183183
}
184-
if (withStatusUpdate && runner.lastRunInfo.lastRunId != null) {
184+
if (runner.lastRunInfo.lastRunId != null) {
185185
if (runner.lastRunInfo.lastRunStatus != LastRunStatus.Failed ||
186186
runner.lastRunInfo.lastRunStatus != LastRunStatus.Successful) {
187187
runner = updateRunnerStatus(runner)
@@ -193,7 +193,12 @@ class RunnerService(
193193

194194
fun updateRunnerStatus(runner: Runner): Runner {
195195
val updateRunnerStatusEvent =
196-
UpdateRunnerStatus(this, runner.organizationId, runner.workspaceId, runner.id)
196+
UpdateRunnerStatus(
197+
this,
198+
runner.organizationId,
199+
runner.workspaceId,
200+
runner.id,
201+
runner.lastRunInfo.lastRunId ?: "")
197202
eventPublisher.publishEvent(updateRunnerStatusEvent)
198203
val runStatus = LastRunStatus.forValue(updateRunnerStatusEvent.response!!)
199204
return runnerRepository.save(runner.apply { lastRunInfo.lastRunStatus = runStatus })

0 commit comments

Comments
 (0)