@@ -260,40 +260,41 @@ def do_backup(conn, table_name, read_capacity):
260260 f .write (json .dumps (table_desc , indent = JSON_INDENT ))
261261 f .close ()
262262
263- original_read_capacity = table_desc ["Table" ]["ProvisionedThroughput" ]["ReadCapacityUnits" ]
264- original_write_capacity = table_desc ["Table" ]["ProvisionedThroughput" ]["WriteCapacityUnits" ]
263+ if not args .schemaOnly :
264+ original_read_capacity = table_desc ["Table" ]["ProvisionedThroughput" ]["ReadCapacityUnits" ]
265+ original_write_capacity = table_desc ["Table" ]["ProvisionedThroughput" ]["WriteCapacityUnits" ]
265266
266- # override table read capacity if specified
267- if read_capacity is not None and read_capacity != original_read_capacity :
268- update_provisioned_throughput (conn , table_name , read_capacity , original_write_capacity )
267+ # override table read capacity if specified
268+ if read_capacity is not None and read_capacity != original_read_capacity :
269+ update_provisioned_throughput (conn , table_name , read_capacity , original_write_capacity )
269270
270- # get table data
271- logging .info ("Dumping table items for " + table_name )
272- mkdir_p (DUMP_PATH + "/" + table_name + "/" + DATA_DIR )
271+ # get table data
272+ logging .info ("Dumping table items for " + table_name )
273+ mkdir_p (DUMP_PATH + "/" + table_name + "/" + DATA_DIR )
273274
274- i = 1
275- last_evaluated_key = None
275+ i = 1
276+ last_evaluated_key = None
276277
277- while True :
278- scanned_table = conn .scan (table_name , exclusive_start_key = last_evaluated_key )
278+ while True :
279+ scanned_table = conn .scan (table_name , exclusive_start_key = last_evaluated_key )
279280
280- f = open (DUMP_PATH + "/" + table_name + "/" + DATA_DIR + "/" + str (i ).zfill (4 ) + ".json" , "w+" )
281- f .write (json .dumps (scanned_table , indent = JSON_INDENT ))
282- f .close ()
281+ f = open (DUMP_PATH + "/" + table_name + "/" + DATA_DIR + "/" + str (i ).zfill (4 ) + ".json" , "w+" )
282+ f .write (json .dumps (scanned_table , indent = JSON_INDENT ))
283+ f .close ()
283284
284- i += 1
285+ i += 1
285286
286- try :
287- last_evaluated_key = scanned_table ["LastEvaluatedKey" ]
288- except KeyError :
289- break
287+ try :
288+ last_evaluated_key = scanned_table ["LastEvaluatedKey" ]
289+ except KeyError :
290+ break
290291
291- # revert back to original table read capacity if specified
292- if read_capacity is not None and read_capacity != original_read_capacity :
293- update_provisioned_throughput (conn , table_name , original_read_capacity , original_write_capacity , False )
292+ # revert back to original table read capacity if specified
293+ if read_capacity is not None and read_capacity != original_read_capacity :
294+ update_provisioned_throughput (conn , table_name , original_read_capacity , original_write_capacity , False )
294295
295- logging .info ("Backup for " + table_name + " table completed. Time taken: " + str (
296- datetime .datetime .now ().replace (microsecond = 0 ) - start_time ))
296+ logging .info ("Backup for " + table_name + " table completed. Time taken: " + str (
297+ datetime .datetime .now ().replace (microsecond = 0 ) - start_time ))
297298
298299
299300def do_restore (conn , sleep_interval , source_table , destination_table , write_capacity ):
@@ -363,74 +364,78 @@ def do_restore(conn, sleep_interval, source_table, destination_table, write_capa
363364 # wait for table creation completion
364365 wait_for_active_table (conn , destination_table , "created" )
365366
366- # read data files
367- logging .info ("Restoring data for " + destination_table + " table.." )
368- data_file_list = os .listdir (dump_data_path + "/" + source_table + "/" + DATA_DIR + "/" )
369- data_file_list .sort ()
370-
371- for data_file in data_file_list :
372- logging .info ("Processing " + data_file + " of " + destination_table )
373- items = []
374- item_data = json .load (open (dump_data_path + "/" + source_table + "/" + DATA_DIR + "/" + data_file ))
375- items .extend (item_data ["Items" ])
376-
377- # batch write data
378- put_requests = []
379- while len (items ) > 0 :
380- put_requests .append ({"PutRequest" : {"Item" : items .pop (0 )}})
381-
382- # flush every MAX_BATCH_WRITE
383- if len (put_requests ) == MAX_BATCH_WRITE :
384- logging .debug ("Writing next " + str (MAX_BATCH_WRITE ) + " items to " + destination_table + ".." )
367+ if not args .schemaOnly :
368+ # read data files
369+ logging .info ("Restoring data for " + destination_table + " table.." )
370+ data_file_list = os .listdir (dump_data_path + "/" + source_table + "/" + DATA_DIR + "/" )
371+ data_file_list .sort ()
372+
373+ for data_file in data_file_list :
374+ logging .info ("Processing " + data_file + " of " + destination_table )
375+ items = []
376+ item_data = json .load (open (dump_data_path + "/" + source_table + "/" + DATA_DIR + "/" + data_file ))
377+ items .extend (item_data ["Items" ])
378+
379+ # batch write data
380+ put_requests = []
381+ while len (items ) > 0 :
382+ put_requests .append ({"PutRequest" : {"Item" : items .pop (0 )}})
383+
384+ # flush every MAX_BATCH_WRITE
385+ if len (put_requests ) == MAX_BATCH_WRITE :
386+ logging .debug ("Writing next " + str (MAX_BATCH_WRITE ) + " items to " + destination_table + ".." )
387+ batch_write (conn , sleep_interval , destination_table , put_requests )
388+ del put_requests [:]
389+
390+ # flush remainder
391+ if len (put_requests ) > 0 :
385392 batch_write (conn , sleep_interval , destination_table , put_requests )
386- del put_requests [:]
387-
388- # flush remainder
389- if len (put_requests ) > 0 :
390- batch_write (conn , sleep_interval , destination_table , put_requests )
391-
392- if not args .dataOnly and not args .skipThroughputUpdate :
393- # revert to original table write capacity if it has been modified
394- if write_capacity != original_write_capacity :
395- update_provisioned_throughput (conn , destination_table , original_read_capacity , original_write_capacity ,
396- False )
397-
398- # loop through each GSI to check if it has changed and update if necessary
399- if table_global_secondary_indexes is not None :
400- gsi_data = []
401- for gsi in table_global_secondary_indexes :
402- original_gsi_write_capacity = original_gsi_write_capacities .pop (0 )
403- if original_gsi_write_capacity != gsi ["ProvisionedThroughput" ]["WriteCapacityUnits" ]:
404- gsi_data .append ({"Update" : {"IndexName" : gsi ["IndexName" ],
405- "ProvisionedThroughput" : {
406- "ReadCapacityUnits" : int (
407- gsi ["ProvisionedThroughput" ]["ReadCapacityUnits" ]),
408- "WriteCapacityUnits" : int (original_gsi_write_capacity )}}})
409-
410- logging .info ("Updating " + destination_table + " global secondary indexes write capacities as necessary.." )
411- while True :
412- try :
413- conn .update_table (destination_table , global_secondary_index_updates = gsi_data )
414- break
415- except boto .exception .JSONResponseError as e :
416- if e .body ["__type" ] == "com.amazonaws.dynamodb.v20120810#LimitExceededException" :
417- logging .info (
418- "Limit exceeded, retrying updating throughput of GlobalSecondaryIndexes in " + destination_table + ".." )
419- time .sleep (sleep_interval )
420- elif e .body ["__type" ] == "com.amazon.coral.availability#ThrottlingException" :
421- logging .info (
422- "Control plane limit exceeded, retrying updating throughput of GlobalSecondaryIndexes in " + destination_table + ".." )
423- time .sleep (sleep_interval )
424-
425- logging .info ("Restore for " + source_table + " to " + destination_table + " table completed. Time taken: " + str (
426- datetime .datetime .now ().replace (microsecond = 0 ) - start_time ))
427393
394+ if not args .dataOnly and not args .skipThroughputUpdate :
395+ # revert to original table write capacity if it has been modified
396+ if write_capacity != original_write_capacity :
397+ update_provisioned_throughput (conn , destination_table , original_read_capacity , original_write_capacity ,
398+ False )
399+
400+ # loop through each GSI to check if it has changed and update if necessary
401+ if table_global_secondary_indexes is not None :
402+ gsi_data = []
403+ for gsi in table_global_secondary_indexes :
404+ original_gsi_write_capacity = original_gsi_write_capacities .pop (0 )
405+ if original_gsi_write_capacity != gsi ["ProvisionedThroughput" ]["WriteCapacityUnits" ]:
406+ gsi_data .append ({"Update" : {"IndexName" : gsi ["IndexName" ],
407+ "ProvisionedThroughput" : {
408+ "ReadCapacityUnits" : int (
409+ gsi ["ProvisionedThroughput" ]["ReadCapacityUnits" ]),
410+ "WriteCapacityUnits" : int (original_gsi_write_capacity )}}})
411+
412+ logging .info ("Updating " + destination_table + " global secondary indexes write capacities as necessary.." )
413+ while True :
414+ try :
415+ conn .update_table (destination_table , global_secondary_index_updates = gsi_data )
416+ break
417+ except boto .exception .JSONResponseError as e :
418+ if e .body ["__type" ] == "com.amazonaws.dynamodb.v20120810#LimitExceededException" :
419+ logging .info (
420+ "Limit exceeded, retrying updating throughput of GlobalSecondaryIndexes in " + destination_table + ".." )
421+ time .sleep (sleep_interval )
422+ elif e .body ["__type" ] == "com.amazon.coral.availability#ThrottlingException" :
423+ logging .info (
424+ "Control plane limit exceeded, retrying updating throughput of GlobalSecondaryIndexes in " + destination_table + ".." )
425+ time .sleep (sleep_interval )
426+
427+ logging .info ("Restore for " + source_table + " to " + destination_table + " table completed. Time taken: " + str (
428+ datetime .datetime .now ().replace (microsecond = 0 ) - start_time ))
429+ else :
430+ logging .info ("Empty schema of " + source_table + " table created. Time taken: " + str (datetime .datetime .now ().replace (microsecond = 0 ) - start_time ))
428431
429432# parse args
430433parser = argparse .ArgumentParser (description = "Simple DynamoDB backup/restore/empty." )
431434parser .add_argument ("-m" , "--mode" , help = "'backup' or 'restore' or 'empty'" )
432435parser .add_argument ("-r" , "--region" ,
433436 help = "AWS region to use, e.g. 'us-west-1'. Use '" + LOCAL_REGION + "' for local DynamoDB testing." )
437+ parser .add_argument ("-p" , "--profile" ,
438+ help = "AWS profile name to use. Allows to you to choose alternate profile from your AWS credentials file." )
434439parser .add_argument ("-s" , "--srcTable" ,
435440 help = "Source DynamoDB table name to backup or restore from, use 'tablename*' for wildcard prefix selection or '*' for all tables." )
436441parser .add_argument ("-d" , "--destTable" ,
@@ -448,6 +453,8 @@ def do_restore(conn, sleep_interval, source_table, destination_table, write_capa
448453parser .add_argument ("--accessKey" , help = "Access key of local DynamoDB [required only for local]" )
449454parser .add_argument ("--secretKey" , help = "Secret key of local DynamoDB [required only for local]" )
450455parser .add_argument ("--log" , help = "Logging level - DEBUG|INFO|WARNING|ERROR|CRITICAL [optional]" )
456+ parser .add_argument ("--schemaOnly" , action = "store_true" , default = False ,
457+ help = "Backup or restore the schema only. Do not export/import data. [optional]" )
451458parser .add_argument ("--dataOnly" , action = "store_true" , default = False ,
452459 help = "Restore data only. Do not delete/recreate schema [optional for restore]" )
453460parser .add_argument ("--skipThroughputUpdate" , action = "store_true" , default = False ,
@@ -460,15 +467,27 @@ def do_restore(conn, sleep_interval, source_table, destination_table, write_capa
460467 log_level = args .log .upper ()
461468logging .basicConfig (level = getattr (logging , log_level ))
462469
470+
471+ # Check to make sure that --dataOnly and --schemaOnly weren't simultaneously specified
472+ if args .schemaOnly and args .dataOnly :
473+ logging .info ("Options --schemaOnly and --dataOnly are mutually exclusive." )
474+ sys .exit (1 )
475+
476+
463477# instantiate connection
464478if args .region == LOCAL_REGION :
465479 conn = DynamoDBConnection (aws_access_key_id = args .accessKey , aws_secret_access_key = args .secretKey , host = args .host ,
466480 port = int (args .port ), is_secure = False )
467481 sleep_interval = LOCAL_SLEEP_INTERVAL
468482else :
469- conn = boto .dynamodb2 .connect_to_region (args .region , aws_access_key_id = args .accessKey ,
470- aws_secret_access_key = args .secretKey )
471- sleep_interval = AWS_SLEEP_INTERVAL
483+ if not args .profile :
484+ conn = boto .dynamodb2 .connect_to_region (args .region , aws_access_key_id = args .accessKey ,
485+ aws_secret_access_key = args .secretKey )
486+ sleep_interval = AWS_SLEEP_INTERVAL
487+ else :
488+ conn = boto .dynamodb2 .connect_to_region (args .region , profile_name = args .profile )
489+ sleep_interval = AWS_SLEEP_INTERVAL
490+
472491
473492# set prefix separator
474493prefix_separator = DEFAULT_PREFIX_SEPARATOR
0 commit comments