@@ -16,6 +16,8 @@ import (
16
16
"github.com/pkg/errors"
17
17
"github.com/sirupsen/logrus"
18
18
corev1 "k8s.io/api/core/v1"
19
+ v1 "k8s.io/api/core/v1"
20
+ k8serror "k8s.io/apimachinery/pkg/api/errors"
19
21
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20
22
"k8s.io/apimachinery/pkg/labels"
21
23
"k8s.io/apimachinery/pkg/util/intstr"
@@ -86,8 +88,34 @@ func (s *grpcCatalogSourceDecorator) Service() *corev1.Service {
86
88
return svc
87
89
}
88
90
89
- func (s * grpcCatalogSourceDecorator ) Pod () * corev1.Pod {
90
- pod := Pod (s .CatalogSource , "registry-server" , s .Spec .Image , s .Labels (), 5 , 10 )
91
+ func (s * grpcCatalogSourceDecorator ) ServiceAccount () * corev1.ServiceAccount {
92
+ var secrets []corev1.LocalObjectReference
93
+ blockOwnerDeletion := true
94
+ isController := true
95
+ for _ , secretName := range s .CatalogSource .Spec .Secrets {
96
+ secrets = append (secrets , corev1.LocalObjectReference {Name : secretName })
97
+ }
98
+ return & corev1.ServiceAccount {
99
+ ObjectMeta : metav1.ObjectMeta {
100
+ Name : s .GetName (),
101
+ Namespace : s .GetNamespace (),
102
+ OwnerReferences : []metav1.OwnerReference {
103
+ {
104
+ Name : s .GetName (),
105
+ Kind : v1alpha1 .CatalogSourceKind ,
106
+ APIVersion : v1alpha1 .CatalogSourceCRDAPIVersion ,
107
+ UID : s .GetUID (),
108
+ Controller : & isController ,
109
+ BlockOwnerDeletion : & blockOwnerDeletion ,
110
+ },
111
+ },
112
+ },
113
+ ImagePullSecrets : secrets ,
114
+ }
115
+ }
116
+
117
+ func (s * grpcCatalogSourceDecorator ) Pod (saName string ) * corev1.Pod {
118
+ pod := Pod (s .CatalogSource , "registry-server" , s .Spec .Image , saName , s .Labels (), 5 , 10 )
91
119
ownerutil .AddOwner (pod , s .CatalogSource , false , false )
92
120
return pod
93
121
}
@@ -160,14 +188,18 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.Ca
160
188
overwritePod := overwrite || len (c .currentPodsWithCorrectImage (source )) == 0
161
189
162
190
//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
163
- if err := c .ensurePod (source , overwritePod ); err != nil {
164
- return errors .Wrapf (err , "error ensuring pod: %s" , source .Pod ().GetName ())
191
+ sa , err := c .ensureSA (source )
192
+ if err != nil && ! k8serror .IsAlreadyExists (err ) {
193
+ return errors .Wrapf (err , "error ensuring service account: %s" , source .GetName ())
194
+ }
195
+ if err := c .ensurePod (source , sa .GetName (), overwritePod ); err != nil {
196
+ return errors .Wrapf (err , "error ensuring pod: %s" , source .Pod (sa .Name ).GetName ())
165
197
}
166
- if err := c .ensureUpdatePod (source ); err != nil {
198
+ if err := c .ensureUpdatePod (source , sa . Name ); err != nil {
167
199
if _ , ok := err .(UpdateNotReadyErr ); ok {
168
200
return err
169
201
}
170
- return errors .Wrapf (err , "error ensuring updated catalog source pod: %s" , source .Pod ().GetName ())
202
+ return errors .Wrapf (err , "error ensuring updated catalog source pod: %s" , source .Pod (sa . Name ).GetName ())
171
203
}
172
204
if err := c .ensureService (source , overwrite ); err != nil {
173
205
return errors .Wrapf (err , "error ensuring service: %s" , source .Service ().GetName ())
@@ -186,7 +218,7 @@ func (c *GrpcRegistryReconciler) EnsureRegistryServer(catalogSource *v1alpha1.Ca
186
218
return nil
187
219
}
188
220
189
- func (c * GrpcRegistryReconciler ) ensurePod (source grpcCatalogSourceDecorator , overwrite bool ) error {
221
+ func (c * GrpcRegistryReconciler ) ensurePod (source grpcCatalogSourceDecorator , saName string , overwrite bool ) error {
190
222
// currentLivePods refers to the currently live instances of the catalog source
191
223
currentLivePods := c .currentPods (source )
192
224
if len (currentLivePods ) > 0 {
@@ -199,16 +231,16 @@ func (c *GrpcRegistryReconciler) ensurePod(source grpcCatalogSourceDecorator, ov
199
231
}
200
232
}
201
233
}
202
- _ , err := c .OpClient .KubernetesInterface ().CoreV1 ().Pods (source .GetNamespace ()).Create (context .TODO (), source .Pod (), metav1.CreateOptions {})
234
+ _ , err := c .OpClient .KubernetesInterface ().CoreV1 ().Pods (source .GetNamespace ()).Create (context .TODO (), source .Pod (saName ), metav1.CreateOptions {})
203
235
if err != nil {
204
- return errors .Wrapf (err , "error creating new pod: %s" , source .Pod ().GetGenerateName ())
236
+ return errors .Wrapf (err , "error creating new pod: %s" , source .Pod (saName ).GetGenerateName ())
205
237
}
206
238
207
239
return nil
208
240
}
209
241
210
242
// ensureUpdatePod checks that for the same catalog source version the same container imageID is running
211
- func (c * GrpcRegistryReconciler ) ensureUpdatePod (source grpcCatalogSourceDecorator ) error {
243
+ func (c * GrpcRegistryReconciler ) ensureUpdatePod (source grpcCatalogSourceDecorator , saName string ) error {
212
244
if ! source .Poll () {
213
245
return nil
214
246
}
@@ -218,7 +250,7 @@ func (c *GrpcRegistryReconciler) ensureUpdatePod(source grpcCatalogSourceDecorat
218
250
219
251
if source .Update () && len (currentUpdatePods ) == 0 {
220
252
logrus .WithField ("CatalogSource" , source .GetName ()).Infof ("catalog update required at %s" , time .Now ().String ())
221
- pod , err := c .createUpdatePod (source )
253
+ pod , err := c .createUpdatePod (source , saName )
222
254
if err != nil {
223
255
return errors .Wrapf (err , "creating update catalog source pod" )
224
256
}
@@ -283,6 +315,14 @@ func (c *GrpcRegistryReconciler) ensureService(source grpcCatalogSourceDecorator
283
315
return err
284
316
}
285
317
318
+ func (c * GrpcRegistryReconciler ) ensureSA (source grpcCatalogSourceDecorator ) (* v1.ServiceAccount , error ) {
319
+ sa := source .ServiceAccount ()
320
+ if _ , err := c .OpClient .CreateServiceAccount (sa ); err != nil {
321
+ return sa , err
322
+ }
323
+ return sa , nil
324
+ }
325
+
286
326
// ServiceHashMatch will check the hash info in existing Service to ensure its
287
327
// hash info matches the desired Service's hash.
288
328
func ServiceHashMatch (existing , new * corev1.Service ) bool {
@@ -317,14 +357,14 @@ func HashServiceSpec(spec corev1.ServiceSpec) string {
317
357
}
318
358
319
359
// createUpdatePod is an internal method that creates a pod using the latest catalog source.
320
- func (c * GrpcRegistryReconciler ) createUpdatePod (source grpcCatalogSourceDecorator ) (* corev1.Pod , error ) {
360
+ func (c * GrpcRegistryReconciler ) createUpdatePod (source grpcCatalogSourceDecorator , saName string ) (* corev1.Pod , error ) {
321
361
// remove label from pod to ensure service does not accidentally route traffic to the pod
322
- p := source .Pod ()
362
+ p := source .Pod (saName )
323
363
p = swapLabels (p , "" , source .Name )
324
364
325
365
pod , err := c .OpClient .KubernetesInterface ().CoreV1 ().Pods (source .GetNamespace ()).Create (context .TODO (), p , metav1.CreateOptions {})
326
366
if err != nil {
327
- logrus .WithField ("pod" , source .Pod ().GetName ()).Warn ("couldn't create new catalogsource pod" )
367
+ logrus .WithField ("pod" , source .Pod (saName ).GetName ()).Warn ("couldn't create new catalogsource pod" )
328
368
return nil , err
329
369
}
330
370
0 commit comments