Skip to content

Commit 28ba66b

Browse files
committed
feat(batch-get-single-table.request): fetch until no UnprocessedItems are returned
BREAKING CHANGE: provide the keys to fetch as Partial<T>
1 parent 85c0d71 commit 28ba66b

File tree

6 files changed

+330
-145
lines changed

6 files changed

+330
-145
lines changed
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import { DynamoDB } from 'aws-sdk'
2+
import { of } from 'rxjs'
3+
import { DynamoRx } from '../dynamo-rx'
4+
import { batchGetItemsFetchAll, combineBatchGetResponses, hasUnprocessedKeys } from './batch-get-utils'
5+
6+
describe('batch-get utils', () => {
7+
8+
describe('hasUnprocessedKeys', () => {
9+
it('should return bool according to given object', () => {
10+
expect(hasUnprocessedKeys({})).toBeFalsy()
11+
expect(hasUnprocessedKeys({ Responses: {} })).toBeFalsy()
12+
expect(hasUnprocessedKeys({ UnprocessedKeys: {} })).toBeFalsy()
13+
expect(hasUnprocessedKeys({ UnprocessedKeys: { 'aTableName': { Keys: [] } } })).toBeFalsy()
14+
expect(hasUnprocessedKeys({ UnprocessedKeys: { 'aTableName': { Keys: [{ id: { S: 'id' } }] } } })).toBeTruthy()
15+
})
16+
})
17+
18+
describe('combineBatchGetResponses', () => {
19+
const resp1: DynamoDB.BatchGetItemOutput = {
20+
Responses: {
21+
'tableA': [
22+
{ id: { S: 'id-a1' } },
23+
],
24+
'tableB': [
25+
{ id: { S: 'id-b' } },
26+
],
27+
},
28+
UnprocessedKeys: {
29+
'tableA:': { Keys: [{ id: { S: 'id-a2' } }] },
30+
'tableC:': { Keys: [{ id: { S: 'id-c' } }] },
31+
'tableD:': { Keys: [{ id: { S: 'id-d' } }] },
32+
},
33+
}
34+
const resp2: DynamoDB.BatchGetItemOutput = {
35+
Responses: {
36+
'tableA': [
37+
{ id: { S: 'id-a2' } },
38+
],
39+
'tableC': [
40+
{ id: { S: 'id-c' } },
41+
],
42+
},
43+
UnprocessedKeys: {
44+
'tableD:': { Keys: [{ id: { S: 'id-d' } }] },
45+
},
46+
}
47+
const expectedOutput: DynamoDB.BatchGetItemOutput = {
48+
Responses: {
49+
'tableA': [
50+
{ id: { S: 'id-a1' } },
51+
{ id: { S: 'id-a2' } },
52+
],
53+
'tableB': [
54+
{ id: { S: 'id-b' } },
55+
],
56+
'tableC': [
57+
{ id: { S: 'id-c' } },
58+
],
59+
},
60+
UnprocessedKeys: {
61+
'tableD:': { Keys: [{ id: { S: 'id-d' } }] },
62+
},
63+
}
64+
it('should combine correctly', () => {
65+
expect(combineBatchGetResponses(resp1)(resp2)).toEqual(expectedOutput)
66+
})
67+
68+
})
69+
70+
describe('batchGetItemsFetchAll', () => {
71+
let batchGetItemsSpy: jasmine.Spy
72+
let dynamoRx: DynamoRx
73+
let backoffTimerMock: { next: jasmine.Spy }
74+
75+
const output1: DynamoDB.BatchGetItemOutput = {
76+
Responses: {
77+
'tableA': [{ id: { S: 'id-A' } }],
78+
},
79+
UnprocessedKeys: {
80+
'tableA': {
81+
Keys: [{ id: { S: 'id-A' } }],
82+
},
83+
},
84+
}
85+
const output2: DynamoDB.BatchGetItemOutput = {
86+
Responses: {
87+
'tableA': [{ id: { S: 'id-A' } }],
88+
},
89+
}
90+
91+
beforeEach(async () => {
92+
batchGetItemsSpy = jasmine.createSpy().and.returnValues(of(output1), of(output2))
93+
dynamoRx = <any>{ batchGetItems: batchGetItemsSpy }
94+
backoffTimerMock = { next: jasmine.createSpy().and.returnValue({ value: 0 }) }
95+
96+
await batchGetItemsFetchAll(
97+
dynamoRx,
98+
<any>{},
99+
<IterableIterator<number>><any>backoffTimerMock,
100+
0,
101+
).toPromise()
102+
})
103+
104+
105+
it('should use UnprocessedKeys for next request', () => {
106+
expect(batchGetItemsSpy).toHaveBeenCalledTimes(2)
107+
expect(batchGetItemsSpy.calls.mostRecent().args[0]).toBeDefined()
108+
expect(batchGetItemsSpy.calls.mostRecent().args[0].RequestItems).toBeDefined()
109+
expect(batchGetItemsSpy.calls.mostRecent().args[0].RequestItems).toEqual(output1.UnprocessedKeys)
110+
})
111+
112+
it('should backoff when UnprocessedItems', () => {
113+
expect(backoffTimerMock.next).toHaveBeenCalledTimes(1)
114+
})
115+
116+
})
117+
118+
})
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import { DynamoDB } from 'aws-sdk'
2+
import { BatchGetRequestMap } from 'aws-sdk/clients/dynamodb'
3+
import { Observable, of } from 'rxjs'
4+
import { delay, map, mergeMap } from 'rxjs/operators'
5+
import { DynamoRx } from '../dynamo-rx'
6+
7+
export function batchGetItemsFetchAll(
8+
dynamoRx: DynamoRx,
9+
params: DynamoDB.BatchGetItemInput,
10+
backoffTimer: IterableIterator<number>,
11+
throttleTimeSlot: number,
12+
): Observable<DynamoDB.BatchGetItemOutput> {
13+
return dynamoRx.batchGetItems(params)
14+
.pipe(
15+
mergeMap(response => {
16+
if (hasUnprocessedKeys(response)) {
17+
return of(response.UnprocessedKeys)
18+
.pipe(
19+
delay(backoffTimer.next().value * throttleTimeSlot),
20+
mergeMap((UnprocessedKeys: DynamoDB.BatchGetRequestMap) => {
21+
const nextParams = { ...params, RequestItems: UnprocessedKeys }
22+
return batchGetItemsFetchAll(dynamoRx, nextParams, backoffTimer, throttleTimeSlot)
23+
}),
24+
map(combineBatchGetResponses(response)),
25+
)
26+
}
27+
return of(response)
28+
}),
29+
)
30+
}
31+
32+
export type ResponseWithUnprocessedKeys = DynamoDB.BatchGetItemOutput & { UnprocessedKeys: BatchGetRequestMap }
33+
34+
export function hasUnprocessedKeys(response: DynamoDB.BatchGetItemOutput): response is ResponseWithUnprocessedKeys {
35+
if (!response.UnprocessedKeys) {
36+
return false
37+
}
38+
return Object.values(response.UnprocessedKeys)
39+
.some(t => !!t && t.Keys && t.Keys.length > 0)
40+
}
41+
42+
/**
43+
* combines a first with a second response. ConsumedCapacity is always from the latter.
44+
* @param response1
45+
*/
46+
export function combineBatchGetResponses(response1: DynamoDB.BatchGetItemOutput) {
47+
return (response2: DynamoDB.BatchGetItemOutput): DynamoDB.BatchGetItemOutput => {
48+
const tableNames: string[] = Object.keys(response1.Responses || {})
49+
50+
Object.keys(response2.Responses || {})
51+
.filter(tn => !tableNames.includes(tn))
52+
.forEach(tn => tableNames.push(tn))
53+
54+
const Responses = tableNames
55+
.reduce((u, tableName) => ({
56+
...u,
57+
[tableName]: [
58+
...(response1.Responses && response1.Responses[tableName] || []),
59+
...(response2.Responses && response2.Responses[tableName] || []),
60+
],
61+
}), {})
62+
return {
63+
...response2,
64+
Responses,
65+
}
66+
}
67+
}

