@@ -7,11 +7,9 @@ import (
7
7
"fmt"
8
8
"reflect"
9
9
"strings"
10
+ "sync"
10
11
"time"
11
12
12
- "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
13
- "k8s.io/apimachinery/pkg/util/yaml"
14
-
15
13
errorwrap "github.com/pkg/errors"
16
14
"github.com/sirupsen/logrus"
17
15
"google.golang.org/grpc/connectivity"
@@ -23,15 +21,18 @@ import (
23
21
extinf "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
24
22
k8serrors "k8s.io/apimachinery/pkg/api/errors"
25
23
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
26
25
"k8s.io/apimachinery/pkg/runtime/schema"
27
26
utilclock "k8s.io/apimachinery/pkg/util/clock"
28
27
utilerrors "k8s.io/apimachinery/pkg/util/errors"
29
28
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30
29
"k8s.io/apimachinery/pkg/util/validation/field"
30
+ "k8s.io/apimachinery/pkg/util/yaml"
31
31
"k8s.io/client-go/dynamic"
32
32
"k8s.io/client-go/informers"
33
33
"k8s.io/client-go/tools/cache"
34
34
"k8s.io/client-go/tools/clientcmd"
35
+ "k8s.io/client-go/util/retry"
35
36
"k8s.io/client-go/util/workqueue"
36
37
37
38
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/reference"
@@ -871,7 +872,7 @@ func (o *Operator) ensureSubscriptionInstallPlanState(logger *logrus.Entry, sub
871
872
return sub , false , nil
872
873
}
873
874
874
- ip , err := o .lister .OperatorsV1alpha1 ().InstallPlanLister (). InstallPlans (sub .GetNamespace ()).Get (ipName )
875
+ ip , err := o .client .OperatorsV1alpha1 ().InstallPlans (sub .GetNamespace ()).Get (ipName , metav1. GetOptions {} )
875
876
if err != nil {
876
877
logger .WithField ("installplan" , ipName ).Warn ("unable to get installplan from cache" )
877
878
return nil , false , err
@@ -942,21 +943,48 @@ func (o *Operator) ensureSubscriptionCSVState(logger *logrus.Entry, sub *v1alpha
942
943
}
943
944
944
945
func (o * Operator ) updateSubscriptionStatus (namespace string , gen int , subs []* v1alpha1.Subscription , installPlanRef * corev1.ObjectReference ) error {
945
- // TODO: parallel, sync waitgroup
946
- var err error
946
+ var (
947
+ errs []error
948
+ mu sync.Mutex
949
+ wg sync.WaitGroup
950
+ getOpts = metav1.GetOptions {}
951
+ lastUpdated = o .now ()
952
+ )
947
953
for _ , sub := range subs {
948
- sub .Status .LastUpdated = o . now ()
954
+ sub .Status .LastUpdated = lastUpdated
949
955
if installPlanRef != nil {
950
956
sub .Status .InstallPlanRef = installPlanRef
951
957
sub .Status .Install = v1alpha1 .NewInstallPlanReference (installPlanRef )
952
958
sub .Status .State = v1alpha1 .SubscriptionStateUpgradePending
953
959
sub .Status .InstallPlanGeneration = gen
954
960
}
955
- if _ , subErr := o .client .OperatorsV1alpha1 ().Subscriptions (namespace ).UpdateStatus (sub ); subErr != nil {
956
- err = subErr
957
- }
961
+
962
+ wg .Add (1 )
963
+ go func (s v1alpha1.Subscription ) {
964
+ defer wg .Done ()
965
+
966
+ update := func () error {
967
+ // Update the status of the latest revision
968
+ latest , err := o .client .OperatorsV1alpha1 ().Subscriptions (s .GetNamespace ()).Get (s .GetName (), getOpts )
969
+ if err != nil {
970
+ return err
971
+ }
972
+
973
+ latest .Status = s .Status
974
+ _ , err = o .client .OperatorsV1alpha1 ().Subscriptions (namespace ).UpdateStatus (latest )
975
+
976
+ return err
977
+ }
978
+ if err := retry .RetryOnConflict (retry .DefaultRetry , update ); err != nil {
979
+ mu .Lock ()
980
+ defer mu .Unlock ()
981
+ errs = append (errs , err )
982
+ }
983
+ }(* sub )
958
984
}
959
- return err
985
+ wg .Wait ()
986
+
987
+ return utilerrors .NewAggregate (errs )
960
988
}
961
989
962
990
func (o * Operator ) ensureInstallPlan (logger * logrus.Entry , namespace string , gen int , subs []* v1alpha1.Subscription , installPlanApproval v1alpha1.Approval , steps []* v1alpha1.Step , bundleLookups []v1alpha1.BundleLookup ) (* corev1.ObjectReference , error ) {
0 commit comments