Skip to content

Commit f822072

Browse files
committed
Adding FlinkDeployment CRD to supported third party resource customizations
Signed-off-by: mszacillo <[email protected]>
1 parent d05b921 commit f822072

File tree

5 files changed

+236
-0
lines changed

5 files changed

+236
-0
lines changed
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
apiVersion: config.karmada.io/v1alpha1
2+
kind: ResourceInterpreterCustomization
3+
metadata:
4+
name: declarative-configuration-flinkdeployment
5+
spec:
6+
target:
7+
apiVersion: flink.apache.org/v1beta1
8+
kind: FlinkDeployment
9+
customizations:
10+
healthInterpretation:
11+
luaScript: >
12+
function InterpretHealth(observedObj)
13+
if observedObj.status ~= nil and observedObj.status.jobStatus ~= nil then
14+
return observedObj.status.jobStatus.state ~= 'CREATED' and observedObj.status.jobStatus.state ~= 'RECONCILING'
15+
end
16+
return false
17+
end
18+
replicaResource:
19+
luaScript: >
20+
local kube = require("kube")
21+
22+
local function isempty(s)
23+
return s == nil or s == ''
24+
end
25+
26+
function GetReplicas(observedObj)
27+
-- FlinkDeployments presently will not be subdivided among clusters, replica should be 1
28+
replica = 1
29+
requires = {
30+
resourceRequest = {},
31+
}
32+
-- Add jobmanager resources into replica requirement
33+
34+
jm_replicas = observedObj.spec.jobManager.replicas
35+
if isempty(jm_replicas) then
36+
jm_replicas = 1
37+
end
38+
39+
for i = 1, jm_replicas do
40+
requires.resourceRequest.cpu = kube.resourceAdd(requires.resourceRequest.cpu, tostring(observedObj.spec.jobManager.resource.cpu))
41+
requires.resourceRequest.memory = kube.resourceAdd(requires.resourceRequest.memory, observedObj.spec.jobManager.resource.memory)
42+
end
43+
44+
-- Add task manager resources into replica requirement
45+
46+
parallelism = observedObj.spec.job.parallelism
47+
tms = math.ceil(parallelism / observedObj.spec.flinkConfiguration['taskmanager.numberOfTaskSlots'])
48+
49+
for i = 1, tms do
50+
requires.resourceRequest.cpu = kube.resourceAdd(requires.resourceRequest.cpu, tostring(observedObj.spec.taskManager.resource.cpu))
51+
requires.resourceRequest.memory = kube.resourceAdd(requires.resourceRequest.memory, observedObj.spec.taskManager.resource.memory)
52+
end
53+
54+
return replica, requires
55+
end
56+
statusAggregation:
57+
luaScript: >
58+
function AggregateStatus(desiredObj, statusItems)
59+
if statusItems == nil then
60+
return desiredObj
61+
end
62+
if desiredObj.status == nil then
63+
desiredObj.status = {}
64+
end
65+
clusterInfo = {}
66+
jobManagerDeploymentStatus = ''
67+
jobStatus = {}
68+
lifecycleState = ''
69+
observedGeneration = 0
70+
reconciliationStatus = {}
71+
taskManager = {}
72+
73+
for i = 1, #statusItems do
74+
currentStatus = statusItems[i].status
75+
if currentStatus ~= nil then
76+
clusterInfo = currentStatus.clusterInfo
77+
jobManagerDeploymentStatus = currentStatus.jobManagerDeploymentStatus
78+
jobStatus = currentStatus.jobStatus
79+
observedGeneration = currentStatus.observedGeneration
80+
lifecycleState = currentStatus.lifecycleState
81+
reconciliationStatus = currentStatus.reconciliationStatus
82+
taskManager = currentStatus.taskManager
83+
end
84+
end
85+
86+
desiredObj.status.clusterInfo = clusterInfo
87+
desiredObj.status.jobManagerDeploymentStatus = jobManagerDeploymentStatus
88+
desiredObj.status.jobStatus = jobStatus
89+
desiredObj.status.lifecycleState = lifecycleState
90+
desiredObj.status.observedGeneration = observedGeneration
91+
desiredObj.status.reconciliationStatus = reconciliationStatus
92+
desiredObj.status.taskManager = taskManager
93+
return desiredObj
94+
end
95+
statusReflection:
96+
luaScript: >
97+
function ReflectStatus(observedObj)
98+
status = {}
99+
if observedObj == nil or observedObj.status == nil then
100+
return status
101+
end
102+
status.clusterInfo = observedObj.status.clusterInfo
103+
status.jobManagerDeploymentStatus = observedObj.status.jobManagerDeploymentStatus
104+
status.jobStatus = observedObj.status.jobStatus
105+
status.observedGeneration = observedObj.status.observedGeneration
106+
status.lifecycleState = observedObj.status.lifecycleState
107+
status.reconciliationStatus = observedObj.status.reconciliationStatus
108+
status.taskManager = observedObj.status.taskManager
109+
return status
110+
end
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
tests:
2+
- desiredInputPath: testdata/desired-flinkdeployment.yaml
3+
statusInputPath: testdata/status-file.yaml
4+
operation: AggregateStatus
5+
- observedInputPath: testdata/observed-flinkdeployment.yaml
6+
operation: InterpretReplica
7+
- observedInputPath: testdata/observed-flinkdeployment.yaml
8+
operation: InterpretHealth
9+
- observedInputPath: testdata/observed-flinkdeployment.yaml
10+
operation: InterpretStatus
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
apiVersion: flink.apache.org/v1beta1
2+
kind: FlinkDeployment
3+
metadata:
4+
name: basic-example
5+
namespace: test-namespace
6+
spec:
7+
flinkConfiguration:
8+
taskmanager.numberOfTaskSlots: "2"
9+
flinkVersion: v1_17
10+
image: flink:1.17
11+
job:
12+
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
13+
parallelism: 2
14+
upgradeMode: stateless
15+
jobManager:
16+
replicas: 1
17+
resource:
18+
cpu: 1
19+
memory: 2048m
20+
mode: native
21+
serviceAccount: flink
22+
taskManager:
23+
resource:
24+
cpu: 1
25+
memory: 2048m
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
apiVersion: flink.apache.org/v1beta1
2+
kind: FlinkDeployment
3+
metadata:
4+
creationTimestamp: "2024-06-05T14:52:28Z"
5+
finalizers:
6+
- flinkdeployments.flink.apache.org/finalizer
7+
generation: 1
8+
name: basic-example
9+
namespace: test-namespace
10+
resourceVersion: "5053661"
11+
uid: 87ef77ca-7bf0-4998-b275-06f459872e03
12+
spec:
13+
flinkConfiguration:
14+
taskmanager.numberOfTaskSlots: "2"
15+
flinkVersion: v1_17
16+
image: flink:1.17
17+
job:
18+
args: []
19+
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
20+
parallelism: 2
21+
state: running
22+
upgradeMode: stateless
23+
jobManager:
24+
replicas: 1
25+
resource:
26+
cpu: 1
27+
memory: 2048m
28+
serviceAccount: flink
29+
taskManager:
30+
resource:
31+
cpu: 1
32+
memory: 2048m
33+
status:
34+
clusterInfo:
35+
flink-revision: 2750d5c @ 2023-05-19T10:45:46+02:00
36+
flink-version: 1.17.1
37+
total-cpu: "2.0"
38+
total-memory: "4294967296"
39+
jobManagerDeploymentStatus: READY
40+
jobStatus:
41+
checkpointInfo:
42+
lastPeriodicCheckpointTimestamp: 0
43+
jobId: 44cc5573945d1d4925732d915c70b9ac
44+
jobName: Minimal Spec Example
45+
savepointInfo:
46+
lastPeriodicSavepointTimestamp: 0
47+
savepointHistory: []
48+
startTime: "1717599166365"
49+
state: RUNNING
50+
updateTime: "1717599182544"
51+
lifecycleState: STABLE
52+
observedGeneration: 1
53+
reconciliationStatus:
54+
lastReconciledSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
55+
lastStableSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
56+
reconciliationTimestamp: 1717599148930
57+
state: DEPLOYED
58+
taskManager:
59+
labelSelector: component=taskmanager,app=basic-example
60+
replicas: 1
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
applied: true
2+
clusterName: member1
3+
health: Healthy
4+
status:
5+
clusterInfo:
6+
flink-revision: 2750d5c @ 2023-05-19T10:45:46+02:00
7+
flink-version: 1.17.1
8+
total-cpu: "2.0"
9+
total-memory: "4294967296"
10+
jobManagerDeploymentStatus: READY
11+
jobStatus:
12+
checkpointInfo:
13+
lastPeriodicCheckpointTimestamp: 0
14+
jobId: 44cc5573945d1d4925732d915c70b9ac
15+
jobName: Minimal Spec Example
16+
savepointInfo:
17+
lastPeriodicSavepointTimestamp: 0
18+
savepointHistory: []
19+
startTime: "1717599166365"
20+
state: RUNNING
21+
updateTime: "1717599182544"
22+
lifecycleState: STABLE
23+
observedGeneration: 1
24+
reconciliationStatus:
25+
lastReconciledSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
26+
lastStableSpec: '{"spec":{"job":{"jarURI":"local:///opt/flink/examples/streaming/StateMachineExample.jar","parallelism":2,"entryClass":null,"args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"flink:1.17","imagePullPolicy":null,"serviceAccount":"flink","flinkVersion":"v1_17","ingress":null,"podTemplate":null,"jobManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m","ephemeralStorage":null},"replicas":null,"podTemplate":null},"logConfiguration":null,"mode":null},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
27+
reconciliationTimestamp: 1717599148930
28+
state: DEPLOYED
29+
taskManager:
30+
labelSelector: component=taskmanager,app=basic-example
31+
replicas: 1

0 commit comments

Comments
 (0)