Skip to content

Commit f879fc1

Browse files
parquet preparation
1 parent ba7bed1 commit f879fc1

File tree

8 files changed

+463
-129
lines changed

8 files changed

+463
-129
lines changed

README.md

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,40 @@
22

33
![overview](./diagrams/overview.drawio.svg)
44

5-
## deploy
5+
## deploy options
6+
7+
The [config file](./cdk/bin/config.ts) controls the deplyoement options.
8+
9+
### Firehose
10+
11+
The formats `JSON` and `Parquet` can be choosen
12+
13+
```typescript
14+
export const config: Config = {
15+
...
16+
kinesisFormat: 'JSON',
17+
...
18+
}
19+
```
20+
21+
```typescript
22+
export const config: Config = {
23+
...
24+
kinesisFormat: 'PARQUET',
25+
...
26+
}
27+
```
28+
29+
30+
### Quicksight
31+
32+
```typescript
33+
export const config: Config = {
34+
...
35+
isQuicksight: true,
36+
...
37+
}
38+
```
639

740
`cd cdk`
841

cdk/bin/config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
export interface Config {
22
isQuicksight: boolean;
3+
kinesisFormat: 'JSON' | 'PARQUET';
34
}
45

56
export const config: Config = {
67
isQuicksight: false,
8+
kinesisFormat: 'PARQUET',
79
}

cdk/lib/cdk-stack.ts

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,9 @@ import {
1111
aws_iam as iam,
1212
aws_glue as glue,
1313
aws_athena as athena,
14-
aws_logs as logs,
1514
StackProps,
1615
} from 'aws-cdk-lib'
1716

18-
import { LambdaFunctionProcessor as LambdaFunctionProcessorAlpha, DeliveryStream as DeliveryStreamAlpha } from '@aws-cdk/aws-kinesisfirehose-alpha'
19-
import * as destinationsAlpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha'
2017
import * as glueAlpha from '@aws-cdk/aws-glue-alpha'
2118
import { CfnDeliveryStream } from 'aws-cdk-lib/aws-kinesisfirehose'
2219
import { SavedQueries } from './saved-queries/saved-queries'
@@ -25,6 +22,9 @@ import { QuicksightRole } from './quicksight/quicksight-role'
2522
import { DdbExport } from './ddb-export/ddb-export'
2623
import { DdbExportStepFunction } from './ddb-export/ddb-export-step-function'
2724
import { Config } from '../bin/config'
25+
import { FirehoseJson } from './firehose/firehose-json'
26+
import { FirehoseParquet } from './firehose/firehose-parquet'
27+
import { th } from '@faker-js/faker'
2828

2929

3030
export interface CdkStackProps extends StackProps {
@@ -79,31 +79,48 @@ export class CdkStack extends Stack {
7979
autoDeleteObjects: true,
8080
})
8181

