Skip to content

Support historical and delay logs import #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: mainline
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion functions/createPartitions.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@ const database = process.env.DATABASE;

// creates partitions for the hour after the current hour
exports.handler = async (event, context, callback) => {
var nextHour = new Date(Date.now() + 60 * 60 * 1000);

// format of dth is '2021-01-01T00'
var input = 'dth' in event ? `${event.dth}:00:00Z` : Date.now() + 60 * 60 * 1000;
var nextHour = new Date(input);
if (isNaN(nextHour))
throw new Error('invalid dth')

var year = nextHour.getUTCFullYear();
var month = (nextHour.getUTCMonth() + 1).toString().padStart(2, '0');
var day = nextHour.getUTCDate().toString().padStart(2, '0');
Expand Down
50 changes: 50 additions & 0 deletions functions/transformMissingGzDataDaily.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
const util = require('./util');

// AWS Glue Data Catalog database and tables
const sourceTable = process.env.SOURCE_TABLE;
const targetTable = process.env.TARGET_TABLE;
const database = process.env.DATABASE;

async function insertMissingGzData(database, sourceTable, targetTable, year, month, day) {

const insertStatement = `
-- Insert missing Gzip Data on ${year}-${month}-${day}
INSERT INTO ${database}.${targetTable}
WITH gz AS (
SELECT *
FROM ${database}.${sourceTable}
WHERE year = '${year}' AND month = '${month}' AND day = '${day}'
), parquet AS (
SELECT concat(year, '-', month, '-', day, 'T', hour) dth, request_id
FROM ${database}.${targetTable}
WHERE year = '${year}' AND month = '${month}' AND day = '${day}'
)
SELECT
gz.*
FROM gz LEFT JOIN parquet
ON concat(gz.year, '-', gz.month, '-', gz.day, 'T', gz.hour) = parquet.dth
AND gz.request_id = parquet.request_id
WHERE parquet.request_id IS NULL`;

await util.runQuery(insertStatement);
}

// get the partitions of yesterday or use `dt` in event
exports.handler = async (event, context, callback) => {
if ( 'dt' in event ) {
var yesterday = new Date(`${event.dt}T00:00:00Z`)
if (isNaN(yesterday))
throw new Error('invalid dt')
} else {
var yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
}

const year = yesterday.getUTCFullYear();
const month = (yesterday.getUTCMonth() + 1).toString().padStart(2, '0');
const day = yesterday.getUTCDate().toString().padStart(2, '0');

console.log('Insert Missing Data in Gzip Files on ', { year, month, day });

await insertMissingGzData(database, sourceTable, targetTable, year, month, day);
}
51 changes: 50 additions & 1 deletion template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,55 @@ Parameters:
Prefix of parquet files that are created in Apache Hive
like style by the CTAS query. Including the trailing slash.
Resources:
TransformMissingGzDataDailyFn:
Type: AWS::Serverless::Function
Properties:
CodeUri: functions/
Handler: transformMissingGzDataDaily.handler
Runtime: nodejs12.x
Timeout: 900
Policies:
- Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- athena:StartQueryExecution
- athena:GetQueryExecution
Resource: '*'
- Effect: Allow
Action:
- s3:ListBucket
- s3:GetBucketLocation
Resource: !Sub "arn:${AWS::Partition}:s3:::${ResourcePrefix}-${AWS::AccountId}-cf-access-logs"
- Effect: Allow
Action:
- s3:PutObject
- s3:GetObject
Resource: !Sub "arn:${AWS::Partition}:s3:::${ResourcePrefix}-${AWS::AccountId}-cf-access-logs/*"
- Effect: Allow
Action:
- glue:CreatePartition
- glue:GetDatabase
- glue:GetTable
- glue:BatchCreatePartition
- glue:GetPartition
- glue:GetPartitions
- glue:CreateTable
- glue:DeleteTable
- glue:DeletePartition
Resource: '*'
Environment:
Variables:
SOURCE_TABLE: !Ref PartitionedGzTable
TARGET_TABLE: !Ref PartitionedParquetTable
DATABASE: !Ref CfLogsDatabase
ATHENA_QUERY_RESULTS_LOCATION: !Sub "s3://${ResourcePrefix}-${AWS::AccountId}-cf-access-logs/athena-query-results"
Events:
HourlyEvt:
Type: Schedule
Properties:
Schedule: cron(10 6 * * ? *)

TransformPartFn:
Type: AWS::Serverless::Function
Properties:
Expand Down Expand Up @@ -512,4 +561,4 @@ Resources:
- { database: !Ref CfLogsDatabase,
partitioned_gz_table: !Ref PartitionedGzTable,
partitioned_parquet_table: !Ref PartitionedParquetTable }
- ' */'
- ' */'