Skip to content

Commit 2a3a38a

Browse files
committed
add a test to validate sqs —> pipes —> sns integration
1 parent a30e81b commit 2a3a38a

File tree

1 file changed

+75
-0
lines changed

1 file changed

+75
-0
lines changed

tests/test_pipes.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import pytest
2+
import time
3+
import boto3
4+
import json
5+
import requests
6+
import localstack.sdk.aws
7+
import localstack.sdk.chaos
8+
from localstack.sdk.models import FaultRule
9+
from localstack.sdk.chaos.managers import fault_configuration
10+
11+
LOCALSTACK_ENDPOINT = "http://localhost.localstack.cloud:4566"
12+
QUEUE_NAME = "QuizSubmissionQueue"
13+
SENDER_EMAIL = "[email protected]"
14+
15+
class TestLocalStackClient:
16+
client = localstack.sdk.chaos.ChaosClient()
17+
18+
@pytest.fixture(scope="module")
19+
def sqs_client(self):
20+
return boto3.client("sqs", endpoint_url=LOCALSTACK_ENDPOINT)
21+
22+
def test_pipes_sqs_sns_integration(self, sqs_client):
23+
outage_rule = FaultRule(region="us-east-1", service="lambda")
24+
25+
with fault_configuration(fault_rules=[outage_rule]):
26+
print("Lambda service outage initiated.")
27+
28+
# Retrieve SQS queue URL
29+
response = sqs_client.get_queue_url(QueueName=QUEUE_NAME)
30+
queue_url = response["QueueUrl"]
31+
print(f"SQS Queue URL: {queue_url}")
32+
33+
# Send a message to the SQS queue
34+
message_body = {"test": "message"}
35+
response = sqs_client.send_message(
36+
QueueUrl=queue_url,
37+
MessageBody=json.dumps(message_body),
38+
)
39+
assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
40+
print(f"Message sent to SQS queue {QUEUE_NAME}: {message_body}")
41+
42+
print("Waiting for system to process message during Lambda outage...")
43+
time.sleep(15)
44+
45+
print("Outage resolved, checking SES for notifications...")
46+
47+
ses_url = f"{LOCALSTACK_ENDPOINT}/_aws/ses"
48+
response = requests.get(ses_url)
49+
response.raise_for_status()
50+
messages_data = response.json()
51+
52+
email_found = False
53+
expected_subject = "SNS-Subscriber-Endpoint"
54+
expected_body_text_part_contains = "QuizSubmissionQueue"
55+
SENDER_EMAIL = "[email protected]"
56+
57+
messages = messages_data.get("messages", [])
58+
59+
for message in messages:
60+
if message["Source"] == SENDER_EMAIL:
61+
email_found = True
62+
assert message["Subject"] == expected_subject, f"Subject mismatch. Expected: {expected_subject}, Found: {message['Subject']}"
63+
assert "Body" in message, "Message body missing."
64+
65+
body = message["Body"]
66+
assert "text_part" in body, "Text part missing in body."
67+
text_part = body["text_part"]
68+
assert expected_body_text_part_contains in text_part, f"Expected content not found in text part: {expected_body_text_part_contains}"
69+
70+
print(f"Email found with subject '{expected_subject}' and matching body content.")
71+
break
72+
73+
assert email_found, f"No email found sent from {SENDER_EMAIL} with subject '{expected_subject}'."
74+
75+
print("Test completed successfully.")

0 commit comments

Comments
 (0)