@@ -2,26 +2,38 @@ package action
22
33import  (
44	"context" 
5+ 	"encoding/json" 
56	"fmt" 
7+ 	"strings" 
68	"time" 
79
810	"github.com/operator-framework/api/pkg/operators/v1alpha1" 
911	"github.com/operator-framework/operator-registry/pkg/image" 
1012	"github.com/operator-framework/operator-registry/pkg/image/containerdregistry" 
1113	"github.com/spf13/pflag" 
14+ 	corev1 "k8s.io/api/core/v1" 
15+ 	apierrors "k8s.io/apimachinery/pkg/api/errors" 
16+ 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 
1217	"k8s.io/apimachinery/pkg/types" 
18+ 	"k8s.io/apimachinery/pkg/util/rand" 
1319	"k8s.io/apimachinery/pkg/util/wait" 
20+ 	"k8s.io/client-go/util/retry" 
1421	"sigs.k8s.io/controller-runtime/pkg/client" 
22+ 	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" 
1523
1624	"github.com/joelanford/kubectl-operator/internal/pkg/catalog" 
1725	"github.com/joelanford/kubectl-operator/internal/pkg/log" 
1826)
1927
28+ const  grpcPort  =  "50051" 
29+ 
2030type  CatalogAdd  struct  {
2131	config  * Configuration 
2232
2333	CatalogSourceName  string 
2434	IndexImage         string 
35+ 	InjectBundles      []string 
36+ 	InjectBundleMode   string 
2537	DisplayName        string 
2638	Publisher          string 
2739	AddTimeout         time.Duration 
@@ -43,6 +55,10 @@ func (a *CatalogAdd) BindFlags(fs *pflag.FlagSet) {
4355	fs .StringVarP (& a .Publisher , "publisher" , "p" , "" , "publisher of the index" )
4456	fs .DurationVarP (& a .AddTimeout , "timeout" , "t" , time .Minute , "the amount of time to wait before cancelling the catalog addition" )
4557	fs .DurationVar (& a .CleanupTimeout , "cleanup-timeout" , time .Minute , "the amount to time to wait before cancelling cleanup" )
58+ 
59+ 	fs .StringArrayVarP (& a .InjectBundles , "inject-bundles" , "b" , nil , "inject extra bundles into the index at runtime" )
60+ 	fs .StringVarP (& a .InjectBundleMode , "inject-bundle-mode" , "m" , "" , "mode to use to inject bundles" )
61+ 	_  =  fs .MarkHidden ("inject-bundle-mode" )
4662}
4763
4864func  (a  * CatalogAdd ) Run (ctx  context.Context ) (* v1alpha1.CatalogSource , error ) {
@@ -70,13 +86,37 @@ func (a *CatalogAdd) Run(ctx context.Context) (*v1alpha1.CatalogSource, error) {
7086
7187	a .setDefaults (labels )
7288
89+ 	var  registryPod  * corev1.Pod 
90+ 	if  len (a .InjectBundles ) >  0  {
91+ 		if  registryPod , err  =  a .createRegistryPod (ctx ); err  !=  nil  {
92+ 			return  nil , err 
93+ 		}
94+ 	}
95+ 
7396	opts  :=  []catalog.Option {
74- 		catalog .Image (a .IndexImage ),
7597		catalog .DisplayName (a .DisplayName ),
7698		catalog .Publisher (a .Publisher ),
7799	}
100+ 
101+ 	if  registryPod  ==  nil  {
102+ 		opts  =  append (opts , catalog .Image (a .IndexImage ))
103+ 	} else  {
104+ 		address  :=  fmt .Sprintf ("%s:%s" , registryPod .Status .PodIP , grpcPort )
105+ 		injectedBundlesJSON , err  :=  json .Marshal (a .InjectBundles )
106+ 		if  err  !=  nil  {
107+ 			return  nil , fmt .Errorf ("json marshal injected bundles: %v" , err )
108+ 		}
109+ 		annotations  :=  map [string ]string {
110+ 			"operators.operatorframework.io/injected-bundles" : string (injectedBundlesJSON ),
111+ 		}
112+ 		opts  =  append (opts ,
113+ 			catalog .Address (address ),
114+ 			catalog .Annotations (annotations ),
115+ 		)
116+ 	}
117+ 
78118	cs  :=  catalog .Build (csKey , opts ... )
79- 	if  err  :=  a .add (ctx , cs ); err  !=  nil  {
119+ 	if  err  :=  a .add (ctx , cs ,  registryPod ); err  !=  nil  {
80120		defer  a .cleanup (cs )
81121		return  nil , err 
82122	}
@@ -106,13 +146,85 @@ func (a *CatalogAdd) setDefaults(labels map[string]string) {
106146			a .Publisher  =  v 
107147		}
108148	}
149+ 	if  a .InjectBundleMode  ==  ""  {
150+ 		if  strings .HasPrefix (a .IndexImage , "quay.io/operator-framework/upstream-opm-builder" ) {
151+ 			a .InjectBundleMode  =  "semver" 
152+ 		} else  {
153+ 			a .InjectBundleMode  =  "replaces" 
154+ 		}
155+ 	}
156+ }
157+ 
158+ func  (a  * CatalogAdd ) createRegistryPod (ctx  context.Context ) (* corev1.Pod , error ) {
159+ 	command  :=  []string {
160+ 		"/bin/sh" ,
161+ 		"-c" ,
162+ 		fmt .Sprintf (`mkdir -p /database && \ 
163+ /bin/opm registry add   -d /database/index.db --mode=%s -b %s && \ 
164+ /bin/opm registry serve -d /database/index.db -p %s` , a .InjectBundleMode , strings .Join (a .InjectBundles , "," ), grpcPort ),
165+ 	}
166+ 
167+ 	pod  :=  & corev1.Pod {
168+ 		ObjectMeta : metav1.ObjectMeta {
169+ 			Name :      fmt .Sprintf ("%s-%s" , a .CatalogSourceName , rand .String (4 )),
170+ 			Namespace : a .config .Namespace ,
171+ 		},
172+ 		Spec : corev1.PodSpec {
173+ 			Containers : []corev1.Container {
174+ 				{
175+ 					Name :    "registry" ,
176+ 					Image :   a .IndexImage ,
177+ 					Command : command ,
178+ 				},
179+ 			},
180+ 		},
181+ 	}
182+ 	if  err  :=  a .config .Client .Create (ctx , pod ); err  !=  nil  {
183+ 		return  nil , err 
184+ 	}
185+ 
186+ 	if  err  :=  wait .PollImmediateUntil (time .Millisecond * 250 , func () (bool , error ) {
187+ 		podKey , err  :=  client .ObjectKeyFromObject (pod )
188+ 		if  err  !=  nil  {
189+ 			return  false , fmt .Errorf ("get pod key: %v" , err )
190+ 		}
191+ 		if  err  :=  a .config .Client .Get (ctx , podKey , pod ); err  !=  nil  {
192+ 			return  false , err 
193+ 		}
194+ 		if  pod .Status .Phase  ==  corev1 .PodRunning  &&  pod .Status .PodIP  !=  ""  {
195+ 			return  true , nil 
196+ 		}
197+ 		return  false , nil 
198+ 	}, ctx .Done ()); err  !=  nil  {
199+ 		return  nil , fmt .Errorf ("registry pod not ready: %v" , err )
200+ 	}
201+ 	return  pod , nil 
109202}
110203
111- func  (a  * CatalogAdd ) add (ctx  context.Context , cs  * v1alpha1.CatalogSource ) error  {
204+ func  (a  * CatalogAdd ) add (ctx  context.Context , cs  * v1alpha1.CatalogSource ,  pod   * corev1. Pod ) error  {
112205	if  err  :=  a .config .Client .Create (ctx , cs ); err  !=  nil  {
113206		return  fmt .Errorf ("create catalogsource: %v" , err )
114207	}
115208
209+ 	if  pod  !=  nil  {
210+ 		retry .RetryOnConflict (retry .DefaultBackoff , func () error  {
211+ 			podKey , err  :=  client .ObjectKeyFromObject (pod )
212+ 			if  err  !=  nil  {
213+ 				return  fmt .Errorf ("get pod key: %v" , err )
214+ 			}
215+ 			if  err  :=  a .config .Client .Get (ctx , podKey , pod ); err  !=  nil  {
216+ 				return  fmt .Errorf ("get registry pod: %v" , err )
217+ 			}
218+ 			if  err  :=  controllerutil .SetOwnerReference (cs , pod , a .config .Scheme ); err  !=  nil  {
219+ 				return  fmt .Errorf ("set registry pod owner reference: %v" , err )
220+ 			}
221+ 			if  err  :=  a .config .Client .Update (ctx , pod ); err  !=  nil  {
222+ 				return  fmt .Errorf ("update registry pod owner reference: %w" , err )
223+ 			}
224+ 			return  nil 
225+ 		})
226+ 	}
227+ 
116228	csKey , err  :=  client .ObjectKeyFromObject (cs )
117229	if  err  !=  nil  {
118230		return  fmt .Errorf ("get catalogsource key: %v" , err )
@@ -136,7 +248,7 @@ func (a *CatalogAdd) add(ctx context.Context, cs *v1alpha1.CatalogSource) error
136248func  (a  * CatalogAdd ) cleanup (cs  * v1alpha1.CatalogSource ) {
137249	ctx , cancel  :=  context .WithTimeout (context .Background (), a .CleanupTimeout )
138250	defer  cancel ()
139- 	if  err  :=  a .config .Client .Delete (ctx , cs ); err  !=  nil  {
251+ 	if  err  :=  a .config .Client .Delete (ctx , cs ); err  !=  nil  &&   ! apierrors . IsNotFound ( err )  {
140252		log .Printf ("delete catalogsource %q: %v" , cs .Name , err )
141253	}
142254}
0 commit comments