Skip to content

Commit c59edb4

Browse files
Add a timeouts to stop unresponded workflow
1 parent 4a7a389 commit c59edb4

File tree

9 files changed

+64
-43
lines changed

9 files changed

+64
-43
lines changed

doc/Models/RunTemplate.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ Name | Type | Description | Notes
3232
**gitRepositoryUrl** | **String** | an optional URL to the git repository | [optional] [default to null]
3333
**gitBranchName** | **String** | an optional git branch name | [optional] [default to null]
3434
**runTemplateSourceDir** | **String** | an optional directory where to find the run template source | [optional] [default to null]
35+
**executionTimeout** | **Integer** | an optional duration in seconds in which a workflow is allowed to run | [optional] [default to null]
3536

3637
[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)
3738

openapi/plantuml/schemas.plantuml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ entity RunTemplate {
126126
gitRepositoryUrl: String
127127
gitBranchName: String
128128
runTemplateSourceDir: String
129+
executionTimeout: Integer
129130
}
130131

131132
entity RunTemplateHandlerId {

scenariorun/src/main/kotlin/com/cosmotech/scenariorun/azure/ScenarioRunServiceImpl.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ internal class ScenarioRunServiceImpl(
261261
logger.debug(startInfo.toString())
262262
scenarioDataDownloadRequest.response =
263263
workflowService
264-
.launchScenarioRun(startInfo.startContainers)
264+
.launchScenarioRun(startInfo.startContainers, null)
265265
.asMapWithAdditionalData(scenarioDataDownloadRequest.workspaceId)
266266
}
267267

@@ -293,7 +293,9 @@ internal class ScenarioRunServiceImpl(
293293
scenarioId,
294294
)
295295
logger.debug(startInfo.toString())
296-
val scenarioRunRequest = workflowService.launchScenarioRun(startInfo.startContainers)
296+
val scenarioRunRequest =
297+
workflowService.launchScenarioRun(
298+
startInfo.startContainers, startInfo.runTemplate.executionTimeout)
297299
val scenarioRun =
298300
this.dbCreateScenarioRun(
299301
scenarioRunRequest,
@@ -360,7 +362,7 @@ internal class ScenarioRunServiceImpl(
360362
organizationId: String,
361363
scenarioRunStartContainers: ScenarioRunStartContainers
362364
): ScenarioRun {
363-
val scenarioRunRequest = workflowService.launchScenarioRun(scenarioRunStartContainers)
365+
val scenarioRunRequest = workflowService.launchScenarioRun(scenarioRunStartContainers, null)
364366
return this.dbCreateScenarioRun(
365367
scenarioRunRequest,
366368
organizationId,

scenariorun/src/main/kotlin/com/cosmotech/scenariorun/workflow/WorkflowService.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,13 @@ internal interface WorkflowService : HealthIndicator {
1313
/**
1414
* Launch a new Scenario run, using the request specified
1515
* @param scenarioRunStartContainers the scenario run start request
16+
* @param executionTimeout the duration in which the workflow is allowed to run
1617
* @return a new ScenarioRun
1718
*/
18-
fun launchScenarioRun(scenarioRunStartContainers: ScenarioRunStartContainers): ScenarioRun
19+
fun launchScenarioRun(
20+
scenarioRunStartContainers: ScenarioRunStartContainers,
21+
executionTimeout: Int?
22+
): ScenarioRun
1923

2024
fun findWorkflowStatusAndArtifact(
2125
labelSelector: String,

scenariorun/src/main/kotlin/com/cosmotech/scenariorun/workflow/argo/ArgoWorkflowService.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,11 +180,13 @@ internal class ArgoWorkflowService(
180180
}
181181

182182
override fun launchScenarioRun(
183-
scenarioRunStartContainers: ScenarioRunStartContainers
183+
scenarioRunStartContainers: ScenarioRunStartContainers,
184+
executionTimeout: Int?
184185
): ScenarioRun {
185186
val body =
186187
WorkflowCreateRequest()
187-
.workflow(buildWorkflow(csmPlatformProperties, scenarioRunStartContainers))
188+
.workflow(
189+
buildWorkflow(csmPlatformProperties, scenarioRunStartContainers, executionTimeout))
188190

189191
logger.trace("Workflow: {}", body.workflow)
190192

scenariorun/src/main/kotlin/com/cosmotech/scenariorun/workflow/argo/WorkflowBuilders.kt

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ internal const val VOLUME_CLAIM_DATASETS_SUBPATH = "datasetsdir"
3434
internal const val VOLUME_CLAIM_PARAMETERS_SUBPATH = "parametersdir"
3535
private const val VOLUME_DATASETS_PATH = "/mnt/scenariorun-data"
3636
private const val VOLUME_PARAMETERS_PATH = "/mnt/scenariorun-parameters"
37+
private const val CSM_ARGO_WORKFLOWS_TIMEOUT = 28800
3738

3839
internal fun buildTemplate(scenarioRunContainer: ScenarioRunContainer): Template {
3940
var envVars: MutableList<V1EnvVar>? = null
@@ -90,7 +91,8 @@ internal fun buildTemplate(scenarioRunContainer: ScenarioRunContainer): Template
9091

9192
internal fun buildWorkflowSpec(
9293
csmPlatformProperties: CsmPlatformProperties,
93-
startContainers: ScenarioRunStartContainers
94+
startContainers: ScenarioRunStartContainers,
95+
executionTimeout: Int?
9496
): WorkflowSpec {
9597
val nodeSelector = mutableMapOf("kubernetes.io/os" to "linux")
9698
if (startContainers.nodeLabel != null) {
@@ -101,31 +103,37 @@ internal fun buildWorkflowSpec(
101103
val entrypointTemplate = buildEntrypointTemplate(startContainers)
102104
templates.add(entrypointTemplate)
103105

104-
return WorkflowSpec()
105-
.imagePullSecrets(
106-
csmPlatformProperties
107-
.argo
108-
.imagePullSecrets
109-
?.filterNot(String::isBlank)
110-
?.map(V1LocalObjectReference()::name)
111-
?.ifEmpty { null })
112-
.nodeSelector(nodeSelector)
113-
.serviceAccountName(csmPlatformProperties.argo.workflows.serviceAccountName)
114-
.entrypoint(CSM_DAG_ENTRYPOINT)
115-
.templates(templates)
116-
.volumeClaimTemplates(buildVolumeClaims(csmPlatformProperties))
106+
var workflowSpec =
107+
WorkflowSpec()
108+
.imagePullSecrets(
109+
csmPlatformProperties
110+
.argo
111+
.imagePullSecrets
112+
?.filterNot(String::isBlank)
113+
?.map(V1LocalObjectReference()::name)
114+
?.ifEmpty { null })
115+
.nodeSelector(nodeSelector)
116+
.serviceAccountName(csmPlatformProperties.argo.workflows.serviceAccountName)
117+
.entrypoint(CSM_DAG_ENTRYPOINT)
118+
.templates(templates)
119+
.volumeClaimTemplates(buildVolumeClaims(csmPlatformProperties))
120+
121+
workflowSpec.activeDeadlineSeconds = executionTimeout ?: CSM_ARGO_WORKFLOWS_TIMEOUT
122+
123+
return workflowSpec
117124
}
118125

119126
internal fun buildWorkflow(
120127
csmPlatformProperties: CsmPlatformProperties,
121-
startContainers: ScenarioRunStartContainers
128+
startContainers: ScenarioRunStartContainers,
129+
executionTimeout: Int?
122130
) =
123131
Workflow()
124132
.metadata(
125133
V1ObjectMeta()
126134
.generateName(startContainers.generateName ?: CSM_DEFAULT_WORKFLOW_NAME)
127135
.labels(startContainers.labels))
128-
.spec(buildWorkflowSpec(csmPlatformProperties, startContainers))
136+
.spec(buildWorkflowSpec(csmPlatformProperties, startContainers, executionTimeout))
129137

130138
private fun buildEntrypointTemplate(startContainers: ScenarioRunStartContainers): Template {
131139
val dagTemplate = Template().name(CSM_DAG_ENTRYPOINT)

scenariorun/src/test/kotlin/com/cosmotech/scenariorun/azure/ScenarioRunServiceImplTests.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ class ScenarioRunServiceImplTests {
335335
workflowId = "my-workflow-id",
336336
workflowName = "my-workflow-name",
337337
containers = containers)
338-
every { workflowService.launchScenarioRun(any()) } returns myScenarioRun
338+
every { workflowService.launchScenarioRun(any(), null) } returns myScenarioRun
339339
every { idGenerator.generate("scenariorun", "sr-") } returns myScenarioRun.id!!
340340

341341
val scenarioRun =
@@ -392,7 +392,7 @@ class ScenarioRunServiceImplTests {
392392
name = "my-container1",
393393
envVars = mapOf("MY_SECRET_ENV_VAR" to "value"),
394394
image = "my-image:latest")))
395-
every { workflowService.launchScenarioRun(any()) } returns myScenarioRun
395+
every { workflowService.launchScenarioRun(any(), any()) } returns myScenarioRun
396396
every { idGenerator.generate("scenariorun", "sr-") } returns myScenarioRun.id!!
397397

398398
val scenarioRunById =

scenariorun/src/test/kotlin/com/cosmotech/scenariorun/workflow/argo/WorkflowBuildersTests.kt

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ class WorkflowBuildersTests {
123123
@Test
124124
fun `Create Workflow Spec with StartContainers not null`() {
125125
val sc = getStartContainersRun()
126-
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc)
126+
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc, null)
127127
assertNotNull(workflowSpec)
128128
}
129129

@@ -136,7 +136,7 @@ class WorkflowBuildersTests {
136136
every { csmPlatformProperties.argo } returns argo
137137

138138
val sc = getStartContainersRun()
139-
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc)
139+
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc, null)
140140
val expected = mapOf("kubernetes.io/os" to "linux", "agentpool" to "highcpupool")
141141

142142
assertEquals(expected, workflowSpec.nodeSelector)
@@ -151,7 +151,7 @@ class WorkflowBuildersTests {
151151
every { csmPlatformProperties.argo } returns argo
152152

153153
val sc = getStartContainersRunDefaultPool()
154-
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc)
154+
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc, null)
155155
val expected = mapOf("kubernetes.io/os" to "linux", "agentpool" to "basicpool")
156156

157157
assertEquals(expected, workflowSpec.nodeSelector)
@@ -160,7 +160,7 @@ class WorkflowBuildersTests {
160160
@Test
161161
fun `Create Workflow Spec with StartContainers no pool`() {
162162
val sc = getStartContainersRunNoPool()
163-
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc)
163+
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc, null)
164164
val expected = mapOf("kubernetes.io/os" to "linux")
165165

166166
assertEquals(expected, workflowSpec.nodeSelector)
@@ -175,31 +175,31 @@ class WorkflowBuildersTests {
175175
every { csmPlatformProperties.argo } returns argo
176176

177177
val sc = getStartContainersRun()
178-
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc)
178+
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc, null)
179179

180180
assertEquals("workflow", workflowSpec.serviceAccountName)
181181
}
182182

183183
@Test
184184
fun `Create Workflow Spec with StartContainers Run name`() {
185185
val sc = getStartContainersRun()
186-
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc)
186+
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc, null)
187187

188188
assertEquals("runContainer", workflowSpec.templates?.getOrNull(0)?.name)
189189
}
190190

191191
@Test
192192
fun `Create Workflow Spec with StartContainers Entrypoint FetchDataset`() {
193193
val sc = getStartContainers()
194-
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc)
194+
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc, null)
195195

196196
assertEquals("entrypoint", workflowSpec.entrypoint)
197197
}
198198

