Skip to content

Commit d04f381

Browse files
prep
1 parent df3cb59 commit d04f381

8 files changed

+311
-77
lines changed

cdk/lib/ddb-export/createTable.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
CREATE EXTERNAL TABLE ddb_exported_table (
1+
CREATE EXTERNAL TABLE table_name (
22
Item struct<pk:struct<S:string>,
33
person:struct<M:struct<
44
jobArea:struct<S:string>,
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import {
2+
AthenaClient,
3+
CreateNamedQueryCommand,
4+
GetNamedQueryCommand,
5+
ListNamedQueriesCommand,
6+
UpdateNamedQueryCommand,
7+
} from '@aws-sdk/client-athena'
8+
9+
const region = process.env.REGION
10+
const athenaWorkgroupName = process.env.ATHENA_WORKGROUP_NAME
11+
const athenaTableName = process.env.ATHENA_TABLE_NAME
12+
const glueDatabaseName = process.env.GLUE_DATABASE_NAME
13+
const athenaQueryStringReadTable = process.env.ATHENA_QUERY_STRING_READ_TABLE
14+
15+
exports.handler = async () => {
16+
17+
await createOrUpdateAthenaSavedQuery('sfn-ddb-export-read-table', athenaQueryStringReadTable!)
18+
19+
}
20+
21+
const createOrUpdateAthenaSavedQuery = async (queryName: string, queryString: string) => {
22+
23+
const client = new AthenaClient({ region: region })
24+
25+
const commandListNamedQueriesCommand = new ListNamedQueriesCommand({
26+
WorkGroup: athenaWorkgroupName,
27+
})
28+
29+
const responseListNamedQueriesCommand = await client.send(commandListNamedQueriesCommand)
30+
console.log('responseListNamedQueriesCommand', responseListNamedQueriesCommand)
31+
let queryId = ''
32+
for (const namedQueryId of responseListNamedQueriesCommand.NamedQueryIds!) {
33+
const commandGetNamedQueryCommand = new GetNamedQueryCommand({
34+
NamedQueryId: namedQueryId,
35+
})
36+
const responseGetNamedQueryCommand = await client.send(commandGetNamedQueryCommand)
37+
console.log('responseGetNamedQueryCommand', responseGetNamedQueryCommand)
38+
if (responseGetNamedQueryCommand.NamedQuery!.Name === queryName) {
39+
queryId = namedQueryId
40+
break
41+
}
42+
}
43+
44+
let updatedQueryString = queryString!.replace('table_name', athenaTableName!)
45+
updatedQueryString = updatedQueryString!.replace('db_name', glueDatabaseName!)
46+
47+
48+
console.log('updatedQueryString', updatedQueryString)
49+
50+
if (queryId === '') {
51+
const commandCreateNamedQueryCommand = new CreateNamedQueryCommand({
52+
Name: queryName,
53+
Database: glueDatabaseName,
54+
Description: 'DynamoDB Export Query',
55+
QueryString: updatedQueryString,
56+
WorkGroup: athenaWorkgroupName,
57+
})
58+
const responseCreateNamedQueryCommand = await client.send(commandCreateNamedQueryCommand)
59+
console.log('responseCreateNamedQueryCommand', responseCreateNamedQueryCommand)
60+
queryId = responseCreateNamedQueryCommand.NamedQueryId!
61+
} else {
62+
const commandUpdateNamedQueryCommand = new UpdateNamedQueryCommand({
63+
NamedQueryId: queryId,
64+
QueryString: updatedQueryString,
65+
Name: queryName,
66+
})
67+
const responseUpdateNamedQueryCommand = await client.send(commandUpdateNamedQueryCommand)
68+
console.log('responseUpdateNamedQueryCommand', responseUpdateNamedQueryCommand)
69+
}
70+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import {
2+
AthenaClient,
3+
StartQueryExecutionCommand,
4+
} from '@aws-sdk/client-athena'
5+
6+
const region = process.env.REGION
7+
const glueDatabaseName = process.env.GLUE_DATABASE_NAME
8+
const athenaWorkgroupName = process.env.ATHENA_WORKGROUP_NAME
9+
const athenaQueryStringCreateTable = process.env.ATHENA_QUERY_STRING_CREATE_TABLE
10+
11+
exports.handler = async (event: any) => {
12+
const exportId = event.exportId;
13+
await createAthenaTable(exportId)
14+
15+
}
16+
17+
const createAthenaTable = async (exportId: string) => {
18+
const client = new AthenaClient({ region: region })
19+
20+
const updatedAthenaQueryStringCreateTable = athenaQueryStringCreateTable!.replace('ddb-export-id', exportId)
21+
22+
const commandStartQueryExecutionCommand = new StartQueryExecutionCommand({
23+
QueryString: updatedAthenaQueryStringCreateTable,
24+
WorkGroup: athenaWorkgroupName,
25+
QueryExecutionContext: {
26+
Database: glueDatabaseName,
27+
},
28+
})
29+
30+
const responseStartQueryExecutionCommand = await client.send(commandStartQueryExecutionCommand)
31+
console.log('responseStartQueryExecutionCommand', responseStartQueryExecutionCommand)
32+
}

cdk/lib/ddb-export/ddb-export-step-function.lambda-function-create-query.ts

Lines changed: 0 additions & 37 deletions
This file was deleted.

cdk/lib/ddb-export/ddb-export-step-function.ts

Lines changed: 160 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,19 @@ export class DdbExportStepFunction extends Construct {
4141
actions: ['dynamodb:ExportTableToPointInTime'],
4242
resources: [props.table.tableArn],
4343
})
44-
);
44+
)
4545
lambdaStartExport.addToRolePolicy(
4646
new iam.PolicyStatement({
4747
actions: ['s3:PutObject'],
4848
resources: [`${props.bucket.bucketArn}/*`],
4949
})
50-
);
50+
)
5151
lambdaStartExport.addToRolePolicy(
5252
new iam.PolicyStatement({
5353
actions: ['kms:Decrypt'],
5454
resources: [props.bucket.encryptionKey!.keyArn],
5555
})
56-
);
56+
)
5757

