Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
22 changes: 22 additions & 0 deletions sdlf-utils/data_lake_testing_pipeline/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# data_lake_testing_pipeline
Testing a data lake

**Installation**
Create a dynamodb table 'datalake-test-config'. Load the src/datalake-test-config.csv. (Add your tests in this csv)
Install the step functions src/state_machines/\*.json, and the lambda functions src/lambdas/\*.py. You will need to ensure that appropriate IAM roles and permissions are set up.

**Execute**
The src/state_machines/DataLakeTestController.json is the controlling step function. Trigger this via any method such as:
- You can simply plug this in to your serverless data lake framework if you like, or
- schedule it to execute in cloudwatch, or
- trigger it via AWS Config, or
- Add it as a stage in your code pipeline.

**Architecture**
The high-level architecture of the data lake testing framework is show in this diagram.

![Architecture](docs/DataLakeTestingArchitecture.jpg)

**Public Refereces**
https://www.allthingsdistributed.com/2020/11/how-the-seahawks-are-using-an-aws-data-lake-to-improve-their-game.html
https://www.forbes.com/sites/amazonwebservices/2020/11/30/how-the-seahawks-are-using-a-data-lake-to-improve-their-game/?sh=790ff357b7b3
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
print("Hello World")
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import json

def validate_job(event, context):
# TODO implement
response = {}
print("Glue job validated.")
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!'),
'status' : "SUCCEEDED"
}

42 changes: 42 additions & 0 deletions sdlf-utils/data_lake_testing_pipeline/src/lambdas/nextTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import json
import boto3
import datetime

### Step in the testing framework's step function to update end timestamp and
### move the testing to the next test.

session = boto3.Session(region_name = 'us-west-2')
s3 = session.resource("s3")
ddb = session.client("dynamodb")

CONFIG_TABLE = 'datalake-test-config'

def lambda_handler(event, context):

testid = event.get('test_id')
print("testid:" + str(testid))
if None == testid:
testid = '0'
else:
first_item = ddb.scan( TableName=CONFIG_TABLE,
ScanFilter = {
'test_id' : {'AttributeValueList':[{'S':testid}],
'ComparisonOperator':'EQ'}
}
)

if None == first_item or 0 >= len(first_item.get('Items')):
print("No test found")
else:
is_active = item.get('active').get('N')
## Update endedAt only if test was active
if( "1" == str(is_active)):
item = first_item.get('Items')[0]
item['endedAt'] = {'S':str(datetime.datetime.now())}
ddb.put_item(TableName=CONFIG_TABLE, Item=item)

## Increment to next test.
testid = str(int(testid) + 1)
rt = {"test_id": testid}

return rt
48 changes: 48 additions & 0 deletions sdlf-utils/data_lake_testing_pipeline/src/lambdas/pickTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import json
import boto3
import datetime

### Test picker to pick an active test, identify the ARN/test params and hand
### it to step function for execution.

session = boto3.Session(region_name = 'us-west-2')
s3 = session.resource("s3")
ddb = session.client("dynamodb")

CONFIG_TABLE = 'datalake-test-config'

def lambda_handler(event, context):

testid = event.get('test_id')
print("testid:" + str(testid))
if None == testid:
testid = '0'

first_item = ddb.scan( TableName=CONFIG_TABLE,
ScanFilter = {
'test_id' : {'AttributeValueList':[{'S':testid}],
'ComparisonOperator':'EQ'}
}
)

### If no test is found, the testing ends there.
if None == first_item or 0 >= len(first_item.get('Items')):
return { 'job': 'null', 'test_id': testid}

item = first_item.get('Items')[0]
is_active = item.get('active').get('N')

if( "1" == str(is_active)):
item['startedAt'] = {'S':str(datetime.datetime.now())}
ddb.put_item(TableName=CONFIG_TABLE, Item=item)
rt = { 'job' : item.get('job').get('S'),
'TaskExecution': item.get('job_arn').get('S').split("/")[-1],
'TaskValidation': item.get('validation_lambda_arn').get('S').split(":")[-1],
'test_id': testid,
'params': item.get('job_input').get('M')
}
else:
## If the test is not active, keep moving to the next test.
rt = { 'job' : 'skip', 'test_id' : testid }

