Skip to content
This repository was archived by the owner on May 8, 2025. It is now read-only.

Commit 5cfd66a

Browse files
authored
Add podLabels to JM, TM, Job (#321)
1 parent a6e3c54 commit 5cfd66a

File tree

4 files changed

+31
-0
lines changed

4 files changed

+31
-0
lines changed

api/v1beta1/flinkcluster_types.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,9 @@ type JobManagerSpec struct {
221221

222222
// JobManager Deployment pod template annotations.
223223
PodAnnotations map[string]string `json:"podAnnotations,omitempty"`
224+
225+
// JobManager Deployment pod template labels.
226+
PodLabels map[string]string `json:"podLabels,omitempty"`
224227
}
225228

226229
// TaskManagerPorts defines ports of TaskManager.
@@ -288,6 +291,9 @@ type TaskManagerSpec struct {
288291

289292
// TaskManager Deployment pod template annotations.
290293
PodAnnotations map[string]string `json:"podAnnotations,omitempty"`
294+
295+
// TaskManager Deployment pod template labels.
296+
PodLabels map[string]string `json:"podLabels,omitempty"`
291297
}
292298

293299
// CleanupAction defines the action to take after job finishes.
@@ -382,6 +388,9 @@ type JobSpec struct {
382388
// Job pod template annotations.
383389
PodAnnotations map[string]string `json:"podAnnotations,omitempty"`
384390

391+
// Job pod template labels.
392+
PodLabels map[string]string `json:"podLabels,omitempty"`
393+
385394
// Compute resources required by each Job container.
386395
// If omitted, a default value will be used.
387396
// Cannot be updated.

config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,10 @@ spec:
723723
additionalProperties:
724724
type: string
725725
type: object
726+
podLabels:
727+
additionalProperties:
728+
type: string
729+
type: object
726730
resources:
727731
properties:
728732
limits:
@@ -1956,6 +1960,10 @@ spec:
19561960
additionalProperties:
19571961
type: string
19581962
type: object
1963+
podLabels:
1964+
additionalProperties:
1965+
type: string
1966+
type: object
19591967
ports:
19601968
properties:
19611969
blob:
@@ -3743,6 +3751,10 @@ spec:
37433751
additionalProperties:
37443752
type: string
37453753
type: object
3754+
podLabels:
3755+
additionalProperties:
3756+
type: string
3757+
type: object
37463758
ports:
37473759
properties:
37483760
data:

controllers/flinkcluster_converter.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ func getDesiredJobManagerDeployment(
9898
}
9999
var jobManagerDeploymentName = getJobManagerDeploymentName(clusterName)
100100
var podLabels = getComponentLabels(*flinkCluster, "jobmanager")
101+
podLabels = mergeLabels(podLabels, jobManagerSpec.PodLabels)
101102
var deploymentLabels = mergeLabels(podLabels, getRevisionHashLabels(flinkCluster.Status))
102103
// Make Volume, VolumeMount to use configMap data for flink-conf.yaml, if flinkProperties is provided.
103104
var volumes []corev1.Volume
@@ -253,6 +254,7 @@ func getDesiredJobManagerService(
253254
TargetPort: intstr.FromString("ui")}
254255
var jobManagerServiceName = getJobManagerServiceName(clusterName)
255256
var podLabels = getComponentLabels(*flinkCluster, "jobmanager")
257+
podLabels = mergeLabels(podLabels, jobManagerSpec.PodLabels)
256258
var serviceLabels = mergeLabels(podLabels, getRevisionHashLabels(flinkCluster.Status))
257259
var jobManagerService = &corev1.Service{
258260
ObjectMeta: metav1.ObjectMeta{
@@ -388,6 +390,7 @@ func getDesiredTaskManagerDeployment(
388390
}
389391
var taskManagerDeploymentName = getTaskManagerDeploymentName(clusterName)
390392
var podLabels = getComponentLabels(*flinkCluster, "taskmanager")
393+
podLabels = mergeLabels(podLabels, taskManagerSpec.PodLabels)
391394
var deploymentLabels = mergeLabels(podLabels, getRevisionHashLabels(flinkCluster.Status))
392395
// Make Volume, VolumeMount to use configMap data for flink-conf.yaml
393396
var volumes []corev1.Volume
@@ -598,6 +601,7 @@ func getDesiredJob(
598601
var jobManagerAddress = fmt.Sprintf(
599602
"%s:%d", jobManagerServiceName, *jobManagerSpec.Ports.UI)
600603
var podLabels = getClusterLabels(*flinkCluster)
604+
podLabels = mergeLabels(podLabels, jobManagerSpec.PodLabels)
601605
var jobLabels = mergeLabels(podLabels, getRevisionHashLabels(flinkCluster.Status))
602606
var jobArgs = []string{"bash", "/opt/flink-operator/submit-job.sh"}
603607
jobArgs = append(jobArgs, "--jobmanager", jobManagerAddress)

docs/crd.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ FlinkCluster
3939
|__ tolerations
4040
|__ sidecars
4141
|__ podAnnotations
42+
|__ podLabels
4243
|__ taskManager
4344
|__ replicas
4445
|__ ports
@@ -56,6 +57,7 @@ FlinkCluster
5657
|__ tolerations
5758
|__ sidecars
5859
|__ podAnnotations
60+
|__ podLabels
5961
|__ job
6062
|__ jarFile
6163
|__ className
@@ -77,6 +79,7 @@ FlinkCluster
7779
|__ afterJobCancelled
7880
|__ cancelRequested
7981
|__ podAnnotations
82+
|__ podLabels
8083
|__ envVars
8184
|__ envFrom
8285
|__ flinkProperties
@@ -191,6 +194,7 @@ FlinkCluster
191194
See [more info](https://kubernetes.io/docs/concepts/containers/) about containers.
192195
* **podAnnotations** (optional): Pod template annotations for the JobManager deployment.
193196
See [more info](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) about annotations.
197+
* **podLabels** (optional): Pod template labels for the JobManager deployment.
194198
* **taskManager** (required): TaskManager spec.
195199
* **replicas** (required): The number of TaskManager replicas.
196200
* **ports** (optional): Ports that TaskManager listening on.
@@ -227,6 +231,7 @@ FlinkCluster
227231
See [more info](https://kubernetes.io/docs/concepts/containers/) about containers.
228232
* **podAnnotations** (optional): Pod template annotations for the TaskManager deployment.
229233
See [more info](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) about annotations.
234+
* **podLabels** (optional): Pod template labels for the TaskManager deployment.
230235
* **job** (optional): Job spec. If specified, the cluster is a Flink job cluster; otherwise, it is a Flink
231236
session cluster.
232237
* **jarFile** (required): JAR file of the job. It could be a local file or remote URI, depending on which
@@ -267,6 +272,7 @@ FlinkCluster
267272
`savePointsDir` is provided, a savepoint will be taken before stopping the job.
268273
* **podAnnotations** (optional): Pod template annotations for the job.
269274
See [more info](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) about annotations.
275+
* **podLabels** (optional): Pod template labels for the job.
270276
* **envVars** (optional): Environment variables shared by all JobManager, TaskManager and job containers.
271277
* **envFrom** (optional): Environment variables from ConfigMaps or Secrets shared by all JobManager, TaskManager and job containers.
272278
* **flinkProperties** (optional): Flink properties which are appened to flink-conf.yaml.

0 commit comments

Comments
 (0)