Skip to content

Commit 6a6771b

Browse files
committed
svm: set UID and RV on SSA patch to cause conflict on logical create
When a resource gets deleted during migration, the SVM SSA patch calls are interpreted as a logical create request. Since the object from storage is nil, the merged result is just a type meta object, which lacks a name in the body. This fails when the API server checks that the name from the request URL and the body are the same. Note that a create request is something that SVM controller should never do. Once the UID is set on the patch, the API server will fail the request at a slightly earlier point with an "uid mismatch" conflict error, which the SVM controller can handle gracefully. Setting UID by itself is not sufficient. When a resource gets deleted and recreated, if RV is not set but UID is set, we would get an immutable field validation error for attempting to update the UID. To address this, we set the resource version on the SSA patch as well. This will cause that update request to also fail with a conflict error. Added the create verb on all resources for SVM controller RBAC as otherwise the API server will reject the request before it fails with a conflict error. The change addresses a host of other issues with the SVM controller: 1. Include failure message in SVM resource 2. Do not block forever on unsynced GC monitor 3. Do not immediately fail on GC monitor being missing, allow for a grace period since discovery may be out of sync 4. Set higher QPS and burst to handle large migrations Test changes: 1. Clean up CRD webhook convertor logs 2. Allow SVM tests to be run multiple times to make finding flakes easier 3. Create and delete CRs during CRD test to force out any flakes 4. Add a stress test with multiple parallel migrations 5. Enable RBAC on KAS 6. Run KCM directly to exercise wiring and RBAC 7. Better logs during CRD migration 8. Scan audit logs to confirm SVM controller never creates Signed-off-by: Monis Khan <[email protected]>
1 parent f820301 commit 6a6771b

File tree

11 files changed

+411
-187
lines changed

11 files changed

+411
-187
lines changed

cmd/kube-controller-manager/app/storageversionmigrator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,11 @@ func startSVMController(
5454
return nil, true, fmt.Errorf("storage version migrator requires garbage collector")
5555
}
5656

57+
// svm controller can make a lot of requests during migration, keep it fast
5758
config := controllerContext.ClientBuilder.ConfigOrDie(controllerName)
59+
config.QPS *= 20
60+
config.Burst *= 100
61+
5862
client := controllerContext.ClientBuilder.ClientOrDie(controllerName)
5963
informer := controllerContext.InformerFactory.Storagemigration().V1alpha1().StorageVersionMigrations()
6064

pkg/controller/storageversionmigrator/resourceversion.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (rv *ResourceVersionController) sync(ctx context.Context, key string) error
199199
StorageVersionMigrations().
200200
UpdateStatus(
201201
ctx,
202-
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason),
202+
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason, "resource does not exist in discovery"),
203203
metav1.UpdateOptions{},
204204
)
205205
if err != nil {

pkg/controller/storageversionmigrator/storageversionmigrator.go

Lines changed: 66 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -204,27 +204,35 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error {
204204
}
205205
gvr := getGVRFromResource(toBeProcessedSVM)
206206

207-
resourceMonitor, err := svmc.dependencyGraphBuilder.GetMonitor(ctx, gvr)
207+
// prevent unsynced monitor from blocking forever
208+
// use a short timeout so that we can fail quickly and possibly handle other migrations while this monitor gets ready.
209+
monCtx, monCtxCancel := context.WithTimeout(ctx, 10*time.Second)
210+
defer monCtxCancel()
211+
resourceMonitor, errMonitor := svmc.dependencyGraphBuilder.GetMonitor(monCtx, gvr)
208212
if resourceMonitor != nil {
209-
if err != nil {
213+
if errMonitor != nil {
210214
// non nil monitor indicates that error is due to resource not being synced
211-
return fmt.Errorf("dependency graph is not synced, requeuing to attempt again")
215+
return fmt.Errorf("dependency graph is not synced, requeuing to attempt again: %w", errMonitor)
212216
}
213217
} else {
218+
logger.V(4).Error(errMonitor, "resource does not exist in GC", "gvr", gvr.String())
219+
220+
// our GC cache could be missing a recently created custom resource, so give it some time to catch up
221+
// we resync discovery every 30 seconds so twice that should be sufficient
222+
if toBeProcessedSVM.CreationTimestamp.Add(time.Minute).After(time.Now()) {
223+
return fmt.Errorf("resource does not exist in GC, requeuing to attempt again: %w", errMonitor)
224+
}
225+
214226
// we can't migrate a resource that doesn't exist in the GC
215-
_, err = svmc.kubeClient.StoragemigrationV1alpha1().
227+
_, errStatus := svmc.kubeClient.StoragemigrationV1alpha1().
216228
StorageVersionMigrations().
217229
UpdateStatus(
218230
ctx,
219-
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason),
231+
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason, "resource not found"),
220232
metav1.UpdateOptions{},
221233
)
222-
if err != nil {
223-
return err
224-
}
225-
logger.V(4).Error(fmt.Errorf("error migrating the resource"), "resource does not exist in GC", "gvr", gvr.String())
226234

227-
return nil
235+
return errStatus
228236
}
229237