return rt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Sample code for testing redshift with the framework.

redshift_core.py - library functions to connect to redshift with psql. Store credentials in a secret named 'aws-ps-redshift'. Expects a database named 'dev' to pre-exist.
redshift_adhoc.py - unit test to test adhoc redshift queries
redshift_counter.py - unit test to test record count
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import redshift_core as rs

def adhoc_query(event, context):
st=time.time()
db_con = rs.connect_redshift('dev','aws-ps-redshift')
output = db_con.run(event['query'])
print("output:" + str(output))
ed=time.time()
print("Query {} executed in {} ms.".format(event['query'],((ed-st)*1000)))
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pg8000 ## from package pygresql
import boto3
import json

region_name = 'us-west-2'

## Get the credentials for Redshift cluster from secrets manager.
def get_secret(secret_name):
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)

get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
return get_secret_value_response


def connect_redshift(dbname, secret_name):
secret = get_secret(secret_name)
credentials = json.loads(secret['SecretString'])
conn_obj = pg8000.connect(database=dbname,
host=credentials['host'],
user=credentials['username'],
password=credentials['password'],
port=credentials['port'])
return conn_obj
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import redshift_core as rs
import time
import sys

## todo: move the json object to s3 instead
def validation_query():
return [
{
"query": "select season, \
count(*) Cnt, \
count(distinct sg.player_id) Players \
from pff.nfl_grade_season_grade sg \
join pff.nfl_players p on p.id=sg.player_id \
group by Season \
order by 1;",
"expected":[
{"low":2000,"high":2020},
{"low":2000,"high":40000},
{"low":200,"high":2020}
]
},
{
"query": "select count(vp.game_id) from \
pff.nfl_video_special vp \
left join pff.nfl_team_play tp on tp.game_id =vp.game_id\
and tp.play_id =vp.play_id \
where tp.play_id is null;",
"expected":[
{"low":10,"high":4000}
]
}
]


def count_records(event, context):
for i,qry in enumerate(validation_query(), start=1):
started_at = time.time()
db_con = connect_redshift('dev','aws-ps-redshift')
output = db_con.run(qry['query'])
print("output:" + str( output))
query_result = output[0]
print("query_result:" + str(query_result))

for i,o in enumerate(query_result):
print("column {} expected {} at {}".format(o, str(qry['expected'][i]),i))
if qry['expected'][i]['low'] <= int(o) <= qry['expected'][i]['high']:
pass
else:
return {"validation_test":"failed"}

