@@ -31,6 +31,7 @@ limitations under the License.
3131package queuejob
3232
3333import (
34+ "errors"
3435 "fmt"
3536 "math"
3637 "math/rand"
@@ -69,7 +70,7 @@ import (
6970
7071 "k8s.io/apimachinery/pkg/runtime"
7172 "k8s.io/apimachinery/pkg/runtime/schema"
72- "k8s.io/apimachinery/pkg/runtime/serializer/json"
73+ runtimeJson "k8s.io/apimachinery/pkg/runtime/serializer/json"
7374
7475 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources"
7576 resconfigmap "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources/configmap" // ConfigMap
@@ -675,7 +676,7 @@ func GetPodTemplate(qjobRes *arbv1.AppWrapperResource) (*v1.PodTemplateSpec, err
675676 rtScheme := runtime .NewScheme ()
676677 v1 .AddToScheme (rtScheme )
677678
678- jsonSerializer := json .NewYAMLSerializer (json .DefaultMetaFactory , rtScheme , rtScheme )
679+ jsonSerializer := runtimeJson .NewYAMLSerializer (runtimeJson .DefaultMetaFactory , rtScheme , rtScheme )
679680
680681 podGVK := schema.GroupVersion {Group : v1 .GroupName , Version : "v1" }.WithKind ("PodTemplate" )
681682
@@ -1903,20 +1904,19 @@ func (cc *XController) worker() {
19031904
19041905 // sync AppWrapper
19051906 if err := cc .syncQueueJob (queuejob ); err != nil {
1906- klog .Errorf ("[worker] Failed to sync AppWrapper '%s/%s', err %#v" , queuejob .Namespace , queuejob .Name , err )
19071907 // If any error, requeue it.
19081908 return err
19091909 }
19101910
19111911 klog .V (10 ).Infof ("[worker] Ending %s Delay=%.6f seconds &newQJ=%p Version=%s Status=%+v" , queuejob .Name , time .Now ().Sub (queuejob .Status .ControllerFirstTimestamp .Time ).Seconds (), queuejob , queuejob .ResourceVersion , queuejob .Status )
19121912 return nil
19131913 })
1914- if err != nil {
1914+ if err != nil && ! CanIgnoreAPIError ( err ) && ! IsJsonSyntaxError ( err ) {
19151915 klog .Warningf ("[worker] Fail to process item from eventQueue, err %v. Attempting to re-enqueque..." , err )
19161916 if err00 := cc .enqueueIfNotPresent (item ); err00 != nil {
1917- klog .Errorf ("[worker] Fatal error railed to re-enqueue item, err %v" , err00 )
1917+ klog .Errorf ("[worker] Fatal error trying to re-enqueue item, err = %v" , err00 )
19181918 } else {
1919- klog .Warning ("[worker] Item re-enqueued" )
1919+ klog .Warning ("[worker] Item re-enqueued. " )
19201920 }
19211921 return
19221922 }
@@ -2140,7 +2140,6 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool
21402140 klog .Errorf ("[manageQueueJob] Error dispatching generic item for app wrapper='%s/%s' type=%v err=%v" , qj .Namespace , qj .Name , err00 )
21412141 }
21422142 dispatchFailureMessage = fmt .Sprintf ("%s/%s creation failure: %+v" , qj .Namespace , qj .Name , err00 )
2143- klog .Errorf ("[manageQueueJob] Error dispatching job=%s Status=%+v err=%+v" , qj .Name , qj .Status , err00 )
21442143 dispatched = false
21452144 }
21462145 }
@@ -2337,7 +2336,7 @@ func (cc *XController) Cleanup(appwrapper *arbv1.AppWrapper) error {
23372336 // we call clean-up for each controller
23382337 for _ , ar := range appwrapper .Spec .AggrResources .Items {
23392338 err00 := cc .qjobResControls [ar .Type ].Cleanup (appwrapper , & ar )
2340- if err00 != nil && ! apierrors . IsNotFound (err00 ) {
2339+ if err00 != nil && ! CanIgnoreAPIError ( err00 ) && ! IsJsonSyntaxError (err00 ) {
23412340 klog .Errorf ("[Cleanup] Error deleting item %s from app wrapper='%s/%s' err=%v." ,
23422341 ar .Type , appwrapper .Namespace , appwrapper .Name , err00 )
23432342 err = multierror .Append (err , err00 )
@@ -2350,14 +2349,19 @@ func (cc *XController) Cleanup(appwrapper *arbv1.AppWrapper) error {
23502349 if appwrapper .Spec .AggrResources .GenericItems != nil {
23512350 for _ , ar := range appwrapper .Spec .AggrResources .GenericItems {
23522351 genericResourceName , gvk , err00 := cc .genericresources .Cleanup (appwrapper , & ar )
2353- if err00 != nil && ! apierrors . IsNotFound (err00 ) {
2352+ if err00 != nil && ! CanIgnoreAPIError ( err00 ) && ! IsJsonSyntaxError (err00 ) {
23542353 klog .Errorf ("[Cleanup] Error deleting generic item %s, from app wrapper='%s/%s' err=%v." ,
23552354 genericResourceName , appwrapper .Namespace , appwrapper .Name , err00 )
23562355 err = multierror .Append (err , err00 )
23572356 continue
23582357 }
2359- klog .V (3 ).Infof ("[Cleanup] Deleted generic item %s, GVK=%s.%s.%s from app wrapper='%s/%s'" ,
2360- genericResourceName , gvk .Group , gvk .Version , gvk .Kind , appwrapper .Namespace , appwrapper .Name )
2358+ if gvk != nil {
2359+ klog .V (3 ).Infof ("[Cleanup] Deleted generic item '%s', GVK=%s.%s.%s from app wrapper='%s/%s'" ,
2360+ genericResourceName , gvk .Group , gvk .Version , gvk .Kind , appwrapper .Namespace , appwrapper .Name )
2361+ } else {
2362+ klog .V (3 ).Infof ("[Cleanup] Deleted generic item '%s' from app wrapper='%s/%s'" ,
2363+ genericResourceName , appwrapper .Namespace , appwrapper .Name )
2364+ }
23612365 }
23622366 }
23632367
@@ -2443,3 +2447,21 @@ func (qjm *XController) schedulingAWAtomicSet(qj *arbv1.AppWrapper) {
24432447 qjm .schedulingAW = qj
24442448 qjm .schedulingMutex .Unlock ()
24452449}
2450+
2451+ func IsJsonSyntaxError (err error ) bool {
2452+ var tt * jsons.SyntaxError
2453+ if err == nil {
2454+ return false
2455+ } else if err .Error () == "Job resource template item not define as a PodTemplate" {
2456+ return true
2457+ } else if err .Error () == "name is required" {
2458+ return true
2459+ } else if errors .As (err , & tt ) {
2460+ return true
2461+ } else {
2462+ return false
2463+ }
2464+ }
2465+ func CanIgnoreAPIError (err error ) bool {
2466+ return err == nil || apierrors .IsNotFound (err ) || apierrors .IsInvalid (err )
2467+ }
0 commit comments