Skip to content

Commit 2856072

Browse files
simeonvandersteenbchew
authored andcommitted
batch write sleep interval and provisioned capacity limits on data only restore (#28)
* change provisioned capacity limits also on a --dataOnly restore * add linear sleep to batch write * Fix style errors * Wait for restore to complete
1 parent 0dbf0af commit 2856072

File tree

2 files changed

+37
-23
lines changed

2 files changed

+37
-23
lines changed

dynamodump.py

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
JSON_INDENT = 2
1717
AWS_SLEEP_INTERVAL = 10 # seconds
1818
LOCAL_SLEEP_INTERVAL = 1 # seconds
19+
BATCH_WRITE_SLEEP_INTERVAL = 0.15 # seconds
1920
MAX_BATCH_WRITE = 25 # DynamoDB limit
2021
SCHEMA_FILE = "schema.json"
2122
DATA_DIR = "data"
@@ -147,16 +148,18 @@ def mkdir_p(path):
147148
def batch_write(conn, sleep_interval, table_name, put_requests):
148149
request_items = {table_name: put_requests}
149150
i = 1
151+
sleep = sleep_interval
150152
while True:
151153
response = conn.batch_write_item(request_items)
152154
unprocessed_items = response["UnprocessedItems"]
153155

154156
if len(unprocessed_items) == 0:
155157
break
156-
157158
if len(unprocessed_items) > 0 and i <= MAX_RETRY:
158-
logging.debug(str(len(unprocessed_items)) + " unprocessed items, retrying.. [" + str(i) + "]")
159+
logging.debug(str(len(unprocessed_items)) + " unprocessed items, retrying after %s seconds.. [%s/%s]" % (str(sleep), str(i), str(MAX_RETRY)))
159160
request_items = unprocessed_items
161+
time.sleep(sleep)
162+
sleep += sleep_interval
160163
i += 1
161164
else:
162165
logging.info("Max retries reached, failed to processed batch write: " + json.dumps(unprocessed_items,
@@ -321,26 +324,27 @@ def do_restore(conn, sleep_interval, source_table, destination_table, write_capa
321324
table_local_secondary_indexes = table.get("LocalSecondaryIndexes")
322325
table_global_secondary_indexes = table.get("GlobalSecondaryIndexes")
323326

324-
if not args.dataOnly:
325-
# override table write capacity if specified, else use RESTORE_WRITE_CAPACITY if original write capacity is lower
326-
if write_capacity is None:
327-
if original_write_capacity < RESTORE_WRITE_CAPACITY:
328-
write_capacity = RESTORE_WRITE_CAPACITY
329-
else:
330-
write_capacity = original_write_capacity
327+
# override table write capacity if specified, else use RESTORE_WRITE_CAPACITY if original write capacity is lower
328+
if write_capacity is None:
329+
if original_write_capacity < RESTORE_WRITE_CAPACITY:
330+
write_capacity = RESTORE_WRITE_CAPACITY
331+
else:
332+
write_capacity = original_write_capacity
333+
334+
# override GSI write capacities if specified, else use RESTORE_WRITE_CAPACITY if original write capacity is lower
335+
original_gsi_write_capacities = []
336+
if table_global_secondary_indexes is not None:
337+
for gsi in table_global_secondary_indexes:
338+
original_gsi_write_capacities.append(gsi["ProvisionedThroughput"]["WriteCapacityUnits"])
331339

332-
# override GSI write capacities if specified, else use RESTORE_WRITE_CAPACITY if original write capacity is lower
333-
original_gsi_write_capacities = []
334-
if table_global_secondary_indexes is not None:
335-
for gsi in table_global_secondary_indexes:
336-
original_gsi_write_capacities.append(gsi["ProvisionedThroughput"]["WriteCapacityUnits"])
340+
if gsi["ProvisionedThroughput"]["WriteCapacityUnits"] < int(write_capacity):
341+
gsi["ProvisionedThroughput"]["WriteCapacityUnits"] = int(write_capacity)
337342

338-
if gsi["ProvisionedThroughput"]["WriteCapacityUnits"] < int(write_capacity):
339-
gsi["ProvisionedThroughput"]["WriteCapacityUnits"] = int(write_capacity)
343+
# temp provisioned throughput for restore
344+
table_provisioned_throughput = {"ReadCapacityUnits": int(original_read_capacity),
345+
"WriteCapacityUnits": int(write_capacity)}
340346

341-
# temp provisioned throughput for restore
342-
table_provisioned_throughput = {"ReadCapacityUnits": int(original_read_capacity),
343-
"WriteCapacityUnits": int(write_capacity)}
347+
if not args.dataOnly:
344348

345349
logging.info("Creating " + destination_table + " table with temp write capacity of " + str(write_capacity))
346350

@@ -363,6 +367,11 @@ def do_restore(conn, sleep_interval, source_table, destination_table, write_capa
363367

364368
# wait for table creation completion
365369
wait_for_active_table(conn, destination_table, "created")
370+
else:
371+
# update provisioned capacity
372+
if int(write_capacity) > original_write_capacity:
373+
update_provisioned_throughput(conn, destination_table, original_read_capacity, write_capacity,
374+
False)
366375

367376
if not args.schemaOnly:
368377
# read data files
@@ -384,16 +393,16 @@ def do_restore(conn, sleep_interval, source_table, destination_table, write_capa
384393
# flush every MAX_BATCH_WRITE
385394
if len(put_requests) == MAX_BATCH_WRITE:
386395
logging.debug("Writing next " + str(MAX_BATCH_WRITE) + " items to " + destination_table + "..")
387-
batch_write(conn, sleep_interval, destination_table, put_requests)
396+
batch_write(conn, BATCH_WRITE_SLEEP_INTERVAL, destination_table, put_requests)
388397
del put_requests[:]
389398

390399
# flush remainder
391400
if len(put_requests) > 0:
392-
batch_write(conn, sleep_interval, destination_table, put_requests)
401+
batch_write(conn, BATCH_WRITE_SLEEP_INTERVAL, destination_table, put_requests)
393402

394-
if not args.dataOnly and not args.skipThroughputUpdate:
403+
if not args.skipThroughputUpdate:
395404
# revert to original table write capacity if it has been modified
396-
if write_capacity != original_write_capacity:
405+
if int(write_capacity) != original_write_capacity:
397406
update_provisioned_throughput(conn, destination_table, original_read_capacity, original_write_capacity,
398407
False)
399408

@@ -424,11 +433,15 @@ def do_restore(conn, sleep_interval, source_table, destination_table, write_capa
424433
"Control plane limit exceeded, retrying updating throughput of GlobalSecondaryIndexes in " + destination_table + "..")
425434
time.sleep(sleep_interval)
426435

436+
# wait for table to become active
437+
wait_for_active_table(conn, destination_table, "active")
438+
427439
logging.info("Restore for " + source_table + " to " + destination_table + " table completed. Time taken: " + str(
428440
datetime.datetime.now().replace(microsecond=0) - start_time))
429441
else:
430442
logging.info("Empty schema of " + source_table + " table created. Time taken: " + str(datetime.datetime.now().replace(microsecond=0) - start_time))
431443

444+
432445
# parse args
433446
parser = argparse.ArgumentParser(description="Simple DynamoDB backup/restore/empty.")
434447
parser.add_argument("-m", "--mode", help="'backup' or 'restore' or 'empty'")

test/test.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,6 @@ def test_schema(self):
3737
def test_data(self):
3838
self.assertEqual(self.test_table_data, self.restored_test_table_data)
3939

40+
4041
if __name__ == '__main__':
4142
unittest.main()

0 commit comments

Comments
 (0)