src/dynamo/batchget/batch-get.request.ts

Lines changed: 11 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { DynamoDB } from 'aws-sdk'
2-
import { Observable, of } from 'rxjs'
3-
import { delay, map, mergeMap } from 'rxjs/operators'
2+
import { Observable } from 'rxjs'
3+
import { map } from 'rxjs/operators'
44
import { metadataForClass } from '../../decorator/metadata/metadata-helper'
55
import { randomExponentialBackoffTimer } from '../../helper'
66
import { createToKeyFn, fromDb } from '../../mapper'
@@ -13,6 +13,7 @@ import { REGEX_TABLE_NAME } from '../request/regex'
1313
import { SessionValidityEnsurer } from '../session-validity-ensurer.type'
1414
import { TableNameResolver } from '../table-name-resolver.type'
1515
import { BatchGetFullResponse } from './batch-get-full.response'
16+
import { batchGetItemsFetchAll } from './batch-get-utils'
1617
import { BatchGetResponse } from './batch-get.response'
1718

1819
const MAX_REQUEST_ITEM_COUNT = 100
@@ -34,6 +35,10 @@ export class BatchGetRequest {
3435
}
3536
}
3637

38+
private fetch(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = DEFAULT_TIME_SLOT) {
39+
return batchGetItemsFetchAll(this.dynamoRx, { ...this.params }, backoffTimer(), throttleTimeSlot)
40+
}
41+
3742
/**
3843
* @param {ModelConstructor<T>} modelClazz
3944
* @param {Partial<T>[]} keys a partial of T that contains Partition key and SortKey (if necessary). Throws if missing.
@@ -49,7 +54,7 @@ export class BatchGetRequest {
4954

5055
// check if modelClazz is really an @Model() decorated class
5156
const metadata = metadataForClass(modelClazz)
52-
if (metadata.modelOptions === null) { throw new Error('given ModelConstructor has no @Model decorator')}
57+
if (!metadata.modelOptions) { throw new Error('given ModelConstructor has no @Model decorator')}
5358

5459
// check if keys to add do not exceed max count
5560
if (this.itemCounter + keys.length > MAX_REQUEST_ITEM_COUNT) { throw new Error(`you can request at max ${MAX_REQUEST_ITEM_COUNT} items per request`)}
@@ -75,19 +80,6 @@ export class BatchGetRequest {
7580
return tableName
7681
}
7782

78-
/**
79-
* combines a first with a second response. ConsumedCapacity is always from the latter.
80-
* @param response1
81-
*/
82-
private combineResponses = (response1: DynamoDB.BatchGetItemOutput) => (response2: DynamoDB.BatchGetItemOutput): DynamoDB.BatchGetItemOutput => {
83-
const Responses = Object.entries(response1.Responses || {})
84-
.reduce((u, [tableName, items]) => ({ [tableName]: [...items, ...(response2.Responses ? response2.Responses[tableName] : [])] }), {})
85-
return {
86-
Responses,
87-
ConsumedCapacity: response2.ConsumedCapacity,
88-
}
89-
}
90-
9183
private mapResponse = (response: DynamoDB.BatchGetItemOutput): BatchGetFullResponse => {
9284
let Responses: BatchGetResponse = {}
9385

@@ -105,39 +97,19 @@ export class BatchGetRequest {
10597
}
10698
}
10799

