|
| 1 | +apiVersion: config.karmada.io/v1alpha1 |
| 2 | +kind: ResourceInterpreterCustomization |
| 3 | +metadata: |
| 4 | + name: declarative-configuration-tfjob |
| 5 | +spec: |
| 6 | + target: |
| 7 | + apiVersion: kubeflow.org/v1 |
| 8 | + kind: TFJob |
| 9 | + customizations: |
| 10 | + componentResource: |
| 11 | + luaScript: | |
| 12 | + local kube = require("kube") |
| 13 | + |
| 14 | + -- Safe fetch of deeply nested table fields. |
| 15 | + local function get(obj, path) |
| 16 | + local cur = obj |
| 17 | + for i = 1, #path do |
| 18 | + if cur == nil then |
| 19 | + return nil |
| 20 | + end |
| 21 | + cur = cur[path[i]] |
| 22 | + end |
| 23 | + return cur |
| 24 | + end |
| 25 | + |
| 26 | + -- Normalize possibly-string numbers with a default. |
| 27 | + local function to_num(v, default) |
| 28 | + if v == nil or v == '' then |
| 29 | + return default |
| 30 | + end |
| 31 | + local n = tonumber(v) |
| 32 | + if n ~= nil then |
| 33 | + return n |
| 34 | + end |
| 35 | + return default |
| 36 | + end |
| 37 | + |
| 38 | + function GetComponents(observedObj) |
| 39 | + local components = {} |
| 40 | + |
| 41 | + -- Define all supported TFJob replica types |
| 42 | + local replicaTypes = { |
| 43 | + {key = "PS", name = "ps"}, |
| 44 | + {key = "Worker", name = "worker"}, |
| 45 | + {key = "Chief", name = "chief"}, |
| 46 | + {key = "Master", name = "master"}, |
| 47 | + {key = "Evaluator", name = "evaluator"} |
| 48 | + } |
| 49 | + |
| 50 | + -- Iterate through all replica types |
| 51 | + for _, replicaType in ipairs(replicaTypes) do |
| 52 | + local spec = get(observedObj, {"spec", "tfReplicaSpecs", replicaType.key}) |
| 53 | + if spec ~= nil then |
| 54 | + local replicas = to_num(spec.replicas, 1) |
| 55 | + local template = spec.template |
| 56 | + |
| 57 | + local requires = {} |
| 58 | + if template ~= nil then |
| 59 | + requires = kube.accuratePodRequirements(template) |
| 60 | + end |
| 61 | + |
| 62 | + local component = { |
| 63 | + name = replicaType.name, |
| 64 | + replicas = replicas, |
| 65 | + replicaRequirements = requires |
| 66 | + } |
| 67 | + table.insert(components, component) |
| 68 | + end |
| 69 | + end |
| 70 | + |
| 71 | + return components |
| 72 | + end |
| 73 | +
|
| 74 | + statusAggregation: |
| 75 | + luaScript: > |
| 76 | + local function omitEmpty(t) |
| 77 | + if t == nil then return nil end |
| 78 | + local out = {} |
| 79 | + for k, v in pairs(t) do |
| 80 | + if type(v) == "table" then |
| 81 | + local inner = omitEmpty(v) |
| 82 | + if inner ~= nil and next(inner) ~= nil then |
| 83 | + out[k] = inner |
| 84 | + end |
| 85 | + elseif v ~= nil and not (v == 0 or v == "" or v == "0s") then |
| 86 | + out[k] = v |
| 87 | + end |
| 88 | + end |
| 89 | + if next(out) ~= nil then |
| 90 | + return out |
| 91 | + else |
| 92 | + return nil |
| 93 | + end |
| 94 | + end |
| 95 | + function AggregateStatus(desiredObj, statusItems) |
| 96 | + if desiredObj.status == nil then |
| 97 | + desiredObj.status = {} |
| 98 | + end |
| 99 | + |
| 100 | + -- If no member cluster status, initialize default status |
| 101 | + if statusItems == nil then |
| 102 | + desiredObj.status.startTime = nil |
| 103 | + desiredObj.status.completionTime = nil |
| 104 | + desiredObj.status.replicaStatuses = {} |
| 105 | + desiredObj.status.conditions = {} |
| 106 | + return desiredObj |
| 107 | + end |
| 108 | + |
| 109 | + local startTime = nil |
| 110 | + local completionTime = nil |
| 111 | + local lastReconcileTime = nil |
| 112 | + local replicaStatuses = {} |
| 113 | + local aggregatedConditions = {} |
| 114 | + local successfulClustersNum = 0 |
| 115 | + local failedClusters = {} |
| 116 | + |
| 117 | + -- Initialize all TFJob replica types status |
| 118 | + replicaStatuses.PS = { active = 0, failed = 0, succeeded = 0 } |
| 119 | + replicaStatuses.Worker = { active = 0, failed = 0, succeeded = 0 } |
| 120 | + replicaStatuses.Chief = { active = 0, failed = 0, succeeded = 0 } |
| 121 | + replicaStatuses.Master = { active = 0, failed = 0, succeeded = 0 } |
| 122 | + replicaStatuses.Evaluator = { active = 0, failed = 0, succeeded = 0 } |
| 123 | + |
| 124 | + for i = 1, #statusItems do |
| 125 | + if statusItems[i].status ~= nil then |
| 126 | + -- Aggregate time fields (earliest start time, latest completion time and reconcile time) |
| 127 | + if statusItems[i].status.startTime ~= nil then |
| 128 | + if startTime == nil or statusItems[i].status.startTime < startTime then |
| 129 | + startTime = statusItems[i].status.startTime |
| 130 | + end |
| 131 | + end |
| 132 | + |
| 133 | + if statusItems[i].status.completionTime ~= nil then |
| 134 | + if completionTime == nil or statusItems[i].status.completionTime > completionTime then |
| 135 | + completionTime = statusItems[i].status.completionTime |
| 136 | + end |
| 137 | + end |
| 138 | + |
| 139 | + if statusItems[i].status.lastReconcileTime ~= nil then |
| 140 | + if lastReconcileTime == nil or statusItems[i].status.lastReconcileTime > lastReconcileTime then |
| 141 | + lastReconcileTime = statusItems[i].status.lastReconcileTime |
| 142 | + end |
| 143 | + end |
| 144 | +
|
| 145 | + -- Aggregate replica status for all TFJob replica types |
| 146 | + if statusItems[i].status.replicaStatuses ~= nil then |
| 147 | + local replicaTypes = {"PS", "Worker", "Chief", "Master", "Evaluator"} |
| 148 | + for _, replicaType in ipairs(replicaTypes) do |
| 149 | + if statusItems[i].status.replicaStatuses[replicaType] ~= nil then |
| 150 | + replicaStatuses[replicaType].active = replicaStatuses[replicaType].active + (statusItems[i].status.replicaStatuses[replicaType].active or 0) |
| 151 | + replicaStatuses[replicaType].failed = replicaStatuses[replicaType].failed + (statusItems[i].status.replicaStatuses[replicaType].failed or 0) |
| 152 | + replicaStatuses[replicaType].succeeded = replicaStatuses[replicaType].succeeded + (statusItems[i].status.replicaStatuses[replicaType].succeeded or 0) |
| 153 | + end |
| 154 | + end |
| 155 | + end |
| 156 | + |
| 157 | + -- Aggregate condition status (merge conditions from all member clusters) |
| 158 | + local isFinished = false |
| 159 | + local finishedType = "" |
| 160 | + if statusItems[i].status.conditions ~= nil then |
| 161 | + for _, c in ipairs(statusItems[i].status.conditions) do |
| 162 | + -- Like kubernetes native Job, we do not merge conditions from member clusters, |
| 163 | + -- but generate a new condition by TFJob finish state. |
| 164 | + -- table.insert(aggregatedConditions, c) |
| 165 | + if (c.type == "Succeeded" or c.type == "Failed") and c.status == "True" then |
| 166 | + isFinished = true |
| 167 | + finishedType = c.type |
| 168 | + end |
| 169 | + end |
| 170 | + end |
| 171 | +
|
| 172 | + if isFinished then |
| 173 | + if finishedType == "Succeeded" then |
| 174 | + successfulClustersNum = successfulClustersNum + 1 |
| 175 | + elseif finishedType == "Failed" then |
| 176 | + table.insert(failedClusters, statusItems[i].clusterName) |
| 177 | + end |
| 178 | + end |
| 179 | +
|
| 180 | + end |
| 181 | + end |
| 182 | + |
| 183 | + if #failedClusters > 0 then |
| 184 | + table.insert(aggregatedConditions, { |
| 185 | + type = "Failed", |
| 186 | + status = "True", |
| 187 | + lastProbeTime = os.date("!%Y-%m-%dT%H:%M:%SZ"), |
| 188 | + lastTransitionTime = os.date("!%Y-%m-%dT%H:%M:%SZ"), |
| 189 | + reason = "TFJobFailed", |
| 190 | + message = "TFJob executed failed in member clusters: " .. table.concat(failedClusters, ", ") |
| 191 | + }) |
| 192 | + end |
| 193 | +
|
| 194 | + if successfulClustersNum == #statusItems and successfulClustersNum > 0 then |
| 195 | + table.insert(aggregatedConditions, { |
| 196 | + type = "Succeeded", |
| 197 | + status = "True", |
| 198 | + lastProbeTime = os.date("!%Y-%m-%dT%H:%M:%SZ"), |
| 199 | + lastTransitionTime = os.date("!%Y-%m-%dT%H:%M:%SZ"), |
| 200 | + reason = "Completed", |
| 201 | + message = "TFJob completed successfully" |
| 202 | + }) |
| 203 | + desiredObj.status.completionTime = completionTime |
| 204 | + end |
| 205 | + |
| 206 | + -- Set aggregated status |
| 207 | + desiredObj.status.startTime = startTime |
| 208 | + desiredObj.status.lastReconcileTime = lastReconcileTime |
| 209 | + desiredObj.status.replicaStatuses = replicaStatuses |
| 210 | + desiredObj.status.conditions = aggregatedConditions |
| 211 | + local tmp = desiredObj.status |
| 212 | + desiredObj.status = omitEmpty(tmp) |
| 213 | +
|
| 214 | + return desiredObj |
| 215 | + end |
| 216 | + |
| 217 | + healthInterpretation: |
| 218 | + luaScript: > |
| 219 | + function InterpretHealth(observedObj) |
| 220 | + if observedObj == nil or |
| 221 | + observedObj.status == nil or |
| 222 | + observedObj.status.conditions == nil then |
| 223 | + return false |
| 224 | + end |
| 225 | + |
| 226 | + -- Determine health based on TFJob status |
| 227 | + for i = 1, #observedObj.status.conditions do |
| 228 | + local condition = observedObj.status.conditions[i] |
| 229 | + if condition.type == "Failed" and condition.status == "True" then |
| 230 | + return false |
| 231 | + end |
| 232 | + end |
| 233 | + return true |
| 234 | + end |
| 235 | +
|
| 236 | + dependencyInterpretation: |
| 237 | + luaScript: > |
| 238 | + local kube = require("kube") |
| 239 | + function GetDependencies(desiredObj) |
| 240 | + local refs = {} |
| 241 | + |
| 242 | + if desiredObj.spec == nil or desiredObj.spec.tfReplicaSpecs == nil then |
| 243 | + return refs |
| 244 | + end |
| 245 | + |
| 246 | + -- Iterate all TFJob replica types |
| 247 | + local replicaTypes = {"PS", "Worker", "Chief", "Master", "Evaluator"} |
| 248 | + for _, replicaType in ipairs(replicaTypes) do |
| 249 | + local spec = desiredObj.spec.tfReplicaSpecs[replicaType] |
| 250 | + if spec ~= nil and spec.template ~= nil then |
| 251 | + local deps = kube.getPodDependencies(spec.template, desiredObj.metadata.namespace) |
| 252 | + if deps ~= nil then |
| 253 | + for _, dep in ipairs(deps) do |
| 254 | + table.insert(refs, dep) |
| 255 | + end |
| 256 | + end |
| 257 | + end |
| 258 | + end |
| 259 | + |
| 260 | + return refs |
| 261 | + end |
0 commit comments