Skip to content

Commit 0213cc6

Browse files
Merge pull request #122 from shiftcode/#95-batch-write-request
#95 batch write request
2 parents 74630c1 + 3a8e23f commit 0213cc6

File tree

12 files changed

+318
-28
lines changed

12 files changed

+318
-28
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export const BATCH_GET_MAX_REQUEST_ITEM_COUNT = 100
2+
export const BATCH_GET_DEFAULT_TIME_SLOT = 1000

src/dynamo/batchget/batch-get.request.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ describe('batch get', () => {
7171
})
7272

7373
it('should throw when more than 100 items are added', () => {
74-
const items55: Array<Partial<SimpleWithPartitionKeyModel>> = new Array(55).map((x, i) => ({ id: `id-${i}` }))
75-
const items60: Array<Partial<Organization>> = new Array(60).map((x, i) => ({
74+
const items55: Array<Partial<SimpleWithPartitionKeyModel>> = [...new Array(55)].map((x, i) => ({ id: `id-${i}` }))
75+
const items60: Array<Partial<Organization>> = [...new Array(60)].map((x, i) => ({
7676
id: `id-${i}`,
7777
createdAtDate: new Date(),
7878
}))

src/dynamo/batchget/batch-get.request.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ import { DynamoRx } from '../dynamo-rx'
1010
import { getTableName } from '../get-table-name.function'
1111
import { BatchGetFullResponse } from './batch-get-full.response'
1212
import { batchGetItemsFetchAll } from './batch-get-utils'
13+
import { BATCH_GET_DEFAULT_TIME_SLOT, BATCH_GET_MAX_REQUEST_ITEM_COUNT } from './batch-get.const'
1314
import { BatchGetResponse } from './batch-get.response'
1415

15-
const MAX_REQUEST_ITEM_COUNT = 100
16-
const DEFAULT_TIME_SLOT = 1000
16+
1717

1818
export class BatchGetRequest {
1919
readonly params: DynamoDB.BatchGetItemInput
@@ -46,7 +46,7 @@ export class BatchGetRequest {
4646
this.tables.set(tableName, modelClazz)
4747

4848
// check if keys to add do not exceed max count
49-
if (this.itemCounter + keys.length > MAX_REQUEST_ITEM_COUNT) { throw new Error(`you can request at max ${MAX_REQUEST_ITEM_COUNT} items per request`)}
49+
if (this.itemCounter + keys.length > BATCH_GET_MAX_REQUEST_ITEM_COUNT) { throw new Error(`you can request at max ${BATCH_GET_MAX_REQUEST_ITEM_COUNT} items per request`)}
5050

5151
this.params.RequestItems[tableName] = {
5252
Keys: keys.map(createToKeyFn(modelClazz)),
@@ -58,26 +58,26 @@ export class BatchGetRequest {
5858
return this
5959
}
6060

61-
execNoMap(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = DEFAULT_TIME_SLOT): Observable<DynamoDB.BatchGetItemOutput> {
61+
execNoMap(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = BATCH_GET_DEFAULT_TIME_SLOT): Observable<DynamoDB.BatchGetItemOutput> {
6262
return this.fetch(backoffTimer, throttleTimeSlot)
6363
}
6464

65-
execFullResponse(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = DEFAULT_TIME_SLOT): Observable<BatchGetFullResponse> {
65+
execFullResponse(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = BATCH_GET_DEFAULT_TIME_SLOT): Observable<BatchGetFullResponse> {
6666
return this.fetch(backoffTimer, throttleTimeSlot)
6767
.pipe(
6868
map(this.mapResponse),
6969
)
7070
}
7171

72-
exec(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = DEFAULT_TIME_SLOT): Observable<BatchGetResponse> {
72+
exec(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = BATCH_GET_DEFAULT_TIME_SLOT): Observable<BatchGetResponse> {
7373
return this.fetch(backoffTimer, throttleTimeSlot)
7474
.pipe(
7575
map(this.mapResponse),
7676
map(r => r.Responses),
7777
)
7878
}
7979

80-
private fetch(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = DEFAULT_TIME_SLOT) {
80+
private fetch(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = BATCH_GET_DEFAULT_TIME_SLOT) {
8181
return batchGetItemsFetchAll(this.dynamoRx, { ...this.params }, backoffTimer(), throttleTimeSlot)
8282
}
8383

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { DynamoDB } from 'aws-sdk'
2+
import { of } from 'rxjs'
3+
import { DynamoRx } from '../dynamo-rx'
4+
import { batchWriteItemsWriteAll, hasUnprocessedItems } from './batch-write-utils'
5+
6+
describe('batch-write-utils', () => {
7+
describe('batchWriteItemsWriteAll', () => {
8+
let batchWriteItemSpy: jasmine.Spy
9+
let dynamoRx: DynamoRx
10+
let backoffTimerMock: { next: jasmine.Spy }
11+
12+
const output1: DynamoDB.BatchWriteItemOutput = {
13+
UnprocessedItems: {
14+
tableA: [
15+
{
16+
PutRequest: { Item: { id: { S: 'id-A' } } },
17+
},
18+
],
19+
},
20+
}
21+
const output2: DynamoDB.BatchWriteItemOutput = {}
22+
23+
beforeEach(async () => {
24+
batchWriteItemSpy = jasmine.createSpy().and.returnValues(of(output1), of(output2))
25+
dynamoRx = <any>{ batchWriteItem: batchWriteItemSpy }
26+
backoffTimerMock = { next: jasmine.createSpy().and.returnValue({ value: 0 }) }
27+
28+
await batchWriteItemsWriteAll(dynamoRx, <any>{}, <IterableIterator<number>>(<any>backoffTimerMock), 0).toPromise()
29+
})
30+
31+
it('should use UnprocessedKeys for next request', () => {
32+
expect(batchWriteItemSpy).toHaveBeenCalledTimes(2)
33+
expect(batchWriteItemSpy.calls.mostRecent().args[0]).toBeDefined()
34+
expect(batchWriteItemSpy.calls.mostRecent().args[0].RequestItems).toBeDefined()
35+
expect(batchWriteItemSpy.calls.mostRecent().args[0].RequestItems).toEqual(output1.UnprocessedItems)
36+
})
37+
38+
it('should backoff when UnprocessedItems', () => {
39+
expect(backoffTimerMock.next).toHaveBeenCalledTimes(1)
40+
})
41+
})
42+
43+
describe('hasUnprocessedItems', () => {
44+
it('should return false when no items', () => {
45+
expect(hasUnprocessedItems({})).toBe(false)
46+
expect(hasUnprocessedItems({ UnprocessedItems: {} })).toBe(false)
47+
expect(hasUnprocessedItems({ UnprocessedItems: { myTable: [] } })).toBe(false)
48+
})
49+
50+
it('should return true when items', () => {
51+
expect(hasUnprocessedItems({ UnprocessedItems: { myTable: [{ PutRequest: { Item: { id: { S: 'myId' } } } }] } }))
52+
})
53+
})
54+
})

src/dynamo/request/batchwritesingletable/batch-write-utils.ts renamed to src/dynamo/batchwrite/batch-write-utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import * as DynamoDB from 'aws-sdk/clients/dynamodb'
22
import { Observable, of } from 'rxjs'
33
import { delay, mergeMap } from 'rxjs/operators'
4-
import { DynamoRx } from '../../dynamo-rx'
4+
import { DynamoRx } from '../dynamo-rx'
55

66

77
export function batchWriteItemsWriteAll(
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export const BATCH_WRITE_MAX_REQUEST_ITEM_COUNT = 25
2+
export const BATCH_WRITE_DEFAULT_TIME_SLOT = 1000
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import { of } from 'rxjs'
2+
import { ComplexModel, SimpleWithPartitionKeyModel } from '../../../test/models'
3+
import { getTableName } from '../get-table-name.function'
4+
import { BatchWriteRequest } from './batch-write.request'
5+
6+
describe('batchWriteRequest', () => {
7+
let req: BatchWriteRequest
8+
9+
describe('constructor', () => {
10+
it('should initialize params', () => {
11+
req = new BatchWriteRequest()
12+
expect(req.params.RequestItems).toBeDefined()
13+
expect(req.params.RequestItems).toEqual({})
14+
})
15+
})
16+
17+
describe('returnConsumedCapacity', () => {
18+
it('should set params', () => {
19+
req = new BatchWriteRequest().returnConsumedCapacity('TOTAL')
20+
expect(req.params.ReturnConsumedCapacity).toBe('TOTAL')
21+
})
22+
})
23+
24+
describe('returnItemCollectionMetrics', () => {
25+
it('should set params', () => {
26+
req = new BatchWriteRequest().returnItemCollectionMetrics('SIZE')
27+
expect(req.params.ReturnItemCollectionMetrics).toBe('SIZE')
28+
})
29+
})
30+
31+
describe('delete', () => {
32+
const now = new Date()
33+
34+
it('should set params', () => {
35+
req = new BatchWriteRequest()
36+
.delete(SimpleWithPartitionKeyModel, [{ id: 'myId1' }])
37+
.delete(ComplexModel, [{ id: 'myId2', creationDate: now }])
38+
39+
expect(Object.keys(req.params.RequestItems)).toEqual([
40+
getTableName(SimpleWithPartitionKeyModel),
41+
getTableName(ComplexModel),
42+
])
43+
44+
const simpleReqItems = req.params.RequestItems[getTableName(SimpleWithPartitionKeyModel)]
45+
expect(simpleReqItems).toBeDefined()
46+
expect(simpleReqItems.length).toBe(1)
47+
expect(simpleReqItems[0].DeleteRequest).toEqual({
48+
Key: { id: { S: 'myId1' } },
49+
})
50+
51+
const complexReqItems = req.params.RequestItems[getTableName(ComplexModel)]
52+
expect(complexReqItems).toBeDefined()
53+
expect(complexReqItems.length).toBe(1)
54+
expect(complexReqItems[0].DeleteRequest).toEqual({
55+
Key: { id: { S: 'myId2' }, creationDate: { S: now.toISOString() } },
56+
})
57+
})
58+
59+
it('should throw when too many items', () => {
60+
const items: Array<Partial<SimpleWithPartitionKeyModel>> = [...new Array(30)].map((_, ix) => ({
61+
id: `mydId-${ix}`,
62+
}))
63+
expect(() => new BatchWriteRequest().delete(SimpleWithPartitionKeyModel, items)).toThrow()
64+
})
65+
})
66+
67+
describe('put', () => {
68+
const now = new Date()
69+
70+
it('should set params', () => {
71+
req = new BatchWriteRequest()
72+
.put(SimpleWithPartitionKeyModel, [{ id: 'myId1' }])
73+
.put(ComplexModel, [{ id: 'myId2', creationDate: now }])
74+
75+
const simpleReqItems = req.params.RequestItems[getTableName(SimpleWithPartitionKeyModel)]
76+
expect(simpleReqItems).toBeDefined()
77+
expect(simpleReqItems.length).toBe(1)
78+
expect(simpleReqItems[0].PutRequest).toEqual({
79+
Item: { id: { S: 'myId1' } },
80+
})
81+
82+
const complexReqItems = req.params.RequestItems[getTableName(ComplexModel)]
83+
expect(complexReqItems).toBeDefined()
84+
expect(complexReqItems.length).toBe(1)
85+
expect(complexReqItems[0].PutRequest).toEqual({
86+
Item: { id: { S: 'myId2' }, creationDate: { S: now.toISOString() } },
87+
})
88+
})
89+
90+
it('should throw when too many items', () => {
91+
const items: SimpleWithPartitionKeyModel[] = [...new Array(30)].map((_, ix) => ({ id: `mydId-${ix}`, age: ix }))
92+
expect(() => new BatchWriteRequest().put(SimpleWithPartitionKeyModel, items)).toThrow()
93+
})
94+
})
95+
96+
describe('put and delete mixed', () => {
97+
const now = new Date()
98+
const tnSimple = getTableName(SimpleWithPartitionKeyModel)
99+
const tnComplex = getTableName(ComplexModel)
100+
101+
const simpleItem: SimpleWithPartitionKeyModel = { id: 'myId', age: 25 }
102+
const simpleItems: SimpleWithPartitionKeyModel[] = [...new Array(16)].map((_, ix) => ({
103+
id: `mydId-${ix}`,
104+
age: ix,
105+
}))
106+
const complexItems: ComplexModel[] = [...new Array(16)].map(
107+
(_, ix) => <ComplexModel>{ id: `myId-${ix}`, creationDate: now },
108+
)
109+
110+
beforeEach(() => (req = new BatchWriteRequest()))
111+
112+
it('should add correct request items', () => {
113+
req.put(ComplexModel, complexItems).delete(SimpleWithPartitionKeyModel, [simpleItem])
114+
expect(req.params.RequestItems[tnComplex].length).toBe(16)
115+
expect(req.params.RequestItems[tnComplex][5].PutRequest).toBeDefined()
116+
117+
expect(req.params.RequestItems[tnSimple].length).toBe(1)
118+
expect(req.params.RequestItems[tnSimple][0].DeleteRequest).toBeDefined()
119+
})
120+
121+
it('should throw when too many (1)', () => {
122+
expect(() => req.put(ComplexModel, complexItems).delete(SimpleWithPartitionKeyModel, simpleItems)).toThrow()
123+
})
124+
it('should throw when too many (2)', () => {
125+
expect(() => req.delete(ComplexModel, complexItems).put(SimpleWithPartitionKeyModel, simpleItems)).toThrow()
126+
})
127+
})
128+
129+
describe('exec functions', () => {
130+
let batchWriteItemSpy: jasmine.Spy
131+
132+
beforeEach(() => {
133+
const output = {
134+
myResponse: true,
135+
}
136+
batchWriteItemSpy = jasmine.createSpy().and.returnValue(of(output))
137+
const dynamoRx = <any>{ batchWriteItem: batchWriteItemSpy }
138+
req = new BatchWriteRequest()
139+
Object.assign(req, { dynamoRx })
140+
})
141+
142+
it('exec should return void', async () => {
143+
expect(await req.exec().toPromise()).toBeUndefined()
144+
})
145+
146+
it('execFullResponse should return the full response', async () => {
147+
expect(await req.execFullResponse().toPromise()).toEqual({ myResponse: true })
148+
})
149+
})
150+
})
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import { DynamoDB } from 'aws-sdk'
2+
import { Observable } from 'rxjs'
3+
import { map } from 'rxjs/operators'
4+
import { randomExponentialBackoffTimer } from '../../helper'
5+
import { createToKeyFn, toDb } from '../../mapper'
6+
import { ModelConstructor } from '../../model'
7+
import { DynamoRx } from '../dynamo-rx'
8+
import { getTableName } from '../get-table-name.function'
9+
import { batchWriteItemsWriteAll } from './batch-write-utils'
10+
import { BATCH_WRITE_DEFAULT_TIME_SLOT, BATCH_WRITE_MAX_REQUEST_ITEM_COUNT } from './batch-write.const'
11+
12+
export class BatchWriteRequest {
13+
readonly params: DynamoDB.BatchWriteItemInput
14+
private readonly dynamoRx: DynamoRx
15+
private itemCount = 0
16+
17+
constructor() {
18+
this.dynamoRx = new DynamoRx()
19+
this.params = {
20+
RequestItems: {},
21+
}
22+
}
23+
24+
returnConsumedCapacity(value: DynamoDB.ReturnConsumedCapacity): BatchWriteRequest {
25+
this.params.ReturnConsumedCapacity = value
26+
return this
27+
}
28+
29+
returnItemCollectionMetrics(value: DynamoDB.ReturnItemCollectionMetrics): BatchWriteRequest {
30+
this.params.ReturnItemCollectionMetrics = value
31+
return this
32+
}
33+
34+
delete<T>(modelClazz: ModelConstructor<T>, items: Array<Partial<T>>): BatchWriteRequest {
35+
this.requestItems(modelClazz, items.map(this.createDeleteRequest(modelClazz)))
36+
return this
37+
}
38+
39+
put<T>(modelClazz: ModelConstructor<T>, items: T[]): BatchWriteRequest {
40+
this.requestItems(modelClazz, items.map(this.createPutRequest(modelClazz)))
41+
return this
42+
}
43+
44+
/**
45+
*
46+
* @param backoffTimer generator for how much timeSlots should be waited before requesting next batch. only used when capacity was exceeded. default randomExponentialBackoffTimer
47+
* @param throttleTimeSlot defines how long one timeSlot is for throttling, default 1 second
48+
*/
49+
exec(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = BATCH_WRITE_DEFAULT_TIME_SLOT): Observable<void> {
50+
return this.write(backoffTimer, throttleTimeSlot).pipe(
51+
map(() => {
52+
return
53+
}),
54+
)
55+
}
56+
57+
execFullResponse(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = BATCH_WRITE_DEFAULT_TIME_SLOT): Observable<DynamoDB.BatchWriteItemOutput> {
58+
return this.write(backoffTimer, throttleTimeSlot)
59+
}
60+
61+
private requestItems(modelClazz: ModelConstructor<any>, items: DynamoDB.WriteRequests) {
62+
if (this.itemCount + items.length > BATCH_WRITE_MAX_REQUEST_ITEM_COUNT) {
63+
throw new Error(`batch write takes at max ${BATCH_WRITE_MAX_REQUEST_ITEM_COUNT} items`)
64+
}
65+
const tableName = getTableName(modelClazz)
66+
this.params.RequestItems[tableName] = this.params.RequestItems[tableName] || []
67+
this.params.RequestItems[tableName].push(...items)
68+
this.itemCount += items.length
69+
}
70+
71+
private write(backoffTimer: () => IterableIterator<number>, throttleTimeSlot: number) {
72+
return batchWriteItemsWriteAll(this.dynamoRx, { ...this.params }, backoffTimer(), throttleTimeSlot)
73+
}
74+
75+
private createDeleteRequest = <T>(modelClazz: ModelConstructor<T>) => {
76+
const toKey = createToKeyFn(modelClazz)
77+
return (item: Partial<T>): DynamoDB.WriteRequest => ({ DeleteRequest: { Key: toKey(item) } })
78+
}
79+
80+
private createPutRequest = <T>(modelClazz: ModelConstructor<T>) => {
81+
return (item: T): DynamoDB.WriteRequest => ({ PutRequest: { Item: toDb(item, modelClazz) } })
82+
}
83+
}

src/dynamo/batchwrite/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './batch-write.request'

src/dynamo/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export * from './batchget'
2+
export * from './batchwrite'
23
export * from './expression'
34
export * from './request'
45
export * from './dynamo-rx'

0 commit comments

Comments
 (0)