Skip to content

Commit 510c8c9

Browse files
committed
add continuous backup support (#1790)
1 parent 148510a commit 510c8c9

File tree

3 files changed

+174
-28
lines changed

3 files changed

+174
-28
lines changed

generator.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ resources:
2323
from:
2424
operation: UpdateTimeToLive
2525
path: TimeToLiveSpecification
26+
ContinuousBackups:
27+
from:
28+
operation: UpdateContinuousBackups
29+
path: PointInTimeRecoverySpecification
2630
AttributeDefinitions:
2731
compare:
2832
is_ignored: true
@@ -134,4 +138,4 @@ resources:
134138
type: string
135139
- name: STATUS
136140
json_path: .status.backupStatus
137-
type: string
141+
type: string

pkg/resource/table/hooks.go

Lines changed: 86 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,30 @@ import (
3232
)
3333

3434
var (
35-
ErrTableDeleting = fmt.Errorf("Table in '%v' state, cannot be modified or deleted", svcsdk.TableStatusDeleting)
36-
ErrTableCreating = fmt.Errorf("Table in '%v' state, cannot be modified or deleted", svcsdk.TableStatusCreating)
37-
ErrTableUpdating = fmt.Errorf("Table in '%v' state, cannot be modified or deleted", svcsdk.TableStatusUpdating)
38-
ErrTableGSIsUpdating = fmt.Errorf("Table GSIs in '%v' state, cannot be modified or deleted", svcsdk.IndexStatusCreating)
35+
ErrTableDeleting = fmt.Errorf(
36+
"Table in '%v' state, cannot be modified or deleted",
37+
svcsdk.TableStatusDeleting,
38+
)
39+
ErrTableCreating = fmt.Errorf(
40+
"Table in '%v' state, cannot be modified or deleted",
41+
svcsdk.TableStatusCreating,
42+
)
43+
ErrTableUpdating = fmt.Errorf(
44+
"Table in '%v' state, cannot be modified or deleted",
45+
svcsdk.TableStatusUpdating,
46+
)
47+
ErrTableGSIsUpdating = fmt.Errorf(
48+
"Table GSIs in '%v' state, cannot be modified or deleted",
49+
svcsdk.IndexStatusCreating,
50+
)
3951
)
4052

41-
var (
42-
// TerminalStatuses are the status strings that are terminal states for a
43-
// DynamoDB table
44-
TerminalStatuses = []v1alpha1.TableStatus_SDK{
45-
v1alpha1.TableStatus_SDK_ARCHIVING,
46-
v1alpha1.TableStatus_SDK_DELETING,
47-
}
48-
)
53+
// TerminalStatuses are the status strings that are terminal states for a
54+
// DynamoDB table
55+
var TerminalStatuses = []v1alpha1.TableStatus_SDK{
56+
v1alpha1.TableStatus_SDK_ARCHIVING,
57+
v1alpha1.TableStatus_SDK_DELETING,
58+
}
4959

5060
var DefaultTTLEnabledValue = false
5161

@@ -124,7 +134,10 @@ func (rm *resourceManager) customUpdateTable(
124134
defer func(err error) { exit(err) }(err)
125135

126136
if immutableFieldChanges := rm.getImmutableFieldChanges(delta); len(immutableFieldChanges) > 0 {
127-
msg := fmt.Sprintf("Immutable Spec fields have been modified: %s", strings.Join(immutableFieldChanges, ","))
137+
msg := fmt.Sprintf(
138+
"Immutable Spec fields have been modified: %s",
139+
strings.Join(immutableFieldChanges, ","),
140+
)
128141
return nil, ackerr.NewTerminalError(fmt.Errorf(msg))
129142
}
130143

@@ -187,6 +200,14 @@ func (rm *resourceManager) customUpdateTable(
187200
}
188201
}
189202

203+
if delta.DifferentAt("Spec.ContinuousBackups") ||
204+
delta.DifferentAt("Spec.ContinuousBackups.PointInTimeRecoveryEnabled") {
205+
err = rm.syncContinuousBackup(ctx, desired)
206+
if err != nil {
207+
return nil, fmt.Errorf("cannot update table %v", err)
208+
}
209+
}
210+
190211
// We want to update fast fields first
191212
// Then attributes
192213
// then GSI
@@ -202,7 +223,8 @@ func (rm *resourceManager) customUpdateTable(
202223
}
203224
case delta.DifferentAt("Spec.GlobalSecondaryIndexes") && delta.DifferentAt("Spec.AttributeDefinitions"):
204225
if err := rm.syncTableGlobalSecondaryIndexes(ctx, latest, desired); err != nil {
205-
if awsErr, ok := ackerr.AWSError(err); ok && awsErr.Code() == "LimitExceededException" {
226+
if awsErr, ok := ackerr.AWSError(err); ok &&
227+
awsErr.Code() == "LimitExceededException" {
206228
return nil, requeueWaitGSIReady
207229
}
208230
return nil, err
@@ -257,13 +279,17 @@ func (rm *resourceManager) newUpdateTablePayload(
257279
input.ProvisionedThroughput = &svcsdk.ProvisionedThroughput{}
258280
if r.ko.Spec.ProvisionedThroughput != nil {
259281
if r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits != nil {
260-
input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(*r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits)
282+
input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(
283+
*r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits,
284+
)
261285
} else {
262286
input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(0)
263287
}
264288

265289
if r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits != nil {
266-
input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(*r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits)
290+
input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(
291+
*r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits,
292+
)
267293
} else {
268294
input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(0)
269295
}
@@ -277,8 +303,11 @@ func (rm *resourceManager) newUpdateTablePayload(
277303
StreamEnabled: aws.Bool(*r.ko.Spec.StreamSpecification.StreamEnabled),
278304
}
279305
// Only set streamViewType when streamSpefication is enabled and streamViewType is non-nil.
280-
if *r.ko.Spec.StreamSpecification.StreamEnabled && r.ko.Spec.StreamSpecification.StreamViewType != nil {
281-
input.StreamSpecification.StreamViewType = aws.String(*r.ko.Spec.StreamSpecification.StreamViewType)
306+
if *r.ko.Spec.StreamSpecification.StreamEnabled &&
307+
r.ko.Spec.StreamSpecification.StreamViewType != nil {
308+
input.StreamSpecification.StreamViewType = aws.String(
309+
*r.ko.Spec.StreamSpecification.StreamViewType,
310+
)
282311
}
283312
} else {
284313
input.StreamSpecification = &svcsdk.StreamSpecification{
@@ -317,7 +346,9 @@ func (rm *resourceManager) syncTableSSESpecification(
317346
input.SSESpecification.SSEType = aws.String(*r.ko.Spec.SSESpecification.SSEType)
318347
}
319348
if r.ko.Spec.SSESpecification.KMSMasterKeyID != nil {
320-
input.SSESpecification.KMSMasterKeyId = aws.String(*r.ko.Spec.SSESpecification.KMSMasterKeyID)
349+
input.SSESpecification.KMSMasterKeyId = aws.String(
350+
*r.ko.Spec.SSESpecification.KMSMasterKeyID,
351+
)
321352
}
322353
}
323354
} else {
@@ -350,13 +381,17 @@ func (rm *resourceManager) syncTableProvisionedThroughput(
350381
}
351382
if r.ko.Spec.ProvisionedThroughput != nil {
352383
if r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits != nil {
353-
input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(*r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits)
384+
input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(
385+
*r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits,
386+
)
354387
} else {
355388
input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(0)
356389
}
357390

358391
if r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits != nil {
359-
input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(*r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits)
392+
input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(
393+
*r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits,
394+
)
360395
} else {
361396
input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(0)
362397
}
@@ -395,6 +430,12 @@ func (rm *resourceManager) setResourceAdditionalFields(
395430
ko.Spec.TimeToLive = ttlSpec
396431
}
397432

433+
if pitrSpec, err := rm.getResourcePointInTimeRecoveryWithContext(ctx, ko.Spec.TableName); err != nil {
434+
return err
435+
} else {
436+
ko.Spec.ContinuousBackups = pitrSpec
437+
}
438+
398439
return nil
399440
}
400441

@@ -403,11 +444,14 @@ func customPreCompare(
403444
a *resource,
404445
b *resource,
405446
) {
406-
407447
if ackcompare.HasNilDifference(a.ko.Spec.SSESpecification, b.ko.Spec.SSESpecification) {
408448
if a.ko.Spec.SSESpecification != nil && b.ko.Spec.SSESpecification == nil {
409449
if *a.ko.Spec.SSESpecification.Enabled {
410-
delta.Add("Spec.SSESpecification", a.ko.Spec.SSESpecification, b.ko.Spec.SSESpecification)
450+
delta.Add(
451+
"Spec.SSESpecification",
452+
a.ko.Spec.SSESpecification,
453+
b.ko.Spec.SSESpecification,
454+
)
411455
}
412456
} else {
413457
delta.Add("Spec.SSESpecification", a.ko.Spec.SSESpecification, b.ko.Spec.SSESpecification)
@@ -447,23 +491,35 @@ func customPreCompare(
447491
}
448492

449493
if len(a.ko.Spec.AttributeDefinitions) != len(b.ko.Spec.AttributeDefinitions) {
450-
delta.Add("Spec.AttributeDefinitions", a.ko.Spec.AttributeDefinitions, b.ko.Spec.AttributeDefinitions)
494+
delta.Add(
495+
"Spec.AttributeDefinitions",
496+
a.ko.Spec.AttributeDefinitions,
497+
b.ko.Spec.AttributeDefinitions,
498+
)
451499
} else if a.ko.Spec.AttributeDefinitions != nil && b.ko.Spec.AttributeDefinitions != nil {
452500
if !equalAttributeDefinitions(a.ko.Spec.AttributeDefinitions, b.ko.Spec.AttributeDefinitions) {
453501
delta.Add("Spec.AttributeDefinitions", a.ko.Spec.AttributeDefinitions, b.ko.Spec.AttributeDefinitions)
454502
}
455503
}
456504

457505
if len(a.ko.Spec.GlobalSecondaryIndexes) != len(b.ko.Spec.GlobalSecondaryIndexes) {
458-
delta.Add("Spec.GlobalSecondaryIndexes", a.ko.Spec.GlobalSecondaryIndexes, b.ko.Spec.GlobalSecondaryIndexes)
506+
delta.Add(
507+
"Spec.GlobalSecondaryIndexes",
508+
a.ko.Spec.GlobalSecondaryIndexes,
509+
b.ko.Spec.GlobalSecondaryIndexes,
510+
)
459511
} else if a.ko.Spec.GlobalSecondaryIndexes != nil && b.ko.Spec.GlobalSecondaryIndexes != nil {
460512
if !equalGlobalSecondaryIndexesArrays(a.ko.Spec.GlobalSecondaryIndexes, b.ko.Spec.GlobalSecondaryIndexes) {
461513
delta.Add("Spec.GlobalSecondaryIndexes", a.ko.Spec.GlobalSecondaryIndexes, b.ko.Spec.GlobalSecondaryIndexes)
462514
}
463515
}
464516

465517
if len(a.ko.Spec.LocalSecondaryIndexes) != len(b.ko.Spec.LocalSecondaryIndexes) {
466-
delta.Add("Spec.LocalSecondaryIndexes", a.ko.Spec.LocalSecondaryIndexes, b.ko.Spec.LocalSecondaryIndexes)
518+
delta.Add(
519+
"Spec.LocalSecondaryIndexes",
520+
a.ko.Spec.LocalSecondaryIndexes,
521+
b.ko.Spec.LocalSecondaryIndexes,
522+
)
467523
} else if a.ko.Spec.LocalSecondaryIndexes != nil && b.ko.Spec.LocalSecondaryIndexes != nil {
468524
if !equalLocalSecondaryIndexesArrays(a.ko.Spec.LocalSecondaryIndexes, b.ko.Spec.LocalSecondaryIndexes) {
469525
delta.Add("Spec.LocalSecondaryIndexes", a.ko.Spec.LocalSecondaryIndexes, b.ko.Spec.LocalSecondaryIndexes)
@@ -614,7 +670,10 @@ func equalLocalSecondaryIndexes(
614670
if !equalStrings(a.Projection.ProjectionType, b.Projection.ProjectionType) {
615671
return false
616672
}
617-
if !ackcompare.SliceStringPEqual(a.Projection.NonKeyAttributes, b.Projection.NonKeyAttributes) {
673+
if !ackcompare.SliceStringPEqual(
674+
a.Projection.NonKeyAttributes,
675+
b.Projection.NonKeyAttributes,
676+
) {
618677
return false
619678
}
620679
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"). You may
4+
// not use this file except in compliance with the License. A copy of the
5+
// License is located at
6+
//
7+
// http://aws.amazon.com/apache2.0/
8+
//
9+
// or in the "license" file accompanying this file. This file is distributed
10+
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
// express or implied. See the License for the specific language governing
12+
// permissions and limitations under the License.
13+
14+
package table
15+
16+
import (
17+
"context"
18+
19+
ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log"
20+
svcsdk "github.com/aws/aws-sdk-go/service/dynamodb"
21+
22+
"github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1"
23+
)
24+
25+
// syncContinuousBackup syncs the PointInTimeRecoverySpecification of the dynamodb table.
26+
func (rm *resourceManager) syncContinuousBackup(
27+
ctx context.Context,
28+
desired *resource,
29+
) (err error) {
30+
rlog := ackrtlog.FromContext(ctx)
31+
exit := rlog.Trace("rm.syncContinuousBackup")
32+
defer func(err error) { exit(err) }(err)
33+
34+
pitrSpec := &svcsdk.PointInTimeRecoverySpecification{}
35+
if desired.ko.Spec.ContinuousBackups != nil &&
36+
desired.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled != nil {
37+
pitrSpec.SetPointInTimeRecoveryEnabled(
38+
*desired.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled,
39+
)
40+
}
41+
42+
_, err = rm.sdkapi.UpdateContinuousBackupsWithContext(
43+
ctx,
44+
&svcsdk.UpdateContinuousBackupsInput{
45+
TableName: desired.ko.Spec.TableName,
46+
PointInTimeRecoverySpecification: pitrSpec,
47+
},
48+
)
49+
rm.metrics.RecordAPICall("UPDATE", "UpdateContinuousBackups", err)
50+
return err
51+
}
52+
53+
// getResourcePointInTimeRecoveryWithContext gets the PointInTimeRecoverySpecification of the dynamodb table.
54+
func (rm *resourceManager) getResourcePointInTimeRecoveryWithContext(
55+
ctx context.Context,
56+
tableName *string,
57+
) (*v1alpha1.PointInTimeRecoverySpecification, error) {
58+
var err error
59+
rlog := ackrtlog.FromContext(ctx)
60+
exit := rlog.Trace("rm.getResourcePointInTimeRecoveryWithContext")
61+
defer func(err error) { exit(err) }(err)
62+
63+
res, err := rm.sdkapi.DescribeContinuousBackupsWithContext(
64+
ctx,
65+
&svcsdk.DescribeContinuousBackupsInput{
66+
TableName: tableName,
67+
},
68+
)
69+
70+
rm.metrics.RecordAPICall("GET", "DescribeContinuousBackups", err)
71+
if err != nil {
72+
return nil, err
73+
}
74+
75+
isEnabled := false
76+
if res.ContinuousBackupsDescription != nil {
77+
isEnabled = *res.ContinuousBackupsDescription.PointInTimeRecoveryDescription.PointInTimeRecoveryStatus == svcsdk.PointInTimeRecoveryStatusEnabled
78+
}
79+
80+
return &v1alpha1.PointInTimeRecoverySpecification{
81+
PointInTimeRecoveryEnabled: &isEnabled,
82+
}, nil
83+
}

0 commit comments

Comments
 (0)