Skip to content

Commit 67b74e1

Browse files
leonorfmartinssuntalagonveefilewalkwithme
authored
Dual writer: mode 3 (#90045)
* Dual writer: mode 3 * Add integration tests for playlits in mode 3 * Remove todo * Update pkg/apiserver/rest/dualwriter_mode3.go Co-authored-by: Arati R. <[email protected]> * Admin: Fixes an issue where user accounts could not be enabled (#88117) Fix: unable to enable user * [REVIEW] FInish mode 3 and add tests * Improve logging * Update dependencies * Update pkg/apiserver/rest/dualwriter_mode3_test.go Co-authored-by: maicon <[email protected]> * remove test assertion * Use mode log when dual writer is initiated --------- Co-authored-by: Arati R. <[email protected]> Co-authored-by: gonvee <[email protected]> Co-authored-by: maicon <[email protected]>
1 parent 5e3a5b3 commit 67b74e1

File tree

5 files changed

+524
-131
lines changed

5 files changed

+524
-131
lines changed

go.work.sum

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,12 @@ github.com/grafana/e2e v0.1.1/go.mod h1:RpNLgae5VT+BUHvPE+/zSypmOXKwEu4t+tnEMS1A
402402
github.com/grafana/grafana-cloud-migration-snapshot v1.2.0 h1:FCUWASPPzGGbF2jTutR5i3rmoQdmnC4bypwJswdW3fI=
403403
github.com/grafana/grafana-cloud-migration-snapshot v1.2.0/go.mod h1:bd6Cm06EK0MzRO5ahUpbDz1SxNOKu+fzladbaRPHZPY=
404404
github.com/grafana/grafana/pkg/apimachinery v0.0.0-20240701135906-559738ce6ae1/go.mod h1:DkxMin+qOh1Fgkxfbt+CUfBqqsCQJMG9op8Os/irBPA=
405+
github.com/grafana/grafana-azure-sdk-go/v2 v2.1.0 h1:lajVqTWaE96MpbjZToj7EshvqgRWOfYNkD4MbIZizaY=
406+
github.com/grafana/grafana-azure-sdk-go/v2 v2.1.0/go.mod h1:aKlFPE36IDa8qccRg3KbgZX3MQ5xymS3RelT4j6kkVU=
407+
github.com/grafana/grafana-plugin-sdk-go v0.235.0/go.mod h1:6n9LbrjGL3xAATntYVNcIi90G9BVHRJjzHKz5FXVfWw=
408+
github.com/grafana/grafana/pkg/apimachinery v0.0.0-20240701135906-559738ce6ae1/go.mod h1:DkxMin+qOh1Fgkxfbt+CUfBqqsCQJMG9op8Os/irBPA=
409+
github.com/grafana/prometheus-alertmanager v0.25.1-0.20240422145632-c33c6b5b6e6b h1:HCbWyVL6vi7gxyO76gQksSPH203oBJ1MJ3JcG1OQlsg=
410+
github.com/grafana/prometheus-alertmanager v0.25.1-0.20240422145632-c33c6b5b6e6b/go.mod h1:01sXtHoRwI8W324IPAzuxDFOmALqYLCOhvSC2fUHWXc=
405411
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM=
406412
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 h1:MJG/KsmcqMwFAkh8mTnAwhyKoB+sTAnY4CACC110tbU=
407413
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
@@ -793,15 +799,24 @@ go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.23.1 h1:ZqR
793799
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.23.1/go.mod h1:D7ynngPWlGJrqyGSDOdscuv7uqttfCE3jcBvffDv9y4=
794800
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.23.1 h1:q/Nj5/2TZRIt6PderQ9oU0M00fzoe8UZuINGw6ETGTw=
795801
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.23.1/go.mod h1:DTE9yAu6r08jU3xa68GiSeI7oRcSEQ2RpKbbQGO+dWM=
802+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0/go.mod h1:z46paqbJ9l7c9fIPCXTqTGwhQZ5XoTIsfeFYWboizjs=
803+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.26.0/go.mod h1:wnJIG4fOqyynOnnQF/eQb4/16VlX2EJAHhHgqIqWfAo=
804+
go.opentelemetry.io/otel/exporters/prometheus v0.37.0 h1:NQc0epfL0xItsmGgSXgfbH2C1fq2VLXkZoDFsfRNHpc=
805+
go.opentelemetry.io/otel/exporters/prometheus v0.37.0/go.mod h1:hB8qWjsStK36t50/R0V2ULFb4u95X/Q6zupXLgvjTh8=
796806
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 h1:I8WIFXR351FoLJYuloU4EgXbtNX2URfU/85pUPheIEQ=
797807
go.opentelemetry.io/otel/exporters/prometheus v0.46.0/go.mod h1:ztwVUHe5DTR/1v7PeuGRnU5Bbd4QKYwApWmuutKsJSs=
798808
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.23.1 h1:C8r95vDR125t815KD+b1tI0Fbc1pFnwHTBxkbIZ6Szc=
799809
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.23.1/go.mod h1:Qr0qomr64jentMtOjWMbtYeJMSuMSlsPEjmnRA2sWZ4=
800810
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.24.0 h1:s0PHtIkN+3xrbDOpt2M8OTG92cWqUESvzh2MxiR5xY8=
801811
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.24.0/go.mod h1:hZlFbDbRt++MMPCCfSJfmhkGIWnX1h3XjkfxZUjLrIA=
812+
go.opentelemetry.io/otel/metric v1.26.0/go.mod h1:SY+rHOI4cEawI9a7N1A4nIg/nTQXe1ccCNWYOJUrpX4=
813+
go.opentelemetry.io/otel/sdk v1.26.0/go.mod h1:0p8MXpqLeJ0pzcszQQN4F0S5FVjBLgypeGSngLsmirs=
814+
go.opentelemetry.io/otel/sdk/metric v0.39.0 h1:Kun8i1eYf48kHH83RucG93ffz0zGV1sh46FAScOTuDI=
815+
go.opentelemetry.io/otel/sdk/metric v0.39.0/go.mod h1:piDIRgjcK7u0HCL5pCA4e74qpK/jk3NiUoAHATVAmiI=
802816
go.opentelemetry.io/otel/sdk/metric v1.26.0 h1:cWSks5tfriHPdWFnl+qpX3P681aAYqlZHcAyHw5aU9Y=
803817
go.opentelemetry.io/otel/sdk/metric v1.26.0/go.mod h1:ClMFFknnThJCksebJwz7KIyEDHO+nTB6gK8obLy8RyE=
804818
go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0=
819+
go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naRm/KDsgaHD8A=
805820
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
806821
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
807822
golang.org/x/arch v0.4.0 h1:A8WCeEWhLwPBKNbFi5Wv5UTCBx5zzubnXDlMOFAzFMc=
@@ -825,6 +840,7 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237/go.
825840
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117/go.mod h1:OimBR/bc1wPO9iV4NC2bpyjy3VnAwZh5EBPQdtaE5oo=
826841
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240325203815-454cdb8f5daa h1:wBkzraZsSqhj1M4L/nMrljUU6XasJkgHvUsq8oRGwF0=
827842
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
843+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8/go.mod h1:I7Y+G38R2bu5j1aLzfFmQfTcU/WnFuqDwLZAbvKTKpM=
828844
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0=
829845
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
830846
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 h1:M1YKkFIboKNieVO5DLUEVzQfGwJD30Nv2jfUgzb5UcE=

pkg/apiserver/rest/dualwriter_mode1_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ func TestMode1_Get(t *testing.T) {
135135
tt.setupStorageFn(m, tt.input)
136136
}
137137

138-
p := prometheus.NewRegistry()
139138
dw := NewDualWriter(Mode1, ls, us, p)
140139

141140
obj, err := dw.Get(context.Background(), tt.input, &metav1.GetOptions{})

pkg/apiserver/rest/dualwriter_mode3.go

Lines changed: 98 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ package rest
22

33
import (
44
"context"
5+
"errors"
6+
"time"
57

6-
apierrors "k8s.io/apimachinery/pkg/api/errors"
78
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
89
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
910
"k8s.io/apimachinery/pkg/runtime"
@@ -21,114 +22,151 @@ type DualWriterMode3 struct {
2122
// newDualWriterMode3 returns a new DualWriter in mode 3.
2223
// Mode 3 represents writing to LegacyStorage and Storage and reading from Storage.
2324
func newDualWriterMode3(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics) *DualWriterMode3 {
24-
return &DualWriterMode3{Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode3"), dualWriterMetrics: dwm}
25+
return &DualWriterMode3{Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode3").WithValues("mode", mode3Str), dualWriterMetrics: dwm}
2526
}
2627

2728
// Mode returns the mode of the dual writer.
2829
func (d *DualWriterMode3) Mode() DualWriterMode {
2930
return Mode3
3031
}
3132

33+
const mode3Str = "3"
34+
3235
// Create overrides the behavior of the generic DualWriter and writes to LegacyStorage and Storage.
3336
func (d *DualWriterMode3) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
34-
log := klog.FromContext(ctx)
37+
var method = "create"
38+
log := d.Log.WithValues("kind", options.Kind, "method", method)
39+
ctx = klog.NewContext(ctx, log)
3540

41+
startStorage := time.Now()
3642
created, err := d.Storage.Create(ctx, obj, createValidation, options)
3743
if err != nil {
3844
log.Error(err, "unable to create object in storage")
45+
d.recordLegacyDuration(true, mode3Str, options.Kind, method, startStorage)
3946
return created, err
4047
}
48+
d.recordStorageDuration(false, mode3Str, options.Kind, method, startStorage)
4149

42-
if _, err := d.Legacy.Create(ctx, obj, createValidation, options); err != nil {
43-
log.WithValues("object", created).Error(err, "unable to create object in legacy storage")
44-
}
45-
return created, nil
50+
go func() {
51+
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy create timeout"))
52+
defer cancel()
53+
54+
startLegacy := time.Now()
55+
_, errObjectSt := d.Legacy.Create(ctx, obj, createValidation, options)
56+
d.recordLegacyDuration(errObjectSt != nil, mode3Str, options.Kind, method, startLegacy)
57+
}()
58+
59+
return created, err
4660
}
4761

4862
// Get overrides the behavior of the generic DualWriter and retrieves an object from Storage.
4963
func (d *DualWriterMode3) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
50-
return d.Storage.Get(ctx, name, &metav1.GetOptions{})
64+
var method = "get"
65+
log := d.Log.WithValues("kind", options.Kind, "name", name, "method", method)
66+
ctx = klog.NewContext(ctx, log)
67+
68+
startStorage := time.Now()
69+
res, err := d.Storage.Get(ctx, name, options)
70+
if err != nil {
71+
log.Error(err, "unable to get object in storage")
72+
}
73+
d.recordStorageDuration(err != nil, mode3Str, options.Kind, method, startStorage)
74+
75+
return res, err
5176
}
5277

53-
func (d *DualWriterMode3) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
54-
log := d.Log.WithValues("name", name)
78+
// List overrides the behavior of the generic DualWriter and reads only from Unified Store.
79+
func (d *DualWriterMode3) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
80+
var method = "list"
81+
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion, "method", method)
5582
ctx = klog.NewContext(ctx, log)
5683

57-
deleted, async, err := d.Storage.Delete(ctx, name, deleteValidation, options)
84+
startStorage := time.Now()
85+
res, err := d.Storage.List(ctx, options)
5886
if err != nil {
59-
if !apierrors.IsNotFound(err) {
60-
log.Error(err, "could not delete from unified store")
61-
return deleted, async, err
62-
}
87+
log.Error(err, "unable to list object in storage")
6388
}
89+
d.recordStorageDuration(err != nil, mode3Str, options.Kind, method, startStorage)
6490

65-
_, _, errLS := d.Legacy.Delete(ctx, name, deleteValidation, options)
66-
if errLS != nil {
67-
if !apierrors.IsNotFound(errLS) {
68-
log.WithValues("deleted", deleted).Error(errLS, "could not delete from legacy store")
69-
}
91+
return res, err
92+
}
93+
94+
func (d *DualWriterMode3) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
95+
var method = "delete"
96+
log := d.Log.WithValues("name", name, "kind", options.Kind, "method", method)
97+
ctx = klog.NewContext(ctx, d.Log)
98+
99+
startStorage := time.Now()
100+
res, async, err := d.Storage.Delete(ctx, name, deleteValidation, options)
101+
if err != nil {
102+
log.Error(err, "unable to delete object in storage")
103+
d.recordStorageDuration(true, mode3Str, options.Kind, method, startStorage)
104+
return res, async, err
70105
}
106+
d.recordStorageDuration(false, mode3Str, name, method, startStorage)
71107

72-
return deleted, async, err
108+
go func() {
109+
startLegacy := time.Now()
110+
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy delete timeout"))
111+
defer cancel()
112+
_, _, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
113+
d.recordLegacyDuration(err != nil, mode3Str, options.Kind, method, startLegacy)
114+
}()
115+
116+
return res, async, err
73117
}
74118

75119
// Update overrides the behavior of the generic DualWriter and writes first to Storage and then to LegacyStorage.
76120
func (d *DualWriterMode3) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
77-
log := d.Log.WithValues("name", name)
121+
var method = "update"
122+
log := d.Log.WithValues("name", name, "kind", options.Kind, "method", method)
78123
ctx = klog.NewContext(ctx, log)
79-
old, err := d.Storage.Get(ctx, name, &metav1.GetOptions{})
80-
if err != nil {
81-
log.WithValues("object", old).Error(err, "could not get object to update")
82-
return nil, false, err
83-
}
84124

85-
updated, err := objInfo.UpdatedObject(ctx, old)
125+
startStorage := time.Now()
126+
res, async, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
86127
if err != nil {
87-
log.WithValues("object", updated).Error(err, "could not update or create object")
88-
return nil, false, err
89-
}
90-
objInfo = &updateWrapper{
91-
upstream: objInfo,
92-
updated: updated,
128+
log.Error(err, "unable to update in storage")
129+
d.recordLegacyDuration(true, mode3Str, options.Kind, method, startStorage)
130+
return res, async, err
93131
}
132+
d.recordStorageDuration(false, mode3Str, options.Kind, method, startStorage)
94133

95-
obj, created, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
96-
if err != nil {
97-
log.WithValues("object", obj).Error(err, "could not write to US")
98-
return obj, created, err
99-
}
134+
go func() {
135+
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy update timeout"))
100136

101-
_, _, errLeg := d.Legacy.Update(ctx, name, &updateWrapper{
102-
upstream: objInfo,
103-
updated: obj,
104-
}, createValidation, updateValidation, forceAllowCreate, options)
105-
if errLeg != nil {
106-
log.Error(errLeg, "could not update object in legacy store")
107-
}
108-
return obj, created, err
137+
startLegacy := time.Now()
138+
defer cancel()
139+
_, _, errObjectSt := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
140+
d.recordLegacyDuration(errObjectSt != nil, mode3Str, options.Kind, method, startLegacy)
141+
}()
142+
143+
return res, async, err
109144
}
110145

111146
// DeleteCollection overrides the behavior of the generic DualWriter and deletes from both LegacyStorage and Storage.
112147
func (d *DualWriterMode3) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
113-
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion)
148+
var method = "delete-collection"
149+
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion, "method", method)
114150
ctx = klog.NewContext(ctx, log)
115151

116-
deleted, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
152+
startStorage := time.Now()
153+
res, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
117154
if err != nil {
118-
log.Error(err, "failed to delete collection successfully from Storage")
155+
log.Error(err, "unable to delete collection in storage")
156+
d.recordStorageDuration(true, mode3Str, options.Kind, method, startStorage)
157+
return res, err
119158
}
159+
d.recordStorageDuration(false, mode3Str, options.Kind, method, startStorage)
120160

121-
if deleted, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions); err != nil {
122-
log.WithValues("deleted", deleted).Error(err, "failed to delete collection successfully from LegacyStorage")
123-
}
161+
go func() {
162+
startLegacy := time.Now()
163+
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy deletecollection timeout"))
164+
defer cancel()
165+
_, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
166+
d.recordStorageDuration(err != nil, mode3Str, options.Kind, method, startLegacy)
167+
}()
124168

125-
return deleted, err
126-
}
127-
128-
func (d *DualWriterMode3) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
129-
//TODO: implement List
130-
klog.Error("List not implemented")
131-
return nil, nil
169+
return res, err
132170
}
133171

134172
func (d *DualWriterMode3) Destroy() {

0 commit comments

Comments
 (0)