Skip to content

Commit c0118ef

Browse files
authored
Merge pull request #5023 from mszacillo/flinkdeployment-crd
Adding FlinkDeployment CRD to supported third party resource customizations
2 parents c0c5569 + 4c8e4de commit c0118ef

File tree

6 files changed

+283
-0
lines changed

6 files changed

+283
-0
lines changed

pkg/resourceinterpreter/customized/declarative/luavm/kube.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package luavm
1818

1919
import (
20+
"math"
21+
2022
lua "github.com/yuin/gopher-lua"
2123
corev1 "k8s.io/api/core/v1"
2224
"k8s.io/apimachinery/pkg/api/resource"
@@ -57,6 +59,7 @@ var kubeFuncs = map[string]lua.LGFunction{
5759
"resourceAdd": resourceAdd,
5860
"accuratePodRequirements": accuratePodRequirements,
5961
"getPodDependencies": getPodDependencies,
62+
"getResourceQuantity": getResourceQuantity,
6063
}
6164

6265
func resourceAdd(ls *lua.LState) int {
@@ -127,6 +130,30 @@ func getPodDependencies(ls *lua.LState) int {
127130
return 1
128131
}
129132

133+
func getResourceQuantity(ls *lua.LState) int {
134+
n := ls.GetTop()
135+
if n != 1 {
136+
ls.RaiseError("getResourceQuantity only accepts one argument")
137+
return 0
138+
}
139+
140+
q := checkResourceQuantity(ls, n)
141+
num := q.AsApproximateFloat64()
142+
143+
if num < 0 {
144+
ls.RaiseError("int approximation unexpectedly returned a negative value: %#v,", q)
145+
return 0
146+
}
147+
148+
if math.IsInf(num, 1) {
149+
ls.RaiseError("int approximation unexpectedly returned an infinite value: %#v,", q)
150+
return 0
151+
}
152+
153+
ls.Push(lua.LNumber(num))
154+
return 1
155+
}
156+
130157
func checkResourceQuantity(ls *lua.LState, n int) resource.Quantity {
131158
v := ls.Get(n)
132159
switch typ := v.Type(); typ {
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
apiVersion: config.karmada.io/v1alpha1
2+
kind: ResourceInterpreterCustomization
3+
metadata:
4+
name: declarative-configuration-flinkdeployment
5+
spec:
6+
target:
7+
apiVersion: flink.apache.org/v1beta1
8+
kind: FlinkDeployment
9+
customizations:
10+
healthInterpretation:
11+
luaScript: >
12+
function InterpretHealth(observedObj)
13+
if observedObj.status ~= nil and observedObj.status.jobStatus ~= nil then
14+
if observedObj.status.jobStatus.state ~= 'CREATED' and observedObj.status.jobStatus.state ~= 'RECONCILING' then
15+
return true
16+
else
17+
return observedObj.status.jobManagerDeploymentStatus == 'ERROR'
18+
end
19+
end
20+
return false
21+
end
22+
replicaResource:
23+
luaScript: >
24+
local kube = require("kube")
25+
26+
local function isempty(s)
27+
return s == nil or s == ''
28+
end
29+
30+
function GetReplicas(observedObj)
31+
requires = {
32+
resourceRequest = {},
33+
nodeClaim = {},
34+
}
35+
36+
jm_replicas = observedObj.spec.jobManager.replicas
37+
if isempty(jm_replicas) then
38+
jm_replicas = 1
39+
end
40+
41+
-- TaskManager replica setting takes precedence over parallelism setting
42+
43+
tm_replicas = observedObj.spec.taskManager.replicas
44+
if isempty(tm_replicas) then
45+
parallelism = observedObj.spec.job.parallelism
46+
task_slots = observedObj.spec.flinkConfiguration['taskmanager.numberOfTaskSlots']
47+
if isempty(parallelism) or isempty(task_slots) then
48+
tm_replicas = 1
49+
else
50+
tm_replicas = math.ceil(parallelism / observedObj.spec.flinkConfiguration['taskmanager.numberOfTaskSlots'])
51+
end
52+
end
53+
54+
replica = jm_replicas + tm_replicas
55+
56+
-- Until multiple podTemplates are supported in replicaRequirements, take max of cpu + memory values as requirement
57+
58+
requires.resourceRequest.cpu = math.max(observedObj.spec.taskManager.resource.cpu, observedObj.spec.jobManager.resource.cpu)
59+
jm_memory_value = kube.getResourceQuantity(observedObj.spec.jobManager.resource.memory)
60+
tm_memory_value = kube.getResourceQuantity(observedObj.spec.taskManager.resource.memory)
61+
if jm_memory_value > tm_memory_value then
62+
requires.resourceRequest.memory = observedObj.spec.jobManager.resource.memory
63+
else
64+
requires.resourceRequest.memory = observedObj.spec.taskManager.resource.memory
65+
end
66+
67+
-- Until multiple podTemplates are supported, interpreter will only take affinity and toleration input to common podTemplate
68+
69+
if observedObj.spec.podTemplate ~= nil and observedObj.spec.podTemplate.spec ~= nil then
70+
requires.nodeClaim.nodeSelector = observedObj.spec.podTemplate.spec.nodeSelector
71+
requires.nodeClaim.tolerations = observedObj.spec.podTemplate.spec.tolerations
72+
end
73+
74+
return replica, requires
75+
end
76+
statusAggregation:
77+
luaScript: >
78+
function AggregateStatus(desiredObj, statusItems)
79+
if statusItems == nil then
80+
return desiredObj
81+
end
82+
if desiredObj.status == nil then
83+
desiredObj.status = {}
84+
end
85+
clusterInfo = {}
86+
jobManagerDeploymentStatus = ''
87+
jobStatus = {}
88+
lifecycleState = ''
89+
observedGeneration = 0
90+
reconciliationStatus = {}
91+
taskManager = {}
92+
93+
for i = 1, #statusItems do
94+
currentStatus = statusItems[i].status
95+
if currentStatus ~= nil then
96+
clusterInfo = currentStatus.clusterInfo
97+
jobManagerDeploymentStatus = currentStatus.jobManagerDeploymentStatus
98+
jobStatus = currentStatus.jobStatus
99+
observedGeneration = currentStatus.observedGeneration
100+
lifecycleState = currentStatus.lifecycleState
101+
reconciliationStatus = currentStatus.reconciliationStatus
102+
taskManager = currentStatus.taskManager
103+
end
104+
end
105+
106+
desiredObj.status.clusterInfo = clusterInfo
107+
desiredObj.status.jobManagerDeploymentStatus = jobManagerDeploymentStatus
108+
desiredObj.status.jobStatus = jobStatus
109+
desiredObj.status.lifecycleState = lifecycleState
110+
desiredObj.status.observedGeneration = observedGeneration
111+
desiredObj.status.reconciliationStatus = reconciliationStatus
112+
desiredObj.status.taskManager = taskManager
113+
return desiredObj
114+
end
115+
statusReflection:
116+
luaScript: >
117+
function ReflectStatus(observedObj)
118+
status = {}
119+
if observedObj == nil or observedObj.status == nil then
120+
return status
121+
end
122+
status.clusterInfo = observedObj.status.clusterInfo
123+
status.jobManagerDeploymentStatus = observedObj.status.jobManagerDeploymentStatus
124+
status.jobStatus = observedObj.status.jobStatus
125+
status.observedGeneration = observedObj.status.observedGeneration
126+
status.lifecycleState = observedObj.status.lifecycleState
127+
status.reconciliationStatus = observedObj.status.reconciliationStatus
128+
status.taskManager = observedObj.status.taskManager
129+
return status
130+
end
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
tests:
2+
- desiredInputPath: testdata/desired-flinkdeployment.yaml
3+
statusInputPath: testdata/status-file.yaml
4+
operation: AggregateStatus
5+
- observedInputPath: testdata/observed-flinkdeployment.yaml
6+
operation: InterpretReplica
7+
- observedInputPath: testdata/observed-flinkdeployment.yaml
8+
operation: InterpretHealth
9+
- observedInputPath: testdata/observed-flinkdeployment.yaml
10+
operation: InterpretStatus
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
apiVersion: flink.apache.org/v1beta1
2+
kind: FlinkDeployment
3+
metadata:
4+
name: basic-example
5+
namespace: test-namespace
6+
spec:
7+
flinkConfiguration:
8+
taskmanager.numberOfTaskSlots: "2"
9+
flinkVersion: v1_17
10+
image: flink:1.17
11+
job:
12+
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
13+
parallelism: 2
14+
upgradeMode: stateless
15+
jobManager:
16+
replicas: 1
17+
resource:
18+
cpu: 1
19+
memory: 2048m
20+
mode: native
21+
serviceAccount: flink
22+
taskManager:
23+
resource:
24+
cpu: 1
25+
memory: 2048m
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
apiVersion: flink.apache.org/v1beta1
2+
kind: FlinkDeployment
3+
metadata:
4+
creationTimestamp: "2024-06-05T14:52:28Z"
5+
finalizers:
6+
- flinkdeployments.flink.apache.org/finalizer
7+
generation: 1
8+
name: basic-example
9+
namespace: test-namespace
10+
resourceVersion: "5053661"
11+
uid: 87ef77ca-7bf0-4998-b275-06f459872e03
12+
spec:
13+
flinkConfiguration:
14+
taskmanager.numberOfTaskSlots: "2"
15+
flinkVersion: v1_17
16+
image: flink:1.17
17+
job:
18+
args: []
19+
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
20+
parallelism: 2
21+
state: running
22+
upgradeMode: stateless
23+
jobManager:
24+
replicas: 1
25+
resource:
26+
cpu: 1
27+
memory: 2048m
28+
serviceAccount: flink
29+
taskManager:
30+
resource:
31+
cpu: 1
32+
memory: 2048m
33+
status:
34+
clusterInfo:
35+
flink-revision: 2750d5c @ 2023-05-19T10:45:46+02:00
36+
flink-version: 1.17.1
37+
total-cpu: "2.0"
38+
total-memory: "4294967296"
39+
jobManagerDeploymentStatus: READY
40+
jobStatus:
41+
checkpointInfo:
42+
lastPeriodicCheckpointTimestamp: 0
43+
jobId: 44cc5573945d1d4925732d915c70b9ac
44+
jobName: Minimal Spec Example
45+
savepointInfo:
46+
lastPeriodicSavepointTimestamp: 0
47+
savepointHistory: []
48+
startTime: "1717599166365"
49+
state: RUNNING
50+
updateTime: "1717599182544"
51+
lifecycleState: STABLE
52+
observedGeneration: 1
53+
reconciliationStatus:
54+
lastReconciledSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
55+
lastStableSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
56+
reconciliationTimestamp: 1717599148930
57+
state: DEPLOYED
58+
taskManager:
59+
labelSelector: component=taskmanager,app=basic-example
60+
replicas: 1
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
applied: true
2+
clusterName: member1
3+
health: Healthy
4+
status:
5+
clusterInfo:
6+
flink-revision: 2750d5c @ 2023-05-19T10:45:46+02:00
7+
flink-version: 1.17.1
8+
total-cpu: "2.0"
9+
total-memory: "4294967296"
10+
jobManagerDeploymentStatus: READY
11+
jobStatus:
12+
checkpointInfo:
13+
lastPeriodicCheckpointTimestamp: 0
14+
jobId: 44cc5573945d1d4925732d915c70b9ac
15+
jobName: Minimal Spec Example
16+
savepointInfo:
17+
lastPeriodicSavepointTimestamp: 0
18+
savepointHistory: []
19+
startTime: "1717599166365"
20+
state: RUNNING
21+
updateTime: "1717599182544"
22+
lifecycleState: STABLE
23+
observedGeneration: 1
24+
reconciliationStatus:
25+
lastReconciledSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
26+
lastStableSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
27+
reconciliationTimestamp: 1717599148930
28+
state: DEPLOYED
29+
taskManager:
30+
labelSelector: component=taskmanager,app=basic-example
31+
replicas: 1

0 commit comments

Comments
 (0)