|
| 1 | +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. |
| 2 | +# SPDX-License-Identifier: Apache-2.0 |
| 3 | +import atexit |
| 4 | +import os |
| 5 | +import tempfile |
| 6 | +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer |
| 7 | +from threading import Thread |
| 8 | + |
| 9 | +import boto3 |
| 10 | +import requests |
| 11 | +from botocore.client import BaseClient |
| 12 | +from botocore.config import Config |
| 13 | +from typing_extensions import override |
| 14 | + |
| 15 | +_PORT: int = 8080 |
| 16 | +_ERROR: str = "error" |
| 17 | +_FAULT: str = "fault" |
| 18 | + |
| 19 | +_AWS_SDK_S3_ENDPOINT: str = os.environ.get("AWS_SDK_S3_ENDPOINT") |
| 20 | +_AWS_SDK_ENDPOINT: str = os.environ.get("AWS_SDK_ENDPOINT") |
| 21 | +_AWS_REGION: str = os.environ.get("AWS_REGION") |
| 22 | +_ERROR_ENDPOINT: str = "http://error.test:8080" |
| 23 | +_FAULT_ENDPOINT: str = "http://fault.test:8080" |
| 24 | +os.environ.setdefault("AWS_ACCESS_KEY_ID", "testcontainers-localstack") |
| 25 | +os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "testcontainers-localstack") |
| 26 | +_NO_RETRY_CONFIG: Config = Config(retries={"max_attempts": 0}, connect_timeout=3, read_timeout=3) |
| 27 | + |
| 28 | + |
| 29 | +# pylint: disable=broad-exception-caught |
| 30 | +class RequestHandler(BaseHTTPRequestHandler): |
| 31 | + main_status: int = 200 |
| 32 | + |
| 33 | + @override |
| 34 | + # pylint: disable=invalid-name |
| 35 | + def do_GET(self): |
| 36 | + if self.in_path("s3"): |
| 37 | + self._handle_s3_request() |
| 38 | + if self.in_path("ddb"): |
| 39 | + self._handle_ddb_request() |
| 40 | + if self.in_path("sqs"): |
| 41 | + self._handle_sqs_request() |
| 42 | + if self.in_path("kinesis"): |
| 43 | + self._handle_kinesis_request() |
| 44 | + |
| 45 | + self._end_request(self.main_status) |
| 46 | + |
| 47 | + # pylint: disable=invalid-name |
| 48 | + def do_POST(self): |
| 49 | + if self.in_path("sqserror"): |
| 50 | + self.send_response(self.main_status) |
| 51 | + self.send_header("Content-type", "text/xml") |
| 52 | + self.end_headers() |
| 53 | + |
| 54 | + xml_response = """<?xml version="1.0"?> |
| 55 | + <ErrorResponse> |
| 56 | + <Error> |
| 57 | + <Type>Sender</Type> |
| 58 | + <Code>InvalidAction</Code> |
| 59 | + <Message>The action or operation requested is invalid.</Message> |
| 60 | + <Detail/> |
| 61 | + </Error> |
| 62 | + </ErrorResponse>""" |
| 63 | + |
| 64 | + self.wfile.write(xml_response.encode()) |
| 65 | + else: |
| 66 | + self._end_request(self.main_status) |
| 67 | + |
| 68 | + # pylint: disable=invalid-name |
| 69 | + def do_PUT(self): |
| 70 | + self._end_request(self.main_status) |
| 71 | + |
| 72 | + def in_path(self, sub_path: str) -> bool: |
| 73 | + return sub_path in self.path |
| 74 | + |
| 75 | + def _handle_s3_request(self) -> None: |
| 76 | + s3_client: BaseClient = boto3.client("s3", endpoint_url=_AWS_SDK_S3_ENDPOINT, region_name=_AWS_REGION) |
| 77 | + if self.in_path(_ERROR): |
| 78 | + error_client: BaseClient = boto3.client("s3", endpoint_url=_ERROR_ENDPOINT, region_name=_AWS_REGION) |
| 79 | + set_main_status(400) |
| 80 | + try: |
| 81 | + error_client.create_bucket(Bucket="-") |
| 82 | + except Exception as exception: |
| 83 | + print("Expected exception occurred", exception) |
| 84 | + elif self.in_path(_FAULT): |
| 85 | + set_main_status(500) |
| 86 | + try: |
| 87 | + fault_client: BaseClient = boto3.client( |
| 88 | + "s3", endpoint_url=_FAULT_ENDPOINT, region_name=_AWS_REGION, config=_NO_RETRY_CONFIG |
| 89 | + ) |
| 90 | + fault_client.create_bucket(Bucket="valid-bucket-name") |
| 91 | + except Exception as exception: |
| 92 | + print("Expected exception occurred", exception) |
| 93 | + elif self.in_path("createbucket/create-bucket"): |
| 94 | + set_main_status(200) |
| 95 | + s3_client.create_bucket( |
| 96 | + Bucket="test-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION} |
| 97 | + ) |
| 98 | + elif self.in_path("createobject/put-object/some-object"): |
| 99 | + set_main_status(200) |
| 100 | + with tempfile.NamedTemporaryFile(delete=True) as temp_file: |
| 101 | + temp_file_name: str = temp_file.name |
| 102 | + temp_file.write(b"This is temp file for S3 upload") |
| 103 | + temp_file.flush() |
| 104 | + s3_client.upload_file(temp_file_name, "test-put-object-bucket-name", "test_object") |
| 105 | + elif self.in_path("getobject/get-object/some-object"): |
| 106 | + set_main_status(200) |
| 107 | + s3_client.get_object(Bucket="test-get-object-bucket-name", Key="test_object") |
| 108 | + else: |
| 109 | + set_main_status(404) |
| 110 | + |
| 111 | + def _handle_ddb_request(self) -> None: |
| 112 | + ddb_client = boto3.client("dynamodb", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) |
| 113 | + if self.in_path(_ERROR): |
| 114 | + set_main_status(400) |
| 115 | + error_client = boto3.client("dynamodb", endpoint_url=_ERROR_ENDPOINT, region_name=_AWS_REGION) |
| 116 | + item: dict = {"id": {"S": "1"}} |
| 117 | + try: |
| 118 | + error_client.put_item(TableName="invalid_table", Item=item) |
| 119 | + except Exception as exception: |
| 120 | + print("Expected exception occurred", exception) |
| 121 | + elif self.in_path(_FAULT): |
| 122 | + set_main_status(500) |
| 123 | + item: dict = {"id": {"S": "1"}} |
| 124 | + try: |
| 125 | + fault_client = boto3.client( |
| 126 | + "dynamodb", endpoint_url=_FAULT_ENDPOINT, region_name=_AWS_REGION, config=_NO_RETRY_CONFIG |
| 127 | + ) |
| 128 | + fault_client.put_item(TableName="invalid_table", Item=item) |
| 129 | + except Exception as exception: |
| 130 | + print("Expected exception occurred", exception) |
| 131 | + elif self.in_path("createtable/some-table"): |
| 132 | + set_main_status(200) |
| 133 | + ddb_client.create_table( |
| 134 | + TableName="test_table", |
| 135 | + KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}], |
| 136 | + AttributeDefinitions=[ |
| 137 | + {"AttributeName": "id", "AttributeType": "S"}, |
| 138 | + ], |
| 139 | + BillingMode="PAY_PER_REQUEST", |
| 140 | + ) |
| 141 | + elif self.in_path("putitem/putitem-table/key"): |
| 142 | + set_main_status(200) |
| 143 | + item: dict = {"id": {"S": "1"}} |
| 144 | + ddb_client.put_item(TableName="put_test_table", Item=item) |
| 145 | + else: |
| 146 | + set_main_status(404) |
| 147 | + |
| 148 | + def _handle_sqs_request(self) -> None: |
| 149 | + sqs_client = boto3.client("sqs", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) |
| 150 | + if self.in_path(_ERROR): |
| 151 | + set_main_status(400) |
| 152 | + try: |
| 153 | + error_client = boto3.client("sqs", endpoint_url=_ERROR_ENDPOINT + "/sqserror", region_name=_AWS_REGION) |
| 154 | + error_client.send_message(QueueUrl="http://error.test:8080/000000000000/sqserror", MessageBody=_ERROR) |
| 155 | + except Exception as exception: |
| 156 | + print("Expected exception occurred", exception) |
| 157 | + elif self.in_path(_FAULT): |
| 158 | + set_main_status(500) |
| 159 | + try: |
| 160 | + fault_client = boto3.client( |
| 161 | + "sqs", endpoint_url=_FAULT_ENDPOINT, region_name=_AWS_REGION, config=_NO_RETRY_CONFIG |
| 162 | + ) |
| 163 | + fault_client.create_queue(QueueName="invalid_test") |
| 164 | + except Exception as exception: |
| 165 | + print("Expected exception occurred", exception) |
| 166 | + elif self.in_path("createqueue/some-queue"): |
| 167 | + set_main_status(200) |
| 168 | + sqs_client.create_queue(QueueName="test_queue") |
| 169 | + elif self.in_path("publishqueue/some-queue"): |
| 170 | + set_main_status(200) |
| 171 | + sqs_client.send_message( |
| 172 | + QueueUrl="http://localstack:4566/000000000000/test_put_get_queue", MessageBody="test_message" |
| 173 | + ) |
| 174 | + elif self.in_path("consumequeue/some-queue"): |
| 175 | + set_main_status(200) |
| 176 | + sqs_client.receive_message( |
| 177 | + QueueUrl="http://localstack:4566/000000000000/test_put_get_queue", MaxNumberOfMessages=1 |
| 178 | + ) |
| 179 | + else: |
| 180 | + set_main_status(404) |
| 181 | + |
| 182 | + def _handle_kinesis_request(self) -> None: |
| 183 | + kinesis_client = boto3.client("kinesis", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) |
| 184 | + if self.in_path(_ERROR): |
| 185 | + set_main_status(400) |
| 186 | + try: |
| 187 | + error_client = boto3.client("kinesis", endpoint_url=_ERROR_ENDPOINT, region_name=_AWS_REGION) |
| 188 | + error_client.put_record(StreamName="invalid_stream", Data=b"test", PartitionKey="partition_key") |
| 189 | + except Exception as exception: |
| 190 | + print("Expected exception occurred", exception) |
| 191 | + elif self.in_path(_FAULT): |
| 192 | + set_main_status(500) |
| 193 | + try: |
| 194 | + fault_client = boto3.client( |
| 195 | + "kinesis", endpoint_url=_FAULT_ENDPOINT, region_name=_AWS_REGION, config=_NO_RETRY_CONFIG |
| 196 | + ) |
| 197 | + fault_client.put_record(StreamName="test_stream", Data=b"test", PartitionKey="partition_key") |
| 198 | + except Exception as exception: |
| 199 | + print("Expected exception occurred", exception) |
| 200 | + elif self.in_path("putrecord/my-stream"): |
| 201 | + set_main_status(200) |
| 202 | + kinesis_client.put_record(StreamName="test_stream", Data=b"test", PartitionKey="partition_key") |
| 203 | + else: |
| 204 | + set_main_status(404) |
| 205 | + |
| 206 | + def _end_request(self, status_code: int): |
| 207 | + self.send_response_only(status_code) |
| 208 | + self.end_headers() |
| 209 | + |
| 210 | + |
| 211 | +def set_main_status(status: int) -> None: |
| 212 | + RequestHandler.main_status = status |
| 213 | + |
| 214 | + |
| 215 | +def prepare_aws_server() -> None: |
| 216 | + requests.Request(method="POST", url="http://localhost:4566/_localstack/state/reset") |
| 217 | + try: |
| 218 | + # Set up S3 so tests can access buckets and retrieve a file. |
| 219 | + s3_client: BaseClient = boto3.client("s3", endpoint_url=_AWS_SDK_S3_ENDPOINT, region_name=_AWS_REGION) |
| 220 | + s3_client.create_bucket( |
| 221 | + Bucket="test-put-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION} |
| 222 | + ) |
| 223 | + s3_client.create_bucket( |
| 224 | + Bucket="test-get-object-bucket-name", CreateBucketConfiguration={"LocationConstraint": _AWS_REGION} |
| 225 | + ) |
| 226 | + with tempfile.NamedTemporaryFile(delete=True) as temp_file: |
| 227 | + temp_file_name: str = temp_file.name |
| 228 | + temp_file.write(b"This is temp file for S3 upload") |
| 229 | + temp_file.flush() |
| 230 | + s3_client.upload_file(temp_file_name, "test-get-object-bucket-name", "test_object") |
| 231 | + |
| 232 | + # Set up DDB so tests can access a table. |
| 233 | + ddb_client: BaseClient = boto3.client("dynamodb", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) |
| 234 | + ddb_client.create_table( |
| 235 | + TableName="put_test_table", |
| 236 | + KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}], |
| 237 | + AttributeDefinitions=[ |
| 238 | + {"AttributeName": "id", "AttributeType": "S"}, |
| 239 | + ], |
| 240 | + BillingMode="PAY_PER_REQUEST", |
| 241 | + ) |
| 242 | + |
| 243 | + # Set up SQS so tests can access a queue. |
| 244 | + sqs_client: BaseClient = boto3.client("sqs", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) |
| 245 | + sqs_client.create_queue(QueueName="test_put_get_queue") |
| 246 | + |
| 247 | + # Set up Kinesis so tests can access a stream. |
| 248 | + kinesis_client: BaseClient = boto3.client("kinesis", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) |
| 249 | + kinesis_client.create_stream(StreamName="test_stream", ShardCount=1) |
| 250 | + except Exception as exception: |
| 251 | + print("Unexpected exception occurred", exception) |
| 252 | + |
| 253 | + |
| 254 | +def main() -> None: |
| 255 | + prepare_aws_server() |
| 256 | + server_address: tuple[str, int] = ("0.0.0.0", _PORT) |
| 257 | + request_handler_class: type = RequestHandler |
| 258 | + requests_server: ThreadingHTTPServer = ThreadingHTTPServer(server_address, request_handler_class) |
| 259 | + atexit.register(requests_server.shutdown) |
| 260 | + server_thread: Thread = Thread(target=requests_server.serve_forever) |
| 261 | + server_thread.start() |
| 262 | + print("Ready") |
| 263 | + server_thread.join() |
| 264 | + |
| 265 | + |
| 266 | +if __name__ == "__main__": |
| 267 | + main() |
0 commit comments