230238
gcListResourceVersion, err := convertResourceVersionToInt(resourceMonitor.Controller.LastSyncResourceVersion())
@@ -244,7 +252,7 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error {
244252
StorageVersionMigrations().
245253
UpdateStatus(
246254
ctx,
247-
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationRunning, migrationRunningStatusReason),
255+
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationRunning, migrationRunningStatusReason, ""),
248256
metav1.UpdateOptions{},
249257
)
250258
if err != nil {
@@ -255,60 +263,72 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error {
255263
if err != nil {
256264
return err
257265
}
258-
typeMeta := metav1.TypeMeta{}
259-
typeMeta.APIVersion, typeMeta.Kind = gvk.ToAPIVersionAndKind()
260-
data, err := json.Marshal(typeMeta)
261-
if err != nil {
262-
return err
263-
}
264266

265267
// ToDo: implement a mechanism to resume migration from the last migrated resource in case of a failure
266268
// process storage migration
267-
for _, gvrKey := range resourceMonitor.Store.ListKeys() {
268-
namespace, name, err := cache.SplitMetaNamespaceKey(gvrKey)
269+
for _, obj := range resourceMonitor.Store.List() {
270+
accessor, err := meta.Accessor(obj)
269271
if err != nil {
270272
return err
271273
}
272274

273-
_, err = svmc.dynamicClient.Resource(gvr).
274-
Namespace(namespace).
275+
typeMeta := typeMetaUIDRV{}
276+
typeMeta.APIVersion, typeMeta.Kind = gvk.ToAPIVersionAndKind()
277+
// set UID so that when a resource gets deleted, we get an "uid mismatch"
278+
// conflict error instead of trying to create it.
279+
typeMeta.UID = accessor.GetUID()
280+
// set RV so that when a resources gets updated or deleted+recreated, we get an "object has been modified"
281+
// conflict error. we do not actually need to do anything special for the updated case because if RV
282+
// was not set, it would just result in no-op request. but for the deleted+recreated case, if RV is
283+
// not set but UID is set, we would get an immutable field validation error. hence we must set both.
284+
typeMeta.ResourceVersion = accessor.GetResourceVersion()
285+
data, err := json.Marshal(typeMeta)
286+
if err != nil {
287+
return err
288+
}
289+
290+
_, errPatch := svmc.dynamicClient.Resource(gvr).
291+
Namespace(accessor.GetNamespace()).
275292
Patch(ctx,
276-
name,
293+
accessor.GetName(),
277294
types.ApplyPatchType,
278295
data,
279296
metav1.PatchOptions{
280297
FieldManager: svmc.controllerName,
281298
},
282299
)
283-
if err != nil {
284-
// in case of NotFound or Conflict, we can stop processing migration for that resource
285-
if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
286-
continue
287-
}
288300

289-
_, err = svmc.kubeClient.StoragemigrationV1alpha1().
301+
// in case of conflict, we can stop processing migration for that resource because it has either been
302+
// - updated, meaning that migration has already been performed
303+
// - deleted, meaning that migration is not needed
304+
// - deleted and recreated, meaning that migration has already been performed
305+
if apierrors.IsConflict(errPatch) {
306+
logger.V(6).Info("Resource ignored due to conflict", "namespace", accessor.GetNamespace(), "name", accessor.GetName(), "gvr", gvr.String(), "err", errPatch)
307+
continue
308+
}
309+
310+
if errPatch != nil {
311+
logger.V(4).Error(errPatch, "Failed to migrate the resource", "namespace", accessor.GetNamespace(), "name", accessor.GetName(), "gvr", gvr.String(), "reason", apierrors.ReasonForError(errPatch))
312+
313+
_, errStatus := svmc.kubeClient.StoragemigrationV1alpha1().
290314
StorageVersionMigrations().
291315
UpdateStatus(
292316
ctx,
293-
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason),
317+
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationFailed, migrationFailedStatusReason, "migration encountered unhandled error"),
294318
metav1.UpdateOptions{},
295319
)
296-
if err != nil {
297-
return err
298-
}
299-
logger.V(4).Error(err, "Failed to migrate the resource", "name", gvrKey, "gvr", gvr.String(), "reason", apierrors.ReasonForError(err))
300320

301-
return nil
321+
return errStatus
302322
// Todo: add retry for scenarios where API server returns rate limiting error
303323
}
304-
logger.V(4).Info("Successfully migrated the resource", "name", gvrKey, "gvr", gvr.String())
324+
logger.V(4).Info("Successfully migrated the resource", "namespace", accessor.GetNamespace(), "name", accessor.GetName(), "gvr", gvr.String())
305325
}
306326

307327
_, err = svmc.kubeClient.StoragemigrationV1alpha1().
308328
StorageVersionMigrations().
309329
UpdateStatus(
310330
ctx,
311-
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded, migrationSuccessStatusReason),
331+
setStatusConditions(toBeProcessedSVM, svmv1alpha1.MigrationSucceeded, migrationSuccessStatusReason, ""),
312332
metav1.UpdateOptions{},
313333
)
314334
if err != nil {
@@ -318,3 +338,13 @@ func (svmc *SVMController) sync(ctx context.Context, key string) error {
318338
logger.V(4).Info("Finished syncing svm resource", "key", key, "gvr", gvr.String(), "elapsed", time.Since(startTime))
319339
return nil
320340
}
341+
342+
type typeMetaUIDRV struct {
343+
metav1.TypeMeta `json:",inline"`
344+
objectMetaUIDandRV `json:"metadata,omitempty"`
345+
}
346+
347+
type objectMetaUIDandRV struct {
348+
UID types.UID `json:"uid,omitempty"`
349+
ResourceVersion string `json:"resourceVersion,omitempty"`
350+
}

pkg/controller/storageversionmigrator/util.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func indexOfCondition(svm *svmv1alpha1.StorageVersionMigration, conditionType sv
6262
func setStatusConditions(
6363
toBeUpdatedSVM *svmv1alpha1.StorageVersionMigration,
6464
conditionType svmv1alpha1.MigrationConditionType,
65-
reason string,
65+
reason, message string,
6666
) *svmv1alpha1.StorageVersionMigration {
6767
if !IsConditionTrue(toBeUpdatedSVM, conditionType) {
6868
if conditionType == svmv1alpha1.MigrationSucceeded || conditionType == svmv1alpha1.MigrationFailed {
@@ -77,6 +77,7 @@ func setStatusConditions(
7777
Status: corev1.ConditionTrue,
7878
LastUpdateTime: metav1.Now(),
7979
Reason: reason,
80+
Message: message,
8081
})
8182
}
8283

plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,10 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
486486
Name: saRolePrefix + "storage-version-migrator-controller",
487487
},
488488
Rules: []rbacv1.PolicyRule{
489-
rbacv1helpers.NewRule("list", "patch").Groups("*").Resources("*").RuleOrDie(),
489+
// need list to get current RV for any resource
490+
// need patch for SSA of any resource
491+
// need create because SSA of a deleted resource will be interpreted as a create request, these always fail with a conflict error because UID is set
492+
rbacv1helpers.NewRule("list", "create", "patch").Groups("*").Resources("*").RuleOrDie(),
490493
rbacv1helpers.NewRule("update").Groups(storageVersionMigrationGroup).Resources("storageversionmigrations/status").RuleOrDie(),
491494
},
492495
})