108-
private fetch(params: DynamoDB.BatchGetItemInput, backoffTimer: IterableIterator<number>, throttleTimeSlot: number): Observable<DynamoDB.BatchGetItemOutput> {
109-
return this.dynamoRx.batchGetItems(params)
110-
.pipe(
111-
mergeMap(response => {
112-
if (!!response.UnprocessedKeys && Object.entries(response.UnprocessedKeys).some(t => !!t && t.length > 0)) {
113-
return of(response.UnprocessedKeys)
114-
.pipe(
115-
delay(backoffTimer.next().value * throttleTimeSlot),
116-
mergeMap((UnprocessedKeys: DynamoDB.BatchGetRequestMap) => {
117-
const nextParams = { ...params, RequestItems: UnprocessedKeys }
118-
return this.fetch(nextParams, backoffTimer, throttleTimeSlot)
119-
}),
120-
map(this.combineResponses(response)),
121-
)
122-
}
123-
return of(response)
124-
}),
125-
)
126-
}
127-
128100
execNoMap(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = DEFAULT_TIME_SLOT): Observable<DynamoDB.BatchGetItemOutput> {
129-
return this.fetch({ ...this.params }, backoffTimer(), throttleTimeSlot)
101+
return this.fetch(backoffTimer, throttleTimeSlot)
130102
}
131103

132104
execFullResponse(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = DEFAULT_TIME_SLOT): Observable<BatchGetFullResponse> {
133-
return this.fetch({ ...this.params }, backoffTimer(), throttleTimeSlot)
105+
return this.fetch(backoffTimer, throttleTimeSlot)
134106
.pipe(
135107
map(this.mapResponse),
136108
)
137109
}
138110

139111
exec(backoffTimer = randomExponentialBackoffTimer, throttleTimeSlot = DEFAULT_TIME_SLOT): Observable<BatchGetResponse> {
140-
return this.fetch({ ...this.params }, backoffTimer(), throttleTimeSlot)
112+
return this.fetch(backoffTimer, throttleTimeSlot)
141113
.pipe(
142114
map(this.mapResponse),
143115
map(r => r.Responses),

src/dynamo/dynamo-store.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ describe('dynamo store', () => {
8383
it('batchWrite', () => expect(store.batchWrite() instanceof BatchWriteSingleTableRequest).toBeTruthy())
8484
it('scan', () => expect(store.scan() instanceof ScanRequest).toBeTruthy())
8585
it('query', () => expect(store.query() instanceof QueryRequest).toBeTruthy())
86-
it('batchGetItem', () => expect(store.batchGetItem(['id']) instanceof BatchGetSingleTableRequest).toBeTruthy())
86+
it('batchGetItem', () => expect(store.batchGetItem([{id:'id'}]) instanceof BatchGetSingleTableRequest).toBeTruthy())
8787
})
8888

8989
describe('should enable custom requests', () => {

0 commit comments

Comments
 (0)