Skip to content

Commit d76b6e3

Browse files
committed
refactor: make QRuntime support extended resource attributes
This also fixes a generic consistency issue: and item in the queue and in the dedup process should always handle uniqueness by (namespace, type, id) triple. Allow additional non-deduplicated "last" data to be attached to the queue items, so that reconcile/map steps can access more input data. Signed-off-by: Andrey Smirnov <[email protected]>
1 parent 23b4690 commit d76b6e3

File tree

14 files changed

+434
-182
lines changed

14 files changed

+434
-182
lines changed

pkg/controller/conformance/qcontrollers.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func (ctrl *QIntToStrController) Reconcile(ctx context.Context, _ *zap.Logger, r
141141
}
142142

143143
// MapInput implements controller.QController interface.
144-
func (ctrl *QIntToStrController) MapInput(_ context.Context, _ *zap.Logger, _ controller.QRuntime, ptr resource.Pointer) ([]resource.Pointer, error) {
144+
func (ctrl *QIntToStrController) MapInput(_ context.Context, _ *zap.Logger, _ controller.QRuntime, ptr controller.ReducedResourceMetadata) ([]resource.Pointer, error) {
145145
switch {
146146
case ptr.Namespace() == ctrl.TargetNamespace && ptr.Type() == StrResourceType:
147147
// remap output to input to recheck on finalizer removal
@@ -219,7 +219,7 @@ func (ctrl *QFailingController) Reconcile(ctx context.Context, _ *zap.Logger, r
219219
}
220220

221221
// MapInput implements controller.QController interface.
222-
func (ctrl *QFailingController) MapInput(context.Context, *zap.Logger, controller.QRuntime, resource.Pointer) ([]resource.Pointer, error) {
222+
func (ctrl *QFailingController) MapInput(context.Context, *zap.Logger, controller.QRuntime, controller.ReducedResourceMetadata) ([]resource.Pointer, error) {
223223
panic("not going to map anything")
224224
}
225225

@@ -335,7 +335,7 @@ func (ctrl *QIntToStrSleepingController) Reconcile(ctx context.Context, logger *
335335
}
336336

337337
// MapInput implements controller.QController interface.
338-
func (ctrl *QIntToStrSleepingController) MapInput(_ context.Context, _ *zap.Logger, _ controller.QRuntime, ptr resource.Pointer) ([]resource.Pointer, error) {
338+
func (ctrl *QIntToStrSleepingController) MapInput(_ context.Context, _ *zap.Logger, _ controller.QRuntime, ptr controller.ReducedResourceMetadata) ([]resource.Pointer, error) {
339339
switch {
340340
case ptr.Namespace() == ctrl.TargetNamespace && ptr.Type() == StrResourceType:
341341
// remap output to input to recheck on finalizer removal

pkg/controller/generic/destroy/destroy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,6 @@ func (ctrl *Controller[Input]) Reconcile(ctx context.Context, logger *zap.Logger
9393
}
9494

9595
// MapInput implements controller.QController interface.
96-
func (ctrl *Controller[Input]) MapInput(context.Context, *zap.Logger, controller.QRuntime, resource.Pointer) ([]resource.Pointer, error) {
96+
func (ctrl *Controller[Input]) MapInput(context.Context, *zap.Logger, controller.QRuntime, controller.ReducedResourceMetadata) ([]resource.Pointer, error) {
9797
return nil, nil
9898
}

pkg/controller/generic/qtransform/options.go

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,44 +14,76 @@ import (
1414
"github.com/cosi-project/runtime/pkg/controller"
1515
"github.com/cosi-project/runtime/pkg/controller/generic"
1616
"github.com/cosi-project/runtime/pkg/resource"
17+
"github.com/cosi-project/runtime/pkg/safe"
18+
"github.com/cosi-project/runtime/pkg/state"
1719
)
1820

1921
type namespaceType struct {
2022
Namespace resource.Namespace
2123
Type resource.Type
2224
}
2325

24-
type mapperFunc func(context.Context, *zap.Logger, controller.QRuntime, resource.Resource) ([]resource.Pointer, error)
26+
type mapperFunc func(context.Context, *zap.Logger, controller.QRuntime, controller.ReducedResourceMetadata) ([]resource.Pointer, error)
2527

26-
// MapperFuncGeneric is a generic version of mapperFunc.
27-
type MapperFuncGeneric[I generic.ResourceWithRD] func(context.Context, *zap.Logger, controller.QRuntime, I) ([]resource.Pointer, error)
28+
// MapperFunc is the most generic version of mapper function: it accepts pointer to resource and some metadata.
29+
type MapperFunc = mapperFunc
30+
31+
// MapperFuncTyped is a generic version of MapperFunc which requires resource I to exist.
32+
type MapperFuncTyped[I generic.ResourceWithRD] func(context.Context, *zap.Logger, controller.QRuntime, I) ([]resource.Pointer, error)
2833

2934
// MapperSameID is a mapper that returns the same namespace ID as the input resource, but uses output resource type.
30-
func MapperSameID[I generic.ResourceWithRD, O generic.ResourceWithRD]() MapperFuncGeneric[I] {
35+
func MapperSameID[O generic.ResourceWithRD]() mapperFunc {
3136
var zeroOutput O
3237

3338
outputRD := zeroOutput.ResourceDefinition()
3439

35-
return func(_ context.Context, _ *zap.Logger, _ controller.QRuntime, v I) ([]resource.Pointer, error) {
36-
return []resource.Pointer{resource.NewMetadata(outputRD.DefaultNamespace, outputRD.Type, v.Metadata().ID(), resource.VersionUndefined)}, nil
40+
return func(_ context.Context, _ *zap.Logger, _ controller.QRuntime, ptr controller.ReducedResourceMetadata) ([]resource.Pointer, error) {
41+
return []resource.Pointer{resource.NewMetadata(outputRD.DefaultNamespace, outputRD.Type, ptr.ID(), resource.VersionUndefined)}, nil
42+
}
43+
}
44+
45+
// MapExtractLabelValue returns a mapper that extracts a label value from a resource, potentially evaluating a condition on labels.
46+
func MapExtractLabelValue[O generic.ResourceWithRD](extractableLabel string, labelTerms ...resource.LabelTerm) MapperFunc {
47+
var zeroOutput O
48+
49+
outputNamespace := zeroOutput.ResourceDefinition().DefaultNamespace
50+
outputType := zeroOutput.ResourceDefinition().Type
51+
52+
return func(_ context.Context, _ *zap.Logger, _ controller.QRuntime, ptr controller.ReducedResourceMetadata) ([]resource.Pointer, error) {
53+
for _, labelTerm := range labelTerms {
54+
if !ptr.Labels().Matches(labelTerm) {
55+
return nil, nil
56+
}
57+
}
58+
59+
value, ok := ptr.Labels().Get(extractableLabel)
60+
if !ok {
61+
return nil, nil
62+
}
63+
64+
return []resource.Pointer{resource.NewMetadata(outputNamespace, outputType, value, resource.VersionUndefined)}, nil
3765
}
3866
}
3967

4068
// MapperNone is a mapper that returns no pointers.
41-
func MapperNone[I generic.ResourceWithRD]() MapperFuncGeneric[I] {
42-
return func(context.Context, *zap.Logger, controller.QRuntime, I) ([]resource.Pointer, error) {
69+
func MapperNone() MapperFunc {
70+
return func(context.Context, *zap.Logger, controller.QRuntime, controller.ReducedResourceMetadata) ([]resource.Pointer, error) {
4371
return nil, nil
4472
}
4573
}
4674

47-
func mapperFuncFromGeneric[I generic.ResourceWithRD](generic MapperFuncGeneric[I]) mapperFunc {
48-
return func(ctx context.Context, logger *zap.Logger, r controller.QRuntime, res resource.Resource) ([]resource.Pointer, error) {
49-
v, ok := res.(I)
50-
if !ok {
51-
return nil, fmt.Errorf("unexpected resource type in mapFunc %T", res)
75+
func MapperFuncFromTyped[I generic.ResourceWithRD](typedMapper MapperFuncTyped[I]) MapperFunc {
76+
return func(ctx context.Context, logger *zap.Logger, r controller.QRuntime, ptr controller.ReducedResourceMetadata) ([]resource.Pointer, error) {
77+
in, err := safe.ReaderGet[I](ctx, r, ptr)
78+
if err != nil {
79+
if state.IsNotFoundError(err) {
80+
return nil, nil
81+
}
82+
83+
return nil, fmt.Errorf("failed to read mapped input: %w", err)
5284
}
5385

54-
return generic(ctx, logger, r, v)
86+
return typedMapper(ctx, logger, r, in)
5587
}
5688
}
5789

@@ -70,7 +102,7 @@ type ControllerOptions struct {
70102
type ControllerOption func(*ControllerOptions)
71103

72104
// WithExtraMappedInputKind adds an extra input with the given kind to the controller.
73-
func WithExtraMappedInputKind[I generic.ResourceWithRD](mapFunc MapperFuncGeneric[I], inputKind controller.InputKind) ControllerOption {
105+
func WithExtraMappedInputKind[I generic.ResourceWithRD](mapFunc MapperFunc, inputKind controller.InputKind) ControllerOption {
74106
return func(o *ControllerOptions) {
75107
var zeroInput I
76108

@@ -91,7 +123,7 @@ func WithExtraMappedInputKind[I generic.ResourceWithRD](mapFunc MapperFuncGeneri
91123
panic(fmt.Errorf("duplicate mapper for %q", nsType))
92124
}
93125

94-
o.mappers[nsType] = mapperFuncFromGeneric(mapFunc)
126+
o.mappers[nsType] = mapFunc
95127

96128
o.extraInputs = append(o.extraInputs, controller.Input{
97129
Namespace: zeroInput.ResourceDefinition().DefaultNamespace,
@@ -102,13 +134,13 @@ func WithExtraMappedInputKind[I generic.ResourceWithRD](mapFunc MapperFuncGeneri
102134
}
103135

104136
// WithExtraMappedInput adds an extra mapped input to the controller.
105-
func WithExtraMappedInput[I generic.ResourceWithRD](mapFunc MapperFuncGeneric[I]) ControllerOption {
106-
return WithExtraMappedInputKind(mapFunc, controller.InputQMapped)
137+
func WithExtraMappedInput[I generic.ResourceWithRD](mapFunc MapperFunc) ControllerOption {
138+
return WithExtraMappedInputKind[I](mapFunc, controller.InputQMapped)
107139
}
108140

109141
// WithExtraMappedDestroyReadyInput adds an extra mapped destroy-ready input to the controller.
110-
func WithExtraMappedDestroyReadyInput[I generic.ResourceWithRD](mapFunc MapperFuncGeneric[I]) ControllerOption {
111-
return WithExtraMappedInputKind(mapFunc, controller.InputQMappedDestroyReady)
142+
func WithExtraMappedDestroyReadyInput[I generic.ResourceWithRD](mapFunc MapperFunc) ControllerOption {
143+
return WithExtraMappedInputKind[I](mapFunc, controller.InputQMappedDestroyReady)
112144
}
113145

114146
// WithExtraOutputs adds extra outputs to the controller.

pkg/controller/generic/qtransform/qtransform.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -358,17 +358,20 @@ func (ctrl *QController[Input, Output]) reconcileTearingDown(ctx context.Context
358358
}
359359

360360
// MapInput implements controller.QController interface.
361-
func (ctrl *QController[Input, Output]) MapInput(ctx context.Context, logger *zap.Logger, r controller.QRuntime, ptr resource.Pointer) ([]resource.Pointer, error) {
362-
in, err := r.Get(ctx, ptr)
363-
if err != nil {
364-
if state.IsNotFoundError(err) {
365-
return nil, nil
366-
}
361+
func (ctrl *QController[Input, Output]) MapInput(ctx context.Context, logger *zap.Logger, r controller.QRuntime, ptr controller.ReducedResourceMetadata) ([]resource.Pointer, error) {
362+
var zeroOut Output
363+
364+
// this is our output, destroy ready
365+
if ptr.Type() == zeroOut.ResourceDefinition().Type {
366+
out, err := safe.ReaderGet[Output](ctx, r, ptr)
367+
if err != nil {
368+
if state.IsNotFoundError(err) {
369+
return nil, nil
370+
}
367371

368-
return nil, fmt.Errorf("error reading input mapped resource: %w", err)
369-
}
372+
return nil, fmt.Errorf("error reading output mapped resource: %w", err)
373+
}
370374

371-
if out, ok := in.(Output); ok {
372375
// output destroy ready, remap to input
373376
input := ctrl.unmapFunc(out)
374377

@@ -386,5 +389,5 @@ func (ctrl *QController[Input, Output]) MapInput(ctx context.Context, logger *za
386389
return nil, fmt.Errorf("no mapper for %q", nsType)
387390
}
388391

389-
return mapperFunc(ctx, logger, r, in)
392+
return mapperFunc(ctx, logger, r, ptr)
390393
}

pkg/controller/generic/qtransform/qtransform_test.go

Lines changed: 126 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,41 @@ func NewABCController(opts ...qtransform.ControllerOption) *ABController {
162162
},
163163
append(
164164
opts,
165-
qtransform.WithExtraMappedInput(func(_ context.Context, _ *zap.Logger, _ controller.QRuntime, c *C) ([]resource.Pointer, error) {
166-
return []resource.Pointer{
167-
NewA(c.Metadata().ID(), ASpec{}).Metadata(),
168-
}, nil
169-
}),
165+
qtransform.WithExtraMappedInput[*C](qtransform.MapperSameID[*A]()),
166+
)...,
167+
)
168+
}
169+
170+
func NewABCLabelsController(opts ...qtransform.ControllerOption) *ABController {
171+
return qtransform.NewQController(
172+
qtransform.Settings[*A, *B]{
173+
Name: "QTransformABCController",
174+
MapMetadataFunc: func(in *A) *B {
175+
return NewB("transformed-"+in.Metadata().ID(), BSpec{})
176+
},
177+
UnmapMetadataFunc: func(in *B) *A {
178+
return NewA(strings.TrimPrefix(in.Metadata().ID(), "transformed-"), ASpec{})
179+
},
180+
TransformFunc: func(ctx context.Context, r controller.Reader, _ *zap.Logger, in *A, out *B) error {
181+
cList, err := safe.ReaderListAll[*C](ctx, r, state.WithLabelQuery(resource.LabelEqual("a", in.Metadata().ID())))
182+
if err != nil && !state.IsNotFoundError(err) {
183+
return err
184+
}
185+
186+
out.TypedSpec().Out = fmt.Sprintf("%q-%d", in.TypedSpec().Str, in.TypedSpec().Int)
187+
188+
for c := range cList.All() {
189+
out.TypedSpec().Out += fmt.Sprintf("-%d", c.TypedSpec().Aux)
190+
}
191+
192+
out.TypedSpec().TransformCount++
193+
194+
return nil
195+
},
196+
},
197+
append(
198+
opts,
199+
qtransform.WithExtraMappedInput[*C](qtransform.MapExtractLabelValue[*A]("a")),
170200
)...,
171201
)
172202
}
@@ -722,6 +752,19 @@ func TestRemappedInput(t *testing.T) {
722752
assert.Equal(`"baz"-3`, r.TypedSpec().Out)
723753
}
724754
})
755+
756+
require.NoError(t, st.Destroy(ctx, NewC("1", CSpec{}).Metadata()))
757+
758+
rtestutils.AssertResources(ctx, t, st, []resource.ID{"transformed-1", "transformed-2", "transformed-3"}, func(r *B, assert *assert.Assertions) {
759+
switch r.Metadata().ID() {
760+
case "transformed-1":
761+
assert.Equal(`"foo"-1`, r.TypedSpec().Out)
762+
case "transformed-2":
763+
assert.Equal(`"bar"-2-22`, r.TypedSpec().Out)
764+
case "transformed-3":
765+
assert.Equal(`"baz"-3`, r.TypedSpec().Out)
766+
}
767+
})
725768
})
726769
}
727770

@@ -767,6 +810,84 @@ func TestRequeueErrorBackoff(t *testing.T) {
767810
})
768811
}
769812

813+
func TestMappedByLabelInput(t *testing.T) {
814+
setup(t, func(ctx context.Context, st state.State, runtime *runtime.Runtime) {
815+
require.NoError(t, runtime.RegisterQController(NewABCLabelsController()))
816+
817+
for _, a := range []*A{
818+
NewA("1", ASpec{Str: "foo", Int: 1}),
819+
NewA("2", ASpec{Str: "bar", Int: 2}),
820+
NewA("3", ASpec{Str: "baz", Int: 3}),
821+
} {
822+
require.NoError(t, st.Create(ctx, a))
823+
}
824+
825+
rtestutils.AssertResources(ctx, t, st, []resource.ID{"transformed-1", "transformed-2", "transformed-3"}, func(r *B, assert *assert.Assertions) {
826+
switch r.Metadata().ID() {
827+
case "transformed-1":
828+
assert.Equal(`"foo"-1`, r.TypedSpec().Out)
829+
case "transformed-2":
830+
assert.Equal(`"bar"-2`, r.TypedSpec().Out)
831+
case "transformed-3":
832+
assert.Equal(`"baz"-3`, r.TypedSpec().Out)
833+
}
834+
})
835+
836+
c1 := NewC("cA", CSpec{Aux: 11})
837+
c1.Metadata().Labels().Set("a", "1")
838+
require.NoError(t, st.Create(ctx, c1))
839+
840+
c2 := NewC("cB", CSpec{Aux: 22})
841+
c2.Metadata().Labels().Set("a", "2")
842+
require.NoError(t, st.Create(ctx, c2))
843+
844+
c3 := NewC("cC", CSpec{Aux: 33})
845+
c3.Metadata().Labels().Set("a", "1")
846+
require.NoError(t, st.Create(ctx, c3))
847+
848+
rtestutils.AssertResources(ctx, t, st, []resource.ID{"transformed-1", "transformed-2", "transformed-3"}, func(r *B, assert *assert.Assertions) {
849+
switch r.Metadata().ID() {
850+
case "transformed-1":
851+
assert.Equal(`"foo"-1-11-33`, r.TypedSpec().Out)
852+
case "transformed-2":
853+
assert.Equal(`"bar"-2-22`, r.TypedSpec().Out)
854+
case "transformed-3":
855+
assert.Equal(`"baz"-3`, r.TypedSpec().Out)
856+
}
857+
})
858+
859+
require.NoError(t, st.Destroy(ctx, c2.Metadata()))
860+
861+
rtestutils.AssertResources(ctx, t, st, []resource.ID{"transformed-1", "transformed-2", "transformed-3"}, func(r *B, assert *assert.Assertions) {
862+
switch r.Metadata().ID() {
863+
case "transformed-1":
864+
assert.Equal(`"foo"-1-11-33`, r.TypedSpec().Out)
865+
case "transformed-2":
866+
assert.Equal(`"bar"-2`, r.TypedSpec().Out)
867+
case "transformed-3":
868+
assert.Equal(`"baz"-3`, r.TypedSpec().Out)
869+
}
870+
})
871+
872+
require.NoError(t, st.Destroy(ctx, c1.Metadata()))
873+
874+
c4 := NewC("cD", CSpec{Aux: 44})
875+
c4.Metadata().Labels().Set("a", "3")
876+
require.NoError(t, st.Create(ctx, c4))
877+
878+
rtestutils.AssertResources(ctx, t, st, []resource.ID{"transformed-1", "transformed-2", "transformed-3"}, func(r *B, assert *assert.Assertions) {
879+
switch r.Metadata().ID() {
880+
case "transformed-1":
881+
assert.Equal(`"foo"-1-33`, r.TypedSpec().Out)
882+
case "transformed-2":
883+
assert.Equal(`"bar"-2`, r.TypedSpec().Out)
884+
case "transformed-3":
885+
assert.Equal(`"baz"-3-44`, r.TypedSpec().Out)
886+
}
887+
})
888+
})
889+
}
890+
770891
func setup(t *testing.T, f func(ctx context.Context, st state.State, rt *runtime.Runtime), opts ...options.Option) {
771892
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
772893

pkg/controller/qcontroller.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type QController interface {
5252
// For example, MapInput might convert watch events in secondary input to primary input items.
5353
//
5454
// MapInput failures are treated in the same way as Reconcile failures.
55-
MapInput(context.Context, *zap.Logger, QRuntime, resource.Pointer) ([]resource.Pointer, error)
55+
MapInput(context.Context, *zap.Logger, QRuntime, ReducedResourceMetadata) ([]resource.Pointer, error)
5656
}
5757

5858
// QRuntime interface as presented to the QController.
@@ -70,6 +70,16 @@ type QSettings struct {
7070
Concurrency optional.Optional[uint]
7171
}
7272

73+
// ReducedResourceMetadata is the input type for MapInput.
74+
//
75+
// It consists of mandatory resource.Pointer and some additional metadata properties.
76+
type ReducedResourceMetadata interface {
77+
resource.Pointer
78+
Phase() resource.Phase
79+
Labels() *resource.Labels
80+
FinalizersEmpty() bool
81+
}
82+
7383
// RequeueError is returned by QController.Reconcile to requeue the item with specified backoff.
7484
//
7585
// RequeueError might contain or might not contain an actual error, in either case the item is requeued

pkg/controller/runtime/internal/qruntime/backoff.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/cenkalti/backoff/v4"
1111
)
1212

13-
func (adapter *Adapter) getBackoffInterval(item QItem) time.Duration {
13+
func (adapter *Adapter) getBackoffInterval(item QKey) time.Duration {
1414
adapter.backoffsMu.Lock()
1515
defer adapter.backoffsMu.Unlock()
1616

@@ -24,7 +24,7 @@ func (adapter *Adapter) getBackoffInterval(item QItem) time.Duration {
2424
return bckoff.NextBackOff()
2525
}
2626

27-
func (adapter *Adapter) clearBackoff(item QItem) {
27+
func (adapter *Adapter) clearBackoff(item QKey) {
2828
adapter.backoffsMu.Lock()
2929
defer adapter.backoffsMu.Unlock()
3030

0 commit comments

Comments
 (0)