diff --git a/api/v1alpha1/computeflinkdeployment_types.go b/api/v1alpha1/computeflinkdeployment_types.go index 70b37f61..a03ac545 100644 --- a/api/v1alpha1/computeflinkdeployment_types.go +++ b/api/v1alpha1/computeflinkdeployment_types.go @@ -25,9 +25,10 @@ import ( // ComputeFlinkDeploymentSpec defines the desired state of ComputeFlinkDeployment type ComputeFlinkDeploymentSpec struct { - // APIServerRef is the reference to the StreamNativeCloudConnection - // +required - APIServerRef corev1.LocalObjectReference `json:"apiServerRef"` + // APIServerRef is the reference to the StreamNativeCloudConnection. + // If not specified, the APIServerRef from the referenced ComputeWorkspace will be used. + // +optional + APIServerRef corev1.LocalObjectReference `json:"apiServerRef,omitempty"` // WorkspaceName is the reference to the workspace, and is required // +kubebuilder:validation:Required diff --git a/config/samples/resource_v1alpha1_computeflinkdeployment.yaml b/config/samples/resource_v1alpha1_computeflinkdeployment.yaml index e568049d..90722a55 100644 --- a/config/samples/resource_v1alpha1_computeflinkdeployment.yaml +++ b/config/samples/resource_v1alpha1_computeflinkdeployment.yaml @@ -66,3 +66,58 @@ spec: log4jLoggers: "": DEBUG com.company: DEBUG + +--- +# Example 2: Using Workspace's APIServerRef +apiVersion: resource.streamnative.io/v1alpha1 +kind: ComputeFlinkDeployment +metadata: + name: operator-test-v2 + namespace: default +spec: + workspaceName: test-operator-workspace # Will use APIServerRef from this workspace + template: + syncingMode: PATCH + deployment: + userMetadata: + name: operator-test-v2 + namespace: default + displayName: operator-test-v2 + spec: + state: RUNNING + deploymentTargetName: default + maxJobCreationAttempts: 99 + template: + metadata: + annotations: + flink.queryable-state.enabled: 'false' + flink.security.ssl.enabled: 'false' + spec: + artifact: + jarUri: function://public/default/flink-operator-test-beam-pulsar-io@1.19-snapshot + mainArgs: --runner=FlinkRunner --attachedMode=false --checkpointingInterval=60000 + entryClass: org.apache.beam.examples.WordCount + kind: JAR + flinkVersion: "1.18.1" + flinkImageTag: "1.18.1-stream3-scala_2.12-java17" + flinkConfiguration: + execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION + execution.checkpointing.interval: 1min + execution.checkpointing.timeout: 10min + high-availability.type: kubernetes + state.backend: filesystem + taskmanager.memory.managed.fraction: '0.2' + parallelism: 1 + numberOfTaskManagers: 1 + resources: + jobmanager: + cpu: "1" + memory: 2G + taskmanager: + cpu: "1" + memory: 2G + logging: + loggingProfile: default + log4jLoggers: + "": DEBUG + com.company: DEBUG diff --git a/controllers/flinkdeployment_controller.go b/controllers/flinkdeployment_controller.go index 8da3c424..db2f720f 100644 --- a/controllers/flinkdeployment_controller.go +++ b/controllers/flinkdeployment_controller.go @@ -52,6 +52,7 @@ type FlinkDeploymentReconciler struct { //+kubebuilder:rbac:groups=resource.streamnative.io,resources=computeflinkdeployments/status,verbs=get;update;patch //+kubebuilder:rbac:groups=resource.streamnative.io,resources=computeflinkdeployments/finalizers,verbs=update //+kubebuilder:rbac:groups=resource.streamnative.io,resources=streamnativecloudconnections,verbs=get;list;watch +//+kubebuilder:rbac:groups=resource.streamnative.io,resources=computeworkspaces,verbs=get;list;watch // handleWatchEvents processes events from the watch interface func (r *FlinkDeploymentReconciler) handleWatchEvents(ctx context.Context, namespacedName types.NamespacedName, watcher watch.Interface) { @@ -167,11 +168,27 @@ func (r *FlinkDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, err } + // Get APIServerRef from ComputeWorkspace if not specified in FlinkDeployment + apiServerRef := deployment.Spec.APIServerRef + if apiServerRef.Name == "" { + // Get the ComputeWorkspace + workspace := &resourcev1alpha1.ComputeWorkspace{} + if err := r.Get(ctx, types.NamespacedName{ + Namespace: req.Namespace, + Name: deployment.Spec.WorkspaceName, + }, workspace); err != nil { + r.updateDeploymentStatus(ctx, deployment, err, "GetWorkspaceFailed", + fmt.Sprintf("Failed to get ComputeWorkspace: %v", err)) + return ctrl.Result{}, err + } + apiServerRef = workspace.Spec.APIServerRef + } + // Get the APIServerConnection apiConn := &resourcev1alpha1.StreamNativeCloudConnection{} if err := r.Get(ctx, types.NamespacedName{ Namespace: req.Namespace, - Name: deployment.Spec.APIServerRef.Name, + Name: apiServerRef.Name, }, apiConn); err != nil { r.updateDeploymentStatus(ctx, deployment, err, "GetAPIServerConnectionFailed", fmt.Sprintf("Failed to get APIServerConnection: %v", err)) diff --git a/docs/compute_flink_deployment.md b/docs/compute_flink_deployment.md index 4de555b4..35bec34c 100644 --- a/docs/compute_flink_deployment.md +++ b/docs/compute_flink_deployment.md @@ -8,7 +8,7 @@ The `ComputeFlinkDeployment` resource defines a Flink deployment in StreamNative | Field | Description | Required | |----------------------|--------------------------------------------------------------------------------------------|----------| -| `apiServerRef` | Reference to the StreamNativeCloudConnection resource for API server access | Yes | +| `apiServerRef` | Reference to the StreamNativeCloudConnection resource for API server access. If not specified, the APIServerRef from the referenced ComputeWorkspace will be used. | No | | `workspaceName` | Name of the ComputeWorkspace where the Flink deployment will run | Yes | | `labels` | Labels to add to the deployment | No | | `annotations` | Annotations to add to the deployment | No | @@ -18,6 +18,19 @@ The `ComputeFlinkDeployment` resource defines a Flink deployment in StreamNative *Note: Either `template` or `communityTemplate` must be specified, but not both. +## APIServerRef Inheritance + +The `ComputeFlinkDeployment` resource can inherit the `APIServerRef` from its referenced `ComputeWorkspace`. This simplifies configuration and reduces duplication. Here's how it works: + +1. If `apiServerRef` is specified in the `ComputeFlinkDeployment`, that value will be used. +2. If `apiServerRef` is not specified, the operator will use the `APIServerRef` from the referenced `ComputeWorkspace`. +3. The `workspaceName` field is required and must reference a valid `ComputeWorkspace` in the same namespace. + +This inheritance mechanism allows you to: +- Reduce configuration duplication +- Centralize API server configuration at the workspace level +- Easily change API server configuration for multiple deployments by updating the workspace + ### VVP Deployment Template | Field | Description | Required | @@ -97,7 +110,7 @@ The `ComputeFlinkDeployment` resource defines a Flink deployment in StreamNative ## Example -1. Create a ComputeFlinkDeployment with VVP template: +1. Create a ComputeFlinkDeployment with explicit APIServerRef: ```yaml apiVersion: resource.streamnative.io/v1alpha1 @@ -175,6 +188,63 @@ NAME READY AGE operator-test-v1 True 1m ``` +2. Create a ComputeFlinkDeployment using Workspace's APIServerRef: + +```yaml +apiVersion: resource.streamnative.io/v1alpha1 +kind: ComputeFlinkDeployment +metadata: + name: operator-test-v2 + namespace: default +spec: + workspaceName: test-operator-workspace # Will use APIServerRef from this workspace + template: + syncingMode: PATCH + deployment: + userMetadata: + name: operator-test-v2 + namespace: default + displayName: operator-test-v2 + spec: + state: RUNNING + deploymentTargetName: default + maxJobCreationAttempts: 99 + template: + metadata: + annotations: + flink.queryable-state.enabled: 'false' + flink.security.ssl.enabled: 'false' + spec: + artifact: + jarUri: function://public/default/flink-operator-test-beam-pulsar-io@1.19-snapshot + mainArgs: --runner=FlinkRunner --attachedMode=false --checkpointingInterval=60000 + entryClass: org.apache.beam.examples.WordCount + kind: JAR + flinkVersion: "1.18.1" + flinkImageTag: "1.18.1-stream3-scala_2.12-java17" + flinkConfiguration: + execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION + execution.checkpointing.interval: 1min + execution.checkpointing.timeout: 10min + high-availability.type: kubernetes + state.backend: filesystem + taskmanager.memory.managed.fraction: '0.2' + parallelism: 1 + numberOfTaskManagers: 1 + resources: + jobmanager: + cpu: "1" + memory: 2G + taskmanager: + cpu: "1" + memory: 2G + logging: + loggingProfile: default + log4jLoggers: + "": DEBUG + com.company: DEBUG +``` + ## Update Deployment You can update the deployment by modifying the YAML file and reapplying it. Most fields can be updated, including: