Skip to content

Commit 4f16de2

Browse files
committed
Taking max replica requirement from jm and tms, updating health interpreter, adding getResourceQuantity function to kube library
Signed-off-by: mszacillo <[email protected]>
1 parent f822072 commit 4f16de2

File tree

2 files changed

+49
-14
lines changed
  • pkg/resourceinterpreter
    • customized/declarative/luavm
    • default/thirdparty/resourcecustomizations/flink.apache.org/v1beta1/FlinkDeployment

2 files changed

+49
-14
lines changed

pkg/resourceinterpreter/customized/declarative/luavm/kube.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package luavm
1818

1919
import (
20+
"math"
21+
2022
lua "github.com/yuin/gopher-lua"
2123
corev1 "k8s.io/api/core/v1"
2224
"k8s.io/apimachinery/pkg/api/resource"
@@ -57,6 +59,7 @@ var kubeFuncs = map[string]lua.LGFunction{
5759
"resourceAdd": resourceAdd,
5860
"accuratePodRequirements": accuratePodRequirements,
5961
"getPodDependencies": getPodDependencies,
62+
"getResourceQuantity": getResourceQuantity,
6063
}
6164

6265
func resourceAdd(ls *lua.LState) int {
@@ -127,6 +130,30 @@ func getPodDependencies(ls *lua.LState) int {
127130
return 1
128131
}
129132

133+
func getResourceQuantity(ls *lua.LState) int {
134+
n := ls.GetTop()
135+
if n != 1 {
136+
ls.RaiseError("getResourceQuantity only accepts one argument")
137+
return 0
138+
}
139+
140+
q := checkResourceQuantity(ls, n)
141+
num := q.AsApproximateFloat64()
142+
143+
if num < 0 {
144+
ls.RaiseError("int approximation unexpectedly returned a negative value: %#v,", q)
145+
return 0
146+
}
147+
148+
if math.IsInf(num, 1) {
149+
ls.RaiseError("int approximation unexpectedly returned an infinite value: %#v,", q)
150+
return 0
151+
}
152+
153+
ls.Push(lua.LNumber(num))
154+
return 1
155+
}
156+
130157
func checkResourceQuantity(ls *lua.LState, n int) resource.Quantity {
131158
v := ls.Get(n)
132159
switch typ := v.Type(); typ {

pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/flink.apache.org/v1beta1/FlinkDeployment/customizations.yaml

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ spec:
1111
luaScript: >
1212
function InterpretHealth(observedObj)
1313
if observedObj.status ~= nil and observedObj.status.jobStatus ~= nil then
14-
return observedObj.status.jobStatus.state ~= 'CREATED' and observedObj.status.jobStatus.state ~= 'RECONCILING'
14+
if observedObj.status.jobStatus.state ~= 'CREATED' and observedObj.status.jobStatus.state ~= 'RECONCILING' then
15+
return true
16+
else
17+
return observedObj.status.jobManagerDeploymentStatus == 'ERROR'
18+
end
1519
end
1620
return false
1721
end
@@ -24,33 +28,37 @@ spec:
2428
end
2529
2630
function GetReplicas(observedObj)
27-
-- FlinkDeployments presently will not be subdivided among clusters, replica should be 1
28-
replica = 1
2931
requires = {
3032
resourceRequest = {},
33+
nodeClaim = {},
3134
}
32-
-- Add jobmanager resources into replica requirement
3335
3436
jm_replicas = observedObj.spec.jobManager.replicas
3537
if isempty(jm_replicas) then
3638
jm_replicas = 1
3739
end
3840
39-
for i = 1, jm_replicas do
40-
requires.resourceRequest.cpu = kube.resourceAdd(requires.resourceRequest.cpu, tostring(observedObj.spec.jobManager.resource.cpu))
41-
requires.resourceRequest.memory = kube.resourceAdd(requires.resourceRequest.memory, observedObj.spec.jobManager.resource.memory)
42-
end
41+
parallelism = observedObj.spec.job.parallelism
42+
tm_replicas = math.ceil(parallelism / observedObj.spec.flinkConfiguration['taskmanager.numberOfTaskSlots'])
4343
44-
-- Add task manager resources into replica requirement
44+
replica = jm_replicas + tm_replicas
4545
46-
parallelism = observedObj.spec.job.parallelism
47-
tms = math.ceil(parallelism / observedObj.spec.flinkConfiguration['taskmanager.numberOfTaskSlots'])
46+
-- Until multiple podTemplates are supported in replicaRequirements, take max of cpu + memory values as requirement
4847
49-
for i = 1, tms do
50-
requires.resourceRequest.cpu = kube.resourceAdd(requires.resourceRequest.cpu, tostring(observedObj.spec.taskManager.resource.cpu))
51-
requires.resourceRequest.memory = kube.resourceAdd(requires.resourceRequest.memory, observedObj.spec.taskManager.resource.memory)
48+
requires.resourceRequest.cpu = math.max(observedObj.spec.taskManager.resource.cpu, observedObj.spec.jobManager.resource.cpu)
49+
jm_memory_value = kube.getResourceQuantity(observedObj.spec.jobManager.resource.memory)
50+
tm_memory_value = kube.getResourceQuantity(observedObj.spec.taskManager.resource.memory)
51+
if jm_memory_value > tm_memory_value then
52+
requires.resourceRequest.memory = observedObj.spec.jobManager.resource.memory
53+
else
54+
requires.resourceRequest.memory = observedObj.spec.taskManager.resource.memory
5255
end
5356
57+
-- Until multiple podTemplates are supported, interpreter will only take affinity and toleration input to common podTemplate
58+
59+
requires.nodeClaim.nodeSelector = observedObj.spec.podTemplate.spec.nodeSelector
60+
requires.nodeClaim.tolerations = observedObj.spec.podTemplate.spec.tolerations
61+
5462
return replica, requires
5563
end
5664
statusAggregation:

0 commit comments

Comments
 (0)