Skip to content

Commit 5bdaef5

Browse files
authored
Fix/restore dynamodump from backup with gsis (#94)
* Checks provided read and write capacity units in schema to be greater than 0 for table and GSIs, otherwise fallback to default RESTORE_READ_CAPACITY to avoid the process crashing * Adds functions to prepare payload passed to create_table boto3 API call in on_restore function * Adds .idea to gitignore * Fixes typo in original_gsi_read_capacity * Lints files * Fix failing tests * Lints files
1 parent a0f8159 commit 5bdaef5

File tree

2 files changed

+110
-32
lines changed

2 files changed

+110
-32
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,4 @@ dump
3939
dynamodump.iml
4040
env
4141
.vscode
42+
.idea

dynamodump/dynamodump.py

Lines changed: 109 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
MAX_RETRY = 6
4545
METADATA_URL = "http://169.254.169.254/latest/meta-data/"
4646
RESTORE_WRITE_CAPACITY = 25
47+
RESTORE_READ_CAPACITY = 25
4748
SCHEMA_FILE = "schema.json"
4849
THREAD_START_DELAY = 1 # seconds
4950

@@ -747,6 +748,33 @@ def do_backup(dynamo, read_capacity, tableQueue=None, srcTable=None):
747748
tableQueue.task_done()
748749

749750

751+
def prepare_provisioned_throughput_for_restore(provisioned_throughput):
752+
"""
753+
This function makes sure that the payload returned for the boto3 API call create_table is compatible
754+
with the provisioned throughput attribute
755+
See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html
756+
"""
757+
return {
758+
"ReadCapacityUnits": provisioned_throughput["ReadCapacityUnits"],
759+
"WriteCapacityUnits": provisioned_throughput["WriteCapacityUnits"],
760+
}
761+
762+
763+
def prepare_gsi_for_restore(gsi):
764+
"""
765+
This function makes sure that the payload returned for the boto3 API call create_table is compatible
766+
See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html
767+
"""
768+
return {
769+
"IndexName": gsi["IndexName"],
770+
"KeySchema": gsi["KeySchema"],
771+
"Projection": gsi["Projection"],
772+
"ProvisionedThroughput": prepare_provisioned_throughput_for_restore(
773+
gsi["ProvisionedThroughput"]
774+
),
775+
}
776+
777+
750778
def do_restore(dynamo, sleep_interval, source_table, destination_table, write_capacity):
751779
"""
752780
Restore table
@@ -792,21 +820,58 @@ def do_restore(dynamo, sleep_interval, source_table, destination_table, write_ca
792820
else:
793821
write_capacity = original_write_capacity
794822

823+
if original_write_capacity == 0:
824+
original_write_capacity = RESTORE_WRITE_CAPACITY
825+
826+
# ensure that read capacity is at least RESTORE_READ_CAPACITY
827+
if original_read_capacity < RESTORE_READ_CAPACITY:
828+
read_capacity = RESTORE_WRITE_CAPACITY
829+
else:
830+
read_capacity = original_read_capacity
831+
832+
if original_read_capacity == 0:
833+
original_read_capacity = RESTORE_READ_CAPACITY
834+
795835
# override GSI write capacities if specified, else use RESTORE_WRITE_CAPACITY if original
796836
# write capacity is lower
797837
original_gsi_write_capacities = []
838+
original_gsi_read_capacities = []
798839
if table_global_secondary_indexes is not None:
799840
for gsi in table_global_secondary_indexes:
800-
original_gsi_write_capacities.append(
801-
gsi["ProvisionedThroughput"]["WriteCapacityUnits"]
802-
)
841+
# keeps track of original gsi write capacity units. If provisioned capacity is 0, set to
842+
# RESTORE_WRITE_CAPACITY as fallback given that 0 is not allowed for write capacities
843+
original_gsi_write_capacity = gsi["ProvisionedThroughput"][
844+
"WriteCapacityUnits"
845+
]
846+
if original_gsi_write_capacity == 0:
847+
original_gsi_write_capacity = RESTORE_WRITE_CAPACITY
848+
849+
original_gsi_write_capacities.append(original_gsi_write_capacity)
803850

804851
if gsi["ProvisionedThroughput"]["WriteCapacityUnits"] < int(write_capacity):
805852
gsi["ProvisionedThroughput"]["WriteCapacityUnits"] = int(write_capacity)
806853

854+
# keeps track of original gsi read capacity units. If provisioned capacity is 0, set to
855+
# RESTORE_READ_CAPACITY as fallback given that 0 is not allowed for read capacities
856+
original_gsi_read_capacity = gsi["ProvisionedThroughput"][
857+
"ReadCapacityUnits"
858+
]
859+
if original_gsi_read_capacity == 0:
860+
original_gsi_read_capacity = RESTORE_READ_CAPACITY
861+
862+
original_gsi_read_capacities.append(original_gsi_read_capacity)
863+
864+
if (
865+
gsi["ProvisionedThroughput"]["ReadCapacityUnits"]
866+
< RESTORE_READ_CAPACITY
867+
):
868+
gsi["ProvisionedThroughput"][
869+
"ReadCapacityUnits"
870+
] = RESTORE_READ_CAPACITY
871+
807872
# temp provisioned throughput for restore
808873
table_provisioned_throughput = {
809-
"ReadCapacityUnits": int(original_read_capacity),
874+
"ReadCapacityUnits": int(read_capacity),
810875
"WriteCapacityUnits": int(write_capacity),
811876
}
812877

@@ -824,7 +889,9 @@ def do_restore(dynamo, sleep_interval, source_table, destination_table, write_ca
824889
optional_args["LocalSecondaryIndexes"] = table_local_secondary_indexes
825890

826891
if table_global_secondary_indexes is not None:
827-
optional_args["GlobalSecondaryIndexes"] = table_global_secondary_indexes
892+
optional_args["GlobalSecondaryIndexes"] = [
893+
prepare_gsi_for_restore(gsi) for gsi in table_global_secondary_indexes
894+
]
828895

829896
while True:
830897
try:
@@ -914,7 +981,10 @@ def do_restore(dynamo, sleep_interval, source_table, destination_table, write_ca
914981

915982
if not args.skipThroughputUpdate:
916983
# revert to original table write capacity if it has been modified
917-
if int(write_capacity) != original_write_capacity:
984+
if (
985+
int(write_capacity) != original_write_capacity
986+
or int(read_capacity) != original_read_capacity
987+
):
918988
update_provisioned_throughput(
919989
dynamo,
920990
destination_table,
@@ -930,13 +1000,19 @@ def do_restore(dynamo, sleep_interval, source_table, destination_table, write_ca
9301000
wcu = gsi["ProvisionedThroughput"]["WriteCapacityUnits"]
9311001
rcu = gsi["ProvisionedThroughput"]["ReadCapacityUnits"]
9321002
original_gsi_write_capacity = original_gsi_write_capacities.pop(0)
933-
if original_gsi_write_capacity != wcu:
1003+
original_gsi_read_capacity = original_gsi_read_capacities.pop(0)
1004+
if (
1005+
original_gsi_write_capacity != wcu
1006+
or original_gsi_read_capacity != rcu
1007+
):
9341008
gsi_data.append(
9351009
{
9361010
"Update": {
9371011
"IndexName": gsi["IndexName"],
9381012
"ProvisionedThroughput": {
939-
"ReadCapacityUnits": int(rcu),
1013+
"ReadCapacityUnits": int(
1014+
original_gsi_read_capacity
1015+
),
9401016
"WriteCapacityUnits": int(
9411017
original_gsi_write_capacity
9421018
),
@@ -945,30 +1021,31 @@ def do_restore(dynamo, sleep_interval, source_table, destination_table, write_ca
9451021
}
9461022
)
9471023

948-
logging.info(
949-
"Updating "
950-
+ destination_table
951-
+ " global secondary indexes write capacities as necessary.."
952-
)
953-
while True:
954-
try:
955-
dynamo.update_table(
956-
TableName=destination_table,
957-
GlobalSecondaryIndexUpdates=gsi_data,
958-
)
959-
break
960-
except dynamo.exceptions.LimitExceededException:
961-
logging.info(
962-
"Limit exceeded, retrying updating throughput of"
963-
"GlobalSecondaryIndexes in " + destination_table + ".."
964-
)
965-
time.sleep(sleep_interval)
966-
except dynamo.exceptions.ProvisionedThroughputExceededException:
967-
logging.info(
968-
"Control plane limit exceeded, retrying updating throughput of"
969-
"GlobalSecondaryIndexes in " + destination_table + ".."
970-
)
971-
time.sleep(sleep_interval)
1024+
if gsi_data:
1025+
logging.info(
1026+
"Updating "
1027+
+ destination_table
1028+
+ " global secondary indexes write and read capacities as necessary.."
1029+
)
1030+
while True:
1031+
try:
1032+
dynamo.update_table(
1033+
TableName=destination_table,
1034+
GlobalSecondaryIndexUpdates=gsi_data,
1035+
)
1036+
break
1037+
except dynamo.exceptions.LimitExceededException:
1038+
logging.info(
1039+
"Limit exceeded, retrying updating throughput of"
1040+
"GlobalSecondaryIndexes in " + destination_table + ".."
1041+
)
1042+
time.sleep(sleep_interval)
1043+
except dynamo.exceptions.ProvisionedThroughputExceededException:
1044+
logging.info(
1045+
"Control plane limit exceeded, retrying updating throughput of"
1046+
"GlobalSecondaryIndexes in " + destination_table + ".."
1047+
)
1048+
time.sleep(sleep_interval)
9721049

9731050
# wait for table to become active
9741051
wait_for_active_table(dynamo, destination_table, "active")

0 commit comments

Comments
 (0)