Skip to content

Commit dd2b1f6

Browse files
committed
Add transformMissingGzDataDaily job to handle late data
1 parent 6f2d477 commit dd2b1f6

File tree

2 files changed

+100
-1
lines changed

2 files changed

+100
-1
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
const util = require('./util');
2+
3+
// AWS Glue Data Catalog database and tables
4+
const sourceTable = process.env.SOURCE_TABLE;
5+
const targetTable = process.env.TARGET_TABLE;
6+
const database = process.env.DATABASE;
7+
8+
async function insertMissingGzData(database, sourceTable, targetTable, year, month, day) {
9+
10+
const insertStatement = `
11+
-- Insert missing Gzip Data on ${year}-${month}-${day}
12+
INSERT INTO ${database}.${targetTable}
13+
WITH gz AS (
14+
SELECT *
15+
FROM ${database}.${sourceTable}
16+
WHERE year = '${year}' AND month = '${month}' AND day = '${day}'
17+
), parquet AS (
18+
SELECT concat(year, '-', month, '-', day, 'T', hour) dth, request_id
19+
FROM ${database}.${targetTable}
20+
WHERE year = '${year}' AND month = '${month}' AND day = '${day}'
21+
)
22+
SELECT
23+
gz.*
24+
FROM gz LEFT JOIN parquet
25+
ON concat(gz.year, '-', gz.month, '-', gz.day, 'T', gz.hour) = parquet.dth
26+
AND gz.request_id = parquet.request_id
27+
WHERE parquet.request_id IS NULL`;
28+
29+
await util.runQuery(insertStatement);
30+
}
31+
32+
// get the partitions of yesterday or use `dt` in event
33+
exports.handler = async (event, context, callback) => {
34+
if ( 'dt' in event ) {
35+
var yesterday = new Date(`${event.dt}T00:00:00Z`)
36+
if (isNaN(yesterday))
37+
throw new Error('invalid dt')
38+
} else {
39+
var yesterday = new Date();
40+
yesterday.setDate(yesterday.getDate() - 1);
41+
}
42+
43+
const year = yesterday.getUTCFullYear();
44+
const month = (yesterday.getUTCMonth() + 1).toString().padStart(2, '0');
45+
const day = yesterday.getUTCDate().toString().padStart(2, '0');
46+
47+
console.log('Insert Missing Data in Gzip Files on ', { year, month, day });
48+
49+
await insertMissingGzData(database, sourceTable, targetTable, year, month, day);
50+
}

template.yaml

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,55 @@ Parameters:
4040
Prefix of parquet files that are created in Apache Hive
4141
like style by the CTAS query. Including the trailing slash.
4242
Resources:
43+
TransformMissingGzDataDailyFn:
44+
Type: AWS::Serverless::Function
45+
Properties:
46+
CodeUri: functions/
47+
Handler: transformMissingGzDataDaily.handler
48+
Runtime: nodejs12.x
49+
Timeout: 900
50+
Policies:
51+
- Version: '2012-10-17'
52+
Statement:
53+
- Effect: Allow
54+
Action:
55+
- athena:StartQueryExecution
56+
- athena:GetQueryExecution
57+
Resource: '*'
58+
- Effect: Allow
59+
Action:
60+
- s3:ListBucket
61+
- s3:GetBucketLocation
62+
Resource: !Sub "arn:${AWS::Partition}:s3:::${ResourcePrefix}-${AWS::AccountId}-cf-access-logs"
63+
- Effect: Allow
64+
Action:
65+
- s3:PutObject
66+
- s3:GetObject
67+
Resource: !Sub "arn:${AWS::Partition}:s3:::${ResourcePrefix}-${AWS::AccountId}-cf-access-logs/*"
68+
- Effect: Allow
69+
Action:
70+
- glue:CreatePartition
71+
- glue:GetDatabase
72+
- glue:GetTable
73+
- glue:BatchCreatePartition
74+
- glue:GetPartition
75+
- glue:GetPartitions
76+
- glue:CreateTable
77+
- glue:DeleteTable
78+
- glue:DeletePartition
79+
Resource: '*'
80+
Environment:
81+
Variables:
82+
SOURCE_TABLE: !Ref PartitionedGzTable
83+
TARGET_TABLE: !Ref PartitionedParquetTable
84+
DATABASE: !Ref CfLogsDatabase
85+
ATHENA_QUERY_RESULTS_LOCATION: !Sub "s3://${ResourcePrefix}-${AWS::AccountId}-cf-access-logs/athena-query-results"
86+
Events:
87+
HourlyEvt:
88+
Type: Schedule
89+
Properties:
90+
Schedule: cron(10 6 * * ? *)
91+
4392
TransformPartFn:
4493
Type: AWS::Serverless::Function
4594
Properties:
@@ -512,4 +561,4 @@ Resources:
512561
- { database: !Ref CfLogsDatabase,
513562
partitioned_gz_table: !Ref PartitionedGzTable,
514563
partitioned_parquet_table: !Ref PartitionedParquetTable }
515-
- ' */'
564+
- ' */'

0 commit comments

Comments
 (0)