@@ -420,6 +420,25 @@ func getReplicaSpace(flow *client.Flow, gc interfaces.GraphContext) string {
420420 return copier .EvaluateString (name , getArgFunc (gc ))
421421}
422422
423+ func (sched * scheduler ) getAllReplicas (flow * client.Flow , gc interfaces.GraphContext ) ([]client.Replica , error ) {
424+ replicaSpace := getReplicaSpace (flow , gc )
425+ label := labels.Set {"replicaspace" : replicaSpace }
426+ options := gc .Graph ().Options ()
427+ if options .FlowInstanceName != "" {
428+ label ["context" ] = options .FlowInstanceName
429+ }
430+ existingReplicas , err := sched .client .Replicas ().List (api.ListOptions {
431+ LabelSelector : labels .SelectorFromSet (label ),
432+ })
433+ if err != nil {
434+ return nil , err
435+ }
436+
437+ sortableReplicas := sortableReplicaList (existingReplicas .Items )
438+ sort .Stable (sortableReplicas )
439+ return sortableReplicas , nil
440+ }
441+
423442// allocateReplicas allocates Replica objects for either creation or deletion.
424443// it returns list of replicas to be constructed, list of replicas to be destructed and an error.
425444// new Replica objects are created in this function as a side effect, but not deleted since this can happen only after
@@ -437,10 +456,16 @@ func (sched *scheduler) allocateReplicas(flow *client.Flow, gc interfaces.GraphC
437456 if err != nil {
438457 return nil , nil , err
439458 }
459+ initialCount := 0
460+ for _ , replica := range existingReplicas .Items {
461+ if replica .Deployed {
462+ initialCount ++
463+ }
464+ }
440465
441466 targetCount := options .ReplicaCount // absolute number of replicas that we want to have
442467 if ! options .FixedNumberOfReplicas {
443- targetCount += len ( existingReplicas . Items )
468+ targetCount += initialCount
444469 }
445470 if targetCount < options .MinReplicaCount {
446471 targetCount = options .MinReplicaCount
@@ -455,7 +480,6 @@ func (sched *scheduler) allocateReplicas(flow *client.Flow, gc interfaces.GraphC
455480 return nil , nil , err
456481 }
457482
458- initialCount := len (existingReplicas .Items )
459483 if targetCount < initialCount {
460484 return nil , adjustedReplicas [targetCount :], nil
461485 }
@@ -475,34 +499,54 @@ func (sched *scheduler) createReplicas(
475499 maxCurrentTime = item .CreationTimestamp .Time
476500 }
477501 }
502+ replicaList := make (sortableReplicaList , 0 , len (existingReplicas ))
503+ unusedReplicas := make (sortableReplicaList , 0 , len (existingReplicas ))
478504
479- sortableReplicaList := sortableReplicaList (existingReplicas )
480- for len (sortableReplicaList ) < desiredCount {
481- replica := & client.Replica {
482- ObjectMeta : api.ObjectMeta {
483- GenerateName : "replica-" ,
484- Labels : label ,
485- Namespace : sched .client .Namespace (),
486- },
487- FlowName : flowName ,
488- ReplicaSpace : replicaSpace ,
489- }
490- replica , err := sched .client .Replicas ().Create (replica )
491- if err != nil {
492- return nil , err
505+ for _ , replica := range existingReplicas {
506+ if replica .Deployed {
507+ replicaList = append (replicaList , replica )
508+ } else {
509+ unusedReplicas = append (unusedReplicas , replica )
493510 }
494- if ! replica .CreationTimestamp .After (maxCurrentTime ) {
495- // ensure that new elements in the list have timestamp that exceeds all the timestamps of existing items
496- // this guarantees that after the sort all new elements will still go after old ones in the list
497- time .Sleep (time .Second )
498- sched .client .Replicas ().Delete (replica .Name )
499- continue
511+ }
512+ sort .Stable (& unusedReplicas )
513+
514+ for len (replicaList ) < desiredCount {
515+ var replica * client.Replica
516+ if len (unusedReplicas ) > 0 {
517+ replica = & unusedReplicas [0 ]
518+ unusedReplicas = unusedReplicas [1 :]
519+ } else {
520+ replica = & client.Replica {
521+ ObjectMeta : api.ObjectMeta {
522+ GenerateName : "replica-" ,
523+ Labels : label ,
524+ Namespace : sched .client .Namespace (),
525+ },
526+ FlowName : flowName ,
527+ ReplicaSpace : replicaSpace ,
528+ }
529+ var err error
530+ replica , err = sched .client .Replicas ().Create (replica )
531+ if err != nil {
532+ return nil , err
533+ }
534+ if ! replica .CreationTimestamp .After (maxCurrentTime ) {
535+ // ensure that new elements in the list have timestamp that exceeds all the timestamps of existing items
536+ // this guarantees that after the sort all new elements will still go after old ones in the list
537+ time .Sleep (time .Second )
538+ sched .client .Replicas ().Delete (replica .Name )
539+ continue
540+ }
500541 }
501- sortableReplicaList = append (sortableReplicaList , * replica )
542+ replicaList = append (replicaList , * replica )
543+ }
544+ for _ , replica := range unusedReplicas {
545+ sched .client .Replicas ().Delete (replica .Name )
502546 }
503547
504- sort .Stable (& sortableReplicaList )
505- return sortableReplicaList , nil
548+ sort .Stable (& replicaList )
549+ return replicaList , nil
506550}
507551
508552// BuildDependencyGraph loads dependencies data and creates the DependencyGraph
@@ -559,7 +603,12 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO
559603 depGraph := newDependencyGraph (sched , options )
560604 rootContext := & graphContext {scheduler : sched , graph : depGraph , flow : flow , args : options .Args }
561605
562- replicas , deleteReplicas , err := sched .allocateReplicas (flow , rootContext )
606+ var replicas , deleteReplicas []client.Replica
607+ if options .ReplicaCount < 0 && options .FixedNumberOfReplicas {
608+ replicas , err = sched .getAllReplicas (flow , rootContext )
609+ } else {
610+ replicas , deleteReplicas , err = sched .allocateReplicas (flow , rootContext )
611+ }
563612 if err != nil {
564613 return nil , err
565614 }
@@ -580,12 +629,16 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO
580629 }
581630
582631 if len (deleteReplicas ) > 0 {
583- dryRunOptions := getDryRunDependencyGraphOptions (options )
632+ silentOptions := options
633+ silentOptions .Silent = true
584634
585635 // since we are using dry-run options, allocateReplicas will only return existing replicas
586- allReplicasGraph := newDependencyGraph (sched , dryRunOptions )
587- context := & graphContext {scheduler : sched , graph : allReplicasGraph , flow : flow , args : dryRunOptions .Args }
588- allReplicas , _ , err := sched .allocateReplicas (flow , context )
636+ allReplicasGraph := newDependencyGraph (sched , silentOptions )
637+ context := & graphContext {scheduler : sched , graph : allReplicasGraph , flow : flow , args : silentOptions .Args }
638+ allReplicas , err := sched .getAllReplicas (flow , context )
639+ if err != nil {
640+ return nil , err
641+ }
589642
590643 // create dependency graph that has all the replicas (both those that we are about to delete and those that
591644 // remain to see what resources belong exclusively to deleted replicas and what resources are shared with
@@ -597,22 +650,14 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO
597650
598651 // compose finalizer method that will delete all the resources belonging to deleted replicas and those
599652 // that were created specially for destruction (for example, jobs with cleanup scripts)
600- depGraph .finalizer = sched .composeFinalizer (allReplicasGraph , depGraph , deleteReplicas )
653+ depGraph .finalizer = sched .composeDeletingFinalizer (allReplicasGraph , depGraph , deleteReplicas )
654+ } else {
655+ depGraph .finalizer = sched .composeAcknowledgingFinalizer (replicas )
601656 }
602657
603658 return depGraph , nil
604659}
605660
606- // getDryRunDependencyGraphOptions returns dependency graph options that will not produce side effects
607- // (i.e. no new or deleted replicas and no logging)
608- func getDryRunDependencyGraphOptions (options interfaces.DependencyGraphOptions ) interfaces.DependencyGraphOptions {
609- options .ReplicaCount = 0
610- options .MinReplicaCount = 0
611- options .FixedNumberOfReplicas = false
612- options .Silent = true
613- return options
614- }
615-
616661func (sched * scheduler ) fillDependencyGraph (rootContext * graphContext ,
617662 resDefs map [string ]client.ResourceDefinition ,
618663 dependencies map [string ][]client.Dependency ,
@@ -775,7 +820,7 @@ readFailed:
775820 }
776821}
777822
778- func (sched * scheduler ) composeFinalizer (construction , destruction * dependencyGraph , replicas []client.Replica ) func () {
823+ func (sched * scheduler ) composeDeletingFinalizer (construction , destruction * dependencyGraph , replicas []client.Replica ) func () {
779824 replicaMap := map [string ]client.Replica {}
780825 for _ , replica := range replicas {
781826 replicaMap [replica .ReplicaName ()] = replica
@@ -785,6 +830,34 @@ func (sched *scheduler) composeFinalizer(construction, destruction *dependencyGr
785830 destructors := getResourceDestructors (construction , destruction , replicaMap , & failed )
786831
787832 return func () {
833+ log .Print ("Performing resource cleanup" )
788834 deleteReplicaResources (sched , destructors , replicaMap , & failed )
789835 }
790836}
837+
838+ func makeAcknowledgeReplicaFunc (replica client.Replica , api client.ReplicasInterface ) func () bool {
839+ return func () bool {
840+ replica .Deployed = true
841+ log .Printf ("%s flow: Marking replica %s as deployed" , replica .FlowName , replica .ReplicaName ())
842+ if err := api .Update (& replica ); err != nil {
843+ log .Printf ("failed to update replica %s: %v" , replica .Name , err )
844+ return false
845+ }
846+ return true
847+ }
848+ }
849+
850+ func (sched * scheduler ) composeAcknowledgingFinalizer (replicas []client.Replica ) func () {
851+ var funcs []func () bool
852+ for _ , replica := range replicas {
853+ if ! replica .Deployed {
854+ funcs = append (funcs , makeAcknowledgeReplicaFunc (replica , sched .client .Replicas ()))
855+ }
856+ }
857+
858+ return func () {
859+ if ! runConcurrently (funcs , sched .concurrency ) {
860+ log .Println ("Some of the replicas were not updated!" )
861+ }
862+ }
863+ }
0 commit comments