Skip to content

Commit cc02d79

Browse files
dmageopenshift-cherrypick-robot
authored andcommitted
pkg/operator: add AzurePathFix controller to migrate blobs
this fixes the path bug on the storage level by migrating blobs from the `docker` virtual directory into the `/docker` virtual directory instead. * azurepathfix: give job required env vars * handle different job conditions on azure path fix controller * azurepathfixjob: set restart policy to never * azurepathfixjob: export account key from secret * azurepathfixjob: add azure account key env var to job container * pass account key to path fix job (this time for reals) * azurepathfix: only allow new controller to run on Azure * also mount trusted-ca volume to job so the script works when a cluster-wide proxy is in use. * fix a couple of mispellings * manifests: export operator image env var to container * manifests: update generated files * mount volumes for proxy certs on azure path fix job * azurepathfixcontroller: return error if account name is not present
1 parent b8094c8 commit cc02d79

File tree

5 files changed

+571
-0
lines changed

5 files changed

+571
-0
lines changed

manifests/07-operator-ibm-cloud-managed.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ spec:
3838
fieldPath: metadata.name
3939
- name: OPERATOR_NAME
4040
value: cluster-image-registry-operator
41+
- name: OPERATOR_IMAGE
42+
value: docker.io/openshift/origin-cluster-image-registry-operator:latest
4143
- name: IMAGE
4244
value: docker.io/openshift/origin-docker-registry:latest
4345
- name: IMAGE_PRUNER

