|
47 | 47 | _AWS_SECRET_ARN: str = "aws.secretsmanager.secret.arn"
|
48 | 48 | _AWS_STATE_MACHINE_ARN: str = "aws.stepfunctions.state_machine.arn"
|
49 | 49 | _AWS_ACTIVITY_ARN: str = "aws.stepfunctions.activity.arn"
|
| 50 | +_AWS_SNS_TOPIC_ARN: str = "aws.sns.topic.arn" |
50 | 51 |
|
51 | 52 |
|
52 | 53 | # pylint: disable=too-many-public-methods,too-many-lines
|
@@ -86,7 +87,7 @@ def set_up_dependency_container(cls):
|
86 | 87 | cls._local_stack: LocalStackContainer = (
|
87 | 88 | LocalStackContainer(image="localstack/localstack:3.5.0")
|
88 | 89 | .with_name("localstack")
|
89 |
| - .with_services("s3", "sqs", "dynamodb", "kinesis", "secretsmanager", "iam", "stepfunctions") |
| 90 | + .with_services("s3", "sqs", "dynamodb", "kinesis", "secretsmanager", "iam", "stepfunctions", "sns") |
90 | 91 | .with_env("DEFAULT_REGION", "us-west-2")
|
91 | 92 | .with_kwargs(network=NETWORK_NAME, networking_config=local_stack_networking_config)
|
92 | 93 | )
|
@@ -752,6 +753,45 @@ def test_secretsmanager_fault(self):
|
752 | 753 | span_name="Secrets Manager.GetSecretValue",
|
753 | 754 | )
|
754 | 755 |
|
| 756 | + def test_sns_get_topic_attributes(self): |
| 757 | + self.do_test_requests( |
| 758 | + "sns/gettopicattributes/test-topic", |
| 759 | + "GET", |
| 760 | + 200, |
| 761 | + 0, |
| 762 | + 0, |
| 763 | + rpc_service="SNS", |
| 764 | + remote_service="AWS::SNS", |
| 765 | + remote_operation="GetTopicAttributes", |
| 766 | + remote_resource_type="AWS::SNS::Topic", |
| 767 | + remote_resource_identifier="test-topic", |
| 768 | + cloudformation_primary_identifier="arn:aws:sns:us-west-2:000000000000:test-topic", |
| 769 | + request_specific_attributes={_AWS_SNS_TOPIC_ARN: "arn:aws:sns:us-west-2:000000000000:test-topic"}, |
| 770 | + span_name="SNS.GetTopicAttributes", |
| 771 | + ) |
| 772 | + |
| 773 | + # TODO: Add error case for sns - our test setup is not setting the http status code properly |
| 774 | + # for this resource |
| 775 | + |
| 776 | + def test_sns_fault(self): |
| 777 | + self.do_test_requests( |
| 778 | + "sns/fault", |
| 779 | + "GET", |
| 780 | + 500, |
| 781 | + 0, |
| 782 | + 1, |
| 783 | + rpc_service="SNS", |
| 784 | + remote_service="AWS::SNS", |
| 785 | + remote_operation="GetTopicAttributes", |
| 786 | + remote_resource_type="AWS::SNS::Topic", |
| 787 | + remote_resource_identifier="invalid-topic", |
| 788 | + cloudformation_primary_identifier="arn:aws:sns:us-west-2:000000000000:invalid-topic", |
| 789 | + request_specific_attributes={ |
| 790 | + _AWS_SNS_TOPIC_ARN: "arn:aws:sns:us-west-2:000000000000:invalid-topic", |
| 791 | + }, |
| 792 | + span_name="SNS.GetTopicAttributes", |
| 793 | + ) |
| 794 | + |
755 | 795 | def test_stepfunctions_describe_state_machine(self):
|
756 | 796 | self.do_test_requests(
|
757 | 797 | "stepfunctions/describestatemachine/my-state-machine",
|
|
0 commit comments