1
1
/* @flow */
2
- import { json , stats , warning , invariant } from './Global' ;
3
- import CloudWatch from './CloudWatch' ;
4
- import type { TableConsumedCapacityDescription } from './FlowTypes' ;
5
- import type {
6
- TableDescription ,
7
- GetMetricStatisticsResponse ,
8
- Dimension ,
9
- } from 'aws-sdk-promise' ;
2
+ import { invariant } from './Global' ;
3
+ import CapacityCalculatorBase from './capacity/CapacityCalculatorBase' ;
4
+ import type { GetMetricStatisticsResponse } from 'aws-sdk-promise' ;
5
+ import type { StatisticSettings } from './flow/FlowTypes' ;
10
6
11
- export default class CapacityCalculator {
12
- _cw : CloudWatch ;
7
+ export default class CapacityCalculator extends CapacityCalculatorBase {
13
8
14
- constructor ( cloudWatch : CloudWatch ) {
15
- invariant ( typeof cloudWatch !== 'undefined' , 'Parameter \'cloudWatch\' is not set' ) ;
16
- this . _cw = cloudWatch ;
9
+ // Get the region
10
+ getCloudWatchRegion ( ) {
11
+ return 'us-east-1' ;
17
12
}
18
13
19
- async describeTableConsumedCapacityAsync ( params : TableDescription )
20
- : Promise < TableConsumedCapacityDescription > {
21
- let sw = stats
22
- . timer ( 'DynamoDB.describeTableConsumedCapacityAsync' )
23
- . start ( ) ;
24
-
25
- try {
26
- invariant ( typeof params !== 'undefined' , 'Parameter \'params\' is not set' ) ;
27
-
28
- // Make all the requests concurrently
29
- let tableRead = this . getConsumedCapacityAsync ( true , params . TableName , null ) ;
30
- let tableWrite = this . getConsumedCapacityAsync ( false , params . TableName , null ) ;
31
-
32
- let gsiReads = ( params . GlobalSecondaryIndexes || [ ] )
33
- . map ( gsi => this . getConsumedCapacityAsync ( true , params . TableName , gsi . IndexName ) ) ;
34
-
35
- let gsiWrites = ( params . GlobalSecondaryIndexes || [ ] )
36
- . map ( gsi => this . getConsumedCapacityAsync ( false , params . TableName , gsi . IndexName ) ) ;
37
-
38
- // Await on the results
39
- let tableConsumedRead = await tableRead ;
40
- let tableConsumedWrite = await tableWrite ;
41
- let gsiConsumedReads = await Promise . all ( gsiReads ) ;
42
- let gsiConsumedWrites = await Promise . all ( gsiWrites ) ;
43
-
44
- // Format results
45
- let gsis = gsiConsumedReads . map ( ( read , i ) => {
46
- let write = gsiConsumedWrites [ i ] ;
47
- return {
48
- // $FlowIgnore: The indexName is not null in this case
49
- IndexName : read . globalSecondaryIndexName ,
50
- ConsumedThroughput : {
51
- ReadCapacityUnits : read . value ,
52
- WriteCapacityUnits : write . value
53
- }
54
- } ;
55
- } ) ;
56
-
57
- return {
58
- TableName : params . TableName ,
59
- ConsumedThroughput : {
60
- ReadCapacityUnits : tableConsumedRead . value ,
61
- WriteCapacityUnits : tableConsumedWrite . value
62
- } ,
63
- GlobalSecondaryIndexes : gsis
64
- } ;
65
- } catch ( ex ) {
66
- warning ( JSON . stringify ( {
67
- class : 'CapacityCalculator' ,
68
- function : 'describeTableConsumedCapacityAsync' ,
69
- params,
70
- } , null , json . padding ) ) ;
71
- throw ex ;
72
- } finally {
73
- sw . end ( ) ;
74
- }
75
- }
76
-
77
- async getConsumedCapacityAsync (
78
- isRead : boolean , tableName : string , globalSecondaryIndexName : ?string ) {
79
- try {
80
- invariant ( typeof isRead !== 'undefined' , 'Parameter \'isRead\' is not set' ) ;
81
- invariant ( typeof tableName !== 'undefined' , 'Parameter \'tableName\' is not set' ) ;
82
- invariant ( typeof globalSecondaryIndexName !== 'undefined' ,
83
- 'Parameter \'globalSecondaryIndexName\' is not set' ) ;
84
-
85
- // These values determine how many minutes worth of metrics
86
- let durationMinutes = 5 ;
87
- let periodMinutes = 1 ;
88
-
89
- let EndTime = new Date ( ) ;
90
- let StartTime = new Date ( ) ;
91
- StartTime . setTime ( EndTime - ( 60000 * durationMinutes ) ) ;
92
- let MetricName = isRead ? 'ConsumedReadCapacityUnits' : 'ConsumedWriteCapacityUnits' ;
93
- let Dimensions = this . getDimensions ( tableName , globalSecondaryIndexName ) ;
94
- let params = {
95
- Namespace : 'AWS/DynamoDB' ,
96
- MetricName,
97
- Dimensions,
98
- StartTime,
99
- EndTime,
100
- Period : ( periodMinutes * 60 ) ,
101
- Statistics : [ 'Average' ] ,
102
- Unit : 'Count'
103
- } ;
104
-
105
- let statistics = await this . _cw . getMetricStatisticsAsync ( params ) ;
106
- let value = this . getProjectedValue ( statistics ) ;
107
- return {
108
- tableName,
109
- globalSecondaryIndexName,
110
- value
111
- } ;
112
- } catch ( ex ) {
113
- warning ( JSON . stringify ( {
114
- class : 'CapacityCalculator' ,
115
- function : 'getConsumedCapacityAsync' ,
116
- isRead, tableName, globalSecondaryIndexName,
117
- } , null , json . padding ) ) ;
118
- throw ex ;
119
- }
14
+ getStatisticSettings ( ) : StatisticSettings {
15
+ return {
16
+ count : 5 ,
17
+ spanMinutes : 1 ,
18
+ type : 'Average' ,
19
+ } ;
120
20
}
121
21
22
+ // Gets the projected capacity value based on the cloudwatch datapoints
122
23
getProjectedValue ( data : GetMetricStatisticsResponse ) {
24
+ invariant ( data != null , 'Parameter \'data\' is not set' ) ;
25
+
123
26
if ( data . Datapoints . length === 0 ) {
124
27
return 0 ;
125
28
}
@@ -128,18 +31,6 @@ export default class CapacityCalculator {
128
31
// 1. Query 5 average readings each spanning a minute
129
32
// 2. Select the Max value from those 5 readings
130
33
let averages = data . Datapoints . map ( dp => dp . Average ) ;
131
- let value = Math . max ( ...averages ) ;
132
- return value ;
133
- }
134
-
135
- getDimensions ( tableName : string , globalSecondaryIndexName : ?string ) : Dimension [ ] {
136
- if ( globalSecondaryIndexName ) {
137
- return [
138
- { Name : 'TableName' , Value : tableName } ,
139
- { Name : 'GlobalSecondaryIndex' , Value : globalSecondaryIndexName }
140
- ] ;
141
- }
142
-
143
- return [ { Name : 'TableName' , Value : tableName } ] ;
34
+ return Math . max ( ...averages ) ;
144
35
}
145
36
}
0 commit comments