@@ -5,7 +5,9 @@ const DynamoDBStreams = require('aws-sdk/clients/dynamodbstreams');
5
5
const DynamoDBStreamReadable = require ( '..' ) ;
6
6
7
7
const fromCallback = fun =>
8
- new Promise ( ( resolve , reject ) => fun ( ( err , data ) => ( err ? reject ( err ) : resolve ( data ) ) ) ) ;
8
+ new Promise ( ( resolve , reject ) => {
9
+ fun ( ( err , data ) => ( err ? reject ( err ) : resolve ( data ) ) ) ;
10
+ } ) ;
9
11
const wait = duration => fromCallback ( cb => setTimeout ( cb , duration ) ) ;
10
12
const batchWriteItem = ( dynamodb , tableName , items ) =>
11
13
fromCallback ( cb =>
@@ -152,25 +154,25 @@ test.serial('reads ongoing records', t => {
152
154
153
155
let count = 0 ;
154
156
return Promise . all ( [
155
- new Promise ( ( resolve , reject ) =>
157
+ new Promise ( ( resolve , reject ) => {
156
158
readable
157
- . on ( 'data' , function ( recordSet ) {
159
+ . on ( 'data' , function ( recordSet ) {
158
160
recordSet . forEach ( record => {
159
161
t . deepEqual ( record . dynamodb . Keys . Id , documents [ count ] . Item . Id ) ;
160
162
count = count + 1 ;
161
163
} ) ;
162
164
if ( count > documents . length ) t . fail ( 'should not read extra records' ) ;
163
165
if ( count === documents . length ) readable . close ( ) ;
164
166
} )
165
- . on ( 'end' , function ( ) {
167
+ . on ( 'end' , function ( ) {
166
168
t . deepEqual ( count , documents . length , `read ${ documents . length } records` ) ;
167
169
resolve ( ) ;
168
170
} )
169
- . on ( 'error' , function ( err ) {
171
+ . on ( 'error' , function ( err ) {
170
172
t . fail ( 'should not error' ) ;
171
173
reject ( err ) ;
172
- } )
173
- ) ,
174
+ } ) ;
175
+ } ) ,
174
176
wait ( 100 ) . then ( ( ) => batchWriteItem ( dynamodb , tableName , documents ) )
175
177
] ) ;
176
178
} ) ;
@@ -209,29 +211,29 @@ test.serial('reads latest records', async t => {
209
211
210
212
let count = 0 ;
211
213
return Promise . all ( [
212
- new Promise ( ( resolve , reject ) =>
214
+ new Promise ( ( resolve , reject ) => {
213
215
readable
214
- . on ( 'data' , function ( recordSet ) {
216
+ . on ( 'data' , function ( recordSet ) {
215
217
recordSet . forEach ( record => {
216
218
t . deepEqual ( record . dynamodb . Keys . Id , subsequentDocuments [ count ] . Item . Id ) ;
217
219
count = count + 1 ;
218
220
} ) ;
219
221
if ( count > subsequentDocuments . length ) t . fail ( 'should not read extra records' ) ;
220
222
if ( count === subsequentDocuments . length ) readable . close ( ) ;
221
223
} )
222
- . on ( 'end' , function ( ) {
224
+ . on ( 'end' , function ( ) {
223
225
t . deepEqual (
224
226
count ,
225
227
subsequentDocuments . length ,
226
228
`read ${ subsequentDocuments . length } records`
227
229
) ;
228
230
resolve ( ) ;
229
231
} )
230
- . on ( 'error' , function ( err ) {
232
+ . on ( 'error' , function ( err ) {
231
233
t . fail ( 'should not error' ) ;
232
234
reject ( err ) ;
233
- } )
234
- ) ,
235
+ } ) ;
236
+ } ) ,
235
237
wait ( 100 ) . then ( ( ) => batchWriteItem ( dynamodb , tableName , subsequentDocuments ) )
236
238
] ) ;
237
239
} ) ;
@@ -261,9 +263,9 @@ test.serial('emits checkpoints, obeys limits', t => {
261
263
let count = 0 ;
262
264
let checkpoints = 0 ;
263
265
return Promise . all ( [
264
- new Promise ( ( resolve , reject ) =>
266
+ new Promise ( ( resolve , reject ) => {
265
267
readable
266
- . on ( 'data' , function ( recordSet ) {
268
+ . on ( 'data' , function ( recordSet ) {
267
269
t . is ( recordSet . length , 1 , 'obeys requested limit' ) ;
268
270
recordSet . forEach ( record => {
269
271
t . deepEqual ( record . dynamodb . Keys . Id , documents [ count ] . Item . Id ) ;
@@ -272,19 +274,19 @@ test.serial('emits checkpoints, obeys limits', t => {
272
274
if ( count > documents . length ) t . fail ( 'should not read extra records' ) ;
273
275
if ( count === documents . length ) readable . close ( ) ;
274
276
} )
275
- . on ( 'checkpoint' , function ( sequenceNum ) {
277
+ . on ( 'checkpoint' , function ( sequenceNum ) {
276
278
if ( typeof sequenceNum !== 'string' ) t . fail ( 'invalid sequenceNum emitted' ) ;
277
279
checkpoints = checkpoints + 1 ;
278
280
} )
279
- . on ( 'end' , function ( ) {
281
+ . on ( 'end' , function ( ) {
280
282
t . deepEqual ( count , documents . length , `read ${ documents . length } records` ) ;
281
283
resolve ( ) ;
282
284
} )
283
- . on ( 'error' , function ( err ) {
285
+ . on ( 'error' , function ( err ) {
284
286
t . fail ( 'should not error' ) ;
285
287
reject ( err ) ;
286
- } )
287
- ) ,
288
+ } ) ;
289
+ } ) ,
288
290
batchWriteItem ( dynamodb , tableName , documents )
289
291
] ) ;
290
292
} ) ;
0 commit comments