Skip to content

Commit 892acaa

Browse files
authored
Merge pull request kubernetes#126107 from enj/enj/i/svm_not_found_err
svm: set UID and RV on SSA patch to cause conflict on logical create
2 parents b293ca9 + 6a6771b commit 892acaa

File tree

11 files changed

+411
-187
lines changed

11 files changed

+411
-187
lines changed

cmd/kube-controller-manager/app/storageversionmigrator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,11 @@ func startSVMController(
5454
return nil, true, fmt.Errorf("storage version migrator requires garbage collector")
5555
}
5656

57+
// svm controller can make a lot of requests during migration, keep it fast
5758
config := controllerContext.ClientBuilder.ConfigOrDie(controllerName)
59+
config.QPS *= 20
60+
config.Burst *= 100
61+
5862
client := controllerContext.ClientBuilder.ClientOrDie(controllerName)
5963
informer := controllerContext.InformerFactory.Storagemigration().V1alpha1().StorageVersionMigrations()
6064

pkg/controller/storageversionmigrator/resourceversion.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (rv *ResourceVersionController) sync(ctx context.Context, key string) error
199199
StorageVersionMigrations().
200200
UpdateStatus(
201201
ctx,
202-
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason),
202+
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason, "resource does not exist in discovery"),
203203
metav1.UpdateOptions{},
204204
)
205205
if err != nil {

pkg/controller/storageversionmigrator/storageversionmigrator.go

Lines changed: 66 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -204,27 +204,35 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error {
204204
}
205205
gvr := getGVRFromResource(toBeProcessedSVM)
206206

207-
resourceMonitor, err := svmc.dependencyGraphBuilder.GetMonitor(ctx, gvr)
207+
// prevent unsynced monitor from blocking forever
208+
// use a short timeout so that we can fail quickly and possibly handle other migrations while this monitor gets ready.
209+
monCtx, monCtxCancel := context.WithTimeout(ctx, 10*time.Second)
210+
defer monCtxCancel()
211+
resourceMonitor, errMonitor := svmc.dependencyGraphBuilder.GetMonitor(monCtx, gvr)
208212
if resourceMonitor != nil {
209-
if err != nil {
213+
if errMonitor != nil {
210214
// non nil monitor indicates that error is due to resource not being synced
211-
return fmt.Errorf("dependency graph is not synced, requeuing to attempt again")
215+
return fmt.Errorf("dependency graph is not synced, requeuing to attempt again: %w", errMonitor)
212216
}
213217
} else {
218+
logger.V(4).Error(errMonitor, "resource does not exist in GC", "gvr", gvr.String())
219+
220+
// our GC cache could be missing a recently created custom resource, so give it some time to catch up
221+
// we resync discovery every 30 seconds so twice that should be sufficient
222+
if toBeProcessedSVM.CreationTimestamp.Add(time.Minute).After(time.Now()) {
223+
return fmt.Errorf("resource does not exist in GC, requeuing to attempt again: %w", errMonitor)
224+
}
225+
214226
// we can't migrate a resource that doesn't exist in the GC
215-
_, err = svmc.kubeClient.StoragemigrationV1alpha1().
227+
_, errStatus := svmc.kubeClient.StoragemigrationV1alpha1().
216228
StorageVersionMigrations().
217229
UpdateStatus(
218230
ctx,
219-
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason),
231+
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason, "resource not found"),
220232
metav1.UpdateOptions{},
221233
)
222-
if err != nil {
223-
return err
224-
}
225-
logger.V(4).Error(fmt.Errorf("error migrating the resource"), "resource does not exist in GC", "gvr", gvr.String())
226234

227-
return nil
235+
return errStatus
228236
}
229237

