Skip to content

Commit 3cde793

Browse files
committed
make flink deployment apiServerRef optional (#282)
make flink deployment apiRef optional
1 parent 58c2044 commit 3cde793

File tree

4 files changed

+149
-6
lines changed

4 files changed

+149
-6
lines changed

api/v1alpha1/computeflinkdeployment_types.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ import (
2525

2626
// ComputeFlinkDeploymentSpec defines the desired state of ComputeFlinkDeployment
2727
type ComputeFlinkDeploymentSpec struct {
28-
// APIServerRef is the reference to the StreamNativeCloudConnection
29-
// +required
30-
APIServerRef corev1.LocalObjectReference `json:"apiServerRef"`
28+
// APIServerRef is the reference to the StreamNativeCloudConnection.
29+
// If not specified, the APIServerRef from the referenced ComputeWorkspace will be used.
30+
// +optional
31+
APIServerRef corev1.LocalObjectReference `json:"apiServerRef,omitempty"`
3132

3233
// WorkspaceName is the reference to the workspace, and is required
3334
// +kubebuilder:validation:Required

config/samples/resource_v1alpha1_computeflinkdeployment.yaml

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,58 @@ spec:
6666
log4jLoggers:
6767
"": DEBUG
6868
com.company: DEBUG
69+
70+
---
71+
# Example 2: Using Workspace's APIServerRef
72+
apiVersion: resource.streamnative.io/v1alpha1
73+
kind: ComputeFlinkDeployment
74+
metadata:
75+
name: operator-test-v2
76+
namespace: default
77+
spec:
78+
workspaceName: test-operator-workspace # Will use APIServerRef from this workspace
79+
template:
80+
syncingMode: PATCH
81+
deployment:
82+
userMetadata:
83+
name: operator-test-v2
84+
namespace: default
85+
displayName: operator-test-v2
86+
spec:
87+
state: RUNNING
88+
deploymentTargetName: default
89+
maxJobCreationAttempts: 99
90+
template:
91+
metadata:
92+
annotations:
93+
flink.queryable-state.enabled: 'false'
94+
flink.security.ssl.enabled: 'false'
95+
spec:
96+
artifact:
97+
jarUri: function://public/default/flink-operator-test-beam-pulsar-io@1.19-snapshot
98+
mainArgs: --runner=FlinkRunner --attachedMode=false --checkpointingInterval=60000
99+
entryClass: org.apache.beam.examples.WordCount
100+
kind: JAR
101+
flinkVersion: "1.18.1"
102+
flinkImageTag: "1.18.1-stream3-scala_2.12-java17"
103+
flinkConfiguration:
104+
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
105+
execution.checkpointing.interval: 1min
106+
execution.checkpointing.timeout: 10min
107+
high-availability.type: kubernetes
108+
state.backend: filesystem
109+
taskmanager.memory.managed.fraction: '0.2'
110+
parallelism: 1
111+
numberOfTaskManagers: 1
112+
resources:
113+
jobmanager:
114+
cpu: "1"
115+
memory: 2G
116+
taskmanager:
117+
cpu: "1"
118+
memory: 2G
119+
logging:
120+
loggingProfile: default
121+
log4jLoggers:
122+
"": DEBUG
123+
com.company: DEBUG

controllers/flinkdeployment_controller.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type FlinkDeploymentReconciler struct {
5252
//+kubebuilder:rbac:groups=resource.streamnative.io,resources=computeflinkdeployments/status,verbs=get;update;patch
5353
//+kubebuilder:rbac:groups=resource.streamnative.io,resources=computeflinkdeployments/finalizers,verbs=update
5454
//+kubebuilder:rbac:groups=resource.streamnative.io,resources=streamnativecloudconnections,verbs=get;list;watch
55+
//+kubebuilder:rbac:groups=resource.streamnative.io,resources=computeworkspaces,verbs=get;list;watch
5556

5657
// handleWatchEvents processes events from the watch interface
5758
func (r *FlinkDeploymentReconciler) handleWatchEvents(ctx context.Context, namespacedName types.NamespacedName, watcher watch.Interface) {
@@ -167,11 +168,27 @@ func (r *FlinkDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Requ
167168
return ctrl.Result{}, err
168169
}
169170

171+
// Get APIServerRef from ComputeWorkspace if not specified in FlinkDeployment
172+
apiServerRef := deployment.Spec.APIServerRef
173+
if apiServerRef.Name == "" {
174+
// Get the ComputeWorkspace
175+
workspace := &resourcev1alpha1.ComputeWorkspace{}
176+
if err := r.Get(ctx, types.NamespacedName{
177+
Namespace: req.Namespace,
178+
Name: deployment.Spec.WorkspaceName,
179+
}, workspace); err != nil {
180+
r.updateDeploymentStatus(ctx, deployment, err, "GetWorkspaceFailed",
181+
fmt.Sprintf("Failed to get ComputeWorkspace: %v", err))
182+
return ctrl.Result{}, err
183+
}
184+
apiServerRef = workspace.Spec.APIServerRef
185+
}
186+
170187
// Get the APIServerConnection
171188
apiConn := &resourcev1alpha1.StreamNativeCloudConnection{}
172189
if err := r.Get(ctx, types.NamespacedName{
173190
Namespace: req.Namespace,
174-
Name: deployment.Spec.APIServerRef.Name,
191+
Name: apiServerRef.Name,
175192
}, apiConn); err != nil {
176193
r.updateDeploymentStatus(ctx, deployment, err, "GetAPIServerConnectionFailed",
177194
fmt.Sprintf("Failed to get APIServerConnection: %v", err))

docs/compute_flink_deployment.md

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ The `ComputeFlinkDeployment` resource defines a Flink deployment in StreamNative
88

99
| Field | Description | Required |
1010
|----------------------|--------------------------------------------------------------------------------------------|----------|
11-
| `apiServerRef` | Reference to the StreamNativeCloudConnection resource for API server access | Yes |
11+
| `apiServerRef` | Reference to the StreamNativeCloudConnection resource for API server access. If not specified, the APIServerRef from the referenced ComputeWorkspace will be used. | No |
1212
| `workspaceName` | Name of the ComputeWorkspace where the Flink deployment will run | Yes |
1313
| `labels` | Labels to add to the deployment | No |
1414
| `annotations` | Annotations to add to the deployment | No |
@@ -18,6 +18,19 @@ The `ComputeFlinkDeployment` resource defines a Flink deployment in StreamNative
1818

1919
*Note: Either `template` or `communityTemplate` must be specified, but not both.
2020

21+
## APIServerRef Inheritance
22+
23+
The `ComputeFlinkDeployment` resource can inherit the `APIServerRef` from its referenced `ComputeWorkspace`. This simplifies configuration and reduces duplication. Here's how it works:
24+
25+
1. If `apiServerRef` is specified in the `ComputeFlinkDeployment`, that value will be used.
26+
2. If `apiServerRef` is not specified, the operator will use the `APIServerRef` from the referenced `ComputeWorkspace`.
27+
3. The `workspaceName` field is required and must reference a valid `ComputeWorkspace` in the same namespace.
28+
29+
This inheritance mechanism allows you to:
30+
- Reduce configuration duplication
31+
- Centralize API server configuration at the workspace level
32+
- Easily change API server configuration for multiple deployments by updating the workspace
33+
2134
### VVP Deployment Template
2235

2336
| Field | Description | Required |
@@ -97,7 +110,7 @@ The `ComputeFlinkDeployment` resource defines a Flink deployment in StreamNative
97110

98111
## Example
99112

100-
1. Create a ComputeFlinkDeployment with VVP template:
113+
1. Create a ComputeFlinkDeployment with explicit APIServerRef:
101114

102115
```yaml
103116
apiVersion: resource.streamnative.io/v1alpha1
@@ -175,6 +188,63 @@ NAME READY AGE
175188
operator-test-v1 True 1m
176189
```
177190

191+
2. Create a ComputeFlinkDeployment using Workspace's APIServerRef:
192+
193+
```yaml
194+
apiVersion: resource.streamnative.io/v1alpha1
195+
kind: ComputeFlinkDeployment
196+
metadata:
197+
name: operator-test-v2
198+
namespace: default
199+
spec:
200+
workspaceName: test-operator-workspace # Will use APIServerRef from this workspace
201+
template:
202+
syncingMode: PATCH
203+
deployment:
204+
userMetadata:
205+
name: operator-test-v2
206+
namespace: default
207+
displayName: operator-test-v2
208+
spec:
209+
state: RUNNING
210+
deploymentTargetName: default
211+
maxJobCreationAttempts: 99
212+
template:
213+
metadata:
214+
annotations:
215+
flink.queryable-state.enabled: 'false'
216+
flink.security.ssl.enabled: 'false'
217+
spec:
218+
artifact:
219+
jarUri: function://public/default/flink-operator-test-beam-pulsar-io@1.19-snapshot
220+
mainArgs: --runner=FlinkRunner --attachedMode=false --checkpointingInterval=60000
221+
entryClass: org.apache.beam.examples.WordCount
222+
kind: JAR
223+
flinkVersion: "1.18.1"
224+
flinkImageTag: "1.18.1-stream3-scala_2.12-java17"
225+
flinkConfiguration:
226+
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
227+
execution.checkpointing.interval: 1min
228+
execution.checkpointing.timeout: 10min
229+
high-availability.type: kubernetes
230+
state.backend: filesystem
231+
taskmanager.memory.managed.fraction: '0.2'
232+
parallelism: 1
233+
numberOfTaskManagers: 1
234+
resources:
235+
jobmanager:
236+
cpu: "1"
237+
memory: 2G
238+
taskmanager:
239+
cpu: "1"
240+
memory: 2G
241+
logging:
242+
loggingProfile: default
243+
log4jLoggers:
244+
"": DEBUG
245+
com.company: DEBUG
246+
```
247+
178248
## Update Deployment
179249
180250
You can update the deployment by modifying the YAML file and reapplying it. Most fields can be updated, including:

0 commit comments

Comments
 (0)