@@ -59,8 +59,10 @@ var flinkSysProps = map[string]struct{}{
5959
6060// Gets the desired state of a cluster.
6161func getDesiredClusterState (
62- cluster * v1beta1. FlinkCluster ,
62+ observed * ObservedClusterState ,
6363 now time.Time ) model.DesiredClusterState {
64+ var cluster = observed .cluster
65+
6466 // The cluster has been deleted, all resources should be cleaned up.
6567 if cluster == nil {
6668 return model.DesiredClusterState {}
@@ -71,7 +73,7 @@ func getDesiredClusterState(
7173 JmService : getDesiredJobManagerService (cluster ),
7274 JmIngress : getDesiredJobManagerIngress (cluster ),
7375 TmDeployment : getDesiredTaskManagerDeployment (cluster ),
74- Job : getDesiredJob (cluster ),
76+ Job : getDesiredJob (observed ),
7577 }
7678}
7779
@@ -584,18 +586,19 @@ func getDesiredConfigMap(
584586}
585587
586588// Gets the desired job spec from a cluster spec.
587- func getDesiredJob (
588- flinkCluster * v1beta1. FlinkCluster ) * batchv1. Job {
589+ func getDesiredJob (observed * ObservedClusterState ) * batchv1. Job {
590+ var flinkCluster = observed . cluster
589591 var jobSpec = flinkCluster .Spec .Job
592+ var jobStatus = flinkCluster .Status .Components .Job
590593
591594 if jobSpec == nil {
592595 return nil
593596 }
594597
595- if ! isUpdateTriggered ( flinkCluster . Status ) {
596- // We need to watch whether job is cancelled already if jobSpec.CancelRequested is deprecated
597- var jobStatus = flinkCluster . Status . Components . Job
598- if isJobCancelRequested (* flinkCluster ) || (jobStatus != nil && jobStatus . State == v1beta1 . JobStateCancelled ) {
598+ // Unless update has been triggered or the job needs to be restarted, keep the job to be stopped in that state.
599+ if ! ( isUpdateTriggered ( flinkCluster . Status ) || shouldRestartJob ( jobSpec .RestartPolicy , jobStatus )) {
600+ // Job cancel requested or stopped already
601+ if isJobCancelRequested (* flinkCluster ) || isJobStopped (jobStatus ) {
599602 return nil
600603 }
601604 }
@@ -619,8 +622,8 @@ func getDesiredJob(
619622 jobArgs = append (jobArgs , "--class" , * jobSpec .ClassName )
620623 }
621624
622- var fromSavepoint = convertFromSavepoint (jobSpec , & flinkCluster .Status )
623- if fromSavepoint != nil && * fromSavepoint != "" {
625+ var fromSavepoint = convertFromSavepoint (jobSpec , flinkCluster .Status . Components . Job )
626+ if fromSavepoint != nil {
624627 jobArgs = append (jobArgs , "--fromSavepoint" , * fromSavepoint )
625628 }
626629
@@ -640,7 +643,7 @@ func getDesiredJob(
640643
641644 var securityContext = jobSpec .SecurityContext
642645
643- var envVars = []corev1.EnvVar {}
646+ var envVars []corev1.EnvVar
644647
645648 // If the JAR file is remote, put the URI in the env variable
646649 // FLINK_JOB_JAR_URI and rewrite the JAR path to a local path. The entrypoint
@@ -654,6 +657,12 @@ func getDesiredJob(
654657 Value : jobSpec .JarFile ,
655658 })
656659 }
660+ envVars = append (envVars ,
661+ corev1.EnvVar {
662+ Name : "FLINK_JM_ADDR" ,
663+ Value : jobManagerAddress ,
664+ })
665+
657666 jobArgs = append (jobArgs , jarPath )
658667 jobArgs = append (jobArgs , jobSpec .Args ... )
659668
@@ -716,7 +725,7 @@ func getDesiredJob(
716725 ServiceAccountName : getServiceAccountName (serviceAccount ),
717726 }
718727
719- // Disable the retry mechanism of k8s Job, all retires should be initiated
728+ // Disable the retry mechanism of k8s Job, all retries should be initiated
720729 // by the operator based on the job restart policy. This is because Flink
721730 // jobs are stateful, if a job fails after running for 10 hours, we probably
722731 // don't want to start over from the beginning, instead we want to resume
@@ -752,28 +761,28 @@ func getDesiredJob(
752761// When FlinkCluster is created or updated, if spec.job.fromSavepoint is specified, Flink job will be restored from it.
753762//
754763// case 2) Restore Flink job from the latest savepoint.
755- // When FlinkCluster is updated not specifying spec.job.fromSavepoint, or job is restarted from the failed state,
756- // Flink job will be restored from the latest savepoint created by the operator or the savepoint from which current job was restored.
757- func convertFromSavepoint (jobSpec * v1beta1.JobSpec , clusterStatus * v1beta1.FlinkClusterStatus ) * string {
758- var jobStatus = clusterStatus .Components .Job
764+ // When FlinkCluster is updated with no spec.job.fromSavepoint, or job is restarted from the failed state,
765+ // Flink job will be restored from the latest savepoint created by the operator.
766+ //
767+ // case 3) When latest created savepoint is unavailable, use the savepoint from which current job was restored.
768+ func convertFromSavepoint (jobSpec * v1beta1.JobSpec , jobStatus * v1beta1.JobStatus ) * string {
759769 switch {
760- case shouldRestartJob (jobSpec .RestartPolicy , jobStatus ):
761- return & jobStatus .SavepointLocation
762- case isUpdateTriggered (* clusterStatus ) && (jobSpec .FromSavepoint == nil || * jobSpec .FromSavepoint == "" ):
763- if jobStatus == nil {
764- return nil
765- }
766- // Latest savepoint created by Flink operator
767- if jobStatus .SavepointLocation != "" {
768- return & jobStatus .SavepointLocation
769- }
770- // The savepoint from which current running job was restored
771- if jobStatus .FromSavepoint != "" {
772- return & jobStatus .FromSavepoint
770+ // Creating for the first time
771+ case jobStatus == nil :
772+ if jobSpec .FromSavepoint != nil && * jobSpec .FromSavepoint != "" {
773+ return jobSpec .FromSavepoint
773774 }
774- return nil
775+ // Updating with FromSavepoint provided
776+ case jobStatus .State == v1beta1 .JobStateUpdating && jobSpec .FromSavepoint != nil && * jobSpec .FromSavepoint != "" :
777+ return jobSpec .FromSavepoint
778+ // Latest savepoint
779+ case jobStatus .SavepointLocation != "" :
780+ return & jobStatus .SavepointLocation
781+ // The savepoint from which current job was restored
782+ case jobStatus .FromSavepoint != "" :
783+ return & jobStatus .FromSavepoint
775784 }
776- return jobSpec . FromSavepoint
785+ return nil
777786}
778787
779788// Copy any non-duplicate volume mounts to the specified initContainers
0 commit comments