@@ -15,8 +15,11 @@ package db_parameter_group
15
15
16
16
import (
17
17
"context"
18
+ "fmt"
19
+ "sync"
18
20
19
21
ackcompare "github.com/aws-controllers-k8s/runtime/pkg/compare"
22
+ ackerr "github.com/aws-controllers-k8s/runtime/pkg/errors"
20
23
ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log"
21
24
22
25
svcapitypes "github.com/aws-controllers-k8s/rds-controller/apis/v1alpha1"
@@ -27,10 +30,34 @@ import (
27
30
)
28
31
29
32
const (
33
+ applyTypeStatic = "static"
30
34
sourceUser = "user"
31
35
maxResetParametersSize = 20
32
36
)
33
37
38
+ var (
39
+ errUnknownParameter = fmt .Errorf ("unknown parameter" )
40
+ errUnmodifiableParameter = fmt .Errorf ("parameter is not modifiable" )
41
+ )
42
+
43
+ func newErrUnknownParameter (name string ) error {
44
+ // This is a terminal error because unless the user removes this parameter
45
+ // from their list of parameter overrides, we will not be able to get the
46
+ // resource into a synced state.
47
+ return ackerr .NewTerminalError (
48
+ fmt .Errorf ("%w: %s" , errUnknownParameter , name ),
49
+ )
50
+ }
51
+
52
+ func newErrUnmodifiableParameter (name string ) error {
53
+ // This is a terminal error because unless the user removes this parameter
54
+ // from their list of parameter overrides, we will not be able to get the
55
+ // resource into a synced state.
56
+ return ackerr .NewTerminalError (
57
+ fmt .Errorf ("%w: %s" , errUnmodifiableParameter , name ),
58
+ )
59
+ }
60
+
34
61
// customUpdate is required to fix
35
62
// https://github.com/aws-controllers-k8s/community/issues/869.
36
63
//
@@ -206,7 +233,8 @@ func (rm *resourceManager) syncParameters(
206
233
exit := rlog .Trace ("rm.syncParameters" )
207
234
defer func () { exit (err ) }()
208
235
209
- groupName := (* string )(latest .ko .Spec .Name )
236
+ groupName := latest .ko .Spec .Name
237
+ family := latest .ko .Spec .Family
210
238
211
239
toModify , toDelete := computeParametersDelta (
212
240
desired .ko .Spec .ParameterOverrides , latest .ko .Spec .ParameterOverrides ,
@@ -219,7 +247,8 @@ func (rm *resourceManager) syncParameters(
219
247
if len (toDelete ) > 0 {
220
248
chunks := sliceStringChunks (toDelete , maxResetParametersSize )
221
249
for _ , chunk := range chunks {
222
- if err = rm .resetParameters (ctx , groupName , chunk ); err != nil {
250
+ err = rm .resetParameters (ctx , family , groupName , chunk )
251
+ if err != nil {
223
252
return err
224
253
}
225
254
}
@@ -228,7 +257,8 @@ func (rm *resourceManager) syncParameters(
228
257
if len (toModify ) > 0 {
229
258
chunks := mapStringChunks (toModify , maxResetParametersSize )
230
259
for _ , chunk := range chunks {
231
- if err = rm .modifyParameters (ctx , groupName , chunk ); err != nil {
260
+ err = rm .modifyParameters (ctx , family , groupName , chunk )
261
+ if err != nil {
232
262
return err
233
263
}
234
264
}
@@ -283,20 +313,37 @@ func (rm *resourceManager) getParameters(
283
313
// no more than 20 parameters to reset.
284
314
func (rm * resourceManager ) resetParameters (
285
315
ctx context.Context ,
316
+ family * string ,
286
317
groupName * string ,
287
318
toDelete []string ,
288
319
) (err error ) {
289
320
rlog := ackrtlog .FromContext (ctx )
290
321
exit := rlog .Trace ("rm.resetParameters" )
291
322
defer func () { exit (err ) }()
292
323
324
+ var pMeta * paramMeta
293
325
inputParams := []* svcsdk.Parameter {}
294
326
for _ , paramName := range toDelete {
327
+ // default to this if something goes wrong looking up parameter
328
+ // defaults
329
+ applyMethod := svcsdk .ApplyMethodImmediate
330
+ pMeta , err = cachedParamMeta .get (
331
+ ctx , * family , paramName , rm .getFamilyParameters ,
332
+ )
333
+ if err != nil {
334
+ return err
335
+ }
336
+ if ! pMeta .isModifiable {
337
+ return newErrUnmodifiableParameter (paramName )
338
+ }
339
+ if ! pMeta .isDynamic {
340
+ applyMethod = svcsdk .ApplyMethodPendingReboot
341
+ }
295
342
p := & svcsdk.Parameter {
296
343
ParameterName : aws .String (paramName ),
297
344
// TODO(jaypipes): Look up appropriate apply method for this
298
345
// parameter...
299
- ApplyMethod : aws .String (svcsdk . ApplyMethodImmediate ),
346
+ ApplyMethod : aws .String (applyMethod ),
300
347
}
301
348
inputParams = append (inputParams , p )
302
349
}
@@ -323,21 +370,38 @@ func (rm *resourceManager) resetParameters(
323
370
// no more than 20 parameters to modify.
324
371
func (rm * resourceManager ) modifyParameters (
325
372
ctx context.Context ,
373
+ family * string ,
326
374
groupName * string ,
327
375
toModify map [string ]* string ,
328
376
) (err error ) {
329
377
rlog := ackrtlog .FromContext (ctx )
330
378
exit := rlog .Trace ("rm.resetParameters" )
331
379
defer func () { exit (err ) }()
332
380
381
+ var pMeta * paramMeta
333
382
inputParams := []* svcsdk.Parameter {}
334
383
for paramName , paramValue := range toModify {
384
+ // default to this if something goes wrong looking up parameter
385
+ // defaults
386
+ applyMethod := svcsdk .ApplyMethodImmediate
387
+ pMeta , err = cachedParamMeta .get (
388
+ ctx , * family , paramName , rm .getFamilyParameters ,
389
+ )
390
+ if err != nil {
391
+ return err
392
+ }
393
+ if ! pMeta .isModifiable {
394
+ return newErrUnmodifiableParameter (paramName )
395
+ }
396
+ if ! pMeta .isDynamic {
397
+ applyMethod = svcsdk .ApplyMethodPendingReboot
398
+ }
335
399
p := & svcsdk.Parameter {
336
400
ParameterName : aws .String (paramName ),
337
401
ParameterValue : paramValue ,
338
402
// TODO(jaypipes): Look up appropriate apply method for this
339
403
// parameter...
340
- ApplyMethod : aws .String (svcsdk . ApplyMethodImmediate ),
404
+ ApplyMethod : aws .String (applyMethod ),
341
405
}
342
406
inputParams = append (inputParams , p )
343
407
}
@@ -360,6 +424,42 @@ func (rm *resourceManager) modifyParameters(
360
424
return nil
361
425
}
362
426
427
+ // getFamilyParameters calls the RDS DescribeEngineDefaultParameters API to
428
+ // retrieve the set of parameter information for a DB parameter group family.
429
+ func (rm * resourceManager ) getFamilyParameters (
430
+ ctx context.Context ,
431
+ family string ,
432
+ ) (map [string ]paramMeta , error ) {
433
+ var marker * string
434
+ familyMeta := map [string ]paramMeta {}
435
+
436
+ for {
437
+ resp , err := rm .sdkapi .DescribeEngineDefaultParametersWithContext (
438
+ ctx ,
439
+ & svcsdk.DescribeEngineDefaultParametersInput {
440
+ DBParameterGroupFamily : aws .String (family ),
441
+ Marker : marker ,
442
+ },
443
+ )
444
+ rm .metrics .RecordAPICall ("GET" , "DescribeEngineDefaultParameters" , err )
445
+ if err != nil {
446
+ return nil , err
447
+ }
448
+ for _ , param := range resp .EngineDefaults .Parameters {
449
+ pName := * param .ParameterName
450
+ familyMeta [pName ] = paramMeta {
451
+ isModifiable : * param .IsModifiable ,
452
+ isDynamic : * param .ApplyType != applyTypeStatic ,
453
+ }
454
+ }
455
+ marker = resp .EngineDefaults .Marker
456
+ if marker == nil {
457
+ break
458
+ }
459
+ }
460
+ return familyMeta , nil
461
+ }
462
+
363
463
// computeParametersDelta compares two Parameter arrays and returns the new
364
464
// parameters to add, to update and the parameter identifiers to delete
365
465
func computeParametersDelta (
@@ -431,3 +531,79 @@ func mapStringChunks(
431
531
432
532
return chunks
433
533
}
534
+
535
+ type paramMeta struct {
536
+ isModifiable bool
537
+ isDynamic bool
538
+ }
539
+
540
+ // metaFetcher is the functor we pass to the paramMetaCache that allows it to
541
+ // fetch engine default parameter information
542
+ type metaFetcher func (ctx context.Context , family string ) (map [string ]paramMeta , error )
543
+
544
+ // paramMetaCache stores information about a parameter for a DB parameter group
545
+ // family. We use this cached information to determine whether a parameter is
546
+ // statically or dynamically defined (whether changes can be applied
547
+ // immediately or pending a reboot) and whether a parameter is modifiable.
548
+ //
549
+ // Keeping things super simple for now and not adding any TTL or expiration
550
+ // behaviour to the cache. Engine defaults are pretty static information...
551
+ type paramMetaCache struct {
552
+ sync.RWMutex
553
+ hits uint64
554
+ misses uint64
555
+ cache map [string ]map [string ]paramMeta
556
+ }
557
+
558
+ // get retrieves the metadata for a named parameter group family and parameter
559
+ // name.
560
+ func (c * paramMetaCache ) get (
561
+ ctx context.Context ,
562
+ family string ,
563
+ name string ,
564
+ fetcher metaFetcher ,
565
+ ) (* paramMeta , error ) {
566
+ c .RLock ()
567
+ defer c .RUnlock ()
568
+
569
+ var err error
570
+ var found bool
571
+ var metas map [string ]paramMeta
572
+ var meta paramMeta
573
+
574
+ metas , found = c .cache [family ]
575
+ if ! found {
576
+ c .misses ++
577
+ metas , err = c .loadFamily (ctx , family , fetcher )
578
+ if err != nil {
579
+ return nil , err
580
+ }
581
+ }
582
+ meta , found = metas [name ]
583
+ if ! found {
584
+ return nil , newErrUnknownParameter (name )
585
+ }
586
+ c .hits ++
587
+ return & meta , nil
588
+ }
589
+
590
+ // loadFamily fetches parameter information from the AWS RDS
591
+ // DescribeEngineDefaultParameters API and caches that information.
592
+ func (c * paramMetaCache ) loadFamily (
593
+ ctx context.Context ,
594
+ family string ,
595
+ fetcher metaFetcher ,
596
+ ) (map [string ]paramMeta , error ) {
597
+ familyMeta , err := fetcher (ctx , family )
598
+ if err != nil {
599
+ return nil , err
600
+ }
601
+ c .Lock ()
602
+ defer c .Unlock ()
603
+ c .cache [family ] = familyMeta
604
+ return familyMeta , nil
605
+ }
606
+
607
+ var cachedParamMeta = paramMetaCache {
608
+ cache : map [string ]map [string ]paramMeta {},
609
+ }
0 commit comments