230238
gcListResourceVersion, err := convertResourceVersionToInt(resourceMonitor.Controller.LastSyncResourceVersion())
@@ -244,7 +252,7 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error {
244252
StorageVersionMigrations().
245253
UpdateStatus(
246254
ctx,
247-
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationRunning, migrationRunningStatusReason),
255+
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationRunning, migrationRunningStatusReason, ""),
248256
metav1.UpdateOptions{},
249257
)
250258
if err != nil {
@@ -255,60 +263,72 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error {
255263
if err != nil {
256264
return err
257265
}
258-
typeMeta := metav1.TypeMeta{}
259-
typeMeta.APIVersion, typeMeta.Kind = gvk.ToAPIVersionAndKind()
260-
data, err := json.Marshal(typeMeta)
261-
if err != nil {
262-
return err
263-
}
264266

265267
// ToDo: implement a mechanism to resume migration from the last migrated resource in case of a failure
266268
// process storage migration
267-
for _, gvrKey := range resourceMonitor.Store.ListKeys() {
268-
namespace, name, err := cache.SplitMetaNamespaceKey(gvrKey)
269+
for _, obj := range resourceMonitor.Store.List() {
270+
accessor, err := meta.Accessor(obj)
269271
if err != nil {
270272
return err
271273
}
272274

273-
_, err = svmc.dynamicClient.Resource(gvr).
274-
Namespace(namespace).
275+
typeMeta := typeMetaUIDRV{}
276+
typeMeta.APIVersion, typeMeta.Kind = gvk.ToAPIVersionAndKind()
277+
// set UID so that when a resource gets deleted, we get an "uid mismatch"
278+
// conflict error instead of trying to create it.
279+
typeMeta.UID = accessor.GetUID()
280+
// set RV so that when a resources gets updated or deleted+recreated, we get an "object has been modified"
281+
// conflict error. we do not actually need to do anything special for the updated case because if RV
282+
// was not set, it would just result in no-op request. but for the deleted+recreated case, if RV is
283+
// not set but UID is set, we would get an immutable field validation error. hence we must set both.
284+
typeMeta.ResourceVersion = accessor.GetResourceVersion()
285+
data, err := json.Marshal(typeMeta)
286+
if err != nil {
287+
return err
288+
}
289+
290+
_, errPatch := svmc.dynamicClient.Resource(gvr).
291+
Namespace(accessor.GetNamespace()).
275292
Patch(ctx,
276-
name,
293+
accessor.GetName(),
277294
types.ApplyPatchType,
278295
data,
279296
metav1.PatchOptions{
280297
FieldManager: svmc.controllerName,
281298
},
282299
)
283-
if err != nil {
284-
// in case of NotFound or Conflict, we can stop processing migration for that resource
285-
if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
286-
continue
287-
}
288300

289-
_, err = svmc.kubeClient.StoragemigrationV1alpha1().
301+
// in case of conflict, we can stop processing migration for that resource because it has either been
302+
// - updated, meaning that migration has already been performed
303+
// - deleted, meaning that migration is not needed
304+
// - deleted and recreated, meaning that migration has already been performed
305+
if apierrors.IsConflict(errPatch) {
306+
logger.V(6).Info("Resource ignored due to conflict", "namespace", accessor.GetNamespace(), "name", accessor.GetName(), "gvr", gvr.String(), "err", errPatch)
307+
continue
308+
}
309+
310+
if errPatch != nil {
311+
logger.V(4).Error(errPatch, "Failed to migrate the resource", "namespace", accessor.GetNamespace(), "name", accessor.GetName(), "gvr", gvr.String(), "reason", apierrors.ReasonForError(errPatch))
312+
313+
_, errStatus := svmc.kubeClient.StoragemigrationV1alpha1().
290314
StorageVersionMigrations().
291315
UpdateStatus(
292316
ctx,
293-
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason),
317+
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason, "migration encountered unhandled error"),
294318
metav1.UpdateOptions{},
295319
)
296-
if err != nil {
297-
return err
298-
}
299-
logger.V(4).Error(err, "Failed to migrate the resource", "name", gvrKey, "gvr", gvr.String(), "reason", apierrors.ReasonForError(err))
300320

301-
return nil
321+
return errStatus
302322
// Todo: add retry for scenarios where API server returns rate limiting error
303323
}
304-
logger.V(4).Info("Successfully migrated the resource", "name", gvrKey, "gvr", gvr.String())
324+
logger.V(4).Info("Successfully migrated the resource", "namespace", accessor.GetNamespace(), "name", accessor.GetName(), "gvr", gvr.String())
305325
}
306326

307327
_, err = svmc.kubeClient.StoragemigrationV1alpha1().
308328
StorageVersionMigrations().
309329
UpdateStatus(
310330
ctx,
311-
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded, migrationSuccessStatusReason),
331+
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded, migrationSuccessStatusReason, ""),
312332
metav1.UpdateOptions{},
313333
)
314334
if err != nil {
@@ -318,3 +338,13 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error {
318338
logger.V(4).Info("Finished syncing svm resource", "key", key, "gvr", gvr.String(), "elapsed", time.Since(startTime))
319339
return nil
320340
}
341+
342+
type typeMetaUIDRV struct {
343+
metav1.TypeMeta `json:",inline"`
344+
objectMetaUIDandRV `json:"metadata,omitempty"`
345+
}
346+
347+
type objectMetaUIDandRV struct {
348+
UID types.UID `json:"uid,omitempty"`
349+
ResourceVersion string `json:"resourceVersion,omitempty"`
350+
}

pkg/controller/storageversionmigrator/util.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func indexOfCondition(svm *svmv1alpha1.StorageVersionMigration, conditionType sv
6262
func setStatusConditions(
6363
toBeUpdatedSVM *svmv1alpha1.StorageVersionMigration,
6464
conditionType svmv1alpha1.MigrationConditionType,
65-
reason string,
65+
reason, message string,
6666
) *svmv1alpha1.StorageVersionMigration {
6767
if !IsConditionTrue(toBeUpdatedSVM, conditionType) {
6868
if conditionType == svmv1alpha1.MigrationSucceeded || conditionType == svmv1alpha1.MigrationFailed {
@@ -77,6 +77,7 @@ func setStatusConditions(
7777
Status: corev1.ConditionTrue,
7878
LastUpdateTime: metav1.Now(),
7979
Reason: reason,
80+
Message: message,
8081
})
8182
}
8283

plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,10 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
486486
Name: saRolePrefix + "storage-version-migrator-controller",
487487
},
488488
Rules: []rbacv1.PolicyRule{
489-
rbacv1helpers.NewRule("list", "patch").Groups("*").Resources("*").RuleOrDie(),
489+
// need list to get current RV for any resource
490+
// need patch for SSA of any resource
491+
// need create because SSA of a deleted resource will be interpreted as a create request, these always fail with a conflict error because UID is set
492+
rbacv1helpers.NewRule("list", "create", "patch").Groups("*").Resources("*").RuleOrDie(),
490493
rbacv1helpers.NewRule("update").Groups(storageVersionMigrationGroup).Resources("storageversionmigrations/status").RuleOrDie(),
491494
},
492495
})

