Skip to content
This repository was archived by the owner on May 8, 2025. It is now read-only.

Commit 17040f1

Browse files
authored
Add recreateOnUpdate options when update cluster (#365)
related-issue: #360
1 parent 37e09fb commit 17040f1

File tree

7 files changed

+63
-6
lines changed

7 files changed

+63
-6
lines changed

api/v1beta1/flinkcluster_default.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ import (
2323

2424
// Sets default values for unspecified FlinkCluster properties.
2525
func _SetDefault(cluster *FlinkCluster) {
26+
if cluster.Spec.RecreateOnUpdate == nil {
27+
cluster.Spec.RecreateOnUpdate = new(bool)
28+
*cluster.Spec.RecreateOnUpdate = true
29+
}
2630
_SetImageDefault(&cluster.Spec.Image)
2731
_SetJobManagerDefault(&cluster.Spec.JobManager)
2832
_SetTaskManagerDefault(&cluster.Spec.TaskManager)

api/v1beta1/flinkcluster_default_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ func TestSetDefault(t *testing.T) {
5555
var defatulJobManagerIngressTLSUse = false
5656
var defaultMemoryOffHeapRatio = int32(25)
5757
var defaultMemoryOffHeapMin = resource.MustParse("600M")
58+
defaultRecreateOnUpdate := new(bool)
59+
*defaultRecreateOnUpdate = true
5860
var expectedCluster = FlinkCluster{
5961
TypeMeta: metav1.TypeMeta{},
6062
ObjectMeta: metav1.ObjectMeta{},
@@ -112,7 +114,8 @@ func TestSetDefault(t *testing.T) {
112114
HadoopConfig: &HadoopConfig{
113115
MountPath: "/etc/hadoop/conf",
114116
},
115-
EnvVars: nil,
117+
EnvVars: nil,
118+
RecreateOnUpdate: defaultRecreateOnUpdate,
116119
},
117120
Status: FlinkClusterStatus{},
118121
}
@@ -146,6 +149,8 @@ func TestSetNonDefault(t *testing.T) {
146149
RunAsUser: &securityContextUserGroup,
147150
RunAsGroup: &securityContextUserGroup,
148151
}
152+
defaultRecreateOnUpdate := new(bool)
153+
*defaultRecreateOnUpdate = true
149154
var cluster = FlinkCluster{
150155
TypeMeta: metav1.TypeMeta{},
151156
ObjectMeta: metav1.ObjectMeta{},
@@ -267,7 +272,8 @@ func TestSetNonDefault(t *testing.T) {
267272
HadoopConfig: &HadoopConfig{
268273
MountPath: "/opt/flink/hadoop/conf",
269274
},
270-
EnvVars: nil,
275+
EnvVars: nil,
276+
RecreateOnUpdate: defaultRecreateOnUpdate,
271277
},
272278
Status: FlinkClusterStatus{},
273279
}

api/v1beta1/flinkcluster_types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,9 @@ type FlinkClusterSpec struct {
459459

460460
// The maximum number of revision history to keep, default: 10.
461461
RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`
462+
463+
// Recreate components when updating flinkcluster, default: true.
464+
RecreateOnUpdate *bool `json:"recreateOnUpdate,omitempty"`
462465
}
463466

464467
// HadoopConfig defines configs for Hadoop.

api/v1beta1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3279,6 +3279,8 @@ spec:
32793279
additionalProperties:
32803280
type: string
32813281
type: object
3282+
recreateOnUpdate:
3283+
type: boolean
32823284
revisionHistoryLimit:
32833285
format: int32
32843286
type: integer

controllers/flinkcluster_reconciler.go

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,12 @@ func (reconciler *ClusterReconciler) reconcileStatefulSet(
140140
if desiredStatefulSet != nil && observedStatefulSet != nil {
141141
if getUpdateState(reconciler.observed) == UpdateStateInProgress {
142142
updateComponent := fmt.Sprintf("%v StatefulSet", component)
143-
err := reconciler.deleteOldComponent(desiredStatefulSet, observedStatefulSet, updateComponent)
143+
var err error
144+
if *reconciler.observed.cluster.Spec.RecreateOnUpdate {
145+
err = reconciler.deleteOldComponent(desiredStatefulSet, observedStatefulSet, updateComponent)
146+
} else {
147+
err = reconciler.updateComponent(desiredStatefulSet, updateComponent)
148+
}
144149
if err != nil {
145150
return err
146151
}
@@ -192,6 +197,21 @@ func (reconciler *ClusterReconciler) deleteOldComponent(desired runtime.Object,
192197
return nil
193198
}
194199

200+
func (reconciler *ClusterReconciler) updateComponent(desired runtime.Object, component string) error {
201+
var log = reconciler.log.WithValues("component", component)
202+
var context = reconciler.context
203+
var k8sClient = reconciler.k8sClient
204+
205+
log.Info("Update component", "component", desired)
206+
err := k8sClient.Update(context, desired)
207+
if err != nil {
208+
log.Error(err, "Failed to update component for update")
209+
return err
210+
}
211+
log.Info("Component update successfully")
212+
return nil
213+
}
214+
195215
func (reconciler *ClusterReconciler) updateStatefulSet(
196216
statefulSet *appsv1.StatefulSet, component string) error {
197217
var context = reconciler.context
@@ -238,7 +258,12 @@ func (reconciler *ClusterReconciler) reconcileJobManagerService() error {
238258
// v1.Service API does not handle update correctly when below values are empty.
239259
desiredJmService.SetResourceVersion(observedJmService.GetResourceVersion())
240260
desiredJmService.Spec.ClusterIP = observedJmService.Spec.ClusterIP
241-
err := reconciler.deleteOldComponent(desiredJmService, observedJmService, "JobManager service")
261+
var err error
262+
if *reconciler.observed.cluster.Spec.RecreateOnUpdate {
263+
err = reconciler.deleteOldComponent(desiredJmService, observedJmService, "JobManager service")
264+
} else {
265+
err = reconciler.updateComponent(desiredJmService, "JobManager service")
266+
}
242267
if err != nil {
243268
return err
244269
}
@@ -298,7 +323,12 @@ func (reconciler *ClusterReconciler) reconcileJobManagerIngress() error {
298323

299324
if desiredJmIngress != nil && observedJmIngress != nil {
300325
if getUpdateState(reconciler.observed) == UpdateStateInProgress {
301-
err := reconciler.deleteOldComponent(desiredJmIngress, observedJmIngress, "JobManager ingress")
326+
var err error
327+
if *reconciler.observed.cluster.Spec.RecreateOnUpdate {
328+
err = reconciler.deleteOldComponent(desiredJmIngress, observedJmIngress, "JobManager ingress")
329+
} else {
330+
err = reconciler.updateComponent(desiredJmIngress, "JobManager ingress")
331+
}
302332
if err != nil {
303333
return err
304334
}
@@ -358,7 +388,12 @@ func (reconciler *ClusterReconciler) reconcileConfigMap() error {
358388

359389
if desiredConfigMap != nil && observedConfigMap != nil {
360390
if getUpdateState(reconciler.observed) == UpdateStateInProgress {
361-
err := reconciler.deleteOldComponent(desiredConfigMap, observedConfigMap, "ConfigMap")
391+
var err error
392+
if *reconciler.observed.cluster.Spec.RecreateOnUpdate {
393+
err = reconciler.deleteOldComponent(desiredConfigMap, observedConfigMap, "ConfigMap")
394+
} else {
395+
err = reconciler.updateComponent(desiredConfigMap, "ConfigMap")
396+
}
362397
if err != nil {
363398
return err
364399
}

docs/crd.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ FlinkCluster
9999
|__ mountPath
100100
|__ logConfig
101101
|__ revisionHistoryLimit
102+
|__ recreateOnUpdate
102103
|__ status
103104
|__ state
104105
|__ components
@@ -313,6 +314,7 @@ FlinkCluster
313314
to stdout will be provided.
314315
* Other arbitrary keys are also allowed, and will become part of the ConfigMap.
315316
* **revisionHistoryLimit** (optional): The maximum number of revision history to keep, default: 10.
317+
* **recreateOnUpdate** (optional): Recreate components when updating flinkcluster, default: true.
316318
* **status**: Flink job or session cluster status.
317319
* **state**: The overall state of the Flink cluster.
318320
* **components**: The status of the components.

0 commit comments

Comments
 (0)