Skip to content

Commit 82e2c77

Browse files
committed
Add resource interpreter for TFJob.
Signed-off-by: Xinyuan Lyu <[email protected]>
1 parent 9f3ca5c commit 82e2c77

File tree

5 files changed

+436
-0
lines changed

5 files changed

+436
-0
lines changed
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
apiVersion: config.karmada.io/v1alpha1
2+
kind: ResourceInterpreterCustomization
3+
metadata:
4+
name: declarative-configuration-tfjob
5+
spec:
6+
target:
7+
apiVersion: kubeflow.org/v1
8+
kind: TFJob
9+
customizations:
10+
componentResource:
11+
luaScript: |
12+
local kube = require("kube")
13+
14+
-- Safe fetch of deeply nested table fields.
15+
local function get(obj, path)
16+
local cur = obj
17+
for i = 1, #path do
18+
if cur == nil then
19+
return nil
20+
end
21+
cur = cur[path[i]]
22+
end
23+
return cur
24+
end
25+
26+
-- Normalize possibly-string numbers with a default.
27+
local function to_num(v, default)
28+
if v == nil or v == '' then
29+
return default
30+
end
31+
local n = tonumber(v)
32+
if n ~= nil then
33+
return n
34+
end
35+
return default
36+
end
37+
38+
function GetComponents(observedObj)
39+
local components = {}
40+
41+
-- Define all supported TFJob replica types
42+
local replicaTypes = {
43+
{key = "PS", name = "ps"},
44+
{key = "Worker", name = "worker"},
45+
{key = "Chief", name = "chief"},
46+
{key = "Master", name = "master"},
47+
{key = "Evaluator", name = "evaluator"}
48+
}
49+
50+
-- Iterate through all replica types
51+
for _, replicaType in ipairs(replicaTypes) do
52+
local spec = get(observedObj, {"spec", "tfReplicaSpecs", replicaType.key})
53+
if spec ~= nil then
54+
local replicas = to_num(spec.replicas, 1)
55+
local template = spec.template
56+
57+
local requires = {}
58+
if template ~= nil then
59+
requires = kube.accuratePodRequirements(template)
60+
end
61+
62+
local component = {
63+
name = replicaType.name,
64+
replicas = replicas,
65+
replicaRequirements = requires
66+
}
67+
table.insert(components, component)
68+
end
69+
end
70+
71+
return components
72+
end
73+
74+
statusAggregation:
75+
luaScript: >
76+
local function omitEmpty(t)
77+
if t == nil then return nil end
78+
local out = {}
79+
for k, v in pairs(t) do
80+
if type(v) == "table" then
81+
local inner = omitEmpty(v)
82+
if inner ~= nil and next(inner) ~= nil then
83+
out[k] = inner
84+
end
85+
elseif v ~= nil and not (v == 0 or v == "" or v == "0s") then
86+
out[k] = v
87+
end
88+
end
89+
if next(out) ~= nil then
90+
return out
91+
else
92+
return nil
93+
end
94+
end
95+
function AggregateStatus(desiredObj, statusItems)
96+
if desiredObj.status == nil then
97+
desiredObj.status = {}
98+
end
99+
100+
-- If no member cluster status, initialize default status
101+
if statusItems == nil then
102+
desiredObj.status.startTime = nil
103+
desiredObj.status.completionTime = nil
104+
desiredObj.status.replicaStatuses = {}
105+
desiredObj.status.conditions = {}
106+
return desiredObj
107+
end
108+
109+
local startTime = nil
110+
local completionTime = nil
111+
local lastReconcileTime = nil
112+
local replicaStatuses = {}
113+
local aggregatedConditions = {}
114+
local successfulClustersNum = 0
115+
local failedClusters = {}
116+
117+
-- Initialize all TFJob replica types status
118+
replicaStatuses.PS = { active = 0, failed = 0, succeeded = 0 }
119+
replicaStatuses.Worker = { active = 0, failed = 0, succeeded = 0 }
120+
replicaStatuses.Chief = { active = 0, failed = 0, succeeded = 0 }
121+
replicaStatuses.Master = { active = 0, failed = 0, succeeded = 0 }
122+
replicaStatuses.Evaluator = { active = 0, failed = 0, succeeded = 0 }
123+
124+
for i = 1, #statusItems do
125+
if statusItems[i].status ~= nil then
126+
-- Aggregate time fields (earliest start time, latest completion time and reconcile time)
127+
if statusItems[i].status.startTime ~= nil then
128+
if startTime == nil or statusItems[i].status.startTime < startTime then
129+
startTime = statusItems[i].status.startTime
130+
end
131+
end
132+
133+
if statusItems[i].status.completionTime ~= nil then
134+
if completionTime == nil or statusItems[i].status.completionTime > completionTime then
135+
completionTime = statusItems[i].status.completionTime
136+
end
137+
end
138+
139+
if statusItems[i].status.lastReconcileTime ~= nil then
140+
if lastReconcileTime == nil or statusItems[i].status.lastReconcileTime > lastReconcileTime then
141+
lastReconcileTime = statusItems[i].status.lastReconcileTime
142+
end
143+
end
144+
145+
-- Aggregate replica status for all TFJob replica types
146+
if statusItems[i].status.replicaStatuses ~= nil then
147+
local replicaTypes = {"PS", "Worker", "Chief", "Master", "Evaluator"}
148+
for _, replicaType in ipairs(replicaTypes) do
149+
if statusItems[i].status.replicaStatuses[replicaType] ~= nil then
150+
replicaStatuses[replicaType].active = replicaStatuses[replicaType].active + (statusItems[i].status.replicaStatuses[replicaType].active or 0)
151+
replicaStatuses[replicaType].failed = replicaStatuses[replicaType].failed + (statusItems[i].status.replicaStatuses[replicaType].failed or 0)
152+
replicaStatuses[replicaType].succeeded = replicaStatuses[replicaType].succeeded + (statusItems[i].status.replicaStatuses[replicaType].succeeded or 0)
153+
end
154+
end
155+
end
156+
157+
-- Aggregate condition status (merge conditions from all member clusters)
158+
local isFinished = false
159+
local finishedType = ""
160+
if statusItems[i].status.conditions ~= nil then
161+
for _, c in ipairs(statusItems[i].status.conditions) do
162+
-- Like kubernetes native Job, we do not merge conditions from member clusters,
163+
-- but generate a new condition by TFJob finish state.
164+
-- table.insert(aggregatedConditions, c)
165+
if (c.type == "Succeeded" or c.type == "Failed") and c.status == "True" then
166+
isFinished = true
167+
finishedType = c.type
168+
end
169+
end
170+
end
171+
172+
if isFinished then
173+
if finishedType == "Succeeded" then
174+
successfulClustersNum = successfulClustersNum + 1
175+
elseif finishedType == "Failed" then
176+
table.insert(failedClusters, statusItems[i].clusterName)
177+
end
178+
end
179+
180+
end
181+
end
182+
183+
if #failedClusters > 0 then
184+
table.insert(aggregatedConditions, {
185+
type = "Failed",
186+
status = "True",
187+
lastProbeTime = os.date("!%Y-%m-%dT%H:%M:%SZ"),
188+
lastTransitionTime = os.date("!%Y-%m-%dT%H:%M:%SZ"),
189+
reason = "TFJobFailed",
190+
message = "TFJob executed failed in member clusters: " .. table.concat(failedClusters, ", ")
191+
})
192+
end
193+
194+
if successfulClustersNum == #statusItems and successfulClustersNum > 0 then
195+
table.insert(aggregatedConditions, {
196+
type = "Succeeded",
197+
status = "True",
198+
lastProbeTime = os.date("!%Y-%m-%dT%H:%M:%SZ"),
199+
lastTransitionTime = os.date("!%Y-%m-%dT%H:%M:%SZ"),
200+
reason = "Completed",
201+
message = "TFJob completed successfully"
202+
})
203+
desiredObj.status.completionTime = completionTime
204+
end
205+
206+
-- Set aggregated status
207+
desiredObj.status.startTime = startTime
208+
desiredObj.status.lastReconcileTime = lastReconcileTime
209+
desiredObj.status.replicaStatuses = replicaStatuses
210+
desiredObj.status.conditions = aggregatedConditions
211+
local tmp = desiredObj.status
212+
desiredObj.status = omitEmpty(tmp)
213+
214+
return desiredObj
215+
end
216+
217+
healthInterpretation:
218+
luaScript: >
219+
function InterpretHealth(observedObj)
220+
if observedObj == nil or
221+
observedObj.status == nil or
222+
observedObj.status.conditions == nil then
223+
return false
224+
end
225+
226+
-- Determine health based on TFJob status
227+
for i = 1, #observedObj.status.conditions do
228+
local condition = observedObj.status.conditions[i]
229+
if condition.type == "Failed" and condition.status == "True" then
230+
return false
231+
end
232+
end
233+
return true
234+
end
235+
236+
dependencyInterpretation:
237+
luaScript: >
238+
local kube = require("kube")
239+
function GetDependencies(desiredObj)
240+
local refs = {}
241+
242+
if desiredObj.spec == nil or desiredObj.spec.tfReplicaSpecs == nil then
243+
return refs
244+
end
245+
246+
-- Iterate all TFJob replica types
247+
local replicaTypes = {"PS", "Worker", "Chief", "Master", "Evaluator"}
248+
for _, replicaType in ipairs(replicaTypes) do
249+
local spec = desiredObj.spec.tfReplicaSpecs[replicaType]
250+
if spec ~= nil and spec.template ~= nil then
251+
local deps = kube.getPodDependencies(spec.template, desiredObj.metadata.namespace)
252+
if deps ~= nil then
253+
for _, dep in ipairs(deps) do
254+
table.insert(refs, dep)
255+
end
256+
end
257+
end
258+
end
259+
260+
return refs
261+
end
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
tests:
2+
- desiredInputPath: testdata/desired-tfjob.yaml
3+
statusInputPath: testdata/status-file.yaml
4+
operation: AggregateStatus
5+
- observedInputPath: testdata/observed-tfjob.yaml
6+
operation: InterpretHealth
7+
- observedInputPath: testdata/observed-tfjob.yaml
8+
operation: InterpretComponent
9+
- desiredInputPath: testdata/desired-tfjob.yaml
10+
operation: InterpretDependency
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
apiVersion: kubeflow.org/v1
2+
kind: TFJob
3+
metadata:
4+
name: dist-mnist-for-e2e-test
5+
namespace: default
6+
spec:
7+
tfReplicaSpecs:
8+
PS:
9+
replicas: 2
10+
restartPolicy: Never
11+
template:
12+
spec:
13+
containers:
14+
- image: kubeflow/tf-dist-mnist-test:latest
15+
name: tensorflow
16+
resources:
17+
limits:
18+
cpu: 1
19+
memory: 512Mi
20+
Worker:
21+
replicas: 4
22+
restartPolicy: Never
23+
template:
24+
spec:
25+
containers:
26+
- image: kubeflow/tf-dist-mnist-test:latest
27+
name: tensorflow
28+
volumeMounts:
29+
- mountPath: "/train"
30+
name: "training"
31+
resources:
32+
limits:
33+
cpu: 1
34+
memory: 512Mi
35+
volumes:
36+
- name: "training"
37+
persistentVolumeClaim:
38+
claimName: "training-data"
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
apiVersion: kubeflow.org/v1
2+
kind: TFJob
3+
metadata:
4+
name: dist-mnist-for-e2e-test
5+
namespace: default
6+
spec:
7+
tfReplicaSpecs:
8+
PS:
9+
replicas: 2
10+
restartPolicy: Never
11+
template:
12+
spec:
13+
containers:
14+
- image: kubeflow/tf-dist-mnist-test:latest
15+
name: tensorflow
16+
resources:
17+
limits:
18+
cpu: 1
19+
memory: 512Mi
20+
Worker:
21+
replicas: 4
22+
restartPolicy: Never
23+
template:
24+
spec:
25+
containers:
26+
- command:
27+
- bash
28+
- -c
29+
- sleep 15s
30+
image: kubeflow/tf-dist-mnist-test:latest
31+
name: tensorflow
32+
volumeMounts:
33+
- mountPath: "/train"
34+
name: "training"
35+
resources:
36+
limits:
37+
cpu: 1
38+
memory: 512Mi
39+
volumes:
40+
- name: "training"
41+
persistentVolumeClaim:
42+
claimName: "training-data"
43+
status:
44+
completionTime: "2025-11-22T10:49:40Z"
45+
conditions:
46+
- lastTransitionTime: "2025-11-22T10:48:50Z"
47+
lastUpdateTime: "2025-11-22T10:48:50Z"
48+
message: TFJob dist-mnist-for-e2e-test is created.
49+
reason: TFJobCreated
50+
status: "True"
51+
type: Created
52+
- lastTransitionTime: "2025-11-22T10:49:24Z"
53+
lastUpdateTime: "2025-11-22T10:49:24Z"
54+
message: TFJob default/dist-mnist-for-e2e-test is running.
55+
reason: TFJobRunning
56+
status: "False"
57+
type: Running
58+
- lastTransitionTime: "2025-11-22T10:49:40Z"
59+
lastUpdateTime: "2025-11-22T10:49:40Z"
60+
message: TFJob default/dist-mnist-for-e2e-test successfully completed.
61+
reason: TFJobSucceeded
62+
status: "True"
63+
type: Succeeded
64+
replicaStatuses:
65+
PS:
66+
succeeded: 2
67+
Worker:
68+
succeeded: 4

0 commit comments

Comments
 (0)