Skip to content

Commit 09e27bd

Browse files
authored
Merge pull request crossplane#6407 from negz/pollution
Promote realtime composition to beta - don't poll in the XR reconciler
2 parents e172321 + a555222 commit 09e27bd

File tree

36 files changed

+604
-698
lines changed

36 files changed

+604
-698
lines changed

.github/workflows/ci.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,6 @@ jobs:
227227
matrix:
228228
test-suite:
229229
- base
230-
- realtime-compositions
231230
- package-dependency-updates
232231
- package-signature-verification
233232

apis/apiextensions/fn/proto/v1/run_function.pb.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apis/apiextensions/fn/proto/v1/run_function.proto

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ message RunFunctionRequest {
5555
// of the 'input' block of the relevant entry in a Composition's pipeline.
5656
optional google.protobuf.Struct input = 4;
5757

58-
// Optional context. Crossplane may pass arbitary contextual information to a
58+
// Optional context. Crossplane may pass arbitrary contextual information to a
5959
// Function. A Function may also return context in its RunFunctionResponse,
6060
// and that context will be passed to subsequent Functions. Crossplane
6161
// discards all context returned by the last Function in the pipeline.
@@ -170,9 +170,9 @@ message ResponseMeta {
170170
// meta.tag of the corresponding RunFunctionRequest.
171171
string tag = 1;
172172

173-
// Time-to-live of this response. Deterministic Functions with no side-effects
174-
// (e.g. simple templating Functions) may specify a TTL. Crossplane may choose
175-
// to cache responses until the TTL expires.
173+
// Time-to-live of this response. Crossplane will call the function again when
174+
// the TTL expires. Crossplane may cache the response to avoid calling the
175+
// function again until the TTL expires.
176176
optional google.protobuf.Duration ttl = 2;
177177
}
178178

apis/apiextensions/fn/proto/v1beta1/zz_generated_run_function.pb.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apis/apiextensions/fn/proto/v1beta1/zz_generated_run_function.proto

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ message RunFunctionRequest {
5757
// of the 'input' block of the relevant entry in a Composition's pipeline.
5858
optional google.protobuf.Struct input = 4;
5959

60-
// Optional context. Crossplane may pass arbitary contextual information to a
60+
// Optional context. Crossplane may pass arbitrary contextual information to a
6161
// Function. A Function may also return context in its RunFunctionResponse,
6262
// and that context will be passed to subsequent Functions. Crossplane
6363
// discards all context returned by the last Function in the pipeline.
@@ -172,9 +172,9 @@ message ResponseMeta {
172172
// meta.tag of the corresponding RunFunctionRequest.
173173
string tag = 1;
174174

175-
// Time-to-live of this response. Deterministic Functions with no side-effects
176-
// (e.g. simple templating Functions) may specify a TTL. Crossplane may choose
177-
// to cache responses until the TTL expires.
175+
// Time-to-live of this response. Crossplane will call the function again when
176+
// the TTL expires. Crossplane may cache the response to avoid calling the
177+
// function again until the TTL expires.
178178
optional google.protobuf.Duration ttl = 2;
179179
}
180180

cmd/crossplane/core/core.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,14 +116,14 @@ type startCommand struct {
116116
TLSClientCertsDir string `env:"TLS_CLIENT_CERTS_DIR" help:"The path of the folder which will store TLS client certificate of Crossplane."`
117117

118118
EnableExternalSecretStores bool `group:"Alpha Features:" help:"Enable support for External Secret Stores."`
119-
EnableRealtimeCompositions bool `group:"Alpha Features:" help:"Enable support for realtime compositions, i.e. watching composed resources and reconciling compositions immediately when any of the composed resources is updated."`
120119
EnableDependencyVersionUpgrades bool `group:"Alpha Features:" help:"Enable support for upgrading dependency versions when the parent package is updated."`
121120
EnableSignatureVerification bool `group:"Alpha Features:" help:"Enable support for package signature verification via ImageConfig API."`
122121

123122
EnableCompositionWebhookSchemaValidation bool `default:"true" group:"Beta Features:" help:"Enable support for Composition validation using schemas."`
124123
EnableDeploymentRuntimeConfigs bool `default:"true" group:"Beta Features:" help:"Enable support for Deployment Runtime Configs."`
125124
EnableUsages bool `default:"true" group:"Beta Features:" help:"Enable support for deletion ordering and resource protection with Usages."`
126125
EnableSSAClaims bool `default:"true" group:"Beta Features:" help:"Enable support for using Kubernetes server-side apply to sync claims with composite resources (XRs)."`
126+
EnableRealtimeCompositions bool `default:"true" group:"Beta Features:" help:"Enable support for realtime compositions, i.e. watching composed resources and reconciling compositions immediately when any of the composed resources is updated."`
127127

128128
// These are GA features that previously had alpha or beta feature flags.
129129
// You can't turn off a GA feature. We maintain the flags to avoid breaking
@@ -244,14 +244,14 @@ func (c *startCommand) Run(s *runtime.Scheme, log logging.Logger) error { //noli
244244
return errors.Wrap(err, "cannot load client TLS certificates")
245245
}
246246

247-
m := xfn.NewMetrics()
248-
metrics.Registry.MustRegister(m)
247+
xfnm := xfn.NewPrometheusMetrics()
248+
metrics.Registry.MustRegister(xfnm)
249249

250250
// We want all XR controllers to share the same gRPC clients.
251251
functionRunner := xfn.NewPackagedFunctionRunner(mgr.GetClient(),
252252
xfn.WithLogger(log),
253253
xfn.WithTLSConfig(clienttls),
254-
xfn.WithInterceptorCreators(m),
254+
xfn.WithInterceptorCreators(xfnm),
255255
)
256256

257257
// Periodically remove clients for Functions that no longer exist.
@@ -283,8 +283,8 @@ func (c *startCommand) Run(s *runtime.Scheme, log logging.Logger) error { //noli
283283
}
284284
}
285285
if c.EnableRealtimeCompositions {
286-
o.Features.Enable(features.EnableAlphaRealtimeCompositions)
287-
log.Info("Alpha feature enabled", "flag", features.EnableAlphaRealtimeCompositions)
286+
o.Features.Enable(features.EnableBetaRealtimeCompositions)
287+
log.Info("Beta feature enabled", "flag", features.EnableBetaRealtimeCompositions)
288288
}
289289
if c.EnableDeploymentRuntimeConfigs {
290290
o.Features.Enable(features.EnableBetaDeploymentRuntimeConfigs)
@@ -375,6 +375,9 @@ func (c *startCommand) Run(s *runtime.Scheme, log logging.Logger) error { //noli
375375
return errors.Wrap(err, "cannot create uncached client for API extension controllers")
376376
}
377377

378+
cem := engine.NewPrometheusMetrics()
379+
metrics.Registry.MustRegister(cem)
380+
378381
// It's important the engine's client is wrapped with unstructured.NewClient
379382
// because controller-runtime always caches *unstructured.Unstructured, not
380383
// our wrapper types like *composite.Unstructured. This client takes care of
@@ -384,6 +387,7 @@ func (c *startCommand) Run(s *runtime.Scheme, log logging.Logger) error { //noli
384387
unstructured.NewClient(cached),
385388
unstructured.NewClient(uncached),
386389
engine.WithLogger(log),
390+
engine.WithMetrics(cem),
387391
)
388392

389393
// TODO(negz): Garbage collect informers for CRs that are still defined

internal/controller/apiextensions/claim/reconciler.go

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ type CompositeSyncer interface {
9898
// composite resource (XR).
9999
type CompositeSyncerFn func(ctx context.Context, cm *claim.Unstructured, xr *composite.Unstructured) error
100100

101-
// Sync the supplied claim with the supplied composite resource..
101+
// Sync the supplied claim with the supplied composite resource.
102102
func (fn CompositeSyncerFn) Sync(ctx context.Context, cm *claim.Unstructured, xr *composite.Unstructured) error {
103103
return fn(ctx, cm, xr)
104104
}
@@ -188,9 +188,8 @@ type Reconciler struct {
188188
composite crComposite
189189
claim crClaim
190190

191-
log logging.Logger
192-
record event.Recorder
193-
pollInterval time.Duration
191+
log logging.Logger
192+
record event.Recorder
194193
}
195194

196195
type crComposite struct {
@@ -275,17 +274,6 @@ func WithRecorder(er event.Recorder) ReconcilerOption {
275274
}
276275
}
277276

278-
// WithPollInterval specifies how long the Reconciler should wait before queueing
279-
// a new reconciliation after a successful reconcile. The Reconciler requeues
280-
// after a specified duration when it is not actively waiting for an external
281-
// operation, but wishes to check whether resources it does not have a watch on
282-
// (i.e. composed resources) need to be reconciled.
283-
func WithPollInterval(after time.Duration) ReconcilerOption {
284-
return func(r *Reconciler) {
285-
r.pollInterval = after
286-
}
287-
}
288-
289277
// NewReconciler returns a Reconciler that reconciles composite resource claims of
290278
// the supplied CompositeClaimKind with resources of the supplied CompositeKind.
291279
// The returned Reconciler will apply only the ObjectMetaConfigurator by

internal/controller/apiextensions/composite/composed.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,6 @@ import (
2222
v1 "github.com/crossplane/crossplane/apis/apiextensions/v1"
2323
)
2424

25-
// A CompositeResource is an output of the composition process.
26-
type CompositeResource struct { //nolint:revive // stick with CompositeResource
27-
// Ready indicated whether the composite resource should be marked as
28-
// ready or unready regardless of the state of the composed resoureces.
29-
// If it is nil the readiness of the composite is determined by the
30-
// readiness of the composed resources.
31-
Ready *bool
32-
}
33-
3425
// A ResourceName uniquely identifies the composed resource within a Composition
3526
// and within Composition Function gRPC calls. This is not the metadata.name of
3627
// the actual composed resource instance; rather it is the name of an entry in a

internal/controller/apiextensions/composite/composition_functions.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"sort"
2323
"strings"
24+
"time"
2425

2526
"google.golang.org/protobuf/encoding/protojson"
2627
"google.golang.org/protobuf/types/known/structpb"
@@ -269,6 +270,11 @@ func (c *FunctionComposer) Compose(ctx context.Context, xr *composite.Unstructur
269270
return CompositionResult{}, errors.Wrap(err, errBuildObserved)
270271
}
271272

273+
// Time-to-live for this composition pipeline run. Each function returns
274+
// a TTL. The pipeline's TTL will be the shortest non-zero TTL returned
275+
// by any function. A TTL of zero means unlimited TTL.
276+
var ttl time.Duration
277+
272278
// The Function pipeline starts with empty desired state.
273279
d := &fnv1.State{}
274280

@@ -319,6 +325,13 @@ func (c *FunctionComposer) Compose(ctx context.Context, xr *composite.Unstructur
319325
return CompositionResult{}, errors.Wrapf(err, errFmtRunPipelineStep, fn.Step)
320326
}
321327

328+
// If this Function specified a non-zero TTL that's less than
329+
// the current recorded TTL for the pipeline, it's the new TTL
330+
// for the pipeline.
331+
if d := rsp.GetMeta().GetTtl().AsDuration(); d > 0 && (ttl == 0 || d < ttl) {
332+
ttl = d
333+
}
334+
322335
// Pass the desired state returned by this Function to the next one.
323336
d = rsp.GetDesired()
324337

@@ -426,16 +439,6 @@ func (c *FunctionComposer) Compose(ctx context.Context, xr *composite.Unstructur
426439
}
427440
}
428441

429-
compositeRes := CompositeResource{}
430-
431-
// Consider the explicit composite unready state in the function response:
432-
switch d.GetComposite().GetReady() { //nolint:exhaustive // only check for false or true
433-
case fnv1.Ready_READY_TRUE:
434-
compositeRes.Ready = ptr.To(true)
435-
case fnv1.Ready_READY_FALSE:
436-
compositeRes.Ready = ptr.To(false)
437-
}
438-
439442
// Garbage collect any observed resources that aren't part of our final
440443
// desired state. We must do this before we update the XR's resource
441444
// references to ensure that we don't forget and leak them if a delete
@@ -550,7 +553,26 @@ func (c *FunctionComposer) Compose(ctx context.Context, xr *composite.Unstructur
550553
return CompositionResult{}, errors.Wrap(err, errApplyXRStatus)
551554
}
552555

553-
return CompositionResult{ConnectionDetails: d.GetComposite().GetConnectionDetails(), Composite: compositeRes, Composed: resources, Events: events, Conditions: conditions}, nil
556+
var ready *bool
557+
switch d.GetComposite().GetReady() {
558+
case fnv1.Ready_READY_TRUE:
559+
ready = ptr.To(true)
560+
case fnv1.Ready_READY_FALSE:
561+
ready = ptr.To(false)
562+
case fnv1.Ready_READY_UNSPECIFIED:
563+
// Remains nil.
564+
}
565+
566+
result := CompositionResult{
567+
Composed: resources,
568+
ConnectionDetails: d.GetComposite().GetConnectionDetails(),
569+
Ready: ready,
570+
Events: events,
571+
Conditions: conditions,
572+
TTL: ttl,
573+
}
574+
575+
return result, nil
554576
}
555577

556578
// ComposedFieldOwnerName generates a unique field owner name

internal/controller/apiextensions/composite/composition_functions_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ package composite
1919
import (
2020
"context"
2121
"testing"
22+
"time"
2223

2324
"github.com/google/go-cmp/cmp"
2425
"github.com/google/go-cmp/cmp/cmpopts"
2526
"google.golang.org/protobuf/encoding/protojson"
2627
"google.golang.org/protobuf/testing/protocmp"
28+
"google.golang.org/protobuf/types/known/durationpb"
2729
"google.golang.org/protobuf/types/known/structpb"
2830
corev1 "k8s.io/api/core/v1"
2931
kerrors "k8s.io/apimachinery/pkg/api/errors"
@@ -698,6 +700,9 @@ func TestFunctionCompose(t *testing.T) {
698700
},
699701
r: FunctionRunnerFn(func(_ context.Context, _ string, _ *fnv1.RunFunctionRequest) (*fnv1.RunFunctionResponse, error) {
700702
rsp := &fnv1.RunFunctionResponse{
703+
Meta: &fnv1.ResponseMeta{
704+
Ttl: durationpb.New(5 * time.Minute),
705+
},
701706
Desired: &fnv1.State{
702707
Composite: &fnv1.Resource{
703708
Resource: MustStruct(map[string]any{
@@ -890,6 +895,7 @@ func TestFunctionCompose(t *testing.T) {
890895
Target: CompositionTargetCompositeAndClaim,
891896
},
892897
},
898+
TTL: 5 * time.Minute,
893899
},
894900
err: nil,
895901
},

0 commit comments

Comments
 (0)