test/images/agnhost/crd-conversion-webhook/converter/framework.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package converter
1818

1919
import (
20+
"bytes"
2021
"fmt"
2122
"io"
2223
"net/http"
@@ -131,7 +132,7 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) {
131132
return
132133
}
133134

134-
klog.V(2).Infof("handling request: %v", body)
135+
klog.V(2).Infof("handling request: %s", string(body))
135136
obj, gvk, err := serializer.Decode(body, nil, nil)
136137
if err != nil {
137138
msg := fmt.Sprintf("failed to deserialize body (%v) with error %v", string(body), err)
@@ -152,7 +153,6 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) {
152153
}
153154
convertReview.Response = doConversionV1beta1(convertReview.Request, convert)
154155
convertReview.Response.UID = convertReview.Request.UID
155-
klog.V(2).Info(fmt.Sprintf("sending response: %v", convertReview.Response))
156156

157157
// reset the request, it is not needed in a response.
158158
convertReview.Request = &v1beta1.ConversionRequest{}
@@ -167,7 +167,6 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) {
167167
}
168168
convertReview.Response = doConversionV1(convertReview.Request, convert)
169169
convertReview.Response.UID = convertReview.Request.UID
170-
klog.V(2).Info(fmt.Sprintf("sending response: %v", convertReview.Response))
171170

172171
// reset the request, it is not needed in a response.
173172
convertReview.Request = &v1.ConversionRequest{}
@@ -187,12 +186,14 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) {
187186
http.Error(w, msg, http.StatusBadRequest)
188187
return
189188
}
190-
err = outSerializer.Encode(responseObj, w)
189+
var buf bytes.Buffer
190+
err = outSerializer.Encode(responseObj, io.MultiWriter(w, &buf))
191191
if err != nil {
192192
klog.Error(err)
193193
http.Error(w, err.Error(), http.StatusInternalServerError)
194194
return
195195
}
196+
klog.V(2).Infof("sending response: %s", buf.String())
196197
}
197198

198199
// ServeExampleConvert servers endpoint for the example converter defined as convertExampleCRD function.

0 commit comments

Comments
 (0)