199199
@Test
200200
fun `Create Workflow Spec with StartContainers entrypoint template not null`() {
201201
val sc = getStartContainers()
202-
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc)
202+
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc, null)
203203

204204
val entrypointTemplate =
205205
workflowSpec.templates?.find { template -> template.name.equals("entrypoint") }
@@ -209,7 +209,7 @@ class WorkflowBuildersTests {
209209
@Test
210210
fun `Create Workflow Spec with StartContainers entrypoint dag not null`() {
211211
val sc = getStartContainers()
212-
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc)
212+
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc, null)
213213

214214
val entrypointTemplate =
215215
workflowSpec.templates?.find { template -> template.name.equals("entrypoint") }
@@ -220,7 +220,7 @@ class WorkflowBuildersTests {
220220
@Test
221221
fun `Create Workflow Spec with StartContainers entrypoint with dependencies dag valid`() {
222222
val sc = getStartContainersWithDependencies()
223-
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc)
223+
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc, null)
224224

225225
val entrypointTemplate =
226226
workflowSpec.templates?.find { template -> template.name.equals("entrypoint") }
@@ -263,7 +263,7 @@ class WorkflowBuildersTests {
263263
@Test
264264
fun `Create Workflow Spec with StartContainers entrypoint dag valid`() {
265265
val sc = getStartContainers()
266-
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc)
266+
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc, null)
267267