manifests/07-operator.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ spec:
6666
fieldPath: metadata.name
6767
- name: OPERATOR_NAME
6868
value: "cluster-image-registry-operator"
69+
- name: OPERATOR_IMAGE
70+
value: docker.io/openshift/origin-cluster-image-registry-operator:latest
6971
- name: IMAGE
7072
value: docker.io/openshift/origin-docker-registry:latest
7173
- name: IMAGE_PRUNER
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
package operator
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
batchv1 "k8s.io/api/batch/v1"
9+
corev1 "k8s.io/api/core/v1"
10+
"k8s.io/apimachinery/pkg/api/errors"
11+
"k8s.io/apimachinery/pkg/labels"
12+
"k8s.io/apimachinery/pkg/selection"
13+
utilerrors "k8s.io/apimachinery/pkg/util/errors"
14+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
15+
"k8s.io/apimachinery/pkg/util/wait"
16+
batchv1informers "k8s.io/client-go/informers/batch/v1"
17+
corev1informers "k8s.io/client-go/informers/core/v1"
18+
batchv1client "k8s.io/client-go/kubernetes/typed/batch/v1"
19+
batchv1listers "k8s.io/client-go/listers/batch/v1"
20+
corev1listers "k8s.io/client-go/listers/core/v1"
21+
restclient "k8s.io/client-go/rest"
22+
"k8s.io/client-go/tools/cache"
23+
"k8s.io/client-go/util/workqueue"
24+
"k8s.io/klog/v2"
25+
26+
configapiv1 "github.com/openshift/api/config/v1"
27+
operatorv1 "github.com/openshift/api/operator/v1"
28+
configv1informers "github.com/openshift/client-go/config/informers/externalversions/config/v1"
29+
configlisters "github.com/openshift/client-go/config/listers/config/v1"
30+
imageregistryv1informers "github.com/openshift/client-go/imageregistry/informers/externalversions/imageregistry/v1"
31+
imageregistryv1listers "github.com/openshift/client-go/imageregistry/listers/imageregistry/v1"
32+
"github.com/openshift/library-go/pkg/operator/v1helpers"
33+
34+
"github.com/openshift/cluster-image-registry-operator/pkg/defaults"
35+
"github.com/openshift/cluster-image-registry-operator/pkg/resource"
36+
"github.com/openshift/cluster-image-registry-operator/pkg/storage/util"
37+
)
38+
39+
type AzurePathFixController struct {
40+
batchClient batchv1client.BatchV1Interface
41+
operatorClient v1helpers.OperatorClient
42+
jobLister batchv1listers.JobNamespaceLister
43+
imageRegistryConfigLister imageregistryv1listers.ConfigLister
44+
secretLister corev1listers.SecretNamespaceLister
45+
podLister corev1listers.PodNamespaceLister
46+
infrastructureLister configlisters.InfrastructureLister
47+
proxyLister configlisters.ProxyLister
48+
kubeconfig *restclient.Config
49+
50+
cachesToSync []cache.InformerSynced
51+
queue workqueue.RateLimitingInterface
52+
}
53+
54+
func NewAzurePathFixController(
55+
kubeconfig *restclient.Config,
56+
batchClient batchv1client.BatchV1Interface,
57+
operatorClient v1helpers.OperatorClient,
58+
jobInformer batchv1informers.JobInformer,
59+
imageRegistryConfigInformer imageregistryv1informers.ConfigInformer,
60+
infrastructureInformer configv1informers.InfrastructureInformer,
61+
secretInformer corev1informers.SecretInformer,
62+
proxyInformer configv1informers.ProxyInformer,
63+
podInformer corev1informers.PodInformer,
64+
) (*AzurePathFixController, error) {
65+
c := &AzurePathFixController{
66+
batchClient: batchClient,
67+
operatorClient: operatorClient,
68+
jobLister: jobInformer.Lister().Jobs(defaults.ImageRegistryOperatorNamespace),
69+
imageRegistryConfigLister: imageRegistryConfigInformer.Lister(),
70+
infrastructureLister: infrastructureInformer.Lister(),
71+
secretLister: secretInformer.Lister().Secrets(defaults.ImageRegistryOperatorNamespace),
72+
podLister: podInformer.Lister().Pods(defaults.ImageRegistryOperatorNamespace),
73+
proxyLister: proxyInformer.Lister(),
74+
kubeconfig: kubeconfig,
75+
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AzurePathFixController"),
76+
}
77+
78+
if _, err := jobInformer.Informer().AddEventHandler(c.eventHandler()); err != nil {
79+
return nil, err
80+
}
81+
c.cachesToSync = append(c.cachesToSync, jobInformer.Informer().HasSynced)
82+
83+
if _, err := imageRegistryConfigInformer.Informer().AddEventHandler(c.eventHandler()); err != nil {
84+
return nil, err
85+
}
86+
c.cachesToSync = append(c.cachesToSync, imageRegistryConfigInformer.Informer().HasSynced)
87+
88+
if _, err := infrastructureInformer.Informer().AddEventHandler(c.eventHandler()); err != nil {
89+
return nil, err
90+
}
91+
c.cachesToSync = append(c.cachesToSync, infrastructureInformer.Informer().HasSynced)
92+
93+
if _, err := secretInformer.Informer().AddEventHandler(c.eventHandler()); err != nil {
94+
return nil, err
95+
}
96+
c.cachesToSync = append(c.cachesToSync, secretInformer.Informer().HasSynced)
97+
98+
if _, err := podInformer.Informer().AddEventHandler(c.eventHandler()); err != nil {
99+
return nil, err
100+
}
101+
c.cachesToSync = append(c.cachesToSync, podInformer.Informer().HasSynced)
102+
103+
if _, err := proxyInformer.Informer().AddEventHandler(c.eventHandler()); err != nil {
104+
return nil, err
105+
}
106+
c.cachesToSync = append(c.cachesToSync, proxyInformer.Informer().HasSynced)
107+
108+
// bootstrap the job if it doesn't exist
109+
c.queue.Add("instance")
110+
111+
return c, nil
112+
}
113+
114+
func (c *AzurePathFixController) eventHandler() cache.ResourceEventHandler {
115+
const workQueueKey = "instance"
116+
return cache.ResourceEventHandlerFuncs{
117+
AddFunc: func(obj interface{}) { c.queue.Add(workQueueKey) },
118+
UpdateFunc: func(old, new interface{}) { c.queue.Add(workQueueKey) },
119+
DeleteFunc: func(obj interface{}) { c.queue.Add(workQueueKey) },
120+
}
121+
}
122+
123+
func (c *AzurePathFixController) runWorker() {
124+
for c.processNextWorkItem() {
125+
}
126+
}
127+
128+
func (c *AzurePathFixController) processNextWorkItem() bool {
129+
obj, shutdown := c.queue.Get()
130+
if shutdown {
131+
return false
132+
}
133+
defer c.queue.Done(obj)
134+
135+
klog.V(4).Infof("get event from workqueue: %s", obj)
136+
137+
// the workqueueKey we reference here is different than the one we use in eventHandler
138+
// use that to identify we are processing an item that was added back to the queue
139+
// can remove if not useful but curious why this didn't seem to be working for the
140+
// caches not synced error
141+
if obj == workqueueKey {
142+
klog.V(2).Infof("AzurePathFixController processing requeued item %s", obj)
143+
}
144+
145+
if err := c.sync(); err != nil {
146+
c.queue.AddRateLimited(workqueueKey)
147+
klog.Errorf("AzurePathFixController: unable to sync: %s, requeuing", err)
148+
} else {
149+
c.queue.Forget(obj)
150+
klog.V(4).Infof("AzurePathFixController: event from workqueue successfully processed")
151+
}
152+
return true
153+
}
154+
155+
func (c *AzurePathFixController) sync() error {
156+
// this controller was made to run specifically on Azure,
157+
// so if we detect a different cloud, skip it.
158+
infra, err := util.GetInfrastructure(c.infrastructureLister)
159+
if err != nil {
160+
return fmt.Errorf("unable to get infrastructure object: %s", err)
161+
}
162+
if infra.Status.PlatformStatus.Type != configapiv1.AzurePlatformType {
163+
return nil
164+
}
165+
166+
ctx := context.TODO()
167+
imageRegistryConfig, err := c.imageRegistryConfigLister.Get("cluster")
168+
if err != nil {
169+
return err
170+
}
171+
azureStorage := imageRegistryConfig.Status.Storage.Azure
172+
if azureStorage == nil || len(azureStorage.AccountName) == 0 {
173+
return fmt.Errorf("storage account not yet provisioned")
174+
}
175+
if azureStorage == nil || len(azureStorage.Container) == 0 {
176+
return fmt.Errorf("storage container not yet provisioned")
177+
}
178+
179+
gen := resource.NewGeneratorAzurePathFixJob(
180+
c.jobLister,
181+
c.batchClient,
182+
c.secretLister,
183+
c.infrastructureLister,
184+
c.proxyLister,
185+
imageRegistryConfig,
186+
c.kubeconfig,
187+
)
188+
189+
progressingCondition := operatorv1.OperatorCondition{
190+
Type: "AzurePathFixProgressing",
191+
Status: operatorv1.ConditionUnknown,
192+
}
193+
degradedCondition := operatorv1.OperatorCondition{
194+
Type: "AzurePathFixControllerDegraded",
195+
Status: operatorv1.ConditionFalse,
196+
Reason: "AsExpected",
197+
}
198+
199+
jobObj, err := gen.Get()
200+
if errors.IsNotFound(err) {
201+
progressingCondition.Status = operatorv1.ConditionTrue
202+
progressingCondition.Reason = "NotFound"
203+
progressingCondition.Message = "The job does not exist"
204+
} else if err != nil {
205+
progressingCondition.Reason = "Unknown"
206+
progressingCondition.Message = fmt.Sprintf("Unable to check job progress: %s", err)
207+
} else {
208+
job := jobObj.(*batchv1.Job)
209+
jobProgressing := true
210+
var jobCondition batchv1.JobConditionType
211+
for _, cond := range job.Status.Conditions {
212+
if (cond.Type == batchv1.JobComplete || cond.Type == batchv1.JobFailed) && cond.Status == corev1.ConditionTrue {
213+
jobProgressing = false
214+
jobCondition = cond.Type
215+
break
216+
}
217+
}
218+
219+
if jobProgressing {
220+
progressingCondition.Reason = "Migrating"
221+
progressingCondition.Message = fmt.Sprintf("Azure path fix job is progressing: %d pods active; %d pods failed", job.Status.Active, job.Status.Failed)
222+
progressingCondition.Status = operatorv1.ConditionTrue
223+
}
224+
225+
if jobCondition == batchv1.JobComplete {
226+
progressingCondition.Reason = "AsExpected"
227+
progressingCondition.Status = operatorv1.ConditionFalse
228+
}
229+
230+
if jobCondition == batchv1.JobFailed {
231+
progressingCondition.Reason = "Failed"
232+
progressingCondition.Status = operatorv1.ConditionFalse
233+
degradedCondition.Reason = "Failed"
234+
degradedCondition.Status = operatorv1.ConditionTrue
235+
236+
// if the job still executing (i.e there are attempts left before backoff),
237+
// we don't want to report degraded, but we let users know that some attempt(s)
238+
// failed, and the job is still progressing.
239+
240+
requirement, err := labels.NewRequirement("batch.kubernetes.io/job-name", selection.Equals, []string{gen.GetName()})
241+
if err != nil {
242+
// this is extremely unlikely to happen
243+
return err
244+
}
245+
pods, err := c.podLister.List(labels.NewSelector().Add(*requirement))
246+
if err != nil {
247+
// there's not much that can be done about an error here,
248+
// the next reconciliation(s) are likely to succeed.
249+
return err
250+
}
251+
252+
if len(pods) == 0 {
253+
msg := "Migration failed but no job pods are left to inspect"
254+
progressingCondition.Message = msg
255+
degradedCondition.Message = msg
256+
}
257+
258+
if len(pods) > 0 {
259+
mostRecentPod := pods[0]
260+
for _, pod := range pods {
261+
if mostRecentPod.CreationTimestamp.Before(&pod.CreationTimestamp) {
262+
mostRecentPod = pod
263+
}
264+
}
265+
266+
if len(mostRecentPod.Status.ContainerStatuses) > 0 {
267+
status := mostRecentPod.Status.ContainerStatuses[0]
268+
msg := fmt.Sprintf("Migration failed: %s", status.State.Terminated.Message)
269+
progressingCondition.Message = msg
270+
degradedCondition.Message = msg
271+
}
272+
}
273+
}
274+
}
275+
276+
err = resource.ApplyMutator(gen)
277+
if err != nil {
278+
_, _, updateError := v1helpers.UpdateStatus(
279+
ctx,
280+
c.operatorClient,
281+
v1helpers.UpdateConditionFn(progressingCondition),
282+
v1helpers.UpdateConditionFn(degradedCondition),
283+
)
284+
return utilerrors.NewAggregate([]error{err, updateError})
285+
}
286+
287+
_, _, err = v1helpers.UpdateStatus(
288+
ctx,
289+
c.operatorClient,
290+
v1helpers.UpdateConditionFn(progressingCondition),
291+
v1helpers.UpdateConditionFn(degradedCondition),
292+
)
293+
return err
294+
}
295+
296+
func (c *AzurePathFixController) Run(stopCh <-chan struct{}) {
297+
defer utilruntime.HandleCrash()
298+
defer c.queue.ShutDown()
299+
300+
klog.Infof("Starting AzurePathFixController")
301+
if !cache.WaitForCacheSync(stopCh, c.cachesToSync...) {
302+
return
303+
}
304+
305+
go wait.Until(c.runWorker, time.Second, stopCh)
306+
307+
klog.Infof("Started AzurePathFixController")
308+
<-stopCh
309+
klog.Infof("Shutting down AzurePathFixController")
310+
}