82-
const processor = new lambda.NodejsFunction(this, 'lambda-function-processor', {
83-
functionName: `${name}-firehose-converter`,
84-
timeout: Duration.minutes(2),
85-
bundling: {
86-
sourceMap: true,
82+
const glueSecurityConfiguration = new glueAlpha.SecurityConfiguration(this, 'glue security options', {
83+
securityConfigurationName: `${name}-security-options`,
84+
s3Encryption: {
85+
mode: glueAlpha.S3EncryptionMode.KMS,
86+
kmsKey: kmsKey,
8787
},
8888
})
8989

90-
const lambdaProcessor = new LambdaFunctionProcessorAlpha(processor, {
91-
retries: 5,
90+
const glueDb = new glueAlpha.Database(this, 'glue db', {
91+
databaseName: `${name}-db`,
9292
})
9393

94-
const ddbChangesPrefix = 'ddb-changes';
94+
let ddbChangesPrefix;
95+
96+
switch (props?.config.kinesisFormat) {
97+
case 'JSON':
98+
ddbChangesPrefix = 'ddb-changes-json';
99+
new FirehoseJson(this, 'firehoseJson', {
100+
name: name,
101+
kmsKey: kmsKey,
102+
firehoseBucket: firehoseBucket,
103+
ddbChangesPrefix: ddbChangesPrefix,
104+
stream: stream,
105+
})
106+
break;
107+
case 'PARQUET':
108+
ddbChangesPrefix = 'ddb-changes-parquet';
109+
new FirehoseParquet(this, 'firehoseJson', {
110+
name: name,
111+
kmsKey: kmsKey,
112+
firehoseBucket: firehoseBucket,
113+
ddbChangesPrefix: ddbChangesPrefix,
114+
stream: stream,
115+
glueSecurityConfiguration: glueSecurityConfiguration,
116+
glueDb: glueDb,
117+
table: table,
118+
})
119+
break;
120+
default: throw new Error('kinesisFormat not supported');
121+
}
122+
95123

96-
// json format
97-
const s3Destination = new destinationsAlpha.S3Bucket(firehoseBucket, {
98-
encryptionKey: kmsKey,
99-
bufferingInterval: Duration.seconds(60),
100-
processor: lambdaProcessor,
101-
dataOutputPrefix: `${ddbChangesPrefix}/`,
102-
logGroup: new logs.LogGroup(this, 'firehose-s3-log-group', {
103-
logGroupName: `${name}-firehose-s3-log-group`,
104-
removalPolicy: RemovalPolicy.DESTROY,
105-
}),
106-
})
107124

108125
// parquet format
109126
// const s3Destination = new destinationsAlpha.S3Bucket(firehoseBucket, {
@@ -113,11 +130,7 @@ export class CdkStack extends Stack {
113130
// bufferingSize: Size.mebibytes(64),
114131
// });
115132

116-
const firehoseDeliveryStream = new DeliveryStreamAlpha(this, 'Delivery Stream', {
117-
deliveryStreamName: `${name}-firehose`,
118-
sourceStream: stream,
119-
destinations: [s3Destination],
120-
})
133+
121134

122135
// // https://5k-team.trilogy.com/hc/en-us/articles/360015651640-Configuring-Firehose-with-CDK
123136
// // https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisfirehose-deliverystream.html
@@ -161,19 +174,6 @@ export class CdkStack extends Stack {
161174
assumedBy: new iam.ServicePrincipal('glue.amazonaws.com'),
162175
})
163176

164-
165-
const glueDb = new glueAlpha.Database(this, 'glue db', {
166-
databaseName: `${name}-db`,
167-
})
168-
169-
const glueSecurityOptions = new glueAlpha.SecurityConfiguration(this, 'glue security options', {
170-
securityConfigurationName: `${name}-security-options`,
171-
s3Encryption: {
172-
mode: glueAlpha.S3EncryptionMode.KMS,
173-
kmsKey: kmsKey,
174-
},
175-
})
176-
177177
const crawler = new glue.CfnCrawler(this, 'crawler', {
178178
name: `${name}-crawler`,
179179
role: roleCrawler.roleArn,
@@ -185,7 +185,7 @@ export class CdkStack extends Stack {
185185
],
186186
},
187187
databaseName: glueDb.databaseName,
188-
crawlerSecurityConfiguration: glueSecurityOptions.securityConfigurationName,
188+
crawlerSecurityConfiguration: glueSecurityConfiguration.securityConfigurationName,
189189
configuration: JSON.stringify({
190190
Version: 1.0,
191191
Grouping: { TableGroupingPolicy: 'CombineCompatibleSchemas' },
@@ -225,7 +225,7 @@ export class CdkStack extends Stack {
225225
actions: ['glue:GetSecurityConfiguration'],
226226
})
227227
)
228-
glueSecurityOptions.node.addDependency(roleCrawler)
228+
glueSecurityConfiguration.node.addDependency(roleCrawler)
229229
// crawler.node.addDependency(roleCrawler)
230230

231231
const athenaWorkgroup = new athena.CfnWorkGroup(this, 'analytics-athena-workgroup', {
@@ -251,11 +251,18 @@ export class CdkStack extends Stack {
251251
savedQueries.node.addDependency(athenaWorkgroup)
252252

253253
if (props?.config.isQuicksight) {
254+
let quicksightUsername;
255+
if (process.env.QUICKSIGHT_USERNAME) {
256+
quicksightUsername = process.env.QUICKSIGHT_USERNAME;
257+
} else {
258+
throw new Error('QUICKSIGHT_USERNAME environment variable is required');
259+
}
254260

255261
new Quicksight(this, 'quicksight', {
256262
bucket: firehoseBucket,
257263
name: name,
258264
prefix: ddbChangesPrefix,
265+
quicksightUsername: quicksightUsername,
259266
})
260267

261268
new QuicksightRole(this, 'quicksight-role', {

cdk/lib/firehose/firehose-json.ts

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import { Construct } from 'constructs'
2+
import * as destinationsAlpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha'
3+
import {
4+
Duration,
5+
aws_kms as kms,
6+
aws_s3 as s3,
7+
aws_lambda_nodejs as lambda,
8+
aws_kinesis as kinesis,
9+
aws_logs as logs,
10+
RemovalPolicy,
11+
} from 'aws-cdk-lib';
12+
import {
13+
LambdaFunctionProcessor as LambdaFunctionProcessorAlpha,
14+
DeliveryStream as DeliveryStreamAlpha
15+
} from '@aws-cdk/aws-kinesisfirehose-alpha'
16+
17+
export interface FirehoseJsonProps {
18+
name: string
19+
kmsKey: kms.IKey
20+
firehoseBucket: s3.IBucket
21+
ddbChangesPrefix: string
22+
stream: kinesis.Stream
23+
}
24+
25+
export class FirehoseJson extends Construct {
26+
constructor(scope: Construct, id: string, props: FirehoseJsonProps) {
27+
super(scope, id)
28+
29+
const { kmsKey, firehoseBucket, name, ddbChangesPrefix, stream } = props
30+
31+
const processor = new lambda.NodejsFunction(this, 'lambda-function-processor', {
32+
functionName: `${name}-firehose-json-converter`,
33+
timeout: Duration.minutes(2),
34+
bundling: {
35+
sourceMap: true,
36+
},
37+
})
38+
39+
const lambdaProcessor = new LambdaFunctionProcessorAlpha(processor, {
40+
retries: 5,
41+
})
42+
43+
const s3Destination = new destinationsAlpha.S3Bucket(firehoseBucket, {
44+
encryptionKey: kmsKey,
45+
bufferingInterval: Duration.seconds(60),
46+
processor: lambdaProcessor,
47+
dataOutputPrefix: `${ddbChangesPrefix}/`,
48+
logGroup: new logs.LogGroup(this, 'firehose--json-s3-log-group', {
49+
logGroupName: `${name}-firehose-json-s3-log-group`,
50+
removalPolicy: RemovalPolicy.DESTROY,
51+
}),
52+
})
53+
54+
new DeliveryStreamAlpha(this, 'Delivery Stream', {
55+
deliveryStreamName: `${name}-firehose-json`,
56+
sourceStream: stream,
57+
destinations: [s3Destination],
58+
})
59+
60+
}
61+
}

cdk/lib/firehose/firehose-parquet.ts

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import { Construct } from 'constructs'
2+
import * as destinationsAlpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha'
3+
import {
4+
Duration,
5+
aws_kms as kms,
6+
aws_iam as iam,
7+
aws_s3 as s3,
8+
aws_dynamodb as dynamodb,
9+
aws_glue as glue,
10+
aws_kinesis as kinesis,
11+
aws_logs as logs,
12+
RemovalPolicy,
13+
Stack,
14+
} from 'aws-cdk-lib';
15+
import {
16+
LambdaFunctionProcessor as LambdaFunctionProcessorAlpha,
17+
DeliveryStream as DeliveryStreamAlpha
18+
} from '@aws-cdk/aws-kinesisfirehose-alpha'
19+
import * as glueAlpha from '@aws-cdk/aws-glue-alpha'
20+
21+
22+
export interface FirehoseParquetProps {
23+
name: string
24+
kmsKey: kms.IKey
25+
firehoseBucket: s3.IBucket
26+
ddbChangesPrefix: string
27+
stream: kinesis.Stream
28+
glueSecurityConfiguration: glueAlpha.SecurityConfiguration
29+
glueDb: glueAlpha.Database
30+
table: dynamodb.ITable
31+
}
32+
33+
export class FirehoseParquet extends Construct {
34+
constructor(scope: Construct, id: string, props: FirehoseParquetProps) {
35+
super(scope, id)
36+
37+
const { kmsKey, firehoseBucket, name, ddbChangesPrefix, stream, glueSecurityConfiguration, glueDb, table } = props
38+
const roleName = `${name}-crawler-ddb-role`;
39+
const roleCrawlerddb = new iam.Role(this, 'roleCrawlerDdb', {
40+
roleName: roleName,
41+
assumedBy: new iam.ServicePrincipal('glue.amazonaws.com'),
42+
})
43+
44+
const crawlerName = `${name}-ddb-crawler`;
45+
const crawler = new glue.CfnCrawler(this, 'crawler-ddb', {
46+
name: crawlerName,
47+
role: roleCrawlerddb.roleArn,
48+
targets: {
49+
dynamoDbTargets: [
50+
{
51+
path: table.tableName,
52+
},
53+
],
54+
},
55+
databaseName: glueDb.databaseName,
56+
crawlerSecurityConfiguration: glueSecurityConfiguration.securityConfigurationName,
57+
configuration: JSON.stringify({
58+
Version: 1.0,
59+
Grouping: { TableGroupingPolicy: 'CombineCompatibleSchemas' },
60+
CrawlerOutput: {
61+
Partitions: { AddOrUpdateBehavior: 'InheritFromTable' },
62+
},
63+
}),
64+
})
65+
66+
const glueCrawlerLogArn = `arn:aws:logs:${Stack.of(this).region}:${Stack.of(this).account}:log-group:/aws-glue/crawlers-role/${roleName}-*:log-stream:${crawlerName}`;
67+
68+
const glueTableArn = `arn:aws:glue:${Stack.of(this).region}:${Stack.of(this).account}:table/${glueDb.databaseName}/*`
69+
70+
const glueCrawlerArn = `arn:aws:glue:${Stack.of(this).region}:${Stack.of(this).account}:crawler/${crawler.name}`
71+
roleCrawlerddb.addToPolicy(
72+
new iam.PolicyStatement({
73+
resources: [
74+
glueCrawlerLogArn,
75+
glueTableArn,
76+
glueDb.catalogArn,
77+
glueDb.databaseArn,
78+
kmsKey.keyArn,
79+
firehoseBucket.bucketArn,
80+
`${firehoseBucket.bucketArn}/*`,
81+
glueCrawlerArn,
82+
table.tableArn,
83+
],
84+
actions: [
85+
'logs:*',
86+
'glue:*',
87+
'kms:Decrypt',
88+
'S3:*',
89+
'dynamodb:DescribeTable',
90+
],
91+
})
92+
)
93+
roleCrawlerddb.addToPolicy(
94+
new iam.PolicyStatement({
95+
resources: ['*'],
96+
actions: ['glue:GetSecurityConfiguration'],
97+
})
98+
)
99+
glueSecurityConfiguration.node.addDependency(roleCrawlerddb)
100+
101+
102+
const s3Destination = new destinationsAlpha.S3Bucket(firehoseBucket, {
103+
encryptionKey: kmsKey,
104+
bufferingInterval: Duration.seconds(60),
105+
dataOutputPrefix: `${ddbChangesPrefix}/`,
106+
logGroup: new logs.LogGroup(this, 'firehose--parquet-s3-log-group', {
107+
logGroupName: `${name}-firehose-parquet-s3-log-group`,
108+
removalPolicy: RemovalPolicy.DESTROY,
109+
}),
110+
})
111+
112+
new DeliveryStreamAlpha(this, 'Delivery Stream', {
113+
deliveryStreamName: `${name}-firehose-parquet`,
114+
sourceStream: stream,
115+
destinations: [s3Destination],
116+
})
117+
118+
}
119+
}

0 commit comments

Comments
 (0)