test/images/agnhost/crd-conversion-webhook/converter/framework.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package converter
1818

1919
import (
20+
"bytes"
2021
"fmt"
2122
"io"
2223
"net/http"
@@ -131,7 +132,7 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) {
131132
return
132133
}
133134

134-
klog.V(2).Infof("handling request: %v", body)
135+
klog.V(2).Infof("handling request: %s", string(body))
135136
obj, gvk, err := serializer.Decode(body, nil, nil)
136137
if err != nil {
137138
msg := fmt.Sprintf("failed to deserialize body (%v) with error %v", string(body), err)
@@ -152,7 +153,6 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) {
152153
}
153154
convertReview.Response = doConversionV1beta1(convertReview.Request, convert)
154155
convertReview.Response.UID = convertReview.Request.UID
155-
klog.V(2).Info(fmt.Sprintf("sending response: %v", convertReview.Response))
156156

157157
// reset the request, it is not needed in a response.
158158
convertReview.Request = &v1beta1.ConversionRequest{}
@@ -167,7 +167,6 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) {
167167
}
168168
convertReview.Response = doConversionV1(convertReview.Request, convert)
169169
convertReview.Response.UID = convertReview.Request.UID
170-
klog.V(2).Info(fmt.Sprintf("sending response: %v", convertReview.Response))
171170

172171
// reset the request, it is not needed in a response.
173172
convertReview.Request = &v1.ConversionRequest{}
@@ -187,12 +186,14 @@ func serve(w http.ResponseWriter, r *http.Request, convert convertFunc) {
187186
http.Error(w, msg, http.StatusBadRequest)
188187
return
189188
}
190-
err = outSerializer.Encode(responseObj, w)
189+
var buf bytes.Buffer
190+
err = outSerializer.Encode(responseObj, io.MultiWriter(w, &buf))
191191
if err != nil {
192192
klog.Error(err)
193193
http.Error(w, err.Error(), http.StatusInternalServerError)
194194
return
195195
}
196+
klog.V(2).Infof("sending response: %s", buf.String())
196197
}
197198

198199
// ServeExampleConvert servers endpoint for the example converter defined as convertExampleCRD function.

0 commit comments

Comments
 (0)