Skip to content

Commit a0dbc48

Browse files
authored
Merge pull request #6826 from pokerfaceSad/pytorchjobs_interpreter
Add resource interpreter for PytorchJob
2 parents a66cfdc + b7c8fad commit a0dbc48

File tree

5 files changed

+471
-0
lines changed

5 files changed

+471
-0
lines changed
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
apiVersion: config.karmada.io/v1alpha1
2+
kind: ResourceInterpreterCustomization
3+
metadata:
4+
name: declarative-configuration-pytorchjob
5+
spec:
6+
target:
7+
apiVersion: kubeflow.org/v1
8+
kind: PyTorchJob
9+
customizations:
10+
componentResource:
11+
luaScript: |
12+
local kube = require("kube")
13+
14+
-- Safe fetch of deeply nested table fields.
15+
local function get(obj, path)
16+
local cur = obj
17+
for i = 1, #path do
18+
if cur == nil then
19+
return nil
20+
end
21+
cur = cur[path[i]]
22+
end
23+
return cur
24+
end
25+
26+
-- Normalize possibly-string numbers with a default.
27+
local function to_num(v, default)
28+
if v == nil or v == '' then
29+
return default
30+
end
31+
local n = tonumber(v)
32+
if n ~= nil then
33+
return n
34+
end
35+
return default
36+
end
37+
38+
function GetComponents(observedObj)
39+
local components = {}
40+
41+
-- Master Component
42+
local master_spec = get(observedObj, {"spec", "pytorchReplicaSpecs", "Master"})
43+
if master_spec ~= nil then
44+
local master_replicas = to_num(master_spec.replicas, 1)
45+
local master_template = master_spec.template
46+
47+
local master_requires = {}
48+
if master_template ~= nil then
49+
master_requires = kube.accuratePodRequirements(master_template)
50+
end
51+
52+
local masterComponent = {
53+
name = "master",
54+
replicas = master_replicas,
55+
replicaRequirements = master_requires
56+
}
57+
table.insert(components, masterComponent)
58+
end
59+
60+
-- Worker Component
61+
local worker_spec = get(observedObj, {"spec", "pytorchReplicaSpecs", "Worker"})
62+
if worker_spec ~= nil then
63+
local worker_replicas = to_num(worker_spec.replicas, 1)
64+
local worker_template = worker_spec.template
65+
66+
local worker_requires = {}
67+
if worker_template ~= nil then
68+
worker_requires = kube.accuratePodRequirements(worker_template)
69+
end
70+
71+
local workerComponent = {
72+
name = "worker",
73+
replicas = worker_replicas,
74+
replicaRequirements = worker_requires
75+
}
76+
table.insert(components, workerComponent)
77+
end
78+
79+
return components
80+
end
81+
82+
statusAggregation:
83+
luaScript: >
84+
local function omitEmpty(t)
85+
if t == nil then return nil end
86+
local out = {}
87+
for k, v in pairs(t) do
88+
if type(v) == "table" then
89+
local inner = omitEmpty(v)
90+
if inner ~= nil and next(inner) ~= nil then
91+
out[k] = inner
92+
end
93+
elseif v ~= nil and not (v == 0 or v == "" or v == "0s") then
94+
out[k] = v
95+
end
96+
end
97+
if next(out) ~= nil then
98+
return out
99+
else
100+
return nil
101+
end
102+
end
103+
function AggregateStatus(desiredObj, statusItems)
104+
if desiredObj.status == nil then
105+
desiredObj.status = {}
106+
end
107+
108+
-- If no member cluster status, initialize default status
109+
if statusItems == nil then
110+
desiredObj.status.startTime = nil
111+
desiredObj.status.completionTime = nil
112+
desiredObj.status.replicaStatuses = {}
113+
desiredObj.status.conditions = {}
114+
return desiredObj
115+
end
116+
117+
local startTime = nil
118+
local completionTime = nil
119+
local lastReconcileTime = nil
120+
local replicaStatuses = {}
121+
local aggregatedConditions = {}
122+
local successfulClustersNum = 0
123+
local failedClusters = {}
124+
125+
-- Initialize Master and Worker status
126+
replicaStatuses.Master = { active = 0, failed = 0, succeeded = 0 }
127+
replicaStatuses.Worker = { active = 0, failed = 0, succeeded = 0 }
128+
129+
for i = 1, #statusItems do
130+
if statusItems[i].status ~= nil then
131+
-- Aggregate time fields (earliest start time, latest completion time and reconcile time)
132+
if statusItems[i].status.startTime ~= nil then
133+
if startTime == nil or statusItems[i].status.startTime < startTime then
134+
startTime = statusItems[i].status.startTime
135+
end
136+
end
137+
138+
if statusItems[i].status.completionTime ~= nil then
139+
if completionTime == nil or statusItems[i].status.completionTime > completionTime then
140+
completionTime = statusItems[i].status.completionTime
141+
end
142+
end
143+
144+
if statusItems[i].status.lastReconcileTime ~= nil then
145+
if lastReconcileTime == nil or statusItems[i].status.lastReconcileTime > lastReconcileTime then
146+
lastReconcileTime = statusItems[i].status.lastReconcileTime
147+
end
148+
end
149+
150+
-- Aggregate replica status
151+
if statusItems[i].status.replicaStatuses ~= nil then
152+
if statusItems[i].status.replicaStatuses.Master ~= nil then
153+
replicaStatuses.Master.active = replicaStatuses.Master.active + (statusItems[i].status.replicaStatuses.Master.active or 0)
154+
replicaStatuses.Master.failed = replicaStatuses.Master.failed + (statusItems[i].status.replicaStatuses.Master.failed or 0)
155+
replicaStatuses.Master.succeeded = replicaStatuses.Master.succeeded + (statusItems[i].status.replicaStatuses.Master.succeeded or 0)
156+
end
157+
158+
if statusItems[i].status.replicaStatuses.Worker ~= nil then
159+
replicaStatuses.Worker.active = replicaStatuses.Worker.active + (statusItems[i].status.replicaStatuses.Worker.active or 0)
160+
replicaStatuses.Worker.failed = replicaStatuses.Worker.failed + (statusItems[i].status.replicaStatuses.Worker.failed or 0)
161+
replicaStatuses.Worker.succeeded = replicaStatuses.Worker.succeeded + (statusItems[i].status.replicaStatuses.Worker.succeeded or 0)
162+
end
163+
end
164+
165+
-- Aggregate condition status (merge conditions from all member clusters)
166+
local isFinished = false
167+
local finishedType = ""
168+
if statusItems[i].status.conditions ~= nil then
169+
for _, c in ipairs(statusItems[i].status.conditions) do
170+
-- Like kubernetes native Job, we do not merge conditions from member clusters,
171+
-- but generate a new condition by PytorchJob finish state.
172+
-- table.insert(aggregatedConditions, c)
173+
if (c.type == "Succeeded" or c.type == "Failed") and c.status == "True" then
174+
isFinished = true
175+
finishedType = c.type
176+
end
177+
end
178+
end
179+
180+
if isFinished then
181+
if finishedType == "Succeeded" then
182+
successfulClustersNum = successfulClustersNum + 1
183+
elseif finishedType == "Failed" then
184+
table.insert(failedClusters, statusItems[i].clusterName)
185+
end
186+
end
187+
188+
end
189+
end
190+
191+
if #failedClusters > 0 then
192+
table.insert(aggregatedConditions, {
193+
type = "Failed",
194+
status = "True",
195+
lastProbeTime = os.date("!%Y-%m-%dT%H:%M:%SZ"),
196+
lastTransitionTime = os.date("!%Y-%m-%dT%H:%M:%SZ"),
197+
reason = "PyTorchJobFailed",
198+
message = "PyTorchJob executed failed in member clusters: " .. table.concat(failedClusters, ", ")
199+
})
200+
end
201+
202+
if successfulClustersNum == #statusItems and successfulClustersNum > 0 then
203+
table.insert(aggregatedConditions, {
204+
type = "Succeeded",
205+
status = "True",
206+
lastProbeTime = os.date("!%Y-%m-%dT%H:%M:%SZ"),
207+
lastTransitionTime = os.date("!%Y-%m-%dT%H:%M:%SZ"),
208+
reason = "Completed",
209+
message = "PyTorchJob completed successfully"
210+
})
211+
desiredObj.status.completionTime = completionTime
212+
end
213+
214+
-- Set aggregated status
215+
desiredObj.status.startTime = startTime
216+
desiredObj.status.lastReconcileTime = lastReconcileTime
217+
desiredObj.status.replicaStatuses = replicaStatuses
218+
desiredObj.status.conditions = aggregatedConditions
219+
local tmp = desiredObj.status
220+
desiredObj.status = omitEmpty(tmp)
221+
222+
return desiredObj
223+
end
224+
225+
healthInterpretation:
226+
luaScript: >
227+
function InterpretHealth(observedObj)
228+
if observedObj == nil or
229+
observedObj.status == nil or
230+
observedObj.status.conditions == nil then
231+
return false
232+
end
233+
234+
-- Determine health based on PyTorchJob status
235+
for i = 1, #observedObj.status.conditions do
236+
local condition = observedObj.status.conditions[i]
237+
if condition.type == "Failed" and condition.status == "True" then
238+
return false
239+
end
240+
end
241+
return true
242+
end
243+
244+
dependencyInterpretation:
245+
luaScript: >
246+
local kube = require("kube")
247+
function GetDependencies(desiredObj)
248+
local refs = {}
249+
250+
if desiredObj.spec == nil or desiredObj.spec.pytorchReplicaSpecs == nil then
251+
return refs
252+
end
253+
254+
-- Iterate PyTorchJob replica types
255+
local replicaTypes = {"Master", "Worker"}
256+
for _, replicaType in ipairs(replicaTypes) do
257+
local spec = desiredObj.spec.pytorchReplicaSpecs[replicaType]
258+
if spec ~= nil and spec.template ~= nil then
259+
local deps = kube.getPodDependencies(spec.template, desiredObj.metadata.namespace)
260+
if deps ~= nil then
261+
for _, dep in ipairs(deps) do
262+
table.insert(refs, dep)
263+
end
264+
end
265+
end
266+
end
267+
268+
return refs
269+
end
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
tests:
2+
- desiredInputPath: testdata/desired-pytorchjob.yaml
3+
statusInputPath: testdata/status-file.yaml
4+
operation: AggregateStatus
5+
- observedInputPath: testdata/observed-pytorchjob.yaml
6+
operation: InterpretHealth
7+
- observedInputPath: testdata/observed-pytorchjob.yaml
8+
operation: InterpretComponent
9+
- desiredInputPath: testdata/desired-pytorchjob.yaml
10+
operation: InterpretDependency
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
apiVersion: kubeflow.org/v1
2+
kind: PyTorchJob
3+
metadata:
4+
name: pytorch-simple
5+
namespace: kubeflow
6+
spec:
7+
pytorchReplicaSpecs:
8+
Master:
9+
replicas: 2
10+
restartPolicy: OnFailure
11+
template:
12+
spec:
13+
containers:
14+
- name: pytorch
15+
image: docker.io/kubeflowkatib/pytorch-mnist:v1beta1-45c5727
16+
imagePullPolicy: Always
17+
command:
18+
- "python3"
19+
- "/opt/pytorch-mnist/mnist.py"
20+
- "--epochs=1"
21+
resources:
22+
limits:
23+
cpu: 1
24+
memory: 512Mi
25+
Worker:
26+
replicas: 2
27+
restartPolicy: OnFailure
28+
template:
29+
spec:
30+
serviceAccountName: pytorch-service-account
31+
containers:
32+
- name: pytorch
33+
image: docker.io/kubeflowkatib/pytorch-mnist:v1beta1-45c5727
34+
imagePullPolicy: Always
35+
volumeMounts:
36+
- mountPath: "/train"
37+
name: "training"
38+
command:
39+
- "python3"
40+
- "/opt/pytorch-mnist/mnist.py"
41+
- "--epochs=1"
42+
resources:
43+
limits:
44+
cpu: 1
45+
memory: 512Mi
46+
volumes:
47+
- name: "training"
48+
persistentVolumeClaim:
49+
claimName: "training-data"

0 commit comments

Comments
 (0)