Skip to content

Commit 7b3798a

Browse files
committed
Using json-safe clone to prevent shared table references in FlinkDeployment luaresult
Signed-off-by: mszacillo <[email protected]>
1 parent e7ecc93 commit 7b3798a

File tree

1 file changed

+53
-23
lines changed
  • pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/flink.apache.org/v1beta1/FlinkDeployment

1 file changed

+53
-23
lines changed

pkg/resourceinterpreter/default/thirdparty/resourcecustomizations/flink.apache.org/v1beta1/FlinkDeployment/customizations.yaml

Lines changed: 53 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,43 +33,71 @@ spec:
3333
return observedObj.status.error ~= nil
3434
end
3535
componentResource:
36-
luaScript: >
36+
luaScript: |
3737
local kube = require("kube")
3838
3939
local function isempty(s)
4040
return s == nil or s == ''
4141
end
4242
43-
local function qty(v)
44-
if v == nil then return nil end
45-
return kube.getResourceQuantity(v)
46-
end
47-
4843
-- Safe fetch of deeply nested table fields.
4944
local function get(obj, path)
5045
local cur = obj
5146
for i = 1, #path do
52-
if cur == nil then return nil end
47+
if cur == nil then
48+
return nil
49+
end
5350
cur = cur[path[i]]
5451
end
5552
return cur
5653
end
5754
5855
-- Normalize possibly-string numbers with a default.
5956
local function to_num(v, default)
60-
if v == nil or v == '' then return default end
57+
if v == nil or v == '' then
58+
return default
59+
end
6160
local n = tonumber(v)
62-
return n ~= nil and n or default
61+
if n ~= nil then
62+
return n
63+
end
64+
return default
65+
end
66+
67+
-- JSON-safe deep clone: strings/numbers/booleans/tables. Needed to prevent shared table references.
68+
local function clone_plain(val, seen)
69+
local tv = type(val)
70+
if tv ~= "table" then
71+
if tv == "string" or tv == "number" or tv == "boolean" or tv == "nil" then
72+
return val
73+
end
74+
return nil
75+
end
76+
seen = seen or {}
77+
if seen[val] then return nil end
78+
seen[val] = true
79+
local out = {}
80+
for k, v in pairs(val) do
81+
local tk = type(k)
82+
if tk == "string" or tk == "number" then
83+
local cv = clone_plain(v, seen)
84+
if cv ~= nil then out[k] = cv end
85+
end
86+
end
87+
seen[val] = nil
88+
return out
6389
end
6490
6591
local function apply_pod_template(pt_spec, requires)
66-
if pt_spec == nil then return end
92+
if pt_spec == nil then
93+
return
94+
end
6795
68-
local nodeSelector = pt_spec.nodeSelector
69-
local tolerations = pt_spec.tolerations
96+
local nodeSelector = clone_plain(pt_spec.nodeSelector)
97+
local tolerations = clone_plain(pt_spec.tolerations)
7098
local priority = pt_spec.priorityClassName
7199
72-
-- Only create nodeClaim if there’s content
100+
-- Only create nodeClaim if there is content
73101
if nodeSelector ~= nil or tolerations ~= nil then
74102
requires.nodeClaim = requires.nodeClaim or {}
75103
requires.nodeClaim.nodeSelector = nodeSelector
@@ -85,27 +113,29 @@ spec:
85113
local components = {}
86114
local pt_spec = get(observedObj, {"spec","podTemplate","spec"})
87115
88-
-- ===== JobManager =====
116+
-- JobManager
117+
89118
local jm_replicas = to_num(get(observedObj, {"spec","jobManager","replicas"}), 1)
90119
91120
local jm_requires = {
92-
resourceRequest = {},
121+
resourceRequest = {}
93122
}
94123
95124
local jm_cpu = get(observedObj, {"spec","jobManager","resource","cpu"})
96125
local jm_memory = get(observedObj, {"spec","jobManager","resource","memory"})
97-
jm_requires.resourceRequest.cpu = qty(jm_cpu)
98-
jm_requires.resourceRequest.memory = qty(jm_memory)
126+
jm_requires.resourceRequest.cpu = jm_cpu
127+
jm_requires.resourceRequest.memory = kube.getResourceQuantity(jm_memory)
99128
apply_pod_template(pt_spec, jm_requires)
100129
101130
local jobManagerComponent = {
102131
name = "jobmanager",
103132
replicas = jm_replicas,
104-
replicaRequirements = jm_requires,
133+
replicaRequirements = jm_requires
105134
}
106135
table.insert(components, jobManagerComponent)
107136
108-
-- ===== TaskManager =====
137+
-- TaskManager
138+
109139
local tm_replicas = to_num(get(observedObj, {"spec","taskManager","replicas"}), nil)
110140
if tm_replicas == nil then
111141
local parallelism = to_num(get(observedObj, {"spec","job","parallelism"}), nil)
@@ -118,19 +148,19 @@ spec:
118148
end
119149
120150
local tm_requires = {
121-
resourceRequest = {},
151+
resourceRequest = {}
122152
}
123153
124154
local tm_cpu = get(observedObj, {"spec","taskManager","resource","cpu"})
125155
local tm_memory = get(observedObj, {"spec","taskManager","resource","memory"})
126-
tm_requires.resourceRequest.cpu = qty(tm_cpu)
127-
tm_requires.resourceRequest.memory = qty(tm_memory)
156+
tm_requires.resourceRequest.cpu = tm_cpu
157+
tm_requires.resourceRequest.memory = kube.getResourceQuantity(tm_memory)
128158
apply_pod_template(pt_spec, tm_requires)
129159
130160
local taskManagerComponent = {
131161
name = "taskmanager",
132162
replicas = tm_replicas,
133-
replicaRequirements = tm_requires,
163+
replicaRequirements = tm_requires
134164
}
135165
table.insert(components, taskManagerComponent)
136166

0 commit comments

Comments
 (0)