|
| 1 | +/* @flow */ |
| 2 | +import { json, stats, warning, invariant } from './Global'; |
| 3 | +import CloudWatch from './CloudWatch'; |
| 4 | +import type { |
| 5 | + TableDescription, GetMetricStatisticsResponse, Dimension, |
| 6 | + TableConsumedCapacityDescription } from './FlowTypes'; |
| 7 | + |
| 8 | +export default class CapacityCalculator { |
| 9 | + _cw: CloudWatch; |
| 10 | + |
| 11 | + constructor(cloudWatch: CloudWatch) { |
| 12 | + invariant(typeof cloudWatch !== 'undefined', 'Parameter \'cloudWatch\' is not set'); |
| 13 | + this._cw = cloudWatch; |
| 14 | + } |
| 15 | + |
| 16 | + async describeTableConsumedCapacityAsync(params: TableDescription) |
| 17 | + : Promise<TableConsumedCapacityDescription> { |
| 18 | + let sw = stats |
| 19 | + .timer('DynamoDB.describeTableConsumedCapacityAsync') |
| 20 | + .start(); |
| 21 | + |
| 22 | + try { |
| 23 | + invariant(typeof params !== 'undefined', 'Parameter \'params\' is not set'); |
| 24 | + |
| 25 | + // Make all the requests concurrently |
| 26 | + let tableRead = this.getConsumedCapacityAsync(true, params.TableName, null); |
| 27 | + let tableWrite = this.getConsumedCapacityAsync(false, params.TableName, null); |
| 28 | + |
| 29 | + let gsiReads = (params.GlobalSecondaryIndexes || []) |
| 30 | + .map(gsi => this.getConsumedCapacityAsync(true, params.TableName, gsi.IndexName)); |
| 31 | + |
| 32 | + let gsiWrites = (params.GlobalSecondaryIndexes || []) |
| 33 | + .map(gsi => this.getConsumedCapacityAsync(false, params.TableName, gsi.IndexName)); |
| 34 | + |
| 35 | + // Await on the results |
| 36 | + let tableConsumedRead = await tableRead; |
| 37 | + let tableConsumedWrite = await tableWrite; |
| 38 | + let gsiConsumedReads = await Promise.all(gsiReads); |
| 39 | + let gsiConsumedWrites = await Promise.all(gsiWrites); |
| 40 | + |
| 41 | + // Format results |
| 42 | + let gsis = gsiConsumedReads.map((read, i) => { |
| 43 | + let write = gsiConsumedWrites[i]; |
| 44 | + return { |
| 45 | + // $FlowIgnore: The indexName is not null in this case |
| 46 | + IndexName: read.globalSecondaryIndexName, |
| 47 | + ConsumedThroughput: { |
| 48 | + ReadCapacityUnits: read.value, |
| 49 | + WriteCapacityUnits: write.value |
| 50 | + } |
| 51 | + }; |
| 52 | + }); |
| 53 | + |
| 54 | + return { |
| 55 | + TableName: params.TableName, |
| 56 | + ConsumedThroughput: { |
| 57 | + ReadCapacityUnits: tableConsumedRead.value, |
| 58 | + WriteCapacityUnits: tableConsumedWrite.value |
| 59 | + }, |
| 60 | + GlobalSecondaryIndexes: gsis |
| 61 | + }; |
| 62 | + } catch (ex) { |
| 63 | + warning(JSON.stringify({ |
| 64 | + class: 'CapacityCalculator', |
| 65 | + function: 'describeTableConsumedCapacityAsync', |
| 66 | + params, |
| 67 | + }, null, json.padding)); |
| 68 | + throw ex; |
| 69 | + } finally { |
| 70 | + sw.end(); |
| 71 | + } |
| 72 | + } |
| 73 | + |
| 74 | + async getConsumedCapacityAsync( |
| 75 | + isRead: boolean, tableName: string, globalSecondaryIndexName: ?string) { |
| 76 | + try { |
| 77 | + invariant(typeof isRead !== 'undefined', 'Parameter \'isRead\' is not set'); |
| 78 | + invariant(typeof tableName !== 'undefined', 'Parameter \'tableName\' is not set'); |
| 79 | + invariant(typeof globalSecondaryIndexName !== 'undefined', |
| 80 | + 'Parameter \'globalSecondaryIndexName\' is not set'); |
| 81 | + |
| 82 | + // These values determine how many minutes worth of metrics |
| 83 | + let durationMinutes = 5; |
| 84 | + let periodMinutes = 1; |
| 85 | + |
| 86 | + let EndTime = new Date(); |
| 87 | + let StartTime = new Date(); |
| 88 | + StartTime.setTime(EndTime - (60000 * durationMinutes)); |
| 89 | + let MetricName = isRead ? 'ConsumedReadCapacityUnits' : 'ConsumedWriteCapacityUnits'; |
| 90 | + let Dimensions = this.getDimensions(tableName, globalSecondaryIndexName); |
| 91 | + let params = { |
| 92 | + Namespace: 'AWS/DynamoDB', |
| 93 | + MetricName, |
| 94 | + Dimensions, |
| 95 | + StartTime, |
| 96 | + EndTime, |
| 97 | + Period: (periodMinutes * 60), |
| 98 | + Statistics: [ 'Average' ], |
| 99 | + Unit: 'Count' |
| 100 | + }; |
| 101 | + |
| 102 | + let statistics = await this._cw.getMetricStatisticsAsync(params); |
| 103 | + let value = this.getProjectedValue(statistics); |
| 104 | + return { |
| 105 | + tableName, |
| 106 | + globalSecondaryIndexName, |
| 107 | + value |
| 108 | + }; |
| 109 | + } catch (ex) { |
| 110 | + warning(JSON.stringify({ |
| 111 | + class: 'CapacityCalculator', |
| 112 | + function: 'getConsumedCapacityAsync', |
| 113 | + isRead, tableName, globalSecondaryIndexName, |
| 114 | + }, null, json.padding)); |
| 115 | + throw ex; |
| 116 | + } |
| 117 | + } |
| 118 | + |
| 119 | + getProjectedValue(data: GetMetricStatisticsResponse) { |
| 120 | + if (data.Datapoints.length === 0) { |
| 121 | + return 0; |
| 122 | + } |
| 123 | + |
| 124 | + // Default algorithm for projecting a good value for the current ConsumedThroughput is: |
| 125 | + // 1. Query 5 average readings each spanning a minute |
| 126 | + // 2. Select the Max value from those 5 readings |
| 127 | + let averages = data.Datapoints.map(dp => dp.Average); |
| 128 | + let value = Math.max(...averages); |
| 129 | + return value; |
| 130 | + } |
| 131 | + |
| 132 | + getDimensions(tableName: string, globalSecondaryIndexName: ?string): Dimension[] { |
| 133 | + if (globalSecondaryIndexName) { |
| 134 | + return [ |
| 135 | + { Name: 'TableName', Value: tableName}, |
| 136 | + { Name: 'GlobalSecondaryIndex', Value: globalSecondaryIndexName} |
| 137 | + ]; |
| 138 | + } |
| 139 | + |
| 140 | + return [ { Name: 'TableName', Value: tableName} ]; |
| 141 | + } |
| 142 | +} |
0 commit comments