diff --git a/README.md b/README.md index 08c26c4..c4e7dbc 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,8 @@ Can be run as a command line script or as an npm module. -b, --bucket S3 bucket to store backups -s, --stop-on-failure specify the reporter to use -r, --read-percentage specific the percentage of Dynamo read capacity to use while backing up. default .25 (25%) + -c, --read-capacity change read capacity during backup. default 0 (no change) + -P, --parallel number of parallel reads. This multiplies ; useful to exceed the 1 MB/request limit -x, --excluded-tables exclude these tables from backup -i, --included-tables only backup these tables -p, --backup-path backup path to store table dumps in. default is DynamoDB-backup-YYYY-MM-DD-HH-mm-ss @@ -146,8 +148,8 @@ __Arguments__ * `tableName` - name of the table to backup * `backupPath` - (optional) the path to use for the backup. - The iterator is passed a `callback(err)` which must be called once it has - completed. If no error has occurred, the `callback` should be run without + The iterator is passed a `callback(err)` which must be called once it has + completed. If no error has occurred, the `callback` should be run without arguments or with an explicit `null` argument. * `callback(err)` - A callback which is called when the table has finished backing up, or an error occurs @@ -157,13 +159,13 @@ __Arguments__ ## Restore S3 backups back to Dynamo. -`dynamo-restore-from-s3` is a utility that restores backups in S3 back to dynamo. It streams data down from S3 and throttles the download speed to match the rate of batch writes to Dynamo. +`dynamo-restore-from-s3` is a utility that restores backups in S3 back to dynamo. It streams data down from S3 and throttles the download speed to match the rate of batch writes to Dynamo. It is suitable for restoring large tables without needing to write to disk or use a large amount of memory. Use it on an AWS EC2 instance for best results and to minimise network latency, this should yield restore speeds of around 15min per GB. Use `--overwrite` if the table already exists. Otherwise it will attempt to generate table on the fly. -Can be run as a command line script or as an npm module. +Can be run as a command line script or as an npm module. # Command line usage @@ -193,10 +195,10 @@ Can be run as a command line script or as an npm module. ``` # Restore over existing table (cmd.exe). - > node ./bin/dynamo-restore-from-s3 -t acme-customers -s s3://my-backups/acme-customers.json --overwrite + > node ./bin/dynamo-restore-from-s3 -t acme-customers -s s3://my-backups/acme-customers.json --overwrite # Restore over existing table (shell). - $ ./bin/dynamo-restore-from-s3 -t acme-customers -s s3://my-backups/acme-customers.json --overwrite + $ ./bin/dynamo-restore-from-s3 -t acme-customers -s s3://my-backups/acme-customers.json --overwrite # Restore over existing table, 1000 concurrent requests. Stop if any batch fails 1000 times. $ ./bin/dynamo-restore-from-s3 -t acme-customers -c 1000 -s s3://my-backups/acme-customers.json --overwrite -sf @@ -204,23 +206,23 @@ Can be run as a command line script or as an npm module. # Restore over existing table, 1000 concurrent requests. When finished, set read capacity to 50 and write capacity to 10 (both needed). $ ./bin/dynamo-restore-from-s3 -t acme-customers -c 1000 -s s3://my-backups/acme-customers.json --overwrite --readcapacity 50 --writecapacity 10 - # Auto-generate table (determine PK from backup). + # Auto-generate table (determine PK from backup). $ ./bin/dynamo-restore-from-s3 -t acme-customers -s s3://my-backups/acme-customers.json # Auto-generate table with partition and sort key. - $ ./bin/dynamo-restore-from-s3 -t acme-orders -s s3://my-backups/acme-orders.json -pk customerId -sk createDate + $ ./bin/dynamo-restore-from-s3 -t acme-orders -s s3://my-backups/acme-orders.json -pk customerId -sk createDate # Auto-generate table, defined PK. Concurrency 2000 (~ 2GB backup). - $ ./bin/dynamo-restore-from-s3 -t acme-orders -pk orderId -c 2000 -s s3://my-backups/acme-orders.json + $ ./bin/dynamo-restore-from-s3 -t acme-orders -pk orderId -c 2000 -s s3://my-backups/acme-orders.json # Auto-generate table. 2000 write units during restore. When finished set 50 write units and 100 write units (both needed). $ ./bin/dynamo-restore-from-s3 -t acme-orders -c 2000 -s s3://my-backups/acme-orders.json --readcapacity 100 --writecapacity 50 # Auto-generate table. Concurrency 50 (10 MB backup or less). - $ ./bin/dynamo-restore-from-s3 -t acme-orders -c 50 -s s3://my-backups/acme-orders.json + $ ./bin/dynamo-restore-from-s3 -t acme-orders -c 50 -s s3://my-backups/acme-orders.json # Auto-generate table. Concurrency 50. Stop process if any batch fails 50 times. - $ ./bin/dynamo-restore-from-s3 -t acme-orders -c 50 -sf -s s3://my-backups/acme-orders.json + $ ./bin/dynamo-restore-from-s3 -t acme-orders -c 50 -sf -s s3://my-backups/acme-orders.json ``` @@ -313,7 +315,7 @@ __Example__ ``` restore.on('send-batch', function(batches, requests, streamMeta) { console.log('Batch Sent'); - console.log('Num cached batches: ', batches); + console.log('Num cached batches: ', batches); console.log('Num requests in flight: ', requests); console.log('Stream metadata:, JSON.stringify(streamMeta)); }); diff --git a/bin/dynamo-backup-to-s3 b/bin/dynamo-backup-to-s3 index d06b72b..295f5cf 100755 --- a/bin/dynamo-backup-to-s3 +++ b/bin/dynamo-backup-to-s3 @@ -15,12 +15,18 @@ function parseBool(val) { return val == 'true'; } +function parseLast(val, _) { + return parseInt(val); +} + program .version(JSON.parse(fs.readFileSync(__dirname + '/../package.json', 'utf8')).version) .usage('[options]') .option('-b, --bucket ', 'S3 bucket to store backups') .option('-s, --stop-on-failure', 'specify the reporter to use', parseBool, true) .option('-r, --read-percentage ', 'specific the percentage of Dynamo read capacity to use while backing up. default .25 (25%)', parseFloat, .25) + .option('-c, --read-capacity ', 'change read capacity during backup. default 0 (no change)', parseLast, 0) + .option('-P, --parallel ', 'number of parallel reads. This multiplies ; useful to exceed the 1 MB/read limit', parseInt) .option('-x, --excluded-tables ', 'exclude these tables from backup', list) .option('-i, --included-tables ', 'only backup these tables', list) .option('-p, --backup-path ', 'backup path to store table dumps in. default is DynamoDB-backup-YYYY-MM-DD-HH-mm-ss') @@ -32,6 +38,8 @@ program .parse(process.argv); // run program +console.log(program.readCapacity); +process.exit(0) var runTimes = {}; @@ -44,6 +52,8 @@ var dynamoBackup = new DynamoBackup({ excludedTables: program.excludedTables, includedTables: program.includedTables, readPercentage: program.readPercentage, + readCapacity: program.readCapacity, + parallel: program.parallel, stopOnFailure: program.stopOnFailure, base64Binary: program.base64EncodeBinary, saveDataPipelineFormat: program.saveDataPipelineFormat @@ -51,7 +61,7 @@ var dynamoBackup = new DynamoBackup({ dynamoBackup.on('error', function(data) { console.log('Error backing up ' + data.tableName); - console.log(data.error); + console.log(data.error.message); if (program.stopOnFailure) { process.exit(-1); } @@ -71,4 +81,4 @@ dynamoBackup.on('end-backup', function(tableName) { dynamoBackup.backupAllTables(function() { console.log('Finished backing up DynamoDB'); process.exit(0); -}); \ No newline at end of file +}); diff --git a/lib/dynamo-backup.js b/lib/dynamo-backup.js index a81e469..9bd9569 100644 --- a/lib/dynamo-backup.js +++ b/lib/dynamo-backup.js @@ -17,6 +17,8 @@ function DynamoBackup(options) { this.excludedTables = options.excludedTables || []; this.includedTables = options.includedTables; this.readPercentage = options.readPercentage || .25; + this.readCapacity = options.readCapacity || 0; + this.parallel = options.parallel || 1; this.backupPath = options.backupPath; this.bucket = options.bucket; this.stopOnFailure = options.stopOnFailure || false; @@ -85,36 +87,52 @@ DynamoBackup.prototype.backupTable = function (tableName, backupPath, callback) return callback(err); }); - self._copyTable( - tableName, - function (items) { - items.forEach(function (item) { - if (self.base64Binary) { - _.each(item, function (value, key) { - if (value && value.B) { - value.B = new Buffer(value.B).toString('base64'); - } - }); - } - - if (self.saveDataPipelineFormat) { - stream.append(self._formatForDataPipeline(item)); - } else { - stream.append(JSON.stringify(item)); - } - stream.append('\n'); - }); - }, - function (err) { - stream.end(); - if (err) { - self.emit('error', { - table: tableName, - err: err + // writes each item to stream + var itemsHandler = function (items) { + items.forEach(function (item) { + if (self.base64Binary) { + _.each(item, function (value, key) { + if (value && value.B) { + value.B = new Buffer(value.B).toString('base64'); + } }); } + + if (self.saveDataPipelineFormat) { + stream.append(self._formatForDataPipeline(item)); + } else { + stream.append(JSON.stringify(item)); + } + stream.append('\n'); + }); + }; + + var endStream = function (err) { + stream.end(); + if (err) { + self.emit('error', { + tableName: tableName, + error: err + }); } + }; + + if (self.readCapacity === 0) { + self._copyTable(tableName, null, self.parallel, itemsHandler, endStream); + } else { + self._updateTable( + tableName, + self.readCapacity, + function (oldReadCapacity, oldWriteCapacity, newReadCapacity) { + self._copyTable(tableName, newReadCapacity, self.parallel, itemsHandler, function(err) { + self._restoreTable(tableName, oldReadCapacity, oldWriteCapacity, function(err2) { + endStream(err || err2); + }); + }); + }, + endStream ); + } }; DynamoBackup.prototype.backupAllTables = function (callback) { @@ -141,7 +159,7 @@ DynamoBackup.prototype.backupAllTables = function (callback) { }) }, callback - ); + ); }); }; @@ -151,22 +169,129 @@ DynamoBackup.prototype._getBackupPath = function () { return self.backupPath || ('DynamoDB-backup-' + now.format('YYYY-MM-DD-HH-mm-ss')); }; -DynamoBackup.prototype._copyTable = function (tableName, itemsReceived, callback) { +// starts a _streamItems() loop, which calls itemsReceived each time, then callback when finished or failed +DynamoBackup.prototype._copyTable = function (tableName, readCapacity, parallelReads, itemsReceived, callback) { var self = this; var ddb = new AWS.DynamoDB(); + // DynamoDB is "eventually consistent". If we just called ddb.updateTable, then ddb.describeTable might return the + // old values for readCapacity. To get around this, allow readCapacity to be passed as an argument + + var startStreams = function(readCapacity) { + var readPercentage = self.readPercentage; + var limit = Math.max((readCapacity * readPercentage) | 0, 1); + + if (parallelReads > 1) { + var asyncTracker = function(err, data) {;}; + async.map( + _.range(parallelReads), + function(i, cb) { + self._streamItems(tableName, null, limit, i, parallelReads, itemsReceived, cb); + }, + callback + ); + } else { + self._streamItems(tableName, null, limit, null, null, itemsReceived, callback); + } + }; + + if (readCapacity) { + startStreams(readCapacity); + } else { + ddb.describeTable({ TableName: tableName }, function (err, data) { + if (err) { + return callback(err); + } + + startStreams(data.Table.ProvisionedThroughput.ReadCapacityUnits); + }); + } +}; + +// calls successCallback on success with (oldReadCapacity, oldWriteCapacity, newReadCapacity) +// on failure, tries to restoreTable, and calls errorCallback with error +DynamoBackup.prototype._updateTable = function (tableName, newReadCapacity, successCallback, errorCallback) { + var self = this; + var ddb = new AWS.DynamoDB(); + ddb.describeTable({ TableName: tableName }, function (err, data) { if (err) { - return callback(err); + return errorCallback(err); } - var readPercentage = self.readPercentage; - var limit = Math.max((data.Table.ProvisionedThroughput.ReadCapacityUnits * readPercentage) | 0, 1); + var oldReadCapacity = data.Table.ProvisionedThroughput.ReadCapacityUnits, + oldWriteCapacity = data.Table.ProvisionedThroughput.WriteCapacityUnits; - self._streamItems(tableName, null, limit, itemsReceived, callback); - }); -}; + if (newReadCapacity == oldReadCapacity) { + return successCallback(); + } + + var params = { + TableName: tableName, + ProvisionedThroughput: { + ReadCapacityUnits: newReadCapacity, + WriteCapacityUnits: oldWriteCapacity + } + }; + + var tryRestore = function(err) { + self._restoreTable(tableName, oldReadCapacity, oldWriteCapacity, function(err2) { + errorCallback(err || err2); + }); + } + + ddb.updateTable(params, function (err, data) { + if (err) { + return tryRestore(err); + } + + ddb.waitFor('tableExists', { TableName: tableName }, function(err, data2) { + if (err) { + return tryRestore(err); + } + successCallback(oldReadCapacity, oldWriteCapacity, newReadCapacity); // fuckit, just assume the change worked + // I don't want to call another describe. + }); + }); + }) +} -DynamoBackup.prototype._streamItems = function fetchItems(tableName, startKey, limit, itemsReceived, callback) { +DynamoBackup.prototype._restoreTable = function (tableName, oldReadCapacity, oldWriteCapacity, callback) { + var self = this; + var ddb = new AWS.DynamoDB(); + + if (oldReadCapacity == null) { + return callback(); + } + + var params = { + TableName: tableName, + ProvisionedThroughput: { + ReadCapacityUnits: oldReadCapacity, + WriteCapacityUnits: oldWriteCapacity + } + }; + + // it takes a moment for dynamoDB to realize we're done reading + var counter = 0; + var update = function() { + ddb.updateTable(params, function (err, data) { + if (err) { + if (counter > 15) { // 5 minute timeout + return callback(err); + } + if (err.code == 'ResourceInUseException') { + counter++; + return setTimeout(update, 20000); + } + return callback(err); + } + callback(); + }); + }; + setTimeout(update, 2000); +} + +DynamoBackup.prototype._streamItems = function fetchItems(tableName, startKey, limit, segment, totalSegments, itemsReceived, callback) { var self = this; var ddb = new AWS.DynamoDB(); var params = { @@ -177,6 +302,10 @@ DynamoBackup.prototype._streamItems = function fetchItems(tableName, startKey, l if (startKey) { params.ExclusiveStartKey = startKey; } + if (segment != null) { + params.Segment = segment; + params.TotalSegments = totalSegments; + } ddb.scan(params, function (err, data) { if (err) { return callback(err); @@ -189,7 +318,7 @@ DynamoBackup.prototype._streamItems = function fetchItems(tableName, startKey, l if (!data.LastEvaluatedKey || _.keys(data.LastEvaluatedKey).length === 0) { return callback(); } - self._streamItems(tableName, data.LastEvaluatedKey, limit, itemsReceived, callback); + self._streamItems(tableName, data.LastEvaluatedKey, limit, segment, totalSegments, itemsReceived, callback); }); }; @@ -217,7 +346,7 @@ DynamoBackup.prototype._fetchTables = function (lastTable, tables, callback) { * AWS Data Pipeline import requires that each key in the Attribute list * be lower-cased and for sets start with a lower-case character followed * by an 'S'. - * + * * Go through each attribute and create a new entry with the correct case */ DynamoBackup.prototype._formatForDataPipeline = function (item) { @@ -259,4 +388,4 @@ DynamoBackup.prototype._getDataPipelineAttributeValueKey = function (type) { } }; -module.exports = DynamoBackup; \ No newline at end of file +module.exports = DynamoBackup; diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index b341fb8..602b5b8 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -2,20 +2,21 @@ * lib/dynamo-restore.js * * By Steven de Salas - * + * * AWS Restore to DynamoDB. Streams an S3 backup to a new DynamoDB table. * */ var URL = require('url'); var util = require('util'); var AWS = require('aws-sdk'); +var S3S = require('s3-streams'); var events = require('events'); var readline = require('readline'); var DYNAMO_CHUNK_SIZE = 25; function DynamoRestore(options) { options = options || {}; - options.concurrency = options.concurrency || 200; + options.concurrency = options.concurrency options.concurrency * DYNAMO_CHUNK_SIZE || 200; options.minConcurrency = 1; options.maxConcurrency = options.concurrency; options.readcapacity = options.readcapacity || 5; @@ -35,7 +36,7 @@ function DynamoRestore(options) { this.dynamodb = new AWS.DynamoDB(); } -// Stick to prototypal inheritance. While this can be done differently +// Stick to prototypal inheritance. While this can be done differently // in ES6 we'd be making package unusable for older engines (0.10.x->0.12.x) util.inherits(DynamoRestore, events.EventEmitter); @@ -134,9 +135,9 @@ DynamoRestore.prototype._startDownload = function() { else this.emit('error', util.format('Error downloading file from s3: %s', error)); return; } - var downloadStream = s3.getObject(params).createReadStream(); + var downloadStream = S3S.ReadStream(s3, params); downloadStream.pause(); - // All good, start downloading + // All good, start downloading this.emit('start-download', meta); this.readline = readline.createInterface({ terminal: false, @@ -180,7 +181,7 @@ DynamoRestore.prototype._processLine = function(line) { // Writing to Dynamo is usually slower than reading from S3, // and we want to avoid clogging up memory or writing to disk. // The list of batches waiting for DynamoDB to process would - // quickly get out of hand here, so an easy way around this is to + // quickly get out of hand here, so an easy way around this is to // stop reading from S3 when the number of requests in flight goes past a // certain size, and then continue reading when the number is reduced. if (this.requests.length >= this.options.concurrency) { @@ -210,7 +211,7 @@ DynamoRestore.prototype._extractSchema = function(template) { if (likelyCandidates.length === 0) { return this.emit('error', 'Fatal Error. Cannot determine --partitionkey from backup, please supply it manually.'); } else { - // Pick the shortest one + // Pick the shortest one partitionkey = likelyCandidates.sort(function(a, b) { return b.length - a.length; }).pop(); @@ -360,4 +361,4 @@ DynamoRestore.prototype._finishBatches = function() { setTimeout(this._finishBatches.bind(this), 200); }; -module.exports = DynamoRestore; \ No newline at end of file +module.exports = DynamoRestore; diff --git a/package.json b/package.json index bc46c90..df82f6a 100644 --- a/package.json +++ b/package.json @@ -28,7 +28,8 @@ "lodash": "^3.10.1", "moment": "^2.10.6", "moment-range": "^2.0.3", - "s3-streaming-upload": "^0.2.1" + "s3-streaming-upload": "^0.2.1", + "s3-streams": "^0.3.0" }, "engines": { "node": ">=0.10.0"