diff --git a/functions/createPartitions.js b/functions/createPartitions.js index b72c5d9..7f086de 100644 --- a/functions/createPartitions.js +++ b/functions/createPartitions.js @@ -8,7 +8,13 @@ const database = process.env.DATABASE; // creates partitions for the hour after the current hour exports.handler = async (event, context, callback) => { - var nextHour = new Date(Date.now() + 60 * 60 * 1000); + + // format of dth is '2021-01-01T00' + var input = 'dth' in event ? `${event.dth}:00:00Z` : Date.now() + 60 * 60 * 1000; + var nextHour = new Date(input); + if (isNaN(nextHour)) + throw new Error('invalid dth') + var year = nextHour.getUTCFullYear(); var month = (nextHour.getUTCMonth() + 1).toString().padStart(2, '0'); var day = nextHour.getUTCDate().toString().padStart(2, '0'); diff --git a/functions/transformMissingGzDataDaily.js b/functions/transformMissingGzDataDaily.js new file mode 100644 index 0000000..807ba1c --- /dev/null +++ b/functions/transformMissingGzDataDaily.js @@ -0,0 +1,50 @@ +const util = require('./util'); + +// AWS Glue Data Catalog database and tables +const sourceTable = process.env.SOURCE_TABLE; +const targetTable = process.env.TARGET_TABLE; +const database = process.env.DATABASE; + +async function insertMissingGzData(database, sourceTable, targetTable, year, month, day) { + + const insertStatement = ` + -- Insert missing Gzip Data on ${year}-${month}-${day} + INSERT INTO ${database}.${targetTable} + WITH gz AS ( + SELECT * + FROM ${database}.${sourceTable} + WHERE year = '${year}' AND month = '${month}' AND day = '${day}' + ), parquet AS ( + SELECT concat(year, '-', month, '-', day, 'T', hour) dth, request_id + FROM ${database}.${targetTable} + WHERE year = '${year}' AND month = '${month}' AND day = '${day}' + ) + SELECT + gz.* + FROM gz LEFT JOIN parquet + ON concat(gz.year, '-', gz.month, '-', gz.day, 'T', gz.hour) = parquet.dth + AND gz.request_id = parquet.request_id + WHERE parquet.request_id IS NULL`; + + await util.runQuery(insertStatement); +} + +// get the partitions of yesterday or use `dt` in event +exports.handler = async (event, context, callback) => { + if ( 'dt' in event ) { + var yesterday = new Date(`${event.dt}T00:00:00Z`) + if (isNaN(yesterday)) + throw new Error('invalid dt') + } else { + var yesterday = new Date(); + yesterday.setDate(yesterday.getDate() - 1); + } + + const year = yesterday.getUTCFullYear(); + const month = (yesterday.getUTCMonth() + 1).toString().padStart(2, '0'); + const day = yesterday.getUTCDate().toString().padStart(2, '0'); + + console.log('Insert Missing Data in Gzip Files on ', { year, month, day }); + + await insertMissingGzData(database, sourceTable, targetTable, year, month, day); +} diff --git a/template.yaml b/template.yaml index c0d90d0..2baa854 100644 --- a/template.yaml +++ b/template.yaml @@ -40,6 +40,55 @@ Parameters: Prefix of parquet files that are created in Apache Hive like style by the CTAS query. Including the trailing slash. Resources: + TransformMissingGzDataDailyFn: + Type: AWS::Serverless::Function + Properties: + CodeUri: functions/ + Handler: transformMissingGzDataDaily.handler + Runtime: nodejs12.x + Timeout: 900 + Policies: + - Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - athena:StartQueryExecution + - athena:GetQueryExecution + Resource: '*' + - Effect: Allow + Action: + - s3:ListBucket + - s3:GetBucketLocation + Resource: !Sub "arn:${AWS::Partition}:s3:::${ResourcePrefix}-${AWS::AccountId}-cf-access-logs" + - Effect: Allow + Action: + - s3:PutObject + - s3:GetObject + Resource: !Sub "arn:${AWS::Partition}:s3:::${ResourcePrefix}-${AWS::AccountId}-cf-access-logs/*" + - Effect: Allow + Action: + - glue:CreatePartition + - glue:GetDatabase + - glue:GetTable + - glue:BatchCreatePartition + - glue:GetPartition + - glue:GetPartitions + - glue:CreateTable + - glue:DeleteTable + - glue:DeletePartition + Resource: '*' + Environment: + Variables: + SOURCE_TABLE: !Ref PartitionedGzTable + TARGET_TABLE: !Ref PartitionedParquetTable + DATABASE: !Ref CfLogsDatabase + ATHENA_QUERY_RESULTS_LOCATION: !Sub "s3://${ResourcePrefix}-${AWS::AccountId}-cf-access-logs/athena-query-results" + Events: + HourlyEvt: + Type: Schedule + Properties: + Schedule: cron(10 6 * * ? *) + TransformPartFn: Type: AWS::Serverless::Function Properties: @@ -512,4 +561,4 @@ Resources: - { database: !Ref CfLogsDatabase, partitioned_gz_table: !Ref PartitionedGzTable, partitioned_parquet_table: !Ref PartitionedParquetTable } - - ' */' \ No newline at end of file + - ' */'