|
2 | 2 | import time |
3 | 3 | import boto3 |
4 | 4 | import requests |
| 5 | +import os |
| 6 | +import json |
| 7 | +import botocore |
5 | 8 |
|
6 | | -# Replace with your LocalStack endpoint |
7 | | -LOCALSTACK_ENDPOINT = "http://localhost.localstack.cloud:4566" |
8 | | -CHAOS_ENDPOINT = f"{LOCALSTACK_ENDPOINT}/_localstack/chaos/faults" |
| 9 | +# --- Configuration Constants --- |
| 10 | +LOCALSTACK_ENDPOINT_URL = os.environ.get("LOCALSTACK_ENDPOINT_URL", "http://localhost:4566") |
| 11 | +CHAOS_ENDPOINT = f"{LOCALSTACK_ENDPOINT_URL}/_localstack/chaos/faults" |
9 | 12 |
|
10 | | -# Replace with your LocalStack DynamoDB table name |
11 | 13 | DYNAMODB_TABLE_NAME = "Products" |
12 | | - |
13 | | -# Replace with your Lambda function names |
14 | | -LAMBDA_FUNCTIONS = ["add-product", "get-product", "process-product-events"] |
15 | | - |
16 | | -@pytest.fixture(scope="module") |
| 14 | +SERVICE_REGION = "us-east-1" |
| 15 | + |
| 16 | +PRIMARY_API_ID = "12345" |
| 17 | +API_GATEWAY_PORT = 4566 |
| 18 | +ADD_PRODUCT_URL = f"http://{PRIMARY_API_ID}.execute-api.localhost.localstack.cloud:{API_GATEWAY_PORT}/dev/productApi" |
| 19 | + |
| 20 | +# Base product data (static parts) - we'll create full data with unique IDs in the test |
| 21 | +BASE_PRODUCT_DATA = { |
| 22 | + "name": "Pytest Widget", |
| 23 | + "price": "25.99", # Slightly different price for easier differentiation if needed |
| 24 | + "description": "A widget specifically for Pytest scenarios.", |
| 25 | +} |
| 26 | + |
| 27 | +# Timings |
| 28 | +DYNAMODB_OUTAGE_REACTION_WAIT = 10 |
| 29 | +SERVICE_RECOVERY_WAIT = 30 |
| 30 | + |
| 31 | +# --- Helper Functions for Chaos (remain the same as your working version) --- |
| 32 | + |
| 33 | +def manage_chaos(service_name, region_name, induce=True, timeout=10): |
| 34 | + """Induces or clears chaos for a specific service and region.""" |
| 35 | + fault_payload = [{"service": service_name, "region": region_name}] |
| 36 | + action_str = "Inducing" if induce else "Clearing" |
| 37 | + |
| 38 | + print(f"\n--- {action_str} chaos for service '{service_name}' in region '{region_name}' ---") |
| 39 | + |
| 40 | + try: |
| 41 | + if induce: |
| 42 | + response = requests.post(CHAOS_ENDPOINT, json=fault_payload, timeout=timeout) |
| 43 | + else: |
| 44 | + response = requests.delete(CHAOS_ENDPOINT, json=fault_payload, timeout=timeout) |
| 45 | + |
| 46 | + response.raise_for_status() |
| 47 | + |
| 48 | + if induce: |
| 49 | + active_faults_after_induce = response.json() |
| 50 | + print(f" Chaos injection successful. Active faults reported: {active_faults_after_induce}") |
| 51 | + return active_faults_after_induce |
| 52 | + else: |
| 53 | + print(f" Chaos clear request successful. Status: {response.status_code}, Response: {response.text[:100]}") |
| 54 | + return [] |
| 55 | + |
| 56 | + except requests.exceptions.RequestException as e: |
| 57 | + pytest.fail(f"Failed to {action_str.lower()} chaos for {service_name} in {region_name}: {e}") |
| 58 | + except json.JSONDecodeError as e: |
| 59 | + current_response_text = response.text if 'response' in locals() else "Response object not available or no text." |
| 60 | + pytest.fail(f"Failed to parse JSON response when {action_str.lower()} chaos: {e}. Response text: {current_response_text}") |
| 61 | + return None |
| 62 | + |
| 63 | +def check_active_faults(expected_to_be_present_or_absent, present=True, timeout=5): |
| 64 | + action_str = "present" if present else "absent" |
| 65 | + print(f"--- Checking chaos faults (expecting specific faults to be {action_str}: {expected_to_be_present_or_absent}) ---") |
| 66 | + try: |
| 67 | + response = requests.get(CHAOS_ENDPOINT, timeout=timeout) |
| 68 | + response.raise_for_status() |
| 69 | + active_faults = response.json() |
| 70 | + print(f" Active faults reported by Chaos API: {active_faults}") |
| 71 | + |
| 72 | + normalize_fault = lambda d: tuple(sorted(d.items())) |
| 73 | + normalized_active_set = {normalize_fault(f) for f in active_faults} |
| 74 | + |
| 75 | + if not expected_to_be_present_or_absent and not present: |
| 76 | + assert not active_faults, f"Expected no active faults, but got: {active_faults}" |
| 77 | + print(" Verified no active faults are present.") |
| 78 | + return |
| 79 | + |
| 80 | + normalized_expected_set = {normalize_fault(f) for f in expected_to_be_present_or_absent} |
| 81 | + |
| 82 | + if present: |
| 83 | + assert normalized_expected_set.issubset(normalized_active_set), \ |
| 84 | + f"Expected faults {expected_to_be_present_or_absent} to be active, but active set is {active_faults}" |
| 85 | + print(f" Verified expected faults are active.") |
| 86 | + else: |
| 87 | + assert not normalized_expected_set.intersection(normalized_active_set), \ |
| 88 | + f"Expected faults {expected_to_be_present_or_absent} to be cleared, but some were found in active set: {active_faults}" |
| 89 | + print(f" Verified expected faults are cleared (not present).") |
| 90 | + |
| 91 | + except requests.exceptions.RequestException as e: |
| 92 | + pytest.fail(f"Failed to GET active faults: {e}") |
| 93 | + except json.JSONDecodeError as e: |
| 94 | + current_response_text = response.text if 'response' in locals() else "Response object not available or no text." |
| 95 | + pytest.fail(f"Failed to parse JSON response from GET /faults: {e}. Response text: {current_response_text}") |
| 96 | + |
| 97 | +# --- Pytest Fixtures (remain the same) --- |
| 98 | + |
| 99 | +@pytest.fixture(scope="session") |
17 | 100 | def dynamodb_resource(): |
18 | | - return boto3.resource("dynamodb", endpoint_url=LOCALSTACK_ENDPOINT) |
| 101 | + return boto3.resource("dynamodb", endpoint_url=LOCALSTACK_ENDPOINT_URL, region_name=SERVICE_REGION) |
19 | 102 |
|
20 | | -@pytest.fixture(scope="module") |
| 103 | +@pytest.fixture(scope="session") |
21 | 104 | def lambda_client(): |
22 | | - return boto3.client("lambda", endpoint_url=LOCALSTACK_ENDPOINT) |
| 105 | + return boto3.client("lambda", endpoint_url=LOCALSTACK_ENDPOINT_URL, region_name=SERVICE_REGION) |
| 106 | + |
| 107 | +# --- Prerequisite Tests (remain the same) --- |
23 | 108 |
|
24 | 109 | def test_dynamodb_table_exists(dynamodb_resource): |
25 | | - tables = dynamodb_resource.tables.all() |
26 | | - table_names = [table.name for table in tables] |
27 | | - assert DYNAMODB_TABLE_NAME in table_names |
28 | | - |
29 | | -def test_lambda_functions_exist(lambda_client): |
30 | | - functions = lambda_client.list_functions()["Functions"] |
31 | | - function_names = [func["FunctionName"] for func in functions] |
32 | | - assert all(func_name in function_names for func_name in LAMBDA_FUNCTIONS) |
33 | | - |
34 | | -def initiate_dynamodb_outage(): |
35 | | - outage_payload = [{"service": "dynamodb", "region": "us-east-1"}] |
36 | | - response = requests.post(CHAOS_ENDPOINT, json=outage_payload) |
37 | | - assert response.ok |
38 | | - return outage_payload |
39 | | - |
40 | | -def check_outage_status(expected_status): |
41 | | - outage_status = requests.get(CHAOS_ENDPOINT).json() |
42 | | - assert outage_status == expected_status |
43 | | - |
44 | | -def stop_dynamodb_outage(): |
45 | | - response = requests.post(CHAOS_ENDPOINT, json=[]) |
46 | | - assert response.ok |
47 | | - check_outage_status([]) |
48 | | - |
49 | | -def test_dynamodb_outage(dynamodb_resource): |
50 | | - # Initiate DynamoDB outage |
51 | | - outage_payload = initiate_dynamodb_outage() |
52 | | - |
53 | | - # Make a request to DynamoDB and assert an error |
54 | | - url = "http://12345.execute-api.localhost.localstack.cloud:4566/dev/productApi" |
| 110 | + print("\n--- Prerequisite: Checking DynamoDB table existence ---") |
| 111 | + try: |
| 112 | + table = dynamodb_resource.Table(DYNAMODB_TABLE_NAME) |
| 113 | + table.load() |
| 114 | + print(f" DynamoDB table '{DYNAMODB_TABLE_NAME}' found in region {dynamodb_resource.meta.client.meta.region_name}.") |
| 115 | + except Exception as e: |
| 116 | + pytest.fail(f"DynamoDB table '{DYNAMODB_TABLE_NAME}' not found or not accessible in region {SERVICE_REGION}: {e}") |
| 117 | + |
| 118 | +def test_add_product_lambda_exists(lambda_client): |
| 119 | + print("\n--- Prerequisite: Checking 'add-product' Lambda function existence ---") |
| 120 | + try: |
| 121 | + lambda_client.get_function(FunctionName="add-product") |
| 122 | + print(f" Lambda function 'add-product' found in region {lambda_client.meta.region_name}.") |
| 123 | + except Exception as e: |
| 124 | + pytest.fail(f"Lambda function 'add-product' not found in region {SERVICE_REGION}: {e}") |
| 125 | + |
| 126 | +# --- Main Test Function for DynamoDB Outage --- |
| 127 | + |
| 128 | +def test_dynamodb_outage_impacts_add_product(dynamodb_resource): |
55 | 129 | headers = {"Content-Type": "application/json"} |
56 | | - data = { |
57 | | - "id": "prod-1002", |
58 | | - "name": "Super Widget", |
59 | | - "price": "29.99", |
60 | | - "description": "A versatile widget that can be used for a variety of purposes. Durable, reliable, and affordable.", |
| 130 | + expected_plain_text_success_message = "Product added/updated successfully." |
| 131 | + expected_outage_message = "A DynamoDB error occurred. Message sent to queue." |
| 132 | + |
| 133 | + # --- MODIFICATION: Define unique data for each stage --- |
| 134 | + ts = int(time.time()) |
| 135 | + normal_product_id = f"prod-normal-{ts}" |
| 136 | + normal_data = { |
| 137 | + "id": normal_product_id, |
| 138 | + "name": f"Normal Widget {ts}", |
| 139 | + "price": BASE_PRODUCT_DATA["price"], |
| 140 | + "description": f"{BASE_PRODUCT_DATA['description']} (Normal Operation)" |
61 | 141 | } |
62 | 142 |
|
63 | | - response = requests.post(url, headers=headers, json=data) |
64 | | - assert "error" in response.text |
65 | | - |
66 | | - # Check if outage is running |
67 | | - check_outage_status(outage_payload) |
68 | | - |
69 | | - # Stop the outage |
70 | | - stop_dynamodb_outage() |
71 | | - |
72 | | - # Wait for a few seconds |
73 | | - # Adding a better retry mechanism is left as an exercise |
74 | | - time.sleep(60) |
| 143 | + outage_attempt_product_id = f"prod-outage-{ts}" |
| 144 | + outage_data = { |
| 145 | + "id": outage_attempt_product_id, |
| 146 | + "name": f"Outage Attempt Widget {ts}", |
| 147 | + "price": "0.00", # e.g., different price for outage attempt |
| 148 | + "description": f"{BASE_PRODUCT_DATA['description']} (During Outage Attempt)" |
| 149 | + } |
75 | 150 |
|
76 | | - # Query if there are items in DynamoDB table |
77 | | - table = dynamodb_resource.Table(DYNAMODB_TABLE_NAME) |
78 | | - response = table.scan() |
79 | | - items = response["Items"] |
| 151 | + restored_product_id = f"prod-restored-{ts + 1}" # Ensure slightly different timestamp if needed |
| 152 | + restored_data = { |
| 153 | + "id": restored_product_id, |
| 154 | + "name": f"Restored Widget {ts+1}", |
| 155 | + "price": "26.99", # Different price again |
| 156 | + "description": f"{BASE_PRODUCT_DATA['description']} (Post Recovery)" |
| 157 | + } |
| 158 | + # --- END MODIFICATION --- |
| 159 | + |
| 160 | + |
| 161 | + # 1. Verify product can be added when DynamoDB is healthy |
| 162 | + print("\n--- Test Step 1: Verify normal operation (add product) ---") |
| 163 | + response_normal = None |
| 164 | + try: |
| 165 | + print(f" Attempting to add product: {normal_data} to {ADD_PRODUCT_URL}") |
| 166 | + response_normal = requests.post(ADD_PRODUCT_URL, headers=headers, json=normal_data, timeout=10) |
| 167 | + print(f" Response status (normal): {response_normal.status_code}, Text: '{response_normal.text}'") |
| 168 | + response_normal.raise_for_status() |
| 169 | + assert expected_plain_text_success_message in response_normal.text |
| 170 | + print(f" Successfully added product before outage.") |
| 171 | + table = dynamodb_resource.Table(DYNAMODB_TABLE_NAME) |
| 172 | + item = table.get_item(Key={'id': normal_product_id}).get('Item') |
| 173 | + assert item is not None and item['name'] == normal_data['name'] |
| 174 | + print(f" Verified product '{normal_product_id}' in DynamoDB.") |
| 175 | + except requests.exceptions.HTTPError as http_err: |
| 176 | + pytest.fail(f"HTTP error during normal operation: {http_err} - Response: {http_err.response.text if http_err.response else 'N/A'}") |
| 177 | + except requests.exceptions.RequestException as e: |
| 178 | + pytest.fail(f"Network/Request error during normal operation: {e}") |
| 179 | + except Exception as e: |
| 180 | + response_text = response_normal.text if response_normal else "Response object not available" |
| 181 | + pytest.fail(f"Error during normal operation verification: {e} (Response text was: '{response_text}')") |
| 182 | + |
| 183 | + # 2. Induce DynamoDB outage in SERVICE_REGION |
| 184 | + faults_to_induce = [{"service": "dynamodb", "region": SERVICE_REGION}] |
| 185 | + manage_chaos(service_name="dynamodb", region_name=SERVICE_REGION, induce=True) |
| 186 | + check_active_faults(expected_to_be_present_or_absent=faults_to_induce, present=True) |
| 187 | + print(f" Waiting {DYNAMODB_OUTAGE_REACTION_WAIT}s for outage to take effect...") |
| 188 | + time.sleep(DYNAMODB_OUTAGE_REACTION_WAIT) |
| 189 | + |
| 190 | + # 3. Verify adding a product is gracefully handled (returns 200 with specific message) |
| 191 | + print("\n--- Test Step 3: Verify add product is gracefully handled during DynamoDB outage ---") |
| 192 | + response_outage = None |
| 193 | + try: |
| 194 | + print(f" Attempting to add product during outage: {outage_data}") |
| 195 | + response_outage = requests.post(ADD_PRODUCT_URL, headers=headers, json=outage_data, timeout=10) |
| 196 | + print(f" Response status (outage): {response_outage.status_code}, Text: '{response_outage.text}'") |
| 197 | + |
| 198 | + assert response_outage.status_code == 200, \ |
| 199 | + f"Expected status code 200 during graceful handling, got {response_outage.status_code}." |
| 200 | + assert expected_outage_message in response_outage.text, \ |
| 201 | + f"Expected outage message '{expected_outage_message}' not found. Got: '{response_outage.text}'" |
| 202 | + print(f" Received expected graceful handling message during outage.") |
| 203 | + |
| 204 | + table = dynamodb_resource.Table(DYNAMODB_TABLE_NAME) |
| 205 | + print(f" Attempting to verify product '{outage_attempt_product_id}' is NOT in DynamoDB (expecting GetItem to fail)...") |
| 206 | + try: |
| 207 | + item_response_during_outage = table.get_item(Key={'id': outage_attempt_product_id}) |
| 208 | + item_during_outage = item_response_during_outage.get('Item') |
| 209 | + if item_during_outage is not None: |
| 210 | + pytest.fail(f"Product '{outage_attempt_product_id}' WAS FOUND in DynamoDB during outage, which is unexpected.") |
| 211 | + print(f" GetItem for '{outage_attempt_product_id}' succeeded but returned no item (good, item not found).") |
| 212 | + except botocore.exceptions.ClientError as ce: |
| 213 | + error_code = ce.response.get('Error', {}).get('Code') |
| 214 | + assert "ServiceUnavailable" in str(ce) or "ProvisionedThroughputExceededException" in str(ce) or error_code == "ServiceUnavailable" , \ |
| 215 | + f"Expected ServiceUnavailable from DynamoDB due to chaos, but got: {ce}" |
| 216 | + print(f" Correctly received ClientError ({type(ce).__name__}: {error_code}) when trying to GetItem during outage: {ce}") |
| 217 | + print(f" This confirms product '{outage_attempt_product_id}' could not be read (and thus likely not written) from DynamoDB during outage.") |
| 218 | + |
| 219 | + except requests.exceptions.RequestException as e: |
| 220 | + pytest.fail(f"Request to API Gateway failed during outage test, unexpected if Lambda handles gracefully: {e}") |
| 221 | + except Exception as e: |
| 222 | + response_text = response_outage.text if response_outage else "Response object not available" |
| 223 | + pytest.fail(f"Unexpected generic error during outage product addition test step: {e} (API Response text: '{response_text}')") |
| 224 | + |
| 225 | + # 4. Clear DynamoDB outage |
| 226 | + manage_chaos(service_name="dynamodb", region_name=SERVICE_REGION, induce=False) |
| 227 | + check_active_faults(expected_to_be_present_or_absent=faults_to_induce, present=False) |
| 228 | + print(f" Waiting {SERVICE_RECOVERY_WAIT}s for DynamoDB to recover...") |
| 229 | + time.sleep(SERVICE_RECOVERY_WAIT) |
| 230 | + |
| 231 | + # 5. Verify product can be added again after outage is cleared |
| 232 | + print("\n--- Test Step 5: Verify normal operation restored (add product) ---") |
| 233 | + response_restored = None |
| 234 | + try: |
| 235 | + print(f" Attempting to add product after outage cleared: {restored_data}") |
| 236 | + response_restored = requests.post(ADD_PRODUCT_URL, headers=headers, json=restored_data, timeout=10) |
| 237 | + print(f" Response status (restored): {response_restored.status_code}, Text: '{response_restored.text}'") |
| 238 | + response_restored.raise_for_status() |
| 239 | + assert expected_plain_text_success_message in response_restored.text |
| 240 | + print(f" Successfully added product after outage cleared.") |
| 241 | + table = dynamodb_resource.Table(DYNAMODB_TABLE_NAME) |
| 242 | + item_restored = table.get_item(Key={'id': restored_product_id}).get('Item') |
| 243 | + assert item_restored is not None and item_restored['name'] == restored_data['name'] |
| 244 | + print(f" Verified product '{restored_product_id}' in DynamoDB after recovery.") |
| 245 | + except requests.exceptions.HTTPError as http_err: |
| 246 | + pytest.fail(f"HTTP error during post-recovery: {http_err} - Response: {http_err.response.text if http_err.response else 'N/A'}") |
| 247 | + except requests.exceptions.RequestException as e: |
| 248 | + pytest.fail(f"Network/Request error during post-recovery: {e}") |
| 249 | + except Exception as e: |
| 250 | + response_text = response_restored.text if response_restored else "Response object not available" |
| 251 | + pytest.fail(f"Error during post-recovery: {e} (Response text: '{response_text}')") |
| 252 | + |
| 253 | + print("\n--- DynamoDB Outage Test Completed Successfully ---") |
0 commit comments