Skip to content

Commit 9110137

Browse files
committed
[feature/PI-618-bulk_etl_e2e] add trigger tests
1 parent be34c91 commit 9110137

File tree

5 files changed

+197
-152
lines changed

5 files changed

+197
-152
lines changed

src/etl/sds/tests/etl_test_utils/ask_s3.py

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44

55
from botocore.exceptions import ClientError
66
from etl.clear_state_inputs import EMPTY_JSON_DATA, EMPTY_LDIF_DATA
7-
from etl_utils.constants import CHANGELOG_NUMBER, WorkerKey
7+
from etl_utils.constants import CHANGELOG_NUMBER, ETL_STATE_LOCK, WorkerKey
88
from event.json import json_loads
9+
from mypy_boto3_dynamodb import DynamoDBClient
910
from mypy_boto3_s3 import S3Client
1011

1112
from etl.sds.tests.etl_test_utils.etl_state import EtlConfig
@@ -46,9 +47,31 @@ def ask_s3_prefix(
4647
return result
4748

4849

49-
def was_trigger_key_deleted(s3_client, bucket, etl_config: EtlConfig):
50+
def was_trigger_key_deleted(s3_client, etl_config: EtlConfig):
5051
return not ask_s3(
51-
s3_client=s3_client, bucket=bucket, key=etl_config.initial_trigger_key
52+
s3_client=s3_client,
53+
bucket=etl_config.bucket,
54+
key=etl_config.initial_trigger_key,
55+
)
56+
57+
58+
def was_queue_history_created(
59+
s3_client, etl_config: EtlConfig, expected_content: bytes
60+
):
61+
return ask_s3_prefix(
62+
s3_client=s3_client,
63+
key_prefix=f"{etl_config.state_machine_history_key_prefix}/{etl_config.etl_type}",
64+
question=lambda x: x == expected_content,
65+
)
66+
67+
68+
def was_state_machine_history_created(
69+
s3_client, etl_config: EtlConfig, expected_content: bytes
70+
):
71+
return ask_s3_prefix(
72+
s3_client=s3_client,
73+
key_prefix=f"{etl_config.queue_history_key_prefix}/{etl_config.etl_type}",
74+
question=lambda x: x == expected_content,
5275
)
5376

5477

@@ -61,27 +84,40 @@ def was_changelog_number_updated(s3_client, bucket, new_changelog_number):
6184
)
6285

6386

