@@ -11,6 +11,7 @@ import (
11
11
errorwrap "github.com/pkg/errors"
12
12
"github.com/sirupsen/logrus"
13
13
"google.golang.org/grpc/connectivity"
14
+ batchv1 "k8s.io/api/batch/v1"
14
15
corev1 "k8s.io/api/core/v1"
15
16
rbacv1 "k8s.io/api/rbac/v1"
16
17
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
@@ -31,6 +32,7 @@ import (
31
32
32
33
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/grpc"
33
34
sharedtime "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/time"
35
+ "github.com/operator-framework/operator-registry/pkg/configmap"
34
36
35
37
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/reference"
36
38
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/apis/operators/v1alpha1"
@@ -85,6 +87,8 @@ type Operator struct {
85
87
catalogSubscriberIndexer map [string ]cache.Indexer
86
88
clientAttenuator * scoped.ClientAttenuator
87
89
serviceAccountQuerier * scoped.UserDefinedServiceAccountQuerier
90
+ bundleLoader * configmap.BundleLoader
91
+ configmapRegistryImage string
88
92
}
89
93
90
94
type CatalogSourceSyncFunc func (logger * logrus.Entry , in * v1alpha1.CatalogSource ) (out * v1alpha1.CatalogSource , continueSync bool , syncError error )
@@ -140,6 +144,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
140
144
catalogSubscriberIndexer : map [string ]cache.Indexer {},
141
145
serviceAccountQuerier : scoped .NewUserDefinedServiceAccountQuerier (logger , crClient ),
142
146
clientAttenuator : scoped .NewClientAttenuator (logger , config , opClient , crClient ),
147
+ bundleLoader : configmap .NewBundleLoader (),
148
+ configmapRegistryImage : configmapRegistryImage ,
143
149
}
144
150
op .sources = grpc .NewSourceStore (logger , 10 * time .Second , 10 * time .Minute , op .syncSourceState )
145
151
op .reconciler = reconciler .NewRegistryReconcilerFactory (lister , opClient , configmapRegistryImage , op .now )
@@ -167,10 +173,13 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
167
173
// Wire InstallPlans
168
174
ipInformer := crInformerFactory .Operators ().V1alpha1 ().InstallPlans ()
169
175
op .lister .OperatorsV1alpha1 ().RegisterInstallPlanLister (namespace , ipInformer .Lister ())
176
+ ipQueue := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), fmt .Sprintf ("%s/ips" , namespace ))
177
+ op .ipQueueSet .Set (namespace , ipQueue )
170
178
ipQueueInformer , err := queueinformer .NewQueueInformer (
171
179
ctx ,
172
180
queueinformer .WithMetricsProvider (metrics .NewMetricsInstallPlan (op .client )),
173
181
queueinformer .WithLogger (op .logger ),
182
+ queueinformer .WithQueue (ipQueue ),
174
183
queueinformer .WithInformer (ipInformer .Informer ()),
175
184
queueinformer .WithSyncer (queueinformer .LegacySyncHandler (op .syncInstallPlans ).ToSyncer ()),
176
185
)
@@ -772,7 +781,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
772
781
logger .Debug ("resolving subscriptions in namespace" )
773
782
774
783
// resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors
775
- steps , updatedSubs , err := o .resolver .ResolveSteps (namespace , querier )
784
+ steps , bundleLookups , updatedSubs , err := o .resolver .ResolveSteps (namespace , querier )
776
785
if err != nil {
777
786
return err
778
787
}
@@ -790,7 +799,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
790
799
}
791
800
}
792
801
793
- installPlanReference , err := o .ensureInstallPlan (logger , namespace , subs , installPlanApproval , steps )
802
+ installPlanReference , err := o .ensureInstallPlan (logger , namespace , subs , installPlanApproval , steps , bundleLookups )
794
803
if err != nil {
795
804
logger .WithError (err ).Debug ("error ensuring installplan" )
796
805
return err
@@ -931,8 +940,8 @@ func (o *Operator) updateSubscriptionStatus(namespace string, subs []*v1alpha1.S
931
940
return err
932
941
}
933
942
934
- func (o * Operator ) ensureInstallPlan (logger * logrus.Entry , namespace string , subs []* v1alpha1.Subscription , installPlanApproval v1alpha1.Approval , steps []* v1alpha1.Step ) (* corev1.ObjectReference , error ) {
935
- if len (steps ) == 0 {
943
+ func (o * Operator ) ensureInstallPlan (logger * logrus.Entry , namespace string , subs []* v1alpha1.Subscription , installPlanApproval v1alpha1.Approval , steps []* v1alpha1.Step , bundleLookups [] * v1alpha1. BundleLookup ) (* corev1.ObjectReference , error ) {
944
+ if len (steps ) == 0 && len ( bundleLookups ) == 0 {
936
945
return nil , nil
937
946
}
938
947
@@ -978,11 +987,11 @@ func (o *Operator) ensureInstallPlan(logger *logrus.Entry, namespace string, sub
978
987
}
979
988
logger .Warn ("no installplan found with matching manifests, creating new one" )
980
989
981
- return o .createInstallPlan (namespace , subs , installPlanApproval , steps )
990
+ return o .createInstallPlan (namespace , subs , installPlanApproval , steps , bundleLookups )
982
991
}
983
992
984
- func (o * Operator ) createInstallPlan (namespace string , subs []* v1alpha1.Subscription , installPlanApproval v1alpha1.Approval , steps []* v1alpha1.Step ) (* corev1.ObjectReference , error ) {
985
- if len (steps ) == 0 {
993
+ func (o * Operator ) createInstallPlan (namespace string , subs []* v1alpha1.Subscription , installPlanApproval v1alpha1.Approval , steps []* v1alpha1.Step , bundleLookups [] * v1alpha1. BundleLookup ) (* corev1.ObjectReference , error ) {
994
+ if len (steps ) == 0 && len ( bundleLookups ) == 0 {
986
995
return nil , nil
987
996
}
988
997
@@ -1027,6 +1036,7 @@ func (o *Operator) createInstallPlan(namespace string, subs []*v1alpha1.Subscrip
1027
1036
Phase : phase ,
1028
1037
Plan : steps ,
1029
1038
CatalogSources : catalogSources ,
1039
+ BundleLookups : bundleLookups ,
1030
1040
}
1031
1041
res , err = o .client .OperatorsV1alpha1 ().InstallPlans (namespace ).UpdateStatus (res )
1032
1042
if err != nil {
@@ -1036,6 +1046,108 @@ func (o *Operator) createInstallPlan(namespace string, subs []*v1alpha1.Subscrip
1036
1046
return reference .GetReference (res )
1037
1047
}
1038
1048
1049
+ func (o * Operator ) checkBundleLookups (plan * v1alpha1.InstallPlan ) (bool , error ) {
1050
+ for _ , bundleLookup := range plan .Status .BundleLookups {
1051
+ if bundleLookup .BundleJob == nil {
1052
+ configmap , job , err := configmap .LaunchBundleImage (o .opClient .KubernetesInterface (), bundleLookup .Image , o .configmapRegistryImage , o .namespace )
1053
+ if err != nil {
1054
+ return false , err
1055
+ }
1056
+ logrus .Infof ("Launched bundle job for image %v" , bundleLookup .Image )
1057
+
1058
+ bundleLookup .BundleJob = & v1alpha1.BundleJob {
1059
+ // job condition and completion time will be filled out later (installplan sync)
1060
+ Name : job .GetName (),
1061
+ Namespace : job .GetNamespace (),
1062
+ }
1063
+ bundleLookup .ConfigMapRef = & v1alpha1.ConfigMapResourceReference {
1064
+ Name : configmap .GetName (),
1065
+ Namespace : configmap .GetNamespace (),
1066
+ UID : configmap .GetUID (),
1067
+ ResourceVersion : configmap .GetResourceVersion (),
1068
+ }
1069
+ _ , err = o .client .OperatorsV1alpha1 ().InstallPlans (plan .GetNamespace ()).UpdateStatus (plan )
1070
+ if err != nil {
1071
+ return false , err
1072
+ }
1073
+
1074
+ return false , nil
1075
+ }
1076
+
1077
+ if bundleLookup .BundleJob .CompletionTime != nil {
1078
+ // already processed
1079
+ continue
1080
+ }
1081
+
1082
+ // TODO: instead of doing a get, should just watch all jobs (add ownerref) and update the installplan from there
1083
+ job , err := o .opClient .KubernetesInterface ().BatchV1 ().Jobs (bundleLookup .BundleJob .Namespace ).Get (bundleLookup .BundleJob .Name , metav1.GetOptions {})
1084
+ if err != nil {
1085
+ return false , err
1086
+ }
1087
+ if len (job .Status .Conditions ) == 0 || len (job .Status .Conditions ) > 0 && job .Status .Conditions [0 ].Type != batchv1 .JobComplete {
1088
+ logrus .Infof ("Job '%v' for '%v' not yet completed" , job .GetName (), bundleLookup .Image )
1089
+ return false , nil
1090
+ }
1091
+ bundleLookup .BundleJob .Condition = job .Status .Conditions [0 ].Type
1092
+ bundleLookup .BundleJob .CompletionTime = job .Status .CompletionTime
1093
+
1094
+ configmap , err := o .lister .CoreV1 ().ConfigMapLister ().ConfigMaps (bundleLookup .ConfigMapRef .Namespace ).Get (bundleLookup .ConfigMapRef .Name )
1095
+ if err != nil {
1096
+ return false , err
1097
+ }
1098
+
1099
+ // extract data from configmap and write to install plan
1100
+ manifest , err := o .bundleLoader .Load (configmap )
1101
+ if err != nil {
1102
+ return false , err
1103
+ }
1104
+
1105
+ // combine data from the bundle image into what's already known from the registry
1106
+ bundleLookup .BundleFromRegistry .CsvName = manifest .Bundle .Name
1107
+ bundleLookup .BundleFromRegistry .PackageName = manifest .Bundle .Package
1108
+ bundleLookup .BundleFromRegistry .ChannelName = manifest .Bundle .Channel
1109
+ _ , jsonCSV , _ , err := manifest .Bundle .Serialize ()
1110
+ if err != nil {
1111
+ return false , fmt .Errorf ("serialize failed: %s" , err .Error ())
1112
+ }
1113
+ bundleLookup .BundleFromRegistry .CsvJson = string (jsonCSV )
1114
+ bundleLookup .BundleFromRegistry .Object = []string {string (jsonCSV )}
1115
+ for _ , item := range manifest .Bundle .Objects {
1116
+ bytes , err := item .MarshalJSON ()
1117
+ if err != nil {
1118
+ return false , fmt .Errorf ("marshall failed: %v" , err )
1119
+ }
1120
+ bundleLookup .BundleFromRegistry .Object = append (bundleLookup .BundleFromRegistry .Object , string (bytes ))
1121
+ }
1122
+
1123
+ var olmCSV v1alpha1.ClusterServiceVersion
1124
+ err = json .Unmarshal (jsonCSV , & olmCSV )
1125
+ if err != nil {
1126
+ return false , fmt .Errorf ("csv retrieval failed: %s" , err .Error ())
1127
+ }
1128
+
1129
+ // TODO: refactor with resolver code (and call the subscription stuff too)
1130
+ bundleSteps , err := resolver .NewStepResourceFromBundle (bundleLookup .BundleFromRegistry , plan .GetNamespace (), olmCSV .Spec .Replaces , bundleLookup .CatalogName , bundleLookup .CatalogNamespace )
1131
+ if err != nil {
1132
+ return false , fmt .Errorf ("failed to turn bundle into steps: %s" , err .Error ())
1133
+ }
1134
+
1135
+ // TODO: could this add duplicate steps?
1136
+ for _ , s := range bundleSteps {
1137
+ plan .Status .Plan = append (plan .Status .Plan , & v1alpha1.Step {
1138
+ Resolving : olmCSV .GetName (),
1139
+ Resource : s ,
1140
+ Status : v1alpha1 .StepStatusUnknown ,
1141
+ })
1142
+ }
1143
+ }
1144
+
1145
+ if _ , err := o .client .OperatorsV1alpha1 ().InstallPlans (plan .GetNamespace ()).UpdateStatus (plan ); err != nil {
1146
+ return false , err
1147
+ }
1148
+ return true , nil
1149
+ }
1150
+
1039
1151
func (o * Operator ) syncInstallPlans (obj interface {}) (syncError error ) {
1040
1152
plan , ok := obj .(* v1alpha1.InstallPlan )
1041
1153
if ! ok {
@@ -1052,11 +1164,29 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {
1052
1164
1053
1165
logger .Info ("syncing" )
1054
1166
1055
- if len (plan .Status .Plan ) == 0 {
1167
+ if len (plan .Status .Plan ) == 0 && len ( plan . Status . BundleLookups ) == 0 {
1056
1168
logger .Info ("skip processing installplan without status - subscription sync responsible for initial status" )
1057
1169
return
1058
1170
}
1059
1171
1172
+ // handle bundle data before trying to install
1173
+ if len (plan .Status .BundleLookups ) != 0 {
1174
+ finished , err := o .checkBundleLookups (plan )
1175
+ if err != nil {
1176
+ syncError = fmt .Errorf ("checkBundleLookups failed: %v" , err )
1177
+ return
1178
+ }
1179
+ if ! finished {
1180
+ err := o .ipQueueSet .RequeueAfter (plan .GetNamespace (), plan .GetName (), 5 * time .Second )
1181
+ if err != nil {
1182
+ syncError = err
1183
+ return
1184
+ }
1185
+ o .logger .Debug ("install plan not yet populated from bundle image, requeueing" )
1186
+ return
1187
+ }
1188
+ }
1189
+
1060
1190
querier := o .serviceAccountQuerier .NamespaceQuerier (plan .GetNamespace ())
1061
1191
reference , err := querier ()
1062
1192
if err != nil {
0 commit comments