|
12 | 12 | import requests |
13 | 13 | from botocore.client import BaseClient |
14 | 14 | from botocore.config import Config |
| 15 | +from botocore.exceptions import ClientError |
15 | 16 | from typing_extensions import Tuple, override |
16 | 17 |
|
17 | 18 | _PORT: int = 8080 |
@@ -47,6 +48,8 @@ def do_GET(self): |
47 | 48 | self._handle_bedrock_request() |
48 | 49 | if self.in_path("secretsmanager"): |
49 | 50 | self._handle_secretsmanager_request() |
| 51 | + if self.in_path("stepfunctions"): |
| 52 | + self._handle_stepfunctions_request() |
50 | 53 |
|
51 | 54 | self._end_request(self.main_status) |
52 | 55 |
|
@@ -329,6 +332,37 @@ def _handle_secretsmanager_request(self) -> None: |
329 | 332 | else: |
330 | 333 | set_main_status(404) |
331 | 334 |
|
| 335 | + def _handle_stepfunctions_request(self) -> None: |
| 336 | + sfn_client = boto3.client("stepfunctions", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) |
| 337 | + if self.in_path(_ERROR): |
| 338 | + set_main_status(400) |
| 339 | + try: |
| 340 | + error_client = boto3.client("stepfunctions", endpoint_url=_ERROR_ENDPOINT, region_name=_AWS_REGION) |
| 341 | + error_client.describe_state_machine(stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:unExistStateMachine") |
| 342 | + except Exception as exception: |
| 343 | + print("Expected exception occurred", exception) |
| 344 | + elif self.in_path(_FAULT): |
| 345 | + set_main_status(500) |
| 346 | + try: |
| 347 | + fault_client = boto3.client("stepfunctions", endpoint_url=_FAULT_ENDPOINT, region_name=_AWS_REGION) |
| 348 | + fault_client.meta.events.register( |
| 349 | + "before-call.stepfunctions.ListStateMachineVersions", |
| 350 | + lambda **kwargs: inject_500_error("ListStateMachineVersions", **kwargs), |
| 351 | + ) |
| 352 | + fault_client.list_state_machine_versions( |
| 353 | + stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:invalid-state-machine", |
| 354 | + ) |
| 355 | + except Exception as exception: |
| 356 | + print("Expected exception occurred", exception) |
| 357 | + elif self.in_path("describestatemachine/my-state-machine"): |
| 358 | + set_main_status(200) |
| 359 | + sfn_client.describe_state_machine(stateMachineArn="arn:aws:states:us-west-2:000000000000:stateMachine:testStateMachine") |
| 360 | + elif self.in_path("describeactivity/my-activity"): |
| 361 | + set_main_status(200) |
| 362 | + sfn_client.describe_activity(activityArn="arn:aws:states:us-west-2:000000000000:activity:testActivity") |
| 363 | + else: |
| 364 | + set_main_status(404) |
| 365 | + |
332 | 366 | def _end_request(self, status_code: int): |
333 | 367 | self.send_response_only(status_code) |
334 | 368 | self.end_headers() |
@@ -383,6 +417,60 @@ def prepare_aws_server() -> None: |
383 | 417 | Name="testSecret", SecretString="secretValue", Description="This is a test secret" |
384 | 418 | ) |
385 | 419 |
|
| 420 | + # Set up Step Functions so tests can access a state machine and activity. |
| 421 | + sfn_client: BaseClient = boto3.client("stepfunctions", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) |
| 422 | + sfn_response = sfn_client.list_state_machines() |
| 423 | + state_machine_name = "testStateMachine" |
| 424 | + activity_name = "testActivity" |
| 425 | + state_machine = next((st for st in sfn_response["stateMachines"] if st["name"] == state_machine_name), None) |
| 426 | + if not state_machine: |
| 427 | + # create state machine needs an iam role so we create it here |
| 428 | + iam_client: BaseClient = boto3.client("iam", endpoint_url=_AWS_SDK_ENDPOINT, region_name=_AWS_REGION) |
| 429 | + iam_role_name = "testRole" |
| 430 | + iam_role_arn = None |
| 431 | + trust_policy = { |
| 432 | + "Version": "2012-10-17", |
| 433 | + "Statement": [ |
| 434 | + { |
| 435 | + "Effect": "Allow", |
| 436 | + "Principal": { |
| 437 | + "Service": "states.amazonaws.com" |
| 438 | + }, |
| 439 | + "Action": "sts:AssumeRole" |
| 440 | + } |
| 441 | + ] |
| 442 | + } |
| 443 | + try: |
| 444 | + iam_response = iam_client.create_role( |
| 445 | + RoleName=iam_role_name, AssumeRolePolicyDocument=json.dumps(trust_policy) |
| 446 | + ) |
| 447 | + iam_client.attach_role_policy( |
| 448 | + RoleName=iam_role_name, |
| 449 | + PolicyArn="arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess" |
| 450 | + ) |
| 451 | + print(f"IAM Role '{iam_role_name}' create successfully.") |
| 452 | + iam_role_arn = iam_response["Role"]["Arn"] |
| 453 | + sfn_defintion = { |
| 454 | + "Comment": "A simple sequential workflow", |
| 455 | + "StartAt": "FirstState", |
| 456 | + "States": { |
| 457 | + "FirstState": { |
| 458 | + "Type": "Pass", |
| 459 | + "Result": "Hello, World!", |
| 460 | + "End": True |
| 461 | + } |
| 462 | + } |
| 463 | + } |
| 464 | + definition_string = json.dumps(sfn_defintion) |
| 465 | + sfn_client.create_state_machine( |
| 466 | + name=state_machine_name, |
| 467 | + definition=definition_string, |
| 468 | + roleArn=iam_role_arn |
| 469 | + ) |
| 470 | + sfn_client.create_activity(name=activity_name) |
| 471 | + except Exception as exception: |
| 472 | + print("Something went wrong with Step Functions setup", exception) |
| 473 | + |
386 | 474 | except Exception as exception: |
387 | 475 | print("Unexpected exception occurred", exception) |
388 | 476 |
|
@@ -412,6 +500,16 @@ def inject_200_success(**kwargs): |
412 | 500 | return http_response, response_body |
413 | 501 |
|
414 | 502 |
|
| 503 | +def inject_500_error(api_name: str, **kwargs): |
| 504 | + raise ClientError( |
| 505 | + { |
| 506 | + "Error": {"Code": "InternalServerError", "Message": "Internal Server Error"}, |
| 507 | + "ResponseMetadata": {"HTTPStatusCode": 500, "RequestId": "mock-request-id"}, |
| 508 | + }, |
| 509 | + api_name, |
| 510 | + ) |
| 511 | + |
| 512 | + |
415 | 513 | def main() -> None: |
416 | 514 | prepare_aws_server() |
417 | 515 | server_address: Tuple[str, int] = ("0.0.0.0", _PORT) |
|
0 commit comments