64-
def etl_state_is_clear(s3_client, bucket) -> bool:
65-
66-
extract_is_empty = ask_s3(
87+
def extract_is_empty(s3_client, bucket) -> bool:
88+
return ask_s3(
6789
s3_client=s3_client,
6890
key=WorkerKey.EXTRACT,
6991
bucket=bucket,
7092
question=lambda x: x == EMPTY_LDIF_DATA,
7193
)
72-
transform_is_empty = ask_s3(
94+
95+
96+
def transform_is_empty(s3_client, bucket) -> bool:
97+
return ask_s3(
7398
s3_client=s3_client,
7499
key=WorkerKey.TRANSFORM,
75100
bucket=bucket,
76101
question=lambda x: x == EMPTY_JSON_DATA,
77102
)
78-
load_is_empty = ask_s3(
103+
104+
105+
def load_is_empty(s3_client, bucket) -> bool:
106+
return ask_s3(
79107
s3_client=s3_client,
80108
key=WorkerKey.LOAD,
81109
bucket=bucket,
82110
question=lambda x: x == EMPTY_JSON_DATA,
83111
)
84-
print( # noqa
85-
f"{{extract, transform, load}} is empty == {{{extract_is_empty, transform_is_empty, load_is_empty}}}"
112+
113+
114+
def was_etl_state_lock_removed(s3_client, bucket) -> bool:
115+
return ask_s3(
116+
s3_client=s3_client,
117+
key=ETL_STATE_LOCK,
118+
bucket=bucket,
86119
)
87-
return extract_is_empty and transform_is_empty and load_is_empty
120+
121+
122+
def database_isnt_empty(db_client: "DynamoDBClient", table_name: str):
123+
return db_client.scan(TableName=table_name, Limit=1, Select="COUNT")["Count"] == 1

src/etl/sds/tests/etl_test_utils/etl_state.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
from collections import deque
22
from dataclasses import dataclass
33

4-
from etl_utils.constants import ETL_QUEUE_HISTORY, ETL_STATE_MACHINE_HISTORY, WorkerKey
4+
from etl_utils.constants import (
5+
CHANGELOG_NUMBER,
6+
ETL_QUEUE_HISTORY,
7+
ETL_STATE_LOCK,
8+
ETL_STATE_MACHINE_HISTORY,
9+
WorkerKey,
10+
)
511
from etl_utils.io import pkl_dumps_lz4
612
from mypy_boto3_s3 import S3Client
713
from sds.epr.bulk_create.bulk_load_fanout import FANOUT
@@ -18,16 +24,18 @@ class EtlConfig:
1824
initial_trigger_key: str
1925
queue_history_key_prefix: str
2026
state_machine_history_key_prefix: str
27+
etl_type: str
2128
table_name: str
2229

2330

24-
def get_etl_config(input_filename: str) -> EtlConfig:
31+
def get_etl_config(input_filename: str, etl_type: str = "") -> EtlConfig:
2532
bulk_trigger_prefix = read_terraform_output("sds_etl.value.bulk_trigger_prefix")
2633
return EtlConfig(
2734
bucket=read_terraform_output("sds_etl.value.bucket"),
2835
initial_trigger_key=f"{bulk_trigger_prefix}/{input_filename}",
2936
queue_history_key_prefix=f"{ETL_QUEUE_HISTORY}/",
3037
state_machine_history_key_prefix=f"{ETL_STATE_MACHINE_HISTORY}/",
38+
etl_type=etl_type,
3139
table_name=read_terraform_output("dynamodb_table_name.value"),
3240
)
3341

@@ -74,3 +82,6 @@ def clear_etl_state(s3_client: S3Client, etl_config: EtlConfig):
7482
bucket=etl_config.bucket,
7583
key_prefix=etl_config.state_machine_history_key_prefix,
7684
)
85+
86+
s3_client.delete_object(Bucket=etl_config.bucket, Key=CHANGELOG_NUMBER)
87+
s3_client.delete_object(Bucket=etl_config.bucket, Key=ETL_STATE_LOCK)

src/etl/sds/trigger/bulk/tests/_test_bulk_trigger.py

Lines changed: 0 additions & 122 deletions
This file was deleted.
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import time
2+
from functools import partial
3+
4+
import boto3
5+
import pytest
6+
from event.aws.client import dynamodb_client
7+
8+
from etl.sds.tests.etl_test_utils.ask_s3 import (
9+
database_isnt_empty as _database_isnt_empty,
10+
)
11+
from etl.sds.tests.etl_test_utils.ask_s3 import extract_is_empty as _extract_is_empty
12+
from etl.sds.tests.etl_test_utils.ask_s3 import load_is_empty as _load_is_empty
13+
from etl.sds.tests.etl_test_utils.ask_s3 import (
14+
transform_is_empty as _transform_is_empty,
15+
)
16+
from etl.sds.tests.etl_test_utils.ask_s3 import (
17+
was_changelog_number_updated as _was_changelog_number_updated,
18+
)
19+
from etl.sds.tests.etl_test_utils.ask_s3 import (
20+
was_etl_state_lock_removed as _was_etl_state_lock_removed,
21+
)
22+
from etl.sds.tests.etl_test_utils.ask_s3 import (
23+
was_queue_history_created as _was_queue_history_created,
24+
)
25+
from etl.sds.tests.etl_test_utils.ask_s3 import (
26+
was_state_machine_history_created as _was_state_machine_history_created,
27+
)
28+
from etl.sds.tests.etl_test_utils.ask_s3 import (
29+
was_trigger_key_deleted as _was_trigger_key_deleted,
30+
)
31+
from etl.sds.tests.etl_test_utils.etl_state import clear_etl_state, get_etl_config
32+
from etl.sds.worker.bulk.tests.test_bulk_e2e import PATH_TO_STAGE_DATA
33+
34+
EXPECTED_CHANGELOG_NUMBER = 123
35+
36+
37+
def message(x):
38+
print(x) # noqa
39+
40+
41+
@pytest.mark.timeout(20)
42+
@pytest.mark.integration
43+
def test_bulk_trigger():
44+
# Prerequisites
45+
with open(PATH_TO_STAGE_DATA / "0.extract_input.ldif") as f:
46+
input_data = f.read().encode()
47+
48+
etl_config = get_etl_config(f"{EXPECTED_CHANGELOG_NUMBER}.ldif", etl_type="bulk")
49+
db_client = dynamodb_client()
50+
s3_client = boto3.client("s3")
51+
clear_etl_state(s3_client=s3_client, etl_config=etl_config)
52+
53+
# Define questions
54+
was_trigger_key_deleted = partial(
55+
_was_trigger_key_deleted, s3_client=s3_client, etl_config=etl_config
56+
)
57+
was_queue_history_created = partial(
58+
_was_queue_history_created,
59+
s3_client=s3_client,
60+
etl_config=etl_config,
61+
expected_content=input_data,
62+
)
63+
was_state_machine_history_created = partial(
64+
_was_state_machine_history_created,
65+
s3_client=s3_client,
66+
etl_config=etl_config,
67+
expected_content=input_data,
68+
)
69+
was_changelog_number_updated = partial(
70+
_was_changelog_number_updated, s3_client=s3_client, bucket=etl_config.bucket
71+
)
72+
extract_is_empty = partial(
73+
_extract_is_empty, s3_client=s3_client, bucket=etl_config.bucket
74+
)
75+
transform_is_empty = partial(
76+
_transform_is_empty, s3_client=s3_client, bucket=etl_config.bucket
77+
)
78+
load_is_empty = partial(
79+
_load_is_empty, s3_client=s3_client, bucket=etl_config.bucket
80+
)
81+
was_state_lock_removed = partial(
82+
_was_etl_state_lock_removed, s3_client=s3_client, bucket=etl_config.bucket
83+
)
84+
database_isnt_empty = partial(
85+
_database_isnt_empty, db_client=db_client, table_name=etl_config.table_name
86+
)
87+
88+
# Trigger the bulk load
89+
s3_client.put_object(
90+
Bucket=etl_config.bucket, Key=etl_config.initial_trigger_key, Body=input_data
91+
)
92+
93+
# Sign-off through the expected lifecycle of the bulk ETL
94+
while not was_trigger_key_deleted():
95+
time.sleep(5)
96+
message("Trigger key deleted")
97+
98+
while not was_queue_history_created():
99+
time.sleep(5)
100+
message("Queue history created")
101+
102+
while not was_state_machine_history_created():
103+
time.sleep(5)
104+
message("State machine history created")
105+
106+
while not was_changelog_number_updated():
107+
time.sleep(5)
108+
message("Changelog number updated")
109+
110+
while not extract_is_empty():
111+
time.sleep(5)
112+
message("Extract's input data is now in empty state")
113+
114+
while not transform_is_empty():
115+
time.sleep(5)
116+
message("Transform's input data is now in empty state")
117+
118+
while not load_is_empty():
119+
time.sleep(5)
120+
message("Load's input data is now in empty state")
121+
122+
assert database_isnt_empty()
123+
message("Database isn't empty")
124+
125+
while not was_state_lock_removed():
126+
message("State lock has been removed")
127+
time.sleep(5)

0 commit comments

Comments
 (0)