Skip to content

Commit 3b2bc34

Browse files
authored
Add support for resource-specific resync periods and default drift remediation period (#106)
Part of: aws-controllers-k8s/community#1367 This patch introduces the ability to specify resource-specific resync periods in the drift remediation configuration, as well as a default drift remediation period in the controller configuration. The resync period for each reconciler is determined by trying to retrieve it from the following sources, in this order: 1. A resource-specific period specified in the drift remediation configuration. 2. A resource-specific requeue on success period specified by the resource manager factory. 3. The default drift remediation period specified in the controller configuration. 4. The default resync period defined in the ACK runtime package. This allows users to customize the drift remediation behavior for different resources as needed, while still providing a fallback option for resources that do not have a specific period specified. Signed-off-by: Amine Hilaly <[email protected]> By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
1 parent 7152103 commit 3b2bc34

File tree

5 files changed

+235
-59
lines changed

5 files changed

+235
-59
lines changed

pkg/config/config.go

Lines changed: 105 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ import (
1717
"errors"
1818
"fmt"
1919
"net/url"
20+
"strconv"
21+
"strings"
22+
"time"
2023

2124
"github.com/aws/aws-sdk-go/aws"
2225
"github.com/aws/aws-sdk-go/aws/session"
@@ -32,20 +35,22 @@ import (
3235
)
3336

3437
const (
35-
flagEnableLeaderElection = "enable-leader-election"
36-
flagMetricAddr = "metrics-addr"
37-
flagEnableDevLogging = "enable-development-logging"
38-
flagAWSRegion = "aws-region"
39-
flagAWSEndpointURL = "aws-endpoint-url"
40-
flagAWSIdentityEndpointURL = "aws-identity-endpoint-url"
41-
flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls"
42-
flagLogLevel = "log-level"
43-
flagResourceTags = "resource-tags"
44-
flagWatchNamespace = "watch-namespace"
45-
flagEnableWebhookServer = "enable-webhook-server"
46-
flagWebhookServerAddr = "webhook-server-addr"
47-
flagDeletionPolicy = "deletion-policy"
48-
envVarAWSRegion = "AWS_REGION"
38+
flagEnableLeaderElection = "enable-leader-election"
39+
flagMetricAddr = "metrics-addr"
40+
flagEnableDevLogging = "enable-development-logging"
41+
flagAWSRegion = "aws-region"
42+
flagAWSEndpointURL = "aws-endpoint-url"
43+
flagAWSIdentityEndpointURL = "aws-identity-endpoint-url"
44+
flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls"
45+
flagLogLevel = "log-level"
46+
flagResourceTags = "resource-tags"
47+
flagWatchNamespace = "watch-namespace"
48+
flagEnableWebhookServer = "enable-webhook-server"
49+
flagWebhookServerAddr = "webhook-server-addr"
50+
flagDeletionPolicy = "deletion-policy"
51+
flagReconcileDefaultResyncSeconds = "reconcile-default-resync-seconds"
52+
flagReconcileResourceResyncSeconds = "reconcile-resource-resync-seconds"
53+
envVarAWSRegion = "AWS_REGION"
4954
)
5055

5156
var (
@@ -63,20 +68,22 @@ var (
6368

6469
// Config contains configuration options for ACK service controllers
6570
type Config struct {
66-
MetricsAddr string
67-
EnableLeaderElection bool
68-
EnableDevelopmentLogging bool
69-
AccountID string
70-
Region string
71-
IdentityEndpointURL string
72-
EndpointURL string
73-
AllowUnsafeEndpointURL bool
74-
LogLevel string
75-
ResourceTags []string
76-
WatchNamespace string
77-
EnableWebhookServer bool
78-
WebhookServerAddr string
79-
DeletionPolicy ackv1alpha1.DeletionPolicy
71+
MetricsAddr string
72+
EnableLeaderElection bool
73+
EnableDevelopmentLogging bool
74+
AccountID string
75+
Region string
76+
IdentityEndpointURL string
77+
EndpointURL string
78+
AllowUnsafeEndpointURL bool
79+
LogLevel string
80+
ResourceTags []string
81+
WatchNamespace string
82+
EnableWebhookServer bool
83+
WebhookServerAddr string
84+
DeletionPolicy ackv1alpha1.DeletionPolicy
85+
ReconcileDefaultResyncSeconds int
86+
ReconcileResourceResyncSeconds []string
8087
}
8188

8289
// BindFlags defines CLI/runtime configuration options
@@ -152,6 +159,19 @@ func (cfg *Config) BindFlags() {
152159
&cfg.DeletionPolicy, flagDeletionPolicy,
153160
"The default deletion policy for all resources managed by the controller",
154161
)
162+
flag.IntVar(
163+
&cfg.ReconcileDefaultResyncSeconds, flagReconcileDefaultResyncSeconds,
164+
0,
165+
"The default duration, in seconds, to wait before resyncing desired state of custom resources. "+
166+
"This value is used if no resource-specific override has been specified. Default is 10 hours.",
167+
)
168+
flag.StringArrayVar(
169+
&cfg.ReconcileResourceResyncSeconds, flagReconcileResourceResyncSeconds,
170+
[]string{},
171+
"A Key/Value list of strings representing the reconcile resync configuration for each resource. This"+
172+
" configuration maps resource kinds to drift remediation periods in seconds. If provided, "+
173+
" resource-specific resync periods take precedence over the default period.",
174+
)
155175
}
156176

157177
// SetupLogger initializes the logger used in the service controller
@@ -233,6 +253,16 @@ func (cfg *Config) Validate() error {
233253
if cfg.DeletionPolicy == "" {
234254
cfg.DeletionPolicy = ackv1alpha1.DeletionPolicyDelete
235255
}
256+
257+
if cfg.ReconcileDefaultResyncSeconds < 0 {
258+
return fmt.Errorf("invalid value for flag '%s': resync seconds default must be greater than 0", flagReconcileDefaultResyncSeconds)
259+
}
260+
261+
_, err := cfg.ParseReconcileResourceResyncSeconds()
262+
if err != nil {
263+
return fmt.Errorf("invalid value for flag '%s': %v", flagReconcileResourceResyncSeconds, err)
264+
}
265+
236266
return nil
237267
}
238268

@@ -244,3 +274,50 @@ func (cfg *Config) checkUnsafeEndpoint(endpoint *url.URL) error {
244274
}
245275
return nil
246276
}
277+
278+
// ParseReconcileResourceResyncSeconds parses the values of the --reconcile-resource-resync-seconds
279+
// flag and returns a map that maps resource names to resync periods.
280+
// The flag arguments are expected to have the format "resource=seconds", where "resource" is the
281+
// name of the resource and "seconds" is the number of seconds that the reconciler should wait before
282+
// reconciling the resource again.
283+
func (cfg *Config) ParseReconcileResourceResyncSeconds() (map[string]time.Duration, error) {
284+
resourceResyncPeriods := make(map[string]time.Duration, len(cfg.ReconcileResourceResyncSeconds))
285+
for _, resourceResyncSecondsFlag := range cfg.ReconcileResourceResyncSeconds {
286+
// Parse the resource name and resync period from the flag argument
287+
resourceName, resyncSeconds, err := parseReconcileFlagArgument(resourceResyncSecondsFlag)
288+
if err != nil {
289+
return nil, fmt.Errorf("error parsing flag argument '%v': %v. Expected format: resource=seconds", resourceResyncSecondsFlag, err)
290+
}
291+
resourceResyncPeriods[strings.ToLower(resourceName)] = time.Duration(resyncSeconds)
292+
}
293+
return resourceResyncPeriods, nil
294+
}
295+
296+
// parseReconcileFlagArgument parses a flag argument of the form "key=value" into
297+
// its individual elements. The key must be a non-empty string and the value must be
298+
// a non-empty positive integer. If the flag argument is not in the expected format
299+
// or has invalid elements, an error is returned.
300+
//
301+
// The function returns the parsed key and value as separate elements.
302+
func parseReconcileFlagArgument(flagArgument string) (string, int, error) {
303+
delimiter := "="
304+
elements := strings.Split(flagArgument, delimiter)
305+
if len(elements) != 2 {
306+
return "", 0, fmt.Errorf("invalid flag argument format: expected key=value")
307+
}
308+
if elements[0] == "" {
309+
return "", 0, fmt.Errorf("missing key in flag argument")
310+
}
311+
if elements[1] == "" {
312+
return "", 0, fmt.Errorf("missing value in flag argument")
313+
}
314+
315+
resyncSeconds, err := strconv.Atoi(elements[1])
316+
if err != nil {
317+
return "", 0, fmt.Errorf("invalid value in flag argument: %v", err)
318+
}
319+
if resyncSeconds < 0 {
320+
return "", 0, fmt.Errorf("invalid value in flag argument: expected non-negative integer, got %d", resyncSeconds)
321+
}
322+
return elements[0], resyncSeconds, nil
323+
}

pkg/config/config_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"). You may
4+
// not use this file except in compliance with the License. A copy of the
5+
// License is located at
6+
//
7+
// http://aws.amazon.com/apache2.0/
8+
//
9+
// or in the "license" file accompanying this file. This file is distributed
10+
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
// express or implied. See the License for the specific language governing
12+
// permissions and limitations under the License.
13+
14+
package config
15+
16+
import "testing"
17+
18+
func TestParseReconcileFlagArgument(t *testing.T) {
19+
tests := []struct {
20+
flagArgument string
21+
expectedKey string
22+
expectedVal int
23+
expectedErr bool
24+
expectedErrMsg string
25+
}{
26+
// Test valid flag arguments
27+
{"key=1", "key", 1, false, ""},
28+
{"key=123456", "key", 123456, false, ""},
29+
{"key=600", "key", 600, false, ""},
30+
{"k=1", "k", 1, false, ""},
31+
{"ke_y=123456", "ke_y", 123456, false, ""},
32+
33+
// Test invalid flag arguments
34+
{"key", "", 0, true, "invalid flag argument format: expected key=value"},
35+
{"key=", "", 0, true, "missing value in flag argument"},
36+
{"=value", "", 0, true, "missing key in flag argument"},
37+
{"key=value1=value2", "", 0, true, "invalid flag argument format: expected key=value"},
38+
{"key=a", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"a\": invalid syntax"},
39+
{"key=-1", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -1"},
40+
{"key=-123456", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -123456"},
41+
{"key=1.1", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"1.1\": invalid syntax"},
42+
}
43+
for _, test := range tests {
44+
key, val, err := parseReconcileFlagArgument(test.flagArgument)
45+
if err != nil && !test.expectedErr {
46+
t.Errorf("unexpected error for flag argument '%s': %v", test.flagArgument, err)
47+
}
48+
if err == nil && test.expectedErr {
49+
t.Errorf("expected error for flag argument '%s', got nil", test.flagArgument)
50+
}
51+
if err != nil && err.Error() != test.expectedErrMsg {
52+
t.Errorf("unexpected error message for flag argument '%s': expected '%s', got '%v'", test.flagArgument, test.expectedErrMsg, err)
53+
}
54+
if key != test.expectedKey {
55+
t.Errorf("unexpected key for flag argument '%s': expected '%s', got '%s'", test.flagArgument, test.expectedKey, key)
56+
}
57+
if val != test.expectedVal {
58+
t.Errorf("unexpected value for flag argument '%s': expected %d, got %d", test.flagArgument, test.expectedVal, val)
59+
}
60+
}
61+
}

pkg/runtime/reconciler.go

Lines changed: 67 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"context"
1818
"encoding/json"
1919
"fmt"
20+
"strings"
2021
"time"
2122

2223
backoff "github.com/cenkalti/backoff/v4"
@@ -47,7 +48,7 @@ const (
4748
// The default duration to trigger the sync for an ACK resource after
4849
// the successful reconciliation. This behavior for a resource can be
4950
// overriden by RequeueOnSuccessSeconds configuration for that resource.
50-
resyncPeriod = 10 * time.Hour
51+
defaultResyncPeriod = 10 * time.Hour
5152
)
5253

5354
// reconciler describes a generic reconciler within ACK.
@@ -70,8 +71,9 @@ type reconciler struct {
7071
// object)s and sharing watch and informer queues across those controllers.
7172
type resourceReconciler struct {
7273
reconciler
73-
rmf acktypes.AWSResourceManagerFactory
74-
rd acktypes.AWSResourceDescriptor
74+
rmf acktypes.AWSResourceManagerFactory
75+
rd acktypes.AWSResourceDescriptor
76+
resyncPeriod time.Duration
7577
}
7678

7779
// GroupKind returns the string containing the API group and kind reconciled by
@@ -887,31 +889,8 @@ func (r *resourceReconciler) handleRequeues(
887889
}
888890
// The code below only executes for "ConditionTypeResourceSynced"
889891
if condition.Status == corev1.ConditionTrue {
890-
if duration := r.rmf.RequeueOnSuccessSeconds(); duration > 0 {
891-
rlog.Debug(
892-
"requeueing resource after resource synced condition true",
893-
)
894-
return latest, requeue.NeededAfter(nil, time.Duration(duration)*time.Second)
895-
}
896-
// Since RequeueOnSuccessSeconds <= 0, requeue the resource
897-
// with "resyncPeriod" to perform periodic drift detection and
898-
// sync the desired state.
899-
//
900-
// Upstream controller-runtime provides SyncPeriod functionality
901-
// which flushes the go-client cache and triggers Sync for all
902-
// the objects in cache every 10 hours by default.
903-
//
904-
// ACK controller use non-cached client to read objects
905-
// from API Server, hence controller-runtime's SyncPeriod
906-
// functionality does not work.
907-
// https://github.com/aws-controllers-k8s/community/issues/1355
908-
//
909-
// ACK controllers use api-reader(non-cached client) to avoid
910-
// reading stale copies of ACK resource that can cause
911-
// duplicate resource creation when resource identifier is
912-
// not present in stale copy of resource.
913-
// https://github.com/aws-controllers-k8s/community/issues/894#issuecomment-911876354
914-
return latest, requeue.NeededAfter(nil, resyncPeriod)
892+
rlog.Debug("requeuing", "after", r.resyncPeriod)
893+
return latest, requeue.NeededAfter(nil, r.resyncPeriod)
915894
} else {
916895
rlog.Debug(
917896
"requeueing resource after finding resource synced condition false",
@@ -1103,6 +1082,56 @@ func (r *resourceReconciler) getEndpointURL(
11031082
return r.cfg.EndpointURL
11041083
}
11051084

1085+
// getResyncPeriod returns the period of the recurring reconciler process which ensures the desired
1086+
// state of custom resources is maintained.
1087+
// It attempts to retrieve the duration from the following sources, in this order:
1088+
// 1. A resource-specific reconciliation resync period specified in the reconciliation resync
1089+
// configuration map (--reconcile-default-resync-seconds).
1090+
// 2. A resource-specific requeue on success period specified by the resource manager factory.
1091+
// The resource manager factory is controller-specific, and thus this period is to specified
1092+
// by controller authors (using ack-generate).
1093+
// 3. The default reconciliation resync period period specified in the controller binary flags.
1094+
// (--reconcile-resource-resync-seconds)
1095+
// 4. The default resync period defined in the ACK runtime package. Defined in defaultResyncPeriod
1096+
// within the same file
1097+
//
1098+
// Each reconciler has a unique value to use. This function should only be called during the
1099+
// instantiation of an AWSResourceReconciler and should not be called during the reconciliation
1100+
// function r.Sync
1101+
func getResyncPeriod(rmf acktypes.AWSResourceManagerFactory, cfg ackcfg.Config) time.Duration {
1102+
// The reconciliation resync period configuration has already been validated as
1103+
// a clean map. Therefore, we can safely ignore any errors that may occur while
1104+
// parsing it and avoid changing the signature of NewReconcilerWithClient.
1105+
drc, _ := cfg.ParseReconcileResourceResyncSeconds()
1106+
1107+
// First, try to use a resource-specific resync period if provided in the resource
1108+
// resync period configuration.
1109+
resourceKind := rmf.ResourceDescriptor().GroupKind().Kind
1110+
if duration, ok := drc[strings.ToLower(resourceKind)]; ok && duration > 0 {
1111+
return time.Duration(duration) * time.Second
1112+
}
1113+
1114+
// Second, try to use a resource-specific requeue on success period specified by the
1115+
// resource manager factory. This value is set during the code generation of the
1116+
// controller and takes precedence over the default resync period period because
1117+
// it allows existing controllers that rely on this value to maintain their intended
1118+
// behavior.
1119+
if duration := rmf.RequeueOnSuccessSeconds(); duration > 0 {
1120+
return time.Duration(duration) * time.Second
1121+
}
1122+
1123+
// Third, try to use the default resync period resync period specified during controller
1124+
// start-up.
1125+
if cfg.ReconcileDefaultResyncSeconds > 0 {
1126+
return time.Duration(cfg.ReconcileDefaultResyncSeconds) * time.Second
1127+
}
1128+
1129+
// If none of the above values are present or valid, use the default resync period
1130+
// defined in the ACK runtime package. Defined in `defaultResyncPeriod` within the
1131+
// same file
1132+
return defaultResyncPeriod
1133+
}
1134+
11061135
// NewReconciler returns a new reconciler object
11071136
func NewReconciler(
11081137
sc acktypes.ServiceController,
@@ -1126,16 +1155,23 @@ func NewReconcilerWithClient(
11261155
metrics *ackmetrics.Metrics,
11271156
cache ackrtcache.Caches,
11281157
) acktypes.AWSResourceReconciler {
1158+
rtLog := log.WithName("ackrt")
1159+
resyncPeriod := getResyncPeriod(rmf, cfg)
1160+
rtLog.V(1).Info("Initiating reconciler",
1161+
"reconciler kind", rmf.ResourceDescriptor().GroupKind().Kind,
1162+
"resync period seconds", resyncPeriod.Seconds(),
1163+
)
11291164
return &resourceReconciler{
11301165
reconciler: reconciler{
11311166
sc: sc,
11321167
kc: kc,
1133-
log: log.WithName("ackrt"),
1168+
log: rtLog,
11341169
cfg: cfg,
11351170
metrics: metrics,
11361171
cache: cache,
11371172
},
1138-
rmf: rmf,
1139-
rd: rmf.ResourceDescriptor(),
1173+
rmf: rmf,
1174+
rd: rmf.ResourceDescriptor(),
1175+
resyncPeriod: resyncPeriod,
11401176
}
11411177
}

pkg/runtime/reconciler_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ func managerFactoryMocks(
136136

137137
rmf := &ackmocks.AWSResourceManagerFactory{}
138138
rmf.On("ResourceDescriptor").Return(rd)
139+
rmf.On("RequeueOnSuccessSeconds").Return(0)
139140

140141
reg := ackrt.NewRegistry()
141142
reg.RegisterResourceManagerFactory(rmf)

pkg/runtime/service_controller_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func TestServiceController(t *testing.T) {
143143

144144
rmf := &mocks.AWSResourceManagerFactory{}
145145
rmf.On("ResourceDescriptor").Return(rd)
146+
rmf.On("RequeueOnSuccessSeconds").Return(0)
146147

147148
reg := ackrt.NewRegistry()
148149
reg.RegisterResourceManagerFactory(rmf)

0 commit comments

Comments
 (0)