@@ -169,6 +169,9 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
169169 cache : clusterstatecache .New (config ),
170170 schedulingAW : nil ,
171171 }
172+ //metrics adapter is implemented through dynamic client which looks at all the
173+ //resources installed in the cluster to construct cache. May be this is need in
174+ //multi-cluster mode, so for now it is turned-off
172175 //cc.metricsAdapter = adapter.New(serverOption, config, cc.cache)
173176
174177 cc .genericresources = genericresource .NewAppWrapperGenericResource (config )
@@ -249,7 +252,7 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
249252 if jobClusterAgent != nil {
250253 cc .agentMap ["/root/kubernetes/" + agentData [0 ]] = jobClusterAgent
251254 cc .agentList = append (cc .agentList , "/root/kubernetes/" + agentData [0 ])
252- }
255+ }
253256 }
254257
255258 if cc .isDispatcher && len (cc .agentMap ) == 0 {
@@ -263,6 +266,8 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
263266 return cc
264267}
265268
269+ //TODO: We can use informer to filter AWs that do not meet the minScheduling spec.
270+ //we still need a thread for dispatch duration but minScheduling spec can definetly be moved to an informer
266271func (qjm * XController ) PreemptQueueJobs () {
267272 ctx := context .Background ()
268273
@@ -919,23 +924,15 @@ func (qjm *XController) nodeChecks(histograms map[string]*dto.Metric, aw *arbv1.
919924// Thread to find queue-job(QJ) for next schedule
920925func (qjm * XController ) ScheduleNext (qj * arbv1.AppWrapper ) {
921926 ctx := context .Background ()
922- // get next QJ from the queue
923- // check if we have enough compute resources for it
924- // if we have enough compute resources then we set the AllocatedReplicas to the total
925- // amount of resources asked by the job
926- // qj, err := qjm.qjqueue.Pop()
927- // if err != nil {
928- // klog.Errorf("[ScheduleNext] Cannot pop QueueJob from qjqueue! err=%#v", err)
929- // return // Try to pop qjqueue again
930- // }
931927 var err error = nil
928+ //TODO: do we really need locking now since we have a single thread processing an AW ?
932929 qjm .schedulingMutex .Lock ()
933930 qjm .schedulingAW = qj
934931 qjm .schedulingMutex .Unlock ()
935932 // ensure that current active appwrapper is reset at the end of this function, to prevent
936933 // the appwrapper from being added in syncjob
937934 defer qjm .schedulingAWAtomicSet (nil )
938-
935+ //TODO: Retry value is set to 1, do we really need retries?
939936 scheduleNextRetrier := retrier .New (retrier .ExponentialBackoff (1 , 100 * time .Millisecond ), & EtcdErrorClassifier {})
940937 scheduleNextRetrier .SetJitter (0.05 )
941938 // Retry the execution
@@ -1073,17 +1070,6 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
10731070 }
10741071 return retryErr
10751072 }
1076- //Remove stale copy
1077- // qjm.eventQueue.Delete(qj)
1078- // if err00 := qjm.eventQueue.Add(qj); err00 != nil { // unsuccessful add to eventQueue, add back to activeQ
1079- // klog.Errorf("[ScheduleNext] [Dispatcher Mode] Fail to add %s to eventQueue, activeQ.Add_toSchedulingQueue &qj=%p Version=%s Status=%+v err=%#v", qj.Name, qj, qj.ResourceVersion, qj.Status, err)
1080- // qjm.qjqueue.MoveToActiveQueueIfExists(qj)
1081- // } else { // successful add to eventQueue, remove from qjqueue
1082- // if qjm.qjqueue.IfExist(qj) {
1083- // klog.V(10).Infof("[ScheduleNext] [Dispatcher Mode] AppWrapper %s will be deleted from priority queue and sent to event queue", qj.Name)
1084- // }
1085- // qjm.qjqueue.Delete(qj)
1086- // }
10871073 klog .V (10 ).Infof ("[ScheduleNext] [Dispatcher Mode] %s, %s: ScheduleNextAfterEtcd" , qj .Name , time .Now ().Sub (qj .CreationTimestamp .Time ))
10881074 return nil
10891075 } else {
@@ -1195,10 +1181,6 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
11951181 } else { // Not enough free quota to dispatch appwrapper
11961182 dispatchFailedMessage = "Insufficient quota to dispatch AppWrapper."
11971183 dispatchFailedReason = "quota limit exceeded"
1198- // if len(msg) > 0 {
1199- // dispatchFailedReason += " "
1200- // dispatchFailedReason += msg
1201- // }
12021184 klog .Infof ("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v msg=%s" ,
12031185 qj .Namespace , qj .Name , time .Now ().Sub (HOLStartTime ), qjm .qjqueue .IfExistActiveQ (qj ), qjm .qjqueue .IfExistUnschedulableQ (qj ), qj , qj .ResourceVersion , qj .Status , msg )
12041186 //call update etcd here to retrigger AW execution for failed quota
@@ -1250,20 +1232,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
12501232 return retryErr
12511233 }
12521234 tempAW .DeepCopyInto (qj )
1253- // add to eventQueue for dispatching to Etcd
1254- // Remove stale copy
1255- // This is a pointer so removing copy will cause the execution to stop
1256- //qjm.eventQueue.Delete(qj)
1257- // if err00 := qjm.eventQueue.Add(qj); err00 != nil { // unsuccessful add to eventQueue, add back to activeQ
1258- // klog.Errorf("[ScheduleNext] [Agent Mode] Failed to add '%s/%s' to eventQueue, activeQ.Add_toSchedulingQueue &qj=%p Version=%s Status=%+v err=%#v", qj.Namespace,
1259- // qj.Name, qj, qj.ResourceVersion, qj.Status, err)
1260- // qjm.qjqueue.MoveToActiveQueueIfExists(qj)
1261- // } else { // successful add to eventQueue, remove from qjqueue
1262- // qjm.qjqueue.Delete(qj)
12631235 forwarded = true
1264- // klog.Infof("[ScheduleNext] [Agent Mode] Successfully dispatched app wrapper '%s/%s' activeQ=%t, Unsched=%t &aw=%p Version=%s Status=%+v",
1265- // qj.Namespace, qj.Name, qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status)
1266- // }
12671236 } // fits
12681237 } else { // Not enough free resources to dispatch HOL
12691238 dispatchFailedMessage = "Insufficient resources to dispatch AppWrapper."
@@ -1306,8 +1275,6 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
13061275 go qjm .backoff (ctx , qj , dispatchFailedReason , dispatchFailedMessage )
13071276 }
13081277 }
1309- //At this point we should delete item from event queue
1310- //qjm.eventQueue.Delete(qj)
13111278 return nil
13121279 })
13131280 if apierrors .IsNotFound (err ) {
@@ -1459,12 +1426,7 @@ func (cc *XController) Run(stopCh <-chan struct{}) {
14591426 // update snapshot of ClientStateCache every second
14601427 cc .cache .Run (stopCh )
14611428
1462- // go wait.Until(cc.ScheduleNext, 2*time.Second, stopCh)
1463- //go wait.Until(cc.ScheduleNext, 2*time.Second, stopCh)
1464- // start preempt thread based on preemption of pods
1465-
1466- // TODO - scheduleNext...Job....
1467- // start preempt thread based on preemption of pods
1429+ // start preempt thread is used to preempt AWs that have partial pods or have reached dispatch duration
14681430 go wait .Until (cc .PreemptQueueJobs , 60 * time .Second , stopCh )
14691431
14701432 // This thread is used to update AW that has completionstatus set to Complete or RunningHoldCompletion
@@ -1799,7 +1761,7 @@ func (cc *XController) worker() {
17991761 }
18001762
18011763 //asmalvan - starts
1802-
1764+ //TODO: Should this be part of ScheduleNext() method?
18031765 if queuejob .Status .State == arbv1 .AppWrapperStateCompleted {
18041766 return nil
18051767 }
@@ -1812,23 +1774,17 @@ func (cc *XController) worker() {
18121774 // If this the first time seeing this AW, no need to delete.
18131775 stateLen := len (queuejob .Status .State )
18141776 if stateLen > 0 {
1815- klog .V (2 ).Infof ("[manageQueueJob ] Deleting resources for AppWrapper Job '%s/%s' because it was preempted, status.CanRun=%t, status.State=%s" , queuejob .Namespace , queuejob .Name , queuejob .Status .CanRun , queuejob .Status .State )
1777+ klog .V (2 ).Infof ("[worker ] Deleting resources for AppWrapper Job '%s/%s' because it was preempted, status.CanRun=%t, status.State=%s" , queuejob .Namespace , queuejob .Name , queuejob .Status .CanRun , queuejob .Status .State )
18161778 err00 := cc .Cleanup (ctx , queuejob )
18171779 if err00 != nil {
1818- klog .Errorf ("[manageQueueJob ] Failed to delete resources for AppWrapper Job '%s/%s', err=%v" , queuejob .Namespace , queuejob .Name , err00 )
1780+ klog .Errorf ("[worker ] Failed to delete resources for AppWrapper Job '%s/%s', err=%v" , queuejob .Namespace , queuejob .Name , err00 )
18191781 return err00
18201782 }
1821- klog .V (2 ).Infof ("[manageQueueJob ] Delete resources for AppWrapper Job '%s/%s' due to preemption was sucessfull, status.CanRun=%t, status.State=%s" , queuejob .Namespace , queuejob .Name , queuejob .Status .CanRun , queuejob .Status .State )
1783+ klog .V (2 ).Infof ("[worker ] Delete resources for AppWrapper Job '%s/%s' due to preemption was sucessfull, status.CanRun=%t, status.State=%s" , queuejob .Namespace , queuejob .Name , queuejob .Status .CanRun , queuejob .Status .State )
18221784 }
18231785
18241786 queuejob .Status .State = arbv1 .AppWrapperStateEnqueued
1825- // add qj to qjqueue only when it is not in UnschedulableQ
1826- // if cc.qjqueue.IfExistUnschedulableQ(queuejob) {
1827- // klog.V(10).Infof("[manageQueueJob] leaving '%s/%s' to qjqueue.UnschedulableQ activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", queuejob.Namespace, queuejob.Name, cc.qjqueue.IfExistActiveQ(queuejob), cc.qjqueue.IfExistUnschedulableQ(queuejob), queuejob, queuejob.ResourceVersion, queuejob.Status)
1828- // return nil
1829- // }
1830-
1831- klog .V (10 ).Infof ("[manageQueueJob] before add to activeQ %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v" , queuejob .Name , cc .qjqueue .IfExistActiveQ (queuejob ), cc .qjqueue .IfExistUnschedulableQ (queuejob ), queuejob , queuejob .ResourceVersion , queuejob .Status )
1787+ klog .V (10 ).Infof ("[worker] before add to activeQ %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v" , queuejob .Name , cc .qjqueue .IfExistActiveQ (queuejob ), cc .qjqueue .IfExistUnschedulableQ (queuejob ), queuejob , queuejob .ResourceVersion , queuejob .Status )
18321788 index := getIndexOfMatchedCondition (queuejob , arbv1 .AppWrapperCondQueueing , "AwaitingHeadOfLine" )
18331789 if index < 0 {
18341790 queuejob .Status .QueueJobState = arbv1 .AppWrapperCondQueueing
@@ -1840,26 +1796,24 @@ func (cc *XController) worker() {
18401796 }
18411797
18421798 queuejob .Status .FilterIgnore = true // Update Queueing status, add to qjqueue for ScheduleNext
1843- err := cc .updateStatusInEtcdWithRetry (ctx , queuejob , "manageQueueJob - setQueueing" )
1799+ err := cc .updateStatusInEtcdWithRetry (ctx , queuejob , "worker - setQueueing" )
18441800 if err != nil {
1845- klog .Errorf ("[manageQueueJob ] Error updating status 'setQueueing' AppWrapper: '%s/%s',Status=%+v, err=%+v." , queuejob .Namespace , queuejob .Name , queuejob .Status , err )
1801+ klog .Errorf ("[worker ] Error updating status 'setQueueing' AppWrapper: '%s/%s',Status=%+v, err=%+v." , queuejob .Namespace , queuejob .Name , queuejob .Status , err )
18461802 return err
18471803 }
1848- // klog.V(10).Infof("[manageQueueJob] before add to activeQ %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", queuejob.Name, cc.qjqueue.IfExistActiveQ(queuejob), cc.qjqueue.IfExistUnschedulableQ(queuejob), queuejob, queuejob.ResourceVersion, queuejob.Status)
1849- // if err00 := cc.qjqueue.AddIfNotPresent(queuejob); err00 != nil {
1850- // klog.Errorf("manageQueueJob] Failed to add '%s/%s' to activeQueue. Back to eventQueue activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v err=%#v",
1851- // queuejob.Namespace, queuejob.Name, cc.qjqueue.IfExistActiveQ(queuejob), cc.qjqueue.IfExistUnschedulableQ(queuejob), queuejob, queuejob.ResourceVersion, queuejob.Status, err00)
1852- // }
1804+
18531805 return nil
18541806 }
1855-
1807+ //scheduleNext method takes a dispatched AW which has not been evaluated, extract resources requested by AW
1808+ //compares it with available unallocated cluster resources, performs quota check
1809+ //if everything passes then CanRun is set to true and AW is ready for dispatch
18561810 if ! queuejob .Status .CanRun && (queuejob .Status .State != arbv1 .AppWrapperStateActive ) {
18571811 cc .ScheduleNext (queuejob )
1858- // sync AppWrapper
18591812 return nil
18601813
18611814 }
1862-
1815+ //When an AW passes ScheduleNext gate then we want to progress AW to Running to begin with
1816+ //Sync queuejob will not unwrap an AW to spawn genericItems
18631817 if queuejob .Status .CanRun {
18641818 if err := cc .syncQueueJob (ctx , queuejob ); err != nil {
18651819 // If any error, requeue it.
0 commit comments