268268
val entrypointTemplate =
269269
workflowSpec.templates?.find { template -> template.name.equals("entrypoint") }
@@ -306,7 +306,7 @@ class WorkflowBuildersTests {
306306
@Test
307307
fun `Create Workflow Spec with StartContainers entrypoint dag dependencies valid`() {
308308
val sc = getStartContainers()
309-
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc)
309+
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc, null)
310310

311311
val entrypointTemplate =
312312
workflowSpec.templates?.find { template -> template.name.equals("entrypoint") }
@@ -331,7 +331,7 @@ class WorkflowBuildersTests {
331331
@Test
332332
fun `Create Workflow Spec with StartContainers diamond`() {
333333
val sc = getStartContainersDiamond()
334-
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc)
334+
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc, null)
335335

336336
val entrypointTemplate =
337337
workflowSpec.templates?.find { template -> template.name.equals("entrypoint") }
@@ -352,30 +352,30 @@ class WorkflowBuildersTests {
352352
@Test
353353
fun `Create Workflow with StartContainers not null`() {
354354
val sc = getStartContainers()
355-
val workflow = buildWorkflow(csmPlatformProperties, sc)
355+
val workflow = buildWorkflow(csmPlatformProperties, sc, null)
356356
assertNotNull(workflow)
357357
}
358358

359359
@Test
360360
fun `Create Workflow with StartContainers generate name default`() {
361361
val sc = getStartContainers()
362-
val workflow = buildWorkflow(csmPlatformProperties, sc)
362+
val workflow = buildWorkflow(csmPlatformProperties, sc, null)
363363
val expected = V1ObjectMeta().generateName("default-workflow-")
364364
assertEquals(expected, workflow.metadata)
365365
}
366366

367367
@Test
368368
fun `Create Workflow with StartContainers generate name Scenario`() {
369369
val sc = getStartContainersNamed()
370-
val workflow = buildWorkflow(csmPlatformProperties, sc)
370+
val workflow = buildWorkflow(csmPlatformProperties, sc, null)
371371
val expected = V1ObjectMeta().generateName("Scenario-1-")
372372
assertEquals(expected, workflow.metadata)
373373
}
374374

375375
@Test
376376
fun `Create Workflow spec with StartContainers volume claim default values`() {
377377
val sc = getStartContainersDiamond()
378-
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc)
378+
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc, null)
379379
val dataDir =
380380
V1PersistentVolumeClaim()
381381
.metadata(V1ObjectMeta().name(VOLUME_CLAIM))
@@ -394,7 +394,7 @@ class WorkflowBuildersTests {
394394
every { csmPlatformProperties.argo.workflows.storageClass } returns "cosmotech-api-test-phoenix"
395395
every { csmPlatformProperties.argo.workflows.accessModes } returns listOf("ReadWriteMany")
396396
every { csmPlatformProperties.argo.workflows.requests } returns mapOf("storage" to "300Gi")
397-
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc)
397+
val workflowSpec = buildWorkflowSpec(csmPlatformProperties, sc, null)
398398
val dataDir =
399399
V1PersistentVolumeClaim()
400400
.metadata(V1ObjectMeta().name(VOLUME_CLAIM))
@@ -429,7 +429,7 @@ class WorkflowBuildersTests {
429429
@Test
430430
fun `Create Workflow with metadata labels`() {
431431
val sc = getStartContainersWithLabels()
432-
val workflow = buildWorkflowSpec(csmPlatformProperties, sc)
432+
val workflow = buildWorkflowSpec(csmPlatformProperties, sc, null)
433433
val labeledTemplate =
434434
workflow.templates?.find { template -> template.name.equals("fetchDatasetContainer-1") }
435435

solution/src/main/openapi/solution.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,9 @@ components:
629629
runTemplateSourceDir:
630630
type: string
631631
description: an optional directory where to find the run template source
632+
executionTimeout:
633+
type: integer
634+
description: an optional duration in seconds in which a workflow is allowed to run
632635
required:
633636
- id
634637
- name

0 commit comments

Comments
 (0)