diff --git a/sdlf-utils/data_lake_testing_pipeline/.Rhistory b/sdlf-utils/data_lake_testing_pipeline/.Rhistory new file mode 100644 index 00000000..e69de29b diff --git a/sdlf-utils/data_lake_testing_pipeline/README.md b/sdlf-utils/data_lake_testing_pipeline/README.md new file mode 100644 index 00000000..3ba10ebe --- /dev/null +++ b/sdlf-utils/data_lake_testing_pipeline/README.md @@ -0,0 +1,22 @@ +# data_lake_testing_pipeline +Testing a data lake + +**Installation** +Create a dynamodb table 'datalake-test-config'. Load the src/datalake-test-config.csv. (Add your tests in this csv) +Install the step functions src/state_machines/\*.json, and the lambda functions src/lambdas/\*.py. You will need to ensure that appropriate IAM roles and permissions are set up. + +**Execute** +The src/state_machines/DataLakeTestController.json is the controlling step function. Trigger this via any method such as: +- You can simply plug this in to your serverless data lake framework if you like, or +- schedule it to execute in cloudwatch, or +- trigger it via AWS Config, or +- Add it as a stage in your code pipeline. + +**Architecture** +The high-level architecture of the data lake testing framework is show in this diagram. + +![Architecture](docs/DataLakeTestingArchitecture.jpg) + +**Public Refereces** +https://www.allthingsdistributed.com/2020/11/how-the-seahawks-are-using-an-aws-data-lake-to-improve-their-game.html +https://www.forbes.com/sites/amazonwebservices/2020/11/30/how-the-seahawks-are-using-a-data-lake-to-improve-their-game/?sh=790ff357b7b3 diff --git a/sdlf-utils/data_lake_testing_pipeline/docs/DataLakeTestingArchitecture.jpg b/sdlf-utils/data_lake_testing_pipeline/docs/DataLakeTestingArchitecture.jpg new file mode 100644 index 00000000..7f3b73e6 Binary files /dev/null and b/sdlf-utils/data_lake_testing_pipeline/docs/DataLakeTestingArchitecture.jpg differ diff --git a/sdlf-utils/data_lake_testing_pipeline/src/lambdas/TestDataLakeGlueJob.py b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/TestDataLakeGlueJob.py new file mode 100644 index 00000000..ad35e5ae --- /dev/null +++ b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/TestDataLakeGlueJob.py @@ -0,0 +1 @@ +print("Hello World") diff --git a/sdlf-utils/data_lake_testing_pipeline/src/lambdas/glue_job_validation.py b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/glue_job_validation.py new file mode 100644 index 00000000..3c667297 --- /dev/null +++ b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/glue_job_validation.py @@ -0,0 +1,12 @@ +import json + +def validate_job(event, context): + # TODO implement + response = {} + print("Glue job validated.") + return { + 'statusCode': 200, + 'body': json.dumps('Hello from Lambda!'), + 'status' : "SUCCEEDED" + } + diff --git a/sdlf-utils/data_lake_testing_pipeline/src/lambdas/nextTest.py b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/nextTest.py new file mode 100644 index 00000000..23e2791a --- /dev/null +++ b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/nextTest.py @@ -0,0 +1,42 @@ +import json +import boto3 +import datetime + +### Step in the testing framework's step function to update end timestamp and +### move the testing to the next test. + +session = boto3.Session(region_name = 'us-west-2') +s3 = session.resource("s3") +ddb = session.client("dynamodb") + +CONFIG_TABLE = 'datalake-test-config' + +def lambda_handler(event, context): + + testid = event.get('test_id') + print("testid:" + str(testid)) + if None == testid: + testid = '0' + else: + first_item = ddb.scan( TableName=CONFIG_TABLE, + ScanFilter = { + 'test_id' : {'AttributeValueList':[{'S':testid}], + 'ComparisonOperator':'EQ'} + } + ) + + if None == first_item or 0 >= len(first_item.get('Items')): + print("No test found") + else: + is_active = item.get('active').get('N') + ## Update endedAt only if test was active + if( "1" == str(is_active)): + item = first_item.get('Items')[0] + item['endedAt'] = {'S':str(datetime.datetime.now())} + ddb.put_item(TableName=CONFIG_TABLE, Item=item) + + ## Increment to next test. + testid = str(int(testid) + 1) + rt = {"test_id": testid} + + return rt diff --git a/sdlf-utils/data_lake_testing_pipeline/src/lambdas/pickTest.py b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/pickTest.py new file mode 100644 index 00000000..cbeff371 --- /dev/null +++ b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/pickTest.py @@ -0,0 +1,48 @@ +import json +import boto3 +import datetime + +### Test picker to pick an active test, identify the ARN/test params and hand +### it to step function for execution. + +session = boto3.Session(region_name = 'us-west-2') +s3 = session.resource("s3") +ddb = session.client("dynamodb") + +CONFIG_TABLE = 'datalake-test-config' + +def lambda_handler(event, context): + + testid = event.get('test_id') + print("testid:" + str(testid)) + if None == testid: + testid = '0' + + first_item = ddb.scan( TableName=CONFIG_TABLE, + ScanFilter = { + 'test_id' : {'AttributeValueList':[{'S':testid}], + 'ComparisonOperator':'EQ'} + } + ) + + ### If no test is found, the testing ends there. + if None == first_item or 0 >= len(first_item.get('Items')): + return { 'job': 'null', 'test_id': testid} + + item = first_item.get('Items')[0] + is_active = item.get('active').get('N') + + if( "1" == str(is_active)): + item['startedAt'] = {'S':str(datetime.datetime.now())} + ddb.put_item(TableName=CONFIG_TABLE, Item=item) + rt = { 'job' : item.get('job').get('S'), + 'TaskExecution': item.get('job_arn').get('S').split("/")[-1], + 'TaskValidation': item.get('validation_lambda_arn').get('S').split(":")[-1], + 'test_id': testid, + 'params': item.get('job_input').get('M') + } + else: + ## If the test is not active, keep moving to the next test. + rt = { 'job' : 'skip', 'test_id' : testid } + + return rt diff --git a/sdlf-utils/data_lake_testing_pipeline/src/lambdas/redshift_utils/README.md b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/redshift_utils/README.md new file mode 100644 index 00000000..1662aff6 --- /dev/null +++ b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/redshift_utils/README.md @@ -0,0 +1,5 @@ +Sample code for testing redshift with the framework. + +redshift_core.py - library functions to connect to redshift with psql. Store credentials in a secret named 'aws-ps-redshift'. Expects a database named 'dev' to pre-exist. +redshift_adhoc.py - unit test to test adhoc redshift queries +redshift_counter.py - unit test to test record count diff --git a/sdlf-utils/data_lake_testing_pipeline/src/lambdas/redshift_utils/redshift_adhoc.py b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/redshift_utils/redshift_adhoc.py new file mode 100644 index 00000000..3883ecd4 --- /dev/null +++ b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/redshift_utils/redshift_adhoc.py @@ -0,0 +1,9 @@ +import redshift_core as rs + +def adhoc_query(event, context): + st=time.time() + db_con = rs.connect_redshift('dev','aws-ps-redshift') + output = db_con.run(event['query']) + print("output:" + str(output)) + ed=time.time() + print("Query {} executed in {} ms.".format(event['query'],((ed-st)*1000))) diff --git a/sdlf-utils/data_lake_testing_pipeline/src/lambdas/redshift_utils/redshift_core.py b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/redshift_utils/redshift_core.py new file mode 100644 index 00000000..13437237 --- /dev/null +++ b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/redshift_utils/redshift_core.py @@ -0,0 +1,30 @@ +import pg8000 ## from package pygresql +import boto3 +import json + +region_name = 'us-west-2' + +## Get the credentials for Redshift cluster from secrets manager. +def get_secret(secret_name): + # Create a Secrets Manager client + session = boto3.session.Session() + client = session.client( + service_name='secretsmanager', + region_name=region_name + ) + + get_secret_value_response = client.get_secret_value( + SecretId=secret_name + ) + return get_secret_value_response + + +def connect_redshift(dbname, secret_name): + secret = get_secret(secret_name) + credentials = json.loads(secret['SecretString']) + conn_obj = pg8000.connect(database=dbname, + host=credentials['host'], + user=credentials['username'], + password=credentials['password'], + port=credentials['port']) + return conn_obj diff --git a/sdlf-utils/data_lake_testing_pipeline/src/lambdas/redshift_utils/redshift_counter.py b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/redshift_utils/redshift_counter.py new file mode 100644 index 00000000..c727c100 --- /dev/null +++ b/sdlf-utils/data_lake_testing_pipeline/src/lambdas/redshift_utils/redshift_counter.py @@ -0,0 +1,52 @@ +import redshift_core as rs +import time +import sys + +## todo: move the json object to s3 instead +def validation_query(): + return [ + { + "query": "select season, \ + count(*) Cnt, \ + count(distinct sg.player_id) Players \ + from pff.nfl_grade_season_grade sg \ + join pff.nfl_players p on p.id=sg.player_id \ + group by Season \ + order by 1;", + "expected":[ + {"low":2000,"high":2020}, + {"low":2000,"high":40000}, + {"low":200,"high":2020} + ] + }, + { + "query": "select count(vp.game_id) from \ + pff.nfl_video_special vp \ + left join pff.nfl_team_play tp on tp.game_id =vp.game_id\ + and tp.play_id =vp.play_id \ + where tp.play_id is null;", + "expected":[ + {"low":10,"high":4000} + ] + } + ] + + +def count_records(event, context): + for i,qry in enumerate(validation_query(), start=1): + started_at = time.time() + db_con = connect_redshift('dev','aws-ps-redshift') + output = db_con.run(qry['query']) + print("output:" + str( output)) + query_result = output[0] + print("query_result:" + str(query_result)) + + for i,o in enumerate(query_result): + print("column {} expected {} at {}".format(o, str(qry['expected'][i]),i)) + if qry['expected'][i]['low'] <= int(o) <= qry['expected'][i]['high']: + pass + else: + return {"validation_test":"failed"} + + return {"validation_test":"succeeded"} + diff --git a/sdlf-utils/data_lake_testing_pipeline/src/sample-datalake-test-config.csv b/sdlf-utils/data_lake_testing_pipeline/src/sample-datalake-test-config.csv new file mode 100644 index 00000000..e1e5a80c --- /dev/null +++ b/sdlf-utils/data_lake_testing_pipeline/src/sample-datalake-test-config.csv @@ -0,0 +1,6 @@ +"test_id (S)","active (N)","endedAt (S)","job (S)","job_arn (S)","job_input (M)","lastRunStatus (S)","platform (M)","startedAt (S)","validation_lambda_arn (S)","validation_rules_location (S)" +"1","2","2020-11-24 07:46:58.879735","glue","arn:aws:glue:us-west-2::job/TestDataLakeGlue","{ }","","{ ""DPU"" : { ""S"" : ""2"" } }","2020-11-23 22:42:48.259908","arn:aws:lambda:us-west-2::function:glueJobValidation","None" +"2","2","2020-11-24 07:46:59.239987","glue","arn:aws:glue:us-west-2::job/TestDataLakeGlue","{ }","","{ ""DPU"" : { ""S"" : ""2"" } }","2020-11-24 06:39:42.603032","arn:aws:lambda:us-west-2::function:glueJobValidation","None" +"3","2","2020-11-24 07:46:59.560120","glue","arn:aws:glue:us-west-2::job/TestDataLakeGlue","{ }","","{ ""DPU"" : { ""S"" : ""5"" } }","2020-11-24 06:40:50.184025","arn:aws:lambda:us-west-2::function:glueJobValidation","None" +"4","1","2020-11-24 07:47:03.190621","emr","j-26C4FG269MBCH","{ }","","{ ""nodes"" : { ""S"" : ""5"" } }","2020-11-24 07:46:59.740828","arn:aws:lambda:us-west-2::function:glueJobValidation","None" +"5","2","2020-11-24 07:47:03.580137","redshift","aws-ps-redshift","{ ""expectedMax"" : { ""N"" : ""1200"" }, ""expectedMin"" : { ""N"" : ""10"" }, ""query"" : { ""S"" : ""select count(*) from access_log;"" } }","","{ ""nodes"" : { ""S"" : ""5"" } }","2020-11-24 07:43:41.499999","arn:aws:lambda:us-west-2::function:aws-ps-redshift-util-counter-query-lambda","None" diff --git a/sdlf-utils/data_lake_testing_pipeline/src/state_machines/DataLakeTestController.json b/sdlf-utils/data_lake_testing_pipeline/src/state_machines/DataLakeTestController.json new file mode 100644 index 00000000..539ffc23 --- /dev/null +++ b/sdlf-utils/data_lake_testing_pipeline/src/state_machines/DataLakeTestController.json @@ -0,0 +1,99 @@ +{ + "Comment": "An example of the Amazon States Language using a choice state.", + "StartAt": "Pick Test", + "States": { + "Pick Test": { + "Type": "Task", + "Resource": "arn:aws:lambda:::function:pickTest", + "Next": "Choice State" + }, + "Next Test": { + "Type": "Task", + "Resource": "arn:aws:lambda:::function:nextTest", + "Next": "Pick Test" + }, + "Choice State": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.job", + "StringEquals": "glue", + "Next": "Glue Step Function" + }, + { + "Variable": "$.job", + "StringEquals": "emr", + "Next": "EMR Step Function" + }, + { + "Variable": "$.job", + "StringEquals": "redshift", + "Next": "Redshift Step Function" + }, + { + "Variable": "$.job", + "StringEquals": "athena", + "Next": "Athena Step Function" + }, + { + "Variable": "$.job", + "StringEquals": "skip", + "Next": "Next Test" + } + ], + "Default": "Default State" + }, + "Glue Step Function": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "Input": { + "JobName.$": "$.TaskExecution", + "TaskValidation.$": "$.TaskValidation" + }, + "StateMachineArn": "arn:aws:states:us-west-2:983401557995:stateMachine:TestingGlueJob" + }, + "ResultPath": "$.output", + "Next": "Next Test" + }, + "Athena Step Function": { + "Type": "Task", + "Resource": "arn:aws:lambda:::function:finishtest", + "Parameters": { + "JobName.$": "$.TaskExecution" + }, + "ResultPath": "$.output", + "Next": "Next Test" + }, + "EMR Step Function": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "Input": { + "ClusterId.$": "$.TaskExecution" + }, + "StateMachineArn": "arn:aws:states:us-west-2:983401557995:stateMachine:TestingEMRJob" + }, + "ResultPath": "$.output", + "Next": "Next Test" + }, + "Redshift Step Function": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "Input": { + "JobName.$": "$.TaskExecution", + "TaskValidation.$": "$.TaskValidation", + "secret.$": "$.TaskExecution", + "params.$": "$.params" + }, + "StateMachineArn": "arn:aws:states:us-west-2:983401557995:stateMachine:TestDataLakeRedshift" + }, + "ResultPath": "$.output", + "Next": "Next Test" + }, + "Default State": { + "Type": "Succeed" + } + } +} diff --git a/sdlf-utils/data_lake_testing_pipeline/src/state_machines/TestDataLakeRedshift.json b/sdlf-utils/data_lake_testing_pipeline/src/state_machines/TestDataLakeRedshift.json new file mode 100644 index 00000000..663f49b3 --- /dev/null +++ b/sdlf-utils/data_lake_testing_pipeline/src/state_machines/TestDataLakeRedshift.json @@ -0,0 +1,43 @@ +{ + "Comment": "A generic template to execute a Lambda function that executes a redshift job", + "StartAt": "Trigger Redshift Job", + "States": { + "Trigger Redshift Job": { + "Type": "Task", + "Resource": "arn:aws:lambda:us-west-2:983401557995:function:aws-ps-redshift-util-adhoc-query-lambda", + "Parameters": { + "secret.$": "$.secret", + "params.$":"$.params" + }, + "Next": "Check For Duplicates" + }, + "Check For Duplicates": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.status", + "StringEquals": "FAILED", + "Next": "Failed" + }, + { + "Variable": "$.status", + "StringEquals": "SUCCEEDED", + "Next": "Complete" + } + ], + "Default": "Failed" + }, + "Failed": { + "Type": "Task", + "Resource": "arn:aws:states:::sns:publish", + "Parameters": { + "TopicArn.$": "$.input.sns_topic_arn", + "Message": "Data Lake Job failed. Check AWS Console for details" + }, + "End": true + }, + "Complete": { + "Type": "Succeed" + } + } +} diff --git a/sdlf-utils/data_lake_testing_pipeline/src/state_machines/TestingEMRJob.json b/sdlf-utils/data_lake_testing_pipeline/src/state_machines/TestingEMRJob.json new file mode 100644 index 00000000..5f17cd8d --- /dev/null +++ b/sdlf-utils/data_lake_testing_pipeline/src/state_machines/TestingEMRJob.json @@ -0,0 +1,42 @@ +{ + "Comment": "A generic template to execute a Lambda function that executes a redshift job", + "StartAt": "Trigger EMR Job", + "States": { + "Trigger EMR Job": { + "Type": "Task", + "Resource": "arn:aws:lambda:us-west-2:983401557995:function:TestDataLakeEMR", + "Parameters": { + "ClusterId.$": "$.ClusterId" + }, + "Next": "Hive Query on Parquet" + }, + "Hive Query on Parquet": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.status", + "StringEquals": "FAILED", + "Next": "Failed" + }, + { + "Variable": "$.status", + "StringEquals": "SUCCEEDED", + "Next": "Complete" + } + ], + "Default": "Failed" + }, + "Failed": { + "Type": "Task", + "Resource": "arn:aws:states:::sns:publish", + "Parameters": { + "TopicArn.$": "$.input.sns_topic_arn", + "Message": "Data Lake Job failed. Check AWS Console for details" + }, + "End": true + }, + "Complete": { + "Type": "Succeed" + } + } +} diff --git a/sdlf-utils/data_lake_testing_pipeline/src/state_machines/TestingGlueJob.json b/sdlf-utils/data_lake_testing_pipeline/src/state_machines/TestingGlueJob.json new file mode 100644 index 00000000..304fd82a --- /dev/null +++ b/sdlf-utils/data_lake_testing_pipeline/src/state_machines/TestingGlueJob.json @@ -0,0 +1,53 @@ +{ + "Comment": "A generic template to execute a Lambda function that executes a glue/redshift/emr job", + "StartAt": "TriggerGlueJob", + "States": { + "TriggerGlueJob": { + "Type": "Task", + "Resource":"arn:aws:states:::glue:startJobRun.sync", + "Parameters":{ + "JobName.$":"$.JobName" + }, + "Next": "ValidateTask", + "ResultPath": null + }, + "ValidateTask": { + "Type": "Task", + "Resource":"arn:aws:states:::lambda:invoke", + "Parameters":{ + "FunctionName.$":"$.TaskValidation" + }, + "Next": "StatusCheck" + }, + "StatusCheck": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.Payload.status", + "StringEquals": "FAILED", + "Next": "FailedNotification" + }, + { + "Variable": "$.Payload.status", + "StringEquals": "SUCCEEDED", + "Next": "Complete" + } + ], + "Default": "FailedNotification" + }, + "FailedNotification": { + "Type": "Task", + "Resource": "arn:aws:states:::sns:publish", + "Parameters": { + "TopicArn.$": "$.input.sns_topic_arn", + "Message": "Data Lake Job failed. Check AWS Console for details" + }, + "Next": "Failed" + }, + "Complete": { + "Type": "Succeed" + }, "Failed": { + "Type": "Succeed" + } + } +}