|
| 1 | +/* @flow */ |
| 2 | +import Provisioner from './Provisioner'; |
| 3 | +import Stats from './utils/Stats'; |
| 4 | +import CostEstimation from './utils/CostEstimation'; |
| 5 | +import Throughput from './utils/Throughput'; |
| 6 | +import CapacityCalculator from './CapacityCalculator'; |
| 7 | +import { json, stats, log, invariant } from './Global'; |
| 8 | +import type { UpdateTableRequest } from 'aws-sdk-promise'; |
| 9 | + |
| 10 | +export default class App { |
| 11 | + _provisioner: Provisioner; |
| 12 | + _capacityCalculator: CapacityCalculator; |
| 13 | + |
| 14 | + constructor() { |
| 15 | + this._provisioner = new Provisioner(); |
| 16 | + this._capacityCalculator = new CapacityCalculator(); |
| 17 | + } |
| 18 | + |
| 19 | + async runAsync(event: any, context: any): Promise { |
| 20 | + invariant(event != null, 'The argument \'event\' was null'); |
| 21 | + invariant(context != null, 'The argument \'context\' was null'); |
| 22 | + |
| 23 | + let sw = stats.timer('Index.handler').start(); |
| 24 | + |
| 25 | + // In local mode the json padding can be overridden |
| 26 | + if (event.json && event.json.padding) { |
| 27 | + json.padding = event.json.padding; |
| 28 | + } |
| 29 | + |
| 30 | + log('Getting table names'); |
| 31 | + let tableNames = await this._provisioner.getTableNamesAsync(); |
| 32 | + |
| 33 | + log('Getting table details'); |
| 34 | + let tableDetails = await this._getTableDetailsAsync(tableNames); |
| 35 | + |
| 36 | + log('Getting required table update requests'); |
| 37 | + let tableUpdateRequests = this._getTableUpdateRequests(tableDetails); |
| 38 | + |
| 39 | + if (tableUpdateRequests.length > 0) { |
| 40 | + log('Updating tables'); |
| 41 | + await this._updateTablesAsync(tableUpdateRequests); |
| 42 | + log('Updated tables'); |
| 43 | + } else { |
| 44 | + log('No table updates required'); |
| 45 | + } |
| 46 | + |
| 47 | + sw.end(); |
| 48 | + this._logMetrics(tableDetails); |
| 49 | + |
| 50 | + // Return an empty response |
| 51 | + let response = null; |
| 52 | + if (context) { |
| 53 | + context.succeed(response); |
| 54 | + } else { |
| 55 | + return response; |
| 56 | + } |
| 57 | + } |
| 58 | + |
| 59 | + async _getTableDetailsAsync(tableNames: string[]): Promise<Object[]> { |
| 60 | + invariant(tableNames instanceof Array, 'The argument \'tableNames\' was not an array'); |
| 61 | + |
| 62 | + let tasks = tableNames.map(name => this._getTableDetailAsync(name)); |
| 63 | + return await Promise.all(tasks); |
| 64 | + } |
| 65 | + |
| 66 | + async _getTableDetailAsync(tableName: string): Promise<Object> { |
| 67 | + invariant(typeof tableName === 'string', 'The argument \'tableName\' was not a string'); |
| 68 | + |
| 69 | + log('Getting table description', tableName); |
| 70 | + let describeTableResponse = await this._provisioner.db |
| 71 | + .describeTableAsync({TableName: tableName}); |
| 72 | + |
| 73 | + let tableDescription = describeTableResponse.Table; |
| 74 | + |
| 75 | + log('Getting table consumed capacity description', tableName); |
| 76 | + let consumedCapacityTableDescription = await this._capacityCalculator |
| 77 | + .describeTableConsumedCapacityAsync(tableDescription, 1); |
| 78 | + |
| 79 | + log('Getting table update request', tableName); |
| 80 | + let tableUpdateRequest = await this._provisioner.getTableUpdateAsync(tableDescription, |
| 81 | + consumedCapacityTableDescription); |
| 82 | + |
| 83 | + // Log the monthlyEstimatedCost |
| 84 | + let totalTableProvisionedThroughput = Throughput |
| 85 | + .getTotalTableProvisionedThroughput(tableDescription); |
| 86 | + |
| 87 | + let monthlyEstimatedCost = CostEstimation |
| 88 | + .getMonthlyEstimatedTableCost(totalTableProvisionedThroughput); |
| 89 | + |
| 90 | + stats |
| 91 | + .counter('DynamoDB.monthlyEstimatedCost') |
| 92 | + .inc(monthlyEstimatedCost); |
| 93 | + |
| 94 | + let result = { |
| 95 | + tableName, |
| 96 | + tableDescription, |
| 97 | + consumedCapacityTableDescription, |
| 98 | + tableUpdateRequest, |
| 99 | + totalTableProvisionedThroughput, |
| 100 | + monthlyEstimatedCost, |
| 101 | + }; |
| 102 | + |
| 103 | + return result; |
| 104 | + } |
| 105 | + |
| 106 | + async _updateTablesAsync(tableUpdateRequests: UpdateTableRequest[]): Promise { |
| 107 | + invariant(tableUpdateRequests instanceof Array, |
| 108 | + 'The argument \'tableUpdateRequests\' was not an array'); |
| 109 | + |
| 110 | + // If we are updating more than 10 tables in a single run |
| 111 | + // then we must wait until each one has been completed to |
| 112 | + // ensure we do not hit the AWS limit of 10 concurrent updates |
| 113 | + let isRateLimitedUpdatingRequired = tableUpdateRequests.length > 10; |
| 114 | + await Promise.all(tableUpdateRequests.map( |
| 115 | + async req => this._updateTableAsync(req, isRateLimitedUpdatingRequired) |
| 116 | + )); |
| 117 | + } |
| 118 | + |
| 119 | + async _updateTableAsync(tableUpdateRequest: UpdateTableRequest, |
| 120 | + isRateLimitedUpdatingRequired: boolean): Promise { |
| 121 | + invariant(tableUpdateRequest != null, 'The argument \'tableUpdateRequest\' was null'); |
| 122 | + invariant(typeof isRateLimitedUpdatingRequired === 'boolean', |
| 123 | + 'The argument \'isRateLimitedUpdatingRequired\' was not a boolean'); |
| 124 | + |
| 125 | + log('Updating table', tableUpdateRequest.TableName); |
| 126 | + await this._provisioner.db |
| 127 | + .updateTableWithRateLimitAsync(tableUpdateRequest, isRateLimitedUpdatingRequired); |
| 128 | + |
| 129 | + log('Updated table', tableUpdateRequest.TableName); |
| 130 | + } |
| 131 | + |
| 132 | + _getTableUpdateRequests(tableDetails: Object[]): UpdateTableRequest[] { |
| 133 | + invariant(tableDetails instanceof Array, |
| 134 | + 'The argument \'tableDetails\' was not an array'); |
| 135 | + |
| 136 | + return tableDetails |
| 137 | + .filter(({tableUpdateRequest}) => { return tableUpdateRequest != null; }) |
| 138 | + .map(({tableUpdateRequest}) => tableUpdateRequest); |
| 139 | + } |
| 140 | + |
| 141 | + _logMetrics(tableDetails: Object[]) { |
| 142 | + invariant(tableDetails instanceof Array, |
| 143 | + 'The argument \'tableDetails\' was not an array'); |
| 144 | + |
| 145 | + // Log stats |
| 146 | + let st = new Stats(stats); |
| 147 | + let stJSON = st.toJSON(); |
| 148 | + st.reset(); |
| 149 | + |
| 150 | + // Log readable info |
| 151 | + let updateRequests = tableDetails.map(i => i.tableUpdateRequest).filter(i => i !== null); |
| 152 | + let totalMonthlyEstimatedCost = tableDetails |
| 153 | + .reduce((prev, curr) => prev + curr.monthlyEstimatedCost, 0); |
| 154 | + let totalProvisionedThroughput = tableDetails.reduce((prev, curr) => { |
| 155 | + return { |
| 156 | + ReadCapacityUnits: prev.ReadCapacityUnits + |
| 157 | + curr.totalTableProvisionedThroughput.ReadCapacityUnits, |
| 158 | + WriteCapacityUnits: prev.WriteCapacityUnits + |
| 159 | + curr.totalTableProvisionedThroughput.WriteCapacityUnits, |
| 160 | + }; |
| 161 | + }, {ReadCapacityUnits: 0, WriteCapacityUnits: 0}); |
| 162 | + |
| 163 | + log(JSON.stringify({ |
| 164 | + 'Index.handler': { |
| 165 | + mean: stJSON['Index.handler'].histogram.mean |
| 166 | + }, |
| 167 | + 'DynamoDB.listTablesAsync': { |
| 168 | + mean: stJSON['DynamoDB.listTablesAsync'].histogram.mean, |
| 169 | + }, |
| 170 | + 'DynamoDB.describeTableAsync': { |
| 171 | + mean: stJSON['DynamoDB.describeTableAsync'].histogram.mean, |
| 172 | + }, |
| 173 | + 'DynamoDB.describeTableConsumedCapacityAsync': { |
| 174 | + mean: stJSON['DynamoDB.describeTableConsumedCapacityAsync'] |
| 175 | + .histogram.mean, |
| 176 | + }, |
| 177 | + 'CloudWatch.getMetricStatisticsAsync': { |
| 178 | + mean: stJSON['CloudWatch.getMetricStatisticsAsync'].histogram.mean, |
| 179 | + }, |
| 180 | + TableUpdates: { |
| 181 | + count: updateRequests.length, |
| 182 | + }, |
| 183 | + TotalProvisionedThroughput: totalProvisionedThroughput, |
| 184 | + TotalMonthlyEstimatedCost: totalMonthlyEstimatedCost, |
| 185 | + }, null, json.padding)); |
| 186 | + } |
| 187 | +} |
0 commit comments