Skip to content
This repository was archived by the owner on May 8, 2025. It is now read-only.

Commit 6b59348

Browse files
authored
support service account name (#334)
1 parent 045cc66 commit 6b59348

File tree

5 files changed

+46
-19
lines changed

5 files changed

+46
-19
lines changed

api/v1beta1/flinkcluster_types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,9 @@ type FlinkClusterSpec struct {
411411
// Flink image spec for the cluster's components.
412412
Image ImageSpec `json:"image"`
413413

414+
// The service account assigned to JobManager, TaskManager and Job submitter Pods. If empty, the default service account in the namespace will be used.
415+
ServiceAccountName *string `json:"serviceAccountName,omitempty"`
416+
414417
// BatchSchedulerName specifies the batch scheduler name for JobManager, TaskManager.
415418
// If empty, no batch scheduling is enabled.
416419
BatchSchedulerName *string `json:"batchSchedulerName,omitempty"`

config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ spec:
3030
properties:
3131
batchSchedulerName:
3232
type: string
33+
serviceAccountName:
34+
type: string
3335
envFrom:
3436
items:
3537
properties:

controllers/flinkcluster_converter.go

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func getDesiredJobManagerDeployment(
8787
var clusterName = flinkCluster.ObjectMeta.Name
8888
var clusterSpec = flinkCluster.Spec
8989
var imageSpec = clusterSpec.Image
90+
var serviceAccount = clusterSpec.ServiceAccountName
9091
var jobManagerSpec = clusterSpec.JobManager
9192
var rpcPort = corev1.ContainerPort{Name: "rpc", ContainerPort: *jobManagerSpec.Ports.RPC}
9293
var blobPort = corev1.ContainerPort{Name: "blob", ContainerPort: *jobManagerSpec.Ports.Blob}
@@ -196,13 +197,14 @@ func getDesiredJobManagerDeployment(
196197
containers = append(containers, jobManagerSpec.Sidecars...)
197198

198199
var podSpec = corev1.PodSpec{
199-
InitContainers: convertJobManagerInitContainers(&jobManagerSpec),
200-
Containers: containers,
201-
Volumes: volumes,
202-
NodeSelector: jobManagerSpec.NodeSelector,
203-
Tolerations: jobManagerSpec.Tolerations,
204-
ImagePullSecrets: imageSpec.PullSecrets,
205-
SecurityContext: securityContext,
200+
InitContainers: convertJobManagerInitContainers(&jobManagerSpec),
201+
Containers: containers,
202+
Volumes: volumes,
203+
NodeSelector: jobManagerSpec.NodeSelector,
204+
Tolerations: jobManagerSpec.Tolerations,
205+
ImagePullSecrets: imageSpec.PullSecrets,
206+
SecurityContext: securityContext,
207+
ServiceAccountName: getServiceAccountName(serviceAccount),
206208
}
207209
var jobManagerDeployment = &appsv1.Deployment{
208210
ObjectMeta: metav1.ObjectMeta{
@@ -381,6 +383,7 @@ func getDesiredTaskManagerDeployment(
381383
var clusterName = flinkCluster.ObjectMeta.Name
382384
var clusterSpec = flinkCluster.Spec
383385
var imageSpec = flinkCluster.Spec.Image
386+
var serviceAccount = clusterSpec.ServiceAccountName
384387
var taskManagerSpec = flinkCluster.Spec.TaskManager
385388
var dataPort = corev1.ContainerPort{Name: "data", ContainerPort: *taskManagerSpec.Ports.Data}
386389
var rpcPort = corev1.ContainerPort{Name: "rpc", ContainerPort: *taskManagerSpec.Ports.RPC}
@@ -490,13 +493,14 @@ func getDesiredTaskManagerDeployment(
490493
}}
491494
containers = append(containers, taskManagerSpec.Sidecars...)
492495
var podSpec = corev1.PodSpec{
493-
InitContainers: convertTaskManagerInitContainers(&taskManagerSpec),
494-
Containers: containers,
495-
Volumes: volumes,
496-
NodeSelector: taskManagerSpec.NodeSelector,
497-
Tolerations: taskManagerSpec.Tolerations,
498-
ImagePullSecrets: imageSpec.PullSecrets,
499-
SecurityContext: securityContext,
496+
InitContainers: convertTaskManagerInitContainers(&taskManagerSpec),
497+
Containers: containers,
498+
Volumes: volumes,
499+
NodeSelector: taskManagerSpec.NodeSelector,
500+
Tolerations: taskManagerSpec.Tolerations,
501+
ImagePullSecrets: imageSpec.PullSecrets,
502+
SecurityContext: securityContext,
503+
ServiceAccountName: getServiceAccountName(serviceAccount),
500504
}
501505
var taskManagerDeployment = &appsv1.Deployment{
502506
ObjectMeta: metav1.ObjectMeta{
@@ -598,6 +602,7 @@ func getDesiredJob(
598602

599603
var clusterSpec = flinkCluster.Spec
600604
var imageSpec = clusterSpec.Image
605+
var serviceAccount = clusterSpec.ServiceAccountName
601606
var jobManagerSpec = clusterSpec.JobManager
602607
var clusterNamespace = flinkCluster.ObjectMeta.Namespace
603608
var clusterName = flinkCluster.ObjectMeta.Name
@@ -704,10 +709,11 @@ func getDesiredJob(
704709
Resources: jobSpec.Resources,
705710
},
706711
},
707-
RestartPolicy: corev1.RestartPolicyNever,
708-
Volumes: volumes,
709-
ImagePullSecrets: imageSpec.PullSecrets,
710-
SecurityContext: securityContext,
712+
RestartPolicy: corev1.RestartPolicyNever,
713+
Volumes: volumes,
714+
ImagePullSecrets: imageSpec.PullSecrets,
715+
SecurityContext: securityContext,
716+
ServiceAccountName: getServiceAccountName(serviceAccount),
711717
}
712718

713719
// Disable the retry mechanism of k8s Job, all retires should be initiated
@@ -1035,6 +1041,14 @@ func getClusterLabels(cluster v1beta1.FlinkCluster) map[string]string {
10351041
}
10361042
}
10371043

1044+
func getServiceAccountName(serviceAccount *string) string {
1045+
if serviceAccount != nil {
1046+
return *serviceAccount
1047+
}
1048+
1049+
return ""
1050+
}
1051+
10381052
func getComponentLabels(cluster v1beta1.FlinkCluster, component string) map[string]string {
10391053
return mergeLabels(getClusterLabels(cluster), map[string]string{
10401054
"component": component,

controllers/flinkcluster_converter_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func TestGetDesiredClusterState(t *testing.T) {
5050
var tolerationSeconds int64 = 30
5151
var restartPolicy = v1beta1.JobRestartPolicyFromSavepointOnFailure
5252
var className = "org.apache.flink.examples.java.wordcount.WordCount"
53+
var serviceAccount = "default"
5354
var hostFormat = "{{$clusterName}}.example.com"
5455
var memoryOffHeapRatio int32 = 25
5556
var memoryOffHeapMin = resource.MustParse("600M")
@@ -131,7 +132,8 @@ func TestGetDesiredClusterState(t *testing.T) {
131132
Namespace: "default",
132133
},
133134
Spec: v1beta1.FlinkClusterSpec{
134-
Image: v1beta1.ImageSpec{Name: "flink:1.8.1"},
135+
Image: v1beta1.ImageSpec{Name: "flink:1.8.1"},
136+
ServiceAccountName: &serviceAccount,
135137
Job: &v1beta1.JobSpec{
136138
Args: []string{"--input", "./README.txt"},
137139
ClassName: &className,
@@ -407,6 +409,7 @@ func TestGetDesiredClusterState(t *testing.T) {
407409
RunAsUser: &userAndGroupId,
408410
RunAsGroup: &userAndGroupId,
409411
},
412+
ServiceAccountName: serviceAccount,
410413
Volumes: []corev1.Volume{
411414
{
412415
Name: "flink-config-volume",
@@ -709,6 +712,7 @@ func TestGetDesiredClusterState(t *testing.T) {
709712
RunAsUser: &userAndGroupId,
710713
RunAsGroup: &userAndGroupId,
711714
},
715+
ServiceAccountName: serviceAccount,
712716
},
713717
},
714718
},
@@ -871,6 +875,7 @@ func TestGetDesiredClusterState(t *testing.T) {
871875
RunAsUser: &userAndGroupId,
872876
RunAsGroup: &userAndGroupId,
873877
},
878+
ServiceAccountName: serviceAccount,
874879
},
875880
},
876881
BackoffLimit: &jobBackoffLimit,

docs/crd.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ FlinkCluster
1616
|__ pullPolicy
1717
|__ pullSecrets
1818
|__ batchSchedulerName
19+
|__ serviceAccountName
1920
|__ jobManager
2021
|__ accessScope
2122
|__ ports
@@ -151,6 +152,8 @@ FlinkCluster
151152
* **pullSecrets** (optional): Secrets for image pull.
152153
* **batchSchedulerName** (optional): BatchSchedulerName specifies the batch scheduler name for JobManager, TaskManager.
153154
If empty, no batch scheduling is enabled.
155+
* **serviceAccountName** (optional): the name of the service account(which must already exist in the namespace).
156+
If empty, the default service account in the namespace will be used.
154157
* **jobManager** (required): JobManager spec.
155158
* **accessScope** (optional): Access scope of the JobManager service. `enum("Cluster", "VPC", "External",
156159
"NodePort", "Headless")`. `Cluster`: accessible from within the same cluster; `VPC`: accessible from within the same VPC;

0 commit comments

Comments
 (0)