return {"validation_test":"succeeded"}

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"test_id (S)","active (N)","endedAt (S)","job (S)","job_arn (S)","job_input (M)","lastRunStatus (S)","platform (M)","startedAt (S)","validation_lambda_arn (S)","validation_rules_location (S)"
"1","2","2020-11-24 07:46:58.879735","glue","arn:aws:glue:us-west-2::job/TestDataLakeGlue","{ }","","{ ""DPU"" : { ""S"" : ""2"" } }","2020-11-23 22:42:48.259908","arn:aws:lambda:us-west-2::function:glueJobValidation","None"
"2","2","2020-11-24 07:46:59.239987","glue","arn:aws:glue:us-west-2::job/TestDataLakeGlue","{ }","","{ ""DPU"" : { ""S"" : ""2"" } }","2020-11-24 06:39:42.603032","arn:aws:lambda:us-west-2::function:glueJobValidation","None"
"3","2","2020-11-24 07:46:59.560120","glue","arn:aws:glue:us-west-2::job/TestDataLakeGlue","{ }","","{ ""DPU"" : { ""S"" : ""5"" } }","2020-11-24 06:40:50.184025","arn:aws:lambda:us-west-2::function:glueJobValidation","None"
"4","1","2020-11-24 07:47:03.190621","emr","j-26C4FG269MBCH","{ }","","{ ""nodes"" : { ""S"" : ""5"" } }","2020-11-24 07:46:59.740828","arn:aws:lambda:us-west-2::function:glueJobValidation","None"
"5","2","2020-11-24 07:47:03.580137","redshift","aws-ps-redshift","{ ""expectedMax"" : { ""N"" : ""1200"" }, ""expectedMin"" : { ""N"" : ""10"" }, ""query"" : { ""S"" : ""select count(*) from access_log;"" } }","","{ ""nodes"" : { ""S"" : ""5"" } }","2020-11-24 07:43:41.499999","arn:aws:lambda:us-west-2::function:aws-ps-redshift-util-counter-query-lambda","None"
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
{
"Comment": "An example of the Amazon States Language using a choice state.",
"StartAt": "Pick Test",
"States": {
"Pick Test": {
"Type": "Task",
"Resource": "arn:aws:lambda:::function:pickTest",
"Next": "Choice State"
},
"Next Test": {
"Type": "Task",
"Resource": "arn:aws:lambda:::function:nextTest",
"Next": "Pick Test"
},
"Choice State": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.job",
"StringEquals": "glue",
"Next": "Glue Step Function"
},
{
"Variable": "$.job",
"StringEquals": "emr",
"Next": "EMR Step Function"
},
{
"Variable": "$.job",
"StringEquals": "redshift",
"Next": "Redshift Step Function"
},
{
"Variable": "$.job",
"StringEquals": "athena",
"Next": "Athena Step Function"
},
{
"Variable": "$.job",
"StringEquals": "skip",
"Next": "Next Test"
}
],
"Default": "Default State"
},
"Glue Step Function": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution.sync:2",
"Parameters": {
"Input": {
"JobName.$": "$.TaskExecution",
"TaskValidation.$": "$.TaskValidation"
},
"StateMachineArn": "arn:aws:states:us-west-2:983401557995:stateMachine:TestingGlueJob"
},
"ResultPath": "$.output",
"Next": "Next Test"
},
"Athena Step Function": {
"Type": "Task",
"Resource": "arn:aws:lambda:::function:finishtest",
"Parameters": {
"JobName.$": "$.TaskExecution"
},
"ResultPath": "$.output",
"Next": "Next Test"
},
"EMR Step Function": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution.sync:2",
"Parameters": {
"Input": {
"ClusterId.$": "$.TaskExecution"
},
"StateMachineArn": "arn:aws:states:us-west-2:983401557995:stateMachine:TestingEMRJob"
},
"ResultPath": "$.output",
"Next": "Next Test"
},
"Redshift Step Function": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution.sync:2",
"Parameters": {
"Input": {
"JobName.$": "$.TaskExecution",
"TaskValidation.$": "$.TaskValidation",
"secret.$": "$.TaskExecution",
"params.$": "$.params"
},
"StateMachineArn": "arn:aws:states:us-west-2:983401557995:stateMachine:TestDataLakeRedshift"
},
"ResultPath": "$.output",
"Next": "Next Test"
},
"Default State": {
"Type": "Succeed"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"Comment": "A generic template to execute a Lambda function that executes a redshift job",
"StartAt": "Trigger Redshift Job",
"States": {
"Trigger Redshift Job": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-west-2:983401557995:function:aws-ps-redshift-util-adhoc-query-lambda",
"Parameters": {
"secret.$": "$.secret",
"params.$":"$.params"
},
"Next": "Check For Duplicates"
},
"Check For Duplicates": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "FAILED",
"Next": "Failed"
},
{
"Variable": "$.status",
"StringEquals": "SUCCEEDED",
"Next": "Complete"
}
],
"Default": "Failed"
},
"Failed": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn.$": "$.input.sns_topic_arn",
"Message": "Data Lake Job failed. Check AWS Console for details"
},
"End": true
},
"Complete": {
"Type": "Succeed"
}
}
}
Loading