Skip to content

Commit 6c9584b

Browse files
committed
feat: allow variadic tasks/nodes
currently we do not make it easy to grow/shrink because the flux operator job command is static. If we define nodes and tasks as a variable we can instead allow the jobs to grow/shrink and adapt appropriately. Signed-off-by: vsoch <[email protected]>
1 parent 448655e commit 6c9584b

File tree

10 files changed

+42
-10
lines changed

10 files changed

+42
-10
lines changed

api/v1alpha1/statemachine_types.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ type JobConfig struct {
165165
// +optional
166166
Nodes int32 `json:"nodes,omitempty"`
167167

168+
// Number of tasks
169+
// +optional
170+
Tasks int32 `json:"tasks,omitempty"`
171+
168172
// Cores per task per job
169173
// 6 frontier / 3 summit / 5 on lassen (vsoch: this used to be 6 default)
170174
// +kubebuilder:default=3

config/crd/bases/state-machine.converged-computing.org_statemachines.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ spec:
101101
because we assume the initial data is bad.
102102
RetryFailure
103103
type: boolean
104+
tasks:
105+
description: Number of tasks
106+
format: int32
107+
type: integer
104108
walltime:
105109
description: Walltime (in string format) for the job
106110
type: string

examples/dist/state-machine-operator-dev.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ spec:
109109
because we assume the initial data is bad.
110110
RetryFailure
111111
type: boolean
112+
tasks:
113+
description: Number of tasks
114+
format: int32
115+
type: integer
112116
walltime:
113117
description: Walltime (in string format) for the job
114118
type: string

examples/test/lammps-metrics.yaml

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,22 @@ metadata:
55
spec:
66
manager:
77
pullPolicy: Never
8-
interactive: true
98
workflow:
109
completed: 10
1110

1211
events:
1312
# Two studies:
14-
# If lammps runtime is > 3:00 add a node (do with autoscaler)
13+
# If lammps runtime is > 30 add a node (do with autoscaler)
1514
# what is the base case
1615
- metric: mean.lammps.duration
17-
when: ">= 360"
16+
when: ">= 30"
1817
action: grow
19-
repetitions: 3
18+
repetitions: 2
2019
# Require checks between before doing again
2120
# TODO check if this is working correctly.
2221
# also check the .get()
2322
backoff: 3
24-
maxSize: 10
23+
maxSize: 5
2524

2625
cluster:
2726
maxSize: 2
@@ -34,16 +33,14 @@ spec:
3433
image: rockylinux:9
3534
script: echo This is a setup for lammps
3635

37-
# Note that this step always fails and we never make it to C
38-
# This should end the workflow early
3936
- name: lammps
4037
properties:
4138
minicluster: "yes"
4239
config:
4340
nodes: 4
4441
image: ghcr.io/converged-computing/metric-lammps-cpu:zen4
4542
workdir: /opt/lammps/examples/reaxff/HNS/
46-
script: lmp -v x 4 -v y 4 -v z 4 -in ./in.reaxff.hns -nocite
43+
script: flux run -N $nodes lmp -v x 4 -v y 4 -v z 4 -in ./in.reaxff.hns -nocite
4744

4845
- name: job_c
4946
config:

internal/controller/manager/jobs/jobs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ cd -
2626
jobid="{{ jobid }}"
2727
outpath="{{ workdir }}"
2828
registry="{{ registry }}"
29+
{% if cores_per_task %}cores_per_task={{ cores_per_task }}{% endif %}
30+
{% if nodes %}nodes={{ nodes }}{% endif %}
31+
{% if tasks %}tasks={{ tasks }}{% endif %}
2932
{% if pull %}pull_tag={{ pull }}{% endif %}
3033
{% if push %}push_tag={{ push }}{% endif %}
3134

internal/controller/manager/jobs/templates/components.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ config:
33
nnodes: {{ if .Job.Config.Nodes }}{{ .Job.Config.Nodes }}{{ else }}1{{ end }}
44
cores_per_task: {{ if .Job.Config.CoresPerTask }}{{ .Job.Config.CoresPerTask }}{{ else }}6{{ end }}
55
ngpus: {{ .Job.Config.Gpus }}
6+
{{ if .Job.Config.Tasks }}tasks: {{ .Job.Config.Tasks }}{{ end }}
67
{{ if .Job.Config.Walltime }}walltime: '{{ .Job.Config.Walltime }}'{{ end }}
78
# Kubernetes specific settings
89
{{ if .Job.Config.GPULabel }}gpulabel: {{ .Job.Config.GPULabel }}{{ end }}

python/state_machine_operator/manager/manager.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -472,11 +472,14 @@ def check_metrics(self, job):
472472

473473
def trigger_grow(self, trigger, step_name, value):
474474
"""
475-
Trigger the job to grow
475+
Trigger the job to grow.
476+
477+
Note that this is more of a static grow - subsequent jobs will be given more
478+
nodes. It doesn't give currently running jobs more.
476479
"""
477480
previous = self.workflow.jobs[step_name]["config"]["nnodes"]
478481
max_size = trigger.action.max_size
479-
if max_size >= previous + 1:
482+
if previous + 1 >= max_size:
480483
LOGGER.info(
481484
f"Grow triggered: {trigger.action.metric} {trigger.when} ({value}), already >= max size {max_size}"
482485
)
@@ -523,9 +526,13 @@ def trigger_workflow_action(self, trigger, step_name, value):
523526
)
524527
self.complete_workflow()
525528

529+
# TODO: think about use case / mechanism for dynamic grow.
530+
# It would likely need to be requested by the application.
531+
# Static grow increases subsequent nodes for a job
526532
if trigger.action.name == "grow":
527533
self.trigger_grow(trigger, step_name, value)
528534

535+
# Static shrink decreases subsequent nodes for a job
529536
if trigger.action.name == "shrink":
530537
self.trigger_shrink(trigger, step_name, value)
531538

python/state_machine_operator/tracker/kubernetes/tracker.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ def submit_minicluster_job(self, step, jobid):
315315
"workingDir": step.workdir,
316316
"name": container_name,
317317
"pullAlways": pull_always,
318+
"launcher": True,
318319
"volumes": {
319320
step.name: {
320321
"configMapName": step.name,
@@ -570,6 +571,7 @@ def create_step(self, jobid):
570571
cores_per_task=self.ncores,
571572
gpus=self.ngpus,
572573
workdir=workdir,
574+
tasks=self.tasks,
573575
)
574576

575577
if "script" in self.job_desc:
@@ -584,6 +586,9 @@ def create_step(self, jobid):
584586
"push": self.push_to,
585587
"registry": self.registry_host,
586588
"plain_http": self.registry_plain_http,
589+
"nodes": step.nodes,
590+
"cores_per_task": self.ncores,
591+
"tasks": step.tasks,
587592
}
588593
step.script = Template(self.job_desc["script"]).render(**kwargs)
589594

python/state_machine_operator/tracker/tracker.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ def type(self):
124124
def nnodes(self):
125125
return int(self.config.get("nnodes", 1))
126126

127+
@property
128+
def tasks(self):
129+
tasks = self.config.get("tasks")
130+
if tasks is not None:
131+
return int(tasks)
132+
127133
@property
128134
def ncores(self):
129135
return int(self.config.get("cores_per_task", 1))

python/state_machine_operator/tracker/types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class JobSetup:
2828
nodes: int
2929
cores_per_task: int
3030
script: str = None
31+
tasks: int = None
3132
walltime: str = None
3233
gpus: int = 0
3334
workdir: str = None

0 commit comments

Comments
 (0)