Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions api/v1alpha1/computeflinkdeployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (

// ComputeFlinkDeploymentSpec defines the desired state of ComputeFlinkDeployment
type ComputeFlinkDeploymentSpec struct {
// APIServerRef is the reference to the StreamNativeCloudConnection
// +required
APIServerRef corev1.LocalObjectReference `json:"apiServerRef"`
// APIServerRef is the reference to the StreamNativeCloudConnection.
// If not specified, the APIServerRef from the referenced ComputeWorkspace will be used.
// +optional
APIServerRef corev1.LocalObjectReference `json:"apiServerRef,omitempty"`

// WorkspaceName is the reference to the workspace, and is required
// +kubebuilder:validation:Required
Expand Down
55 changes: 55 additions & 0 deletions config/samples/resource_v1alpha1_computeflinkdeployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,58 @@ spec:
log4jLoggers:
"": DEBUG
com.company: DEBUG

---
# Example 2: Using Workspace's APIServerRef
apiVersion: resource.streamnative.io/v1alpha1
kind: ComputeFlinkDeployment
metadata:
name: operator-test-v2
namespace: default
spec:
workspaceName: test-operator-workspace # Will use APIServerRef from this workspace
template:
syncingMode: PATCH
deployment:
userMetadata:
name: operator-test-v2
namespace: default
displayName: operator-test-v2
spec:
state: RUNNING
deploymentTargetName: default
maxJobCreationAttempts: 99
template:
metadata:
annotations:
flink.queryable-state.enabled: 'false'
flink.security.ssl.enabled: 'false'
spec:
artifact:
jarUri: function://public/default/[email protected]
mainArgs: --runner=FlinkRunner --attachedMode=false --checkpointingInterval=60000
entryClass: org.apache.beam.examples.WordCount
kind: JAR
flinkVersion: "1.18.1"
flinkImageTag: "1.18.1-stream3-scala_2.12-java17"
flinkConfiguration:
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 1min
execution.checkpointing.timeout: 10min
high-availability.type: kubernetes
state.backend: filesystem
taskmanager.memory.managed.fraction: '0.2'
parallelism: 1
numberOfTaskManagers: 1
resources:
jobmanager:
cpu: "1"
memory: 2G
taskmanager:
cpu: "1"
memory: 2G
logging:
loggingProfile: default
log4jLoggers:
"": DEBUG
com.company: DEBUG
19 changes: 18 additions & 1 deletion controllers/flinkdeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type FlinkDeploymentReconciler struct {
//+kubebuilder:rbac:groups=resource.streamnative.io,resources=computeflinkdeployments/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=resource.streamnative.io,resources=computeflinkdeployments/finalizers,verbs=update
//+kubebuilder:rbac:groups=resource.streamnative.io,resources=streamnativecloudconnections,verbs=get;list;watch
//+kubebuilder:rbac:groups=resource.streamnative.io,resources=computeworkspaces,verbs=get;list;watch

// handleWatchEvents processes events from the watch interface
func (r *FlinkDeploymentReconciler) handleWatchEvents(ctx context.Context, namespacedName types.NamespacedName, watcher watch.Interface) {
Expand Down Expand Up @@ -167,11 +168,27 @@ func (r *FlinkDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, err
}

// Get APIServerRef from ComputeWorkspace if not specified in FlinkDeployment
apiServerRef := deployment.Spec.APIServerRef
if apiServerRef.Name == "" {
// Get the ComputeWorkspace
workspace := &resourcev1alpha1.ComputeWorkspace{}
if err := r.Get(ctx, types.NamespacedName{
Namespace: req.Namespace,
Name: deployment.Spec.WorkspaceName,
}, workspace); err != nil {
r.updateDeploymentStatus(ctx, deployment, err, "GetWorkspaceFailed",
fmt.Sprintf("Failed to get ComputeWorkspace: %v", err))
return ctrl.Result{}, err
}
apiServerRef = workspace.Spec.APIServerRef
}

// Get the APIServerConnection
apiConn := &resourcev1alpha1.StreamNativeCloudConnection{}
if err := r.Get(ctx, types.NamespacedName{
Namespace: req.Namespace,
Name: deployment.Spec.APIServerRef.Name,
Name: apiServerRef.Name,
}, apiConn); err != nil {
r.updateDeploymentStatus(ctx, deployment, err, "GetAPIServerConnectionFailed",
fmt.Sprintf("Failed to get APIServerConnection: %v", err))
Expand Down
74 changes: 72 additions & 2 deletions docs/compute_flink_deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The `ComputeFlinkDeployment` resource defines a Flink deployment in StreamNative

| Field | Description | Required |
|----------------------|--------------------------------------------------------------------------------------------|----------|
| `apiServerRef` | Reference to the StreamNativeCloudConnection resource for API server access | Yes |
| `apiServerRef` | Reference to the StreamNativeCloudConnection resource for API server access. If not specified, the APIServerRef from the referenced ComputeWorkspace will be used. | No |
| `workspaceName` | Name of the ComputeWorkspace where the Flink deployment will run | Yes |
| `labels` | Labels to add to the deployment | No |
| `annotations` | Annotations to add to the deployment | No |
Expand All @@ -18,6 +18,19 @@ The `ComputeFlinkDeployment` resource defines a Flink deployment in StreamNative

*Note: Either `template` or `communityTemplate` must be specified, but not both.

## APIServerRef Inheritance

The `ComputeFlinkDeployment` resource can inherit the `APIServerRef` from its referenced `ComputeWorkspace`. This simplifies configuration and reduces duplication. Here's how it works:

1. If `apiServerRef` is specified in the `ComputeFlinkDeployment`, that value will be used.
2. If `apiServerRef` is not specified, the operator will use the `APIServerRef` from the referenced `ComputeWorkspace`.
3. The `workspaceName` field is required and must reference a valid `ComputeWorkspace` in the same namespace.

This inheritance mechanism allows you to:
- Reduce configuration duplication
- Centralize API server configuration at the workspace level
- Easily change API server configuration for multiple deployments by updating the workspace

### VVP Deployment Template

| Field | Description | Required |
Expand Down Expand Up @@ -97,7 +110,7 @@ The `ComputeFlinkDeployment` resource defines a Flink deployment in StreamNative

## Example

1. Create a ComputeFlinkDeployment with VVP template:
1. Create a ComputeFlinkDeployment with explicit APIServerRef:

```yaml
apiVersion: resource.streamnative.io/v1alpha1
Expand Down Expand Up @@ -175,6 +188,63 @@ NAME READY AGE
operator-test-v1 True 1m
```

2. Create a ComputeFlinkDeployment using Workspace's APIServerRef:

```yaml
apiVersion: resource.streamnative.io/v1alpha1
kind: ComputeFlinkDeployment
metadata:
name: operator-test-v2
namespace: default
spec:
workspaceName: test-operator-workspace # Will use APIServerRef from this workspace
template:
syncingMode: PATCH
deployment:
userMetadata:
name: operator-test-v2
namespace: default
displayName: operator-test-v2
spec:
state: RUNNING
deploymentTargetName: default
maxJobCreationAttempts: 99
template:
metadata:
annotations:
flink.queryable-state.enabled: 'false'
flink.security.ssl.enabled: 'false'
spec:
artifact:
jarUri: function://public/default/[email protected]
mainArgs: --runner=FlinkRunner --attachedMode=false --checkpointingInterval=60000
entryClass: org.apache.beam.examples.WordCount
kind: JAR
flinkVersion: "1.18.1"
flinkImageTag: "1.18.1-stream3-scala_2.12-java17"
flinkConfiguration:
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 1min
execution.checkpointing.timeout: 10min
high-availability.type: kubernetes
state.backend: filesystem
taskmanager.memory.managed.fraction: '0.2'
parallelism: 1
numberOfTaskManagers: 1
resources:
jobmanager:
cpu: "1"
memory: 2G
taskmanager:
cpu: "1"
memory: 2G
logging:
loggingProfile: default
log4jLoggers:
"": DEBUG
com.company: DEBUG
```

## Update Deployment

You can update the deployment by modifying the YAML file and reapplying it. Most fields can be updated, including:
Expand Down