@@ -20,6 +20,7 @@ function DynamoBackup(options) {
2020 this . bucket = options . bucket ;
2121 this . stopOnFailure = options . stopOnFailure || false ;
2222 this . base64Binary = options . base64Binary || false ;
23+ this . saveDataPipelineFormat = options . saveDataPipelineFormat || false ;
2324 this . awsAccessKey = options . awsAccessKey || process . env . AWS_ACCESS_KEY_ID ;
2425 this . awsSecretKey = options . awsSecretKey || process . env . AWS_SECRET_ACCESS_KEY ;
2526 this . awsRegion = options . awsRegion || process . env . AWS_DEFAULT_REGION || 'us-east-1' ;
@@ -49,22 +50,22 @@ DynamoBackup.prototype.backupTable = function (tableName, backupPath, callback)
4950 }
5051
5152 var upload = new Uploader ( {
52- accessKey : self . awsAccessKey ,
53- secretKey : self . awsSecretKey ,
54- region : self . awsRegion ,
55- bucket : self . bucket ,
53+ accessKey : self . awsAccessKey ,
54+ secretKey : self . awsSecretKey ,
55+ region : self . awsRegion ,
56+ bucket : self . bucket ,
5657 objectName : path . join ( backupPath , tableName + '.json' ) ,
57- stream : stream ,
58- debug : self . debug
58+ stream : stream ,
59+ debug : self . debug
5960 } ) ;
6061
6162 var startTime = moment . utc ( ) ;
6263 self . emit ( 'start-backup' , tableName , startTime ) ;
63- upload . send ( function ( err ) {
64+ upload . send ( function ( err ) {
6465 if ( err ) {
6566 self . emit ( 'error' , {
6667 table : tableName ,
67- err : err
68+ err : err
6869 } ) ;
6970 }
7071 var endTime = moment . utc ( ) ;
@@ -75,36 +76,41 @@ DynamoBackup.prototype.backupTable = function (tableName, backupPath, callback)
7576
7677 self . _copyTable (
7778 tableName ,
78- function ( items ) {
79- items . forEach ( function ( item ) {
79+ function ( items ) {
80+ items . forEach ( function ( item ) {
8081 if ( self . base64Binary ) {
8182 _ . each ( item , function ( value , key ) {
8283 if ( value && value . B ) {
8384 value . B = new Buffer ( value . B ) . toString ( 'base64' ) ;
8485 }
8586 } ) ;
8687 }
87- stream . append ( JSON . stringify ( item ) ) ;
88+
89+ if ( self . saveDataPipelineFormat ) {
90+ stream . append ( self . _formatForDataPipeline ( item ) ) ;
91+ } else {
92+ stream . append ( JSON . stringify ( item ) ) ;
93+ }
8894 stream . append ( '\n' ) ;
8995 } ) ;
9096 } ,
91- function ( err ) {
97+ function ( err ) {
9298 stream . end ( ) ;
93- if ( err ) {
99+ if ( err ) {
94100 self . emit ( 'error' , {
95101 table : tableName ,
96- err : err
102+ err : err
97103 } ) ;
98104 }
99105 }
100- ) ;
106+ ) ;
101107}
102108
103109DynamoBackup . prototype . backupAllTables = function ( callback ) {
104110 var self = this ;
105111 var backupPath = self . _getBackupPath ( ) ;
106112
107- self . listTables ( function ( err , tables ) {
113+ self . listTables ( function ( err , tables ) {
108114 if ( err ) {
109115 return callback ( err ) ;
110116 }
@@ -113,8 +119,8 @@ DynamoBackup.prototype.backupAllTables = function (callback) {
113119 tables = _ . intersection ( tables , includedTables ) ;
114120
115121 async . each ( tables ,
116- function ( tableName , done ) {
117- self . backupTable ( tableName , backupPath , function ( err ) {
122+ function ( tableName , done ) {
123+ self . backupTable ( tableName , backupPath , function ( err ) {
118124 if ( err ) {
119125 if ( self . stopOnFailure ) {
120126 return done ( err ) ;
@@ -124,11 +130,11 @@ DynamoBackup.prototype.backupAllTables = function (callback) {
124130 } )
125131 } ,
126132 callback
127- ) ;
133+ ) ;
128134 } ) ;
129135}
130136
131- DynamoBackup . prototype . _getBackupPath = function ( ) {
137+ DynamoBackup . prototype . _getBackupPath = function ( ) {
132138 var self = this ;
133139 var now = moment . utc ( ) ;
134140 return self . backupPath || ( 'DynamoDB-backup-' + now . format ( 'YYYY-MM-DD-HH-mm-ss' ) ) ;
@@ -137,13 +143,13 @@ DynamoBackup.prototype._getBackupPath = function() {
137143DynamoBackup . prototype . _copyTable = function ( tableName , itemsReceived , callback ) {
138144 var self = this ;
139145 var ddb = new AWS . DynamoDB ( ) ;
140- ddb . describeTable ( { TableName : tableName } , function ( err , data ) {
146+ ddb . describeTable ( { TableName : tableName } , function ( err , data ) {
141147 if ( err ) {
142148 return callback ( err ) ;
143149 }
144150
145151 var readPercentage = self . readPercentage ;
146- var limit = Math . max ( ( data . Table . ProvisionedThroughput . ReadCapacityUnits * readPercentage ) | 0 , 1 ) ;
152+ var limit = Math . max ( ( data . Table . ProvisionedThroughput . ReadCapacityUnits * readPercentage ) | 0 , 1 ) ;
147153
148154 self . _streamItems ( tableName , null , limit , itemsReceived , callback ) ;
149155 } ) ;
@@ -160,12 +166,12 @@ DynamoBackup.prototype._streamItems = function fetchItems(tableName, startKey, l
160166 if ( startKey ) {
161167 params . ExclusiveStartKey = startKey ;
162168 }
163- ddb . scan ( params , function ( err , data ) {
169+ ddb . scan ( params , function ( err , data ) {
164170 if ( err ) {
165171 return callback ( err ) ;
166172 }
167173
168- if ( data . Items . length > 0 ) {
174+ if ( data . Items . length > 0 ) {
169175 itemsReceived ( data . Items ) ;
170176 }
171177
@@ -176,14 +182,14 @@ DynamoBackup.prototype._streamItems = function fetchItems(tableName, startKey, l
176182 } ) ;
177183}
178184
179- DynamoBackup . prototype . _fetchTables = function ( lastTable , tables , callback ) {
185+ DynamoBackup . prototype . _fetchTables = function ( lastTable , tables , callback ) {
180186 var self = this ;
181187 var ddb = new AWS . DynamoDB ( ) ;
182188 var params = { } ;
183189 if ( lastTable ) {
184190 params . ExclusiveStartTableName = lastTable ;
185191 }
186- ddb . listTables ( params , function ( err , data ) {
192+ ddb . listTables ( params , function ( err , data ) {
187193 if ( err ) {
188194 return callback ( err , null ) ;
189195 }
@@ -196,4 +202,45 @@ DynamoBackup.prototype._fetchTables = function(lastTable, tables, callback) {
196202 } ) ;
197203} ;
198204
199- module . exports = DynamoBackup ;
205+ /**
206+ * AWS Data Pipeline import requires that each key in the Attribute list
207+ * be lower-cased and for sets start with a lower-case character followed
208+ * by an 'S'.
209+ *
210+ * Go through each attribute and create a new entry with the correct case
211+ */
212+ DynamoBackup . prototype . _formatForDataPipeline = function ( item ) {
213+ var self = this ;
214+ _ . each ( item , function ( value , key ) {
215+ //value will be of the form: {S: 'xxx'}. Convert the key
216+ _ . each ( value , function ( v , k ) {
217+ var dataPipelineValueKey = self . _getDataPipelineAttributeValueKey ( k ) ;
218+ value [ dataPipelineValueKey ] = v ;
219+ value [ k ] = undefined ;
220+ } ) ;
221+ } ) ;
222+ return JSON . stringify ( item ) ;
223+ } ;
224+
225+ DynamoBackup . prototype . _getDataPipelineAttributeValueKey = function ( type ) {
226+ switch ( type ) {
227+ case 'S' :
228+ case 'N' :
229+ case 'B' :
230+ case 'M' :
231+ case 'L' :
232+ case 'NULL' :
233+ case 'BOOL' :
234+ return type . toLowerCase ( ) ;
235+ case 'SS' :
236+ return 'sS' ;
237+ case 'NS' :
238+ return 'nS' ;
239+ case 'BS' :
240+ return 'bS' ;
241+ default :
242+ throw new Error ( 'Unknown AttributeValue key: ' + type ) ;
243+ }
244+ }
245+
246+ module . exports = DynamoBackup ;
0 commit comments