-
Notifications
You must be signed in to change notification settings - Fork 0
Dual Storage Architecture #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dual-storage-baseline
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,8 +2,9 @@ package rest | |||||||||||||||||||
|
|
||||||||||||||||||||
| import ( | ||||||||||||||||||||
| "context" | ||||||||||||||||||||
| "errors" | ||||||||||||||||||||
| "time" | ||||||||||||||||||||
|
|
||||||||||||||||||||
| apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||||||||||||||||||
| metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" | ||||||||||||||||||||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||||||||||||||||
| "k8s.io/apimachinery/pkg/runtime" | ||||||||||||||||||||
|
|
@@ -21,114 +22,151 @@ type DualWriterMode3 struct { | |||||||||||||||||||
| // newDualWriterMode3 returns a new DualWriter in mode 3. | ||||||||||||||||||||
| // Mode 3 represents writing to LegacyStorage and Storage and reading from Storage. | ||||||||||||||||||||
| func newDualWriterMode3(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics) *DualWriterMode3 { | ||||||||||||||||||||
| return &DualWriterMode3{Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode3"), dualWriterMetrics: dwm} | ||||||||||||||||||||
| return &DualWriterMode3{Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode3").WithValues("mode", mode3Str), dualWriterMetrics: dwm} | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Mode returns the mode of the dual writer. | ||||||||||||||||||||
| func (d *DualWriterMode3) Mode() DualWriterMode { | ||||||||||||||||||||
| return Mode3 | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| const mode3Str = "3" | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Create overrides the behavior of the generic DualWriter and writes to LegacyStorage and Storage. | ||||||||||||||||||||
| func (d *DualWriterMode3) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { | ||||||||||||||||||||
| log := klog.FromContext(ctx) | ||||||||||||||||||||
| var method = "create" | ||||||||||||||||||||
| log := d.Log.WithValues("kind", options.Kind, "method", method) | ||||||||||||||||||||
| ctx = klog.NewContext(ctx, log) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| startStorage := time.Now() | ||||||||||||||||||||
| created, err := d.Storage.Create(ctx, obj, createValidation, options) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| log.Error(err, "unable to create object in storage") | ||||||||||||||||||||
| d.recordLegacyDuration(true, mode3Str, options.Kind, method, startStorage) | ||||||||||||||||||||
| return created, err | ||||||||||||||||||||
| } | ||||||||||||||||||||
| d.recordStorageDuration(false, mode3Str, options.Kind, method, startStorage) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if _, err := d.Legacy.Create(ctx, obj, createValidation, options); err != nil { | ||||||||||||||||||||
| log.WithValues("object", created).Error(err, "unable to create object in legacy storage") | ||||||||||||||||||||
| } | ||||||||||||||||||||
| return created, nil | ||||||||||||||||||||
| go func() { | ||||||||||||||||||||
| ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy create timeout")) | ||||||||||||||||||||
| defer cancel() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| startLegacy := time.Now() | ||||||||||||||||||||
| _, errObjectSt := d.Legacy.Create(ctx, obj, createValidation, options) | ||||||||||||||||||||
| d.recordLegacyDuration(errObjectSt != nil, mode3Str, options.Kind, method, startLegacy) | ||||||||||||||||||||
| }() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return created, err | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Get overrides the behavior of the generic DualWriter and retrieves an object from Storage. | ||||||||||||||||||||
| func (d *DualWriterMode3) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { | ||||||||||||||||||||
| return d.Storage.Get(ctx, name, &metav1.GetOptions{}) | ||||||||||||||||||||
| var method = "get" | ||||||||||||||||||||
| log := d.Log.WithValues("kind", options.Kind, "name", name, "method", method) | ||||||||||||||||||||
| ctx = klog.NewContext(ctx, log) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| startStorage := time.Now() | ||||||||||||||||||||
| res, err := d.Storage.Get(ctx, name, options) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| log.Error(err, "unable to get object in storage") | ||||||||||||||||||||
| } | ||||||||||||||||||||
| d.recordStorageDuration(err != nil, mode3Str, options.Kind, method, startStorage) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return res, err | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| func (d *DualWriterMode3) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { | ||||||||||||||||||||
| log := d.Log.WithValues("name", name) | ||||||||||||||||||||
| // List overrides the behavior of the generic DualWriter and reads only from Unified Store. | ||||||||||||||||||||
| func (d *DualWriterMode3) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { | ||||||||||||||||||||
| var method = "list" | ||||||||||||||||||||
| log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion, "method", method) | ||||||||||||||||||||
| ctx = klog.NewContext(ctx, log) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| deleted, async, err := d.Storage.Delete(ctx, name, deleteValidation, options) | ||||||||||||||||||||
| startStorage := time.Now() | ||||||||||||||||||||
| res, err := d.Storage.List(ctx, options) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| if !apierrors.IsNotFound(err) { | ||||||||||||||||||||
| log.Error(err, "could not delete from unified store") | ||||||||||||||||||||
| return deleted, async, err | ||||||||||||||||||||
| } | ||||||||||||||||||||
| log.Error(err, "unable to list object in storage") | ||||||||||||||||||||
| } | ||||||||||||||||||||
| d.recordStorageDuration(err != nil, mode3Str, options.Kind, method, startStorage) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| _, _, errLS := d.Legacy.Delete(ctx, name, deleteValidation, options) | ||||||||||||||||||||
| if errLS != nil { | ||||||||||||||||||||
| if !apierrors.IsNotFound(errLS) { | ||||||||||||||||||||
| log.WithValues("deleted", deleted).Error(errLS, "could not delete from legacy store") | ||||||||||||||||||||
| } | ||||||||||||||||||||
| return res, err | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| func (d *DualWriterMode3) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { | ||||||||||||||||||||
| var method = "delete" | ||||||||||||||||||||
| log := d.Log.WithValues("name", name, "kind", options.Kind, "method", method) | ||||||||||||||||||||
| ctx = klog.NewContext(ctx, d.Log) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| startStorage := time.Now() | ||||||||||||||||||||
| res, async, err := d.Storage.Delete(ctx, name, deleteValidation, options) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| log.Error(err, "unable to delete object in storage") | ||||||||||||||||||||
| d.recordStorageDuration(true, mode3Str, options.Kind, method, startStorage) | ||||||||||||||||||||
| return res, async, err | ||||||||||||||||||||
| } | ||||||||||||||||||||
| d.recordStorageDuration(false, mode3Str, name, method, startStorage) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return deleted, async, err | ||||||||||||||||||||
| go func() { | ||||||||||||||||||||
| startLegacy := time.Now() | ||||||||||||||||||||
| ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy delete timeout")) | ||||||||||||||||||||
| defer cancel() | ||||||||||||||||||||
| _, _, err := d.Legacy.Delete(ctx, name, deleteValidation, options) | ||||||||||||||||||||
| d.recordLegacyDuration(err != nil, mode3Str, options.Kind, method, startLegacy) | ||||||||||||||||||||
| }() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return res, async, err | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // Update overrides the behavior of the generic DualWriter and writes first to Storage and then to LegacyStorage. | ||||||||||||||||||||
| func (d *DualWriterMode3) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { | ||||||||||||||||||||
| log := d.Log.WithValues("name", name) | ||||||||||||||||||||
| var method = "update" | ||||||||||||||||||||
| log := d.Log.WithValues("name", name, "kind", options.Kind, "method", method) | ||||||||||||||||||||
| ctx = klog.NewContext(ctx, log) | ||||||||||||||||||||
| old, err := d.Storage.Get(ctx, name, &metav1.GetOptions{}) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| log.WithValues("object", old).Error(err, "could not get object to update") | ||||||||||||||||||||
| return nil, false, err | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| updated, err := objInfo.UpdatedObject(ctx, old) | ||||||||||||||||||||
| startStorage := time.Now() | ||||||||||||||||||||
| res, async, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| log.WithValues("object", updated).Error(err, "could not update or create object") | ||||||||||||||||||||
| return nil, false, err | ||||||||||||||||||||
| } | ||||||||||||||||||||
| objInfo = &updateWrapper{ | ||||||||||||||||||||
| upstream: objInfo, | ||||||||||||||||||||
| updated: updated, | ||||||||||||||||||||
| log.Error(err, "unable to update in storage") | ||||||||||||||||||||
| d.recordLegacyDuration(true, mode3Str, options.Kind, method, startStorage) | ||||||||||||||||||||
| return res, async, err | ||||||||||||||||||||
| } | ||||||||||||||||||||
| d.recordStorageDuration(false, mode3Str, options.Kind, method, startStorage) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| obj, created, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| log.WithValues("object", obj).Error(err, "could not write to US") | ||||||||||||||||||||
| return obj, created, err | ||||||||||||||||||||
| } | ||||||||||||||||||||
| go func() { | ||||||||||||||||||||
| ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy update timeout")) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| _, _, errLeg := d.Legacy.Update(ctx, name, &updateWrapper{ | ||||||||||||||||||||
| upstream: objInfo, | ||||||||||||||||||||
| updated: obj, | ||||||||||||||||||||
| }, createValidation, updateValidation, forceAllowCreate, options) | ||||||||||||||||||||
| if errLeg != nil { | ||||||||||||||||||||
| log.Error(errLeg, "could not update object in legacy store") | ||||||||||||||||||||
| } | ||||||||||||||||||||
| return obj, created, err | ||||||||||||||||||||
| startLegacy := time.Now() | ||||||||||||||||||||
| defer cancel() | ||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||
| _, _, errObjectSt := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) | ||||||||||||||||||||
| d.recordLegacyDuration(errObjectSt != nil, mode3Str, options.Kind, method, startLegacy) | ||||||||||||||||||||
| }() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return res, async, err | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| // DeleteCollection overrides the behavior of the generic DualWriter and deletes from both LegacyStorage and Storage. | ||||||||||||||||||||
| func (d *DualWriterMode3) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) { | ||||||||||||||||||||
| log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion) | ||||||||||||||||||||
| var method = "delete-collection" | ||||||||||||||||||||
| log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion, "method", method) | ||||||||||||||||||||
| ctx = klog.NewContext(ctx, log) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| deleted, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions) | ||||||||||||||||||||
| startStorage := time.Now() | ||||||||||||||||||||
| res, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions) | ||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||
| log.Error(err, "failed to delete collection successfully from Storage") | ||||||||||||||||||||
| log.Error(err, "unable to delete collection in storage") | ||||||||||||||||||||
| d.recordStorageDuration(true, mode3Str, options.Kind, method, startStorage) | ||||||||||||||||||||
| return res, err | ||||||||||||||||||||
| } | ||||||||||||||||||||
| d.recordStorageDuration(false, mode3Str, options.Kind, method, startStorage) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if deleted, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions); err != nil { | ||||||||||||||||||||
| log.WithValues("deleted", deleted).Error(err, "failed to delete collection successfully from LegacyStorage") | ||||||||||||||||||||
| } | ||||||||||||||||||||
| go func() { | ||||||||||||||||||||
| startLegacy := time.Now() | ||||||||||||||||||||
| ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy deletecollection timeout")) | ||||||||||||||||||||
| defer cancel() | ||||||||||||||||||||
| _, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions) | ||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔷 Medium: This records legacy DeleteCollection timing using the storage metrics function; use the legacy recorder for correct attribution.
Suggested change
|
||||||||||||||||||||
| d.recordStorageDuration(err != nil, mode3Str, options.Kind, method, startLegacy) | ||||||||||||||||||||
| }() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return deleted, err | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| func (d *DualWriterMode3) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { | ||||||||||||||||||||
| //TODO: implement List | ||||||||||||||||||||
| klog.Error("List not implemented") | ||||||||||||||||||||
| return nil, nil | ||||||||||||||||||||
| return res, err | ||||||||||||||||||||
| } | ||||||||||||||||||||
|
|
||||||||||||||||||||
| func (d *DualWriterMode3) Destroy() { | ||||||||||||||||||||
|
|
||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔷 Medium: This loses the per-call fields (name/kind/method) by attaching the base logger instead of the enriched one; also the storage metrics label uses name instead of kind. Attach the enriched logger and use options.Kind for metrics consistency.
go
ctx = klog.NewContext(ctx, log)
// ...
d.recordStorageDuration(false, mode3Str, options.Kind, method, startStorage)