5858
const lambdaCheckExportState = new lambdaNodejs.NodejsFunction(this, 'lambda-function-check-export-state', {
5959
functionName: `${props.name}-ddb-check-export-state`,
@@ -87,15 +87,136 @@ export class DdbExportStepFunction extends Construct {
8787
errors: ['InProgressError'],
8888
})
8989

90-
const athenaDropTable = new stepfunctions_tasks.AthenaStartQueryExecution(this, 'drop table', {
91-
queryString: `DROP TABLE IF EXISTS \`${props.glueDb.databaseName}.ddb_exported_table\`;`,
90+
const athenaTableName = 'ddb_exported_table'
91+
92+
const athenaDropTable = new stepfunctions_tasks.AthenaStartQueryExecution(this, 'drop Athena table', {
93+
queryString: `DROP TABLE IF EXISTS \`${props.glueDb.databaseName}.${athenaTableName}\`;`,
9294
workGroup: props.athenaWorkgroup.name,
93-
});
95+
queryExecutionContext: {
96+
databaseName: props.glueDb.databaseName,
97+
},
98+
integrationPattern: stepfunctions.IntegrationPattern.RUN_JOB,
99+
resultPath: '$.TaskResult',
100+
})
101+
102+
const getSqlString = (file: string): string => {
103+
let createTableCommand = readFileSync(join(__dirname, `${file}`), 'utf-8').toString()
104+
const s3Location = `s3://${props.bucket.bucketName}/ddb-exports/AWSDynamoDB/ddb-export-id/data/`
105+
createTableCommand = createTableCommand.replace(/s3Location/g, s3Location)
106+
createTableCommand = createTableCommand.replace(/table_name/g, athenaTableName)
107+
return createTableCommand
108+
}
109+
110+
const queryStringCreateTable = getSqlString('createTable.sql')
94111

95-
const athenaCreateTable = new stepfunctions.Pass(this, 'create table', {});
96-
const sfnTaskCreateSelectQuery = new stepfunctions.Pass(this, 'create SELECT query', {});
112+
// const athenaCreateTable = new stepfunctions_tasks.AthenaStartQueryExecution(this, 'create table', {
113+
// queryString: queryStringCreateTable,
97114

98-
const definition = sfnTaskStartExport.next(sfnTaskCheckExportState).next(athenaDropTable).next(athenaCreateTable).next(sfnTaskCreateSelectQuery);
115+
// workGroup: props.athenaWorkgroup.name,
116+
// queryExecutionContext: {
117+
// databaseName: props.glueDb.databaseName,
118+
// },
119+
// resultPath: '$.TaskResult',
120+
// })
121+
122+
const lambdaCreateAthenaTable = new lambdaNodejs.NodejsFunction(this, 'lambda-function-create-athena-table', {
123+
functionName: `${props.name}-create-athena-table`,
124+
timeout: Duration.minutes(2),
125+
environment: {
126+
REGION: Stack.of(this).region,
127+
GLUE_DATABASE_NAME: props.glueDb.databaseName,
128+
ATHENA_WORKGROUP_NAME: props.athenaWorkgroup.name,
129+
ATHENA_QUERY_STRING_CREATE_TABLE: queryStringCreateTable,
130+
},
131+
})
132+
lambdaCreateAthenaTable.addToRolePolicy(
133+
new iam.PolicyStatement({
134+
actions: ['athena:StartQueryExecution'],
135+
resources: [`arn:aws:athena:${Stack.of(this).region}:${Stack.of(this).account}:workgroup/${props.athenaWorkgroup.name}`],
136+
})
137+
)
138+
lambdaCreateAthenaTable.addToRolePolicy(
139+
new iam.PolicyStatement({
140+
actions: ['s3:*'],
141+
resources: [`*`],
142+
// actions: ['s3:PutObject'],
143+
// resources: [`${props.bucket.bucketArn}/*`],
144+
})
145+
);
146+
lambdaCreateAthenaTable.addToRolePolicy(
147+
new iam.PolicyStatement({
148+
actions: ['kms:*'],
149+
resources: ['*'],
150+
// actions: ['kms:Decrypt'],
151+
// resources: [props.bucket.encryptionKey!.keyArn],
152+
})
153+
);
154+
lambdaCreateAthenaTable.addToRolePolicy(
155+
new iam.PolicyStatement({
156+
actions: [
157+
'glue:BatchCreatePartition',
158+
'glue:BatchDeletePartition',
159+
'glue:BatchDeleteTable',
160+
'glue:BatchGetPartition',
161+
'glue:CreateDatabase',
162+
'glue:CreatePartition',
163+
'glue:CreateTable',
164+
'glue:DeleteDatabase',
165+
'glue:DeletePartition',
166+
'glue:DeleteTable',
167+
'glue:GetDatabase',
168+
'glue:GetDatabases',
169+
'glue:GetPartition',
170+
'glue:GetPartitions',
171+
'glue:GetTable',
172+
'glue:GetTables',
173+
'glue:UpdateDatabase',
174+
'glue:UpdatePartition',
175+
'glue:UpdateTable'
176+
],
177+
resources: [
178+
`arn:aws:glue:${Stack.of(this).region}:${Stack.of(this).account}:catalog`, // remove?
179+
`arn:aws:glue:${Stack.of(this).region}:${Stack.of(this).account}:database/default`, // remove?
180+
props.glueDb.databaseArn,
181+
`arn:aws:glue:${Stack.of(this).region}:${Stack.of(this).account}:table/${props.glueDb.databaseName}/${athenaTableName}`],
182+
})
183+
);
184+
185+
const sfnTaskCreateAthenaTable = new stepfunctions_tasks.LambdaInvoke(this, 'create Athena table', {
186+
lambdaFunction: lambdaCreateAthenaTable,
187+
})
188+
189+
const queryStringReadTable = getSqlString('readTable.sql')
190+
191+
const lambdaCreateAthenaQuery = new lambdaNodejs.NodejsFunction(this, 'lambda-function-create-athena-query', {
192+
functionName: `${props.name}-create-athena-query`,
193+
timeout: Duration.minutes(2),
194+
environment: {
195+
REGION: Stack.of(this).region,
196+
ATHENA_WORKGROUP_NAME: props.athenaWorkgroup.name,
197+
ATHENA_TABLE_NAME: athenaTableName,
198+
GLUE_DATABASE_NAME: props.glueDb.databaseName,
199+
ATHENA_QUERY_STRING_READ_TABLE: queryStringReadTable,
200+
},
201+
})
202+
lambdaCreateAthenaQuery.addToRolePolicy(
203+
new iam.PolicyStatement({
204+
actions: ['athena:CreateNamedQuery', 'athena:ListNamedQueries', 'athena:GetNamedQuery', 'athena:UpdateNamedQuery'],
205+
resources: [`arn:aws:athena:${Stack.of(this).region}:${Stack.of(this).account}:workgroup/${props.athenaWorkgroup.name}`],
206+
})
207+
)
208+
lambdaCreateAthenaQuery.addToRolePolicy(
209+
new iam.PolicyStatement({
210+
actions: ['kms:Decrypt', 'kms:GenerateDataKey'],
211+
resources: [props.athenaResultBucket.encryptionKey!.keyArn],
212+
})
213+
)
214+
215+
const sfnTaskCreateAthenaQuery = new stepfunctions_tasks.LambdaInvoke(this, 'create Athena query', {
216+
lambdaFunction: lambdaCreateAthenaQuery,
217+
})
218+
219+
const definition = sfnTaskStartExport.next(sfnTaskCheckExportState).next(athenaDropTable).next(sfnTaskCreateAthenaTable).next(sfnTaskCreateAthenaQuery)
99220

100221
const sfn = new stepfunctions.StateMachine(this, 'ddb-export-state-machine', {
101222
stateMachineName: `${props.name}-ddb-export-state-machine`,
@@ -105,17 +226,40 @@ export class DdbExportStepFunction extends Construct {
105226
sfn.addToRolePolicy(
106227
new iam.PolicyStatement({
107228
actions: ['s3:*'],
108-
resources: [props.athenaResultBucket.bucketArn,
109-
`${props.athenaResultBucket.bucketArn}/*`],
229+
resources: [props.athenaResultBucket.bucketArn, `${props.athenaResultBucket.bucketArn}/*`],
110230
})
111-
);
231+
)
112232
sfn.addToRolePolicy(
113233
new iam.PolicyStatement({
114-
actions: ['kms:Decrypt'],
234+
actions: ['kms:Decrypt', 'kms:GenerateDataKey'],
115235
resources: [props.athenaResultBucket.encryptionKey!.keyArn],
116236
})
117-
);
118-
119-
237+
)
238+
// sfn.addToRolePolicy(
239+
// new iam.PolicyStatement({
240+
// actions: [
241+
// 'glue:BatchCreatePartition',
242+
// 'glue:BatchDeletePartition',
243+
// 'glue:BatchDeleteTable',
244+
// 'glue:BatchGetPartition',
245+
// 'glue:CreateDatabase',
246+
// 'glue:CreatePartition',
247+
// 'glue:CreateTable',
248+
// 'glue:DeleteDatabase',
249+
// 'glue:DeletePartition',
250+
// 'glue:DeleteTable',
251+
// 'glue:GetDatabase',
252+
// 'glue:GetDatabases',
253+
// 'glue:GetPartition',
254+
// 'glue:GetPartitions',
255+
// 'glue:GetTable',
256+
// 'glue:GetTables',
257+
// 'glue:UpdateDatabase',
258+
// 'glue:UpdatePartition',
259+
// 'glue:UpdateTable'
260+
// ],
261+
// resources: [props.glueDb.databaseArn, `arn:aws:glue:${Stack.of(this).region}:${Stack.of(this).account}:table/${props.glueDb.databaseName}/${athenaTableName}`],
262+
// })
263+
// )
120264
}
121265
}

0 commit comments

Comments
 (0)