pkg/operator/starter.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,21 @@ func RunOperator(ctx context.Context, kubeconfig *restclient.Config) error {
168168
return err
169169
}
170170

171+
azurePathFixController, err := NewAzurePathFixController(
172+
kubeconfig,
173+
kubeClient.BatchV1(),
174+
configOperatorClient,
175+
kubeInformers.Batch().V1().Jobs(),
176+
imageregistryInformers.Imageregistry().V1().Configs(),
177+
configInformers.Config().V1().Infrastructures(),
178+
kubeInformers.Core().V1().Secrets(),
179+
configInformers.Config().V1().Proxies(),
180+
kubeInformers.Core().V1().Pods(),
181+
)
182+
if err != nil {
183+
return err
184+
}
185+
171186
metricsController := NewMetricsController(imageInformers.Image().V1().ImageStreams())
172187

173188
kubeInformers.Start(ctx.Done())
@@ -187,6 +202,7 @@ func RunOperator(ctx context.Context, kubeconfig *restclient.Config) error {
187202
go imagePrunerController.Run(ctx.Done())
188203
go loggingController.Run(ctx, 1)
189204
go azureStackCloudController.Run(ctx)
205+
go azurePathFixController.Run(ctx.Done())
190206
go metricsController.Run(ctx)
191207

192208
<-ctx.Done()

0 commit comments

Comments
 (0)