Skip to content

Commit db24d50

Browse files
authored
feat: connectors (#241)
1 parent 6a9027b commit db24d50

File tree

175 files changed

+17517
-24
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

175 files changed

+17517
-24
lines changed

MANIFEST.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ include requirements/dev.txt
44
recursive-include samtranslator/validator/sam_schema *.json
55
include samtranslator/policy_templates_data/policy_templates.json
66
include samtranslator/policy_templates_data/schema.json
7+
include samtranslator/model/connector_profiles/profiles.json
78
include README.md
89
include THIRD_PARTY_LICENSES
910

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
from time import sleep
2+
from parameterized import parameterized
3+
from integration.conftest import clean_bucket
4+
from integration.helpers.base_test import S3_BUCKET_PREFIX, BaseTest
5+
from integration.helpers.resource import generate_suffix
6+
7+
8+
class TestConnectors(BaseTest):
9+
@parameterized.expand(
10+
[
11+
("combination/connector_function_to_function",),
12+
("combination/connector_restapi_to_function",),
13+
("combination/connector_httpapi_to_function",),
14+
("combination/connector_function_to_bucket_read",),
15+
("combination/connector_function_to_bucket_write",),
16+
("combination/connector_function_to_table_read",),
17+
("combination/connector_function_to_table_write",),
18+
("combination/connector_function_to_sfn_read",),
19+
("combination/connector_function_to_sfn_write",),
20+
("combination/connector_function_to_queue_write",),
21+
("combination/connector_function_to_queue_read",),
22+
("combination/connector_function_to_topic_write",),
23+
("combination/connector_function_to_eventbus_write",),
24+
("combination/connector_topic_to_queue_write",),
25+
("combination/connector_event_rule_to_sqs_write",),
26+
("combination/connector_event_rule_to_sns_write",),
27+
("combination/connector_event_rule_to_sfn_write",),
28+
("combination/connector_event_rule_to_eb_default_write",),
29+
("combination/connector_event_rule_to_eb_custom_write",),
30+
("combination/connector_event_rule_to_lambda_write",),
31+
("combination/connector_sqs_to_function",),
32+
("combination/connector_sns_to_function_write",),
33+
("combination/connector_table_to_function_read",),
34+
]
35+
)
36+
def test_connector_by_invoking_a_function(self, template_file_path):
37+
self.skip_using_service_detector(template_file_path)
38+
self.create_and_verify_stack(template_file_path)
39+
40+
lambda_function_name = self.get_physical_id_by_logical_id("TriggerFunction")
41+
lambda_client = self.client_provider.lambda_client
42+
s3_client = self.client_provider.s3_client
43+
44+
request_params = {
45+
"FunctionName": lambda_function_name,
46+
"InvocationType": "RequestResponse",
47+
"Payload": "{}",
48+
}
49+
response = lambda_client.invoke(**request_params)
50+
self.assertEqual(response.get("StatusCode"), 200)
51+
self.assertEqual(response.get("FunctionError"), None)
52+
53+
# Some tests will create items in S3 Bucket, which result in stack DELETE_FAILED state
54+
# manually empty the bucket to allow stacks to be deleted successfully.
55+
bucket_name = self.get_physical_id_by_type("AWS::S3::Bucket")
56+
if bucket_name:
57+
clean_bucket(bucket_name, s3_client)
58+
59+
@parameterized.expand(
60+
[
61+
("combination/connector_sfn_to_table_read",),
62+
("combination/connector_sfn_to_table_write",),
63+
("combination/connector_sfn_to_sqs_write",),
64+
("combination/connector_sfn_to_sns_write",),
65+
("combination/connector_sfn_to_function_write",),
66+
("combination/connector_sfn_to_bucket_write",),
67+
("combination/connector_sfn_to_bucket_read",),
68+
("combination/connector_sfn_to_sfn_async",),
69+
("combination/connector_sfn_to_eb_default_write",),
70+
("combination/connector_sfn_to_eb_custom_write",),
71+
]
72+
)
73+
def test_connector_by_sync_execute_an_state_machine(self, template_file_path):
74+
self.skip_using_service_detector(template_file_path)
75+
self.create_and_verify_stack(template_file_path)
76+
77+
state_machine_arn = self.get_physical_id_by_logical_id("TriggerStateMachine")
78+
sfn_client = self.client_provider.sfn_client
79+
80+
response = sfn_client.start_sync_execution(
81+
stateMachineArn=state_machine_arn,
82+
)
83+
# Without permission, it will be "FAILED"
84+
self.assertEqual(response.get("status"), "SUCCEEDED")
85+
86+
@parameterized.expand(
87+
[
88+
("combination/connector_sfn_to_sfn_sync",),
89+
]
90+
)
91+
def test_connector_by_async_execute_an_state_machine(self, template_file_path):
92+
self.skip_using_service_detector(template_file_path)
93+
self.create_and_verify_stack(template_file_path)
94+
95+
state_machine_arn = self.get_physical_id_by_logical_id("TriggerStateMachine")
96+
sfn_client = self.client_provider.sfn_client
97+
98+
response = sfn_client.start_execution(
99+
stateMachineArn=state_machine_arn,
100+
)
101+
execution_arn = response["executionArn"]
102+
103+
status = None
104+
wait_tries = 5
105+
while wait_tries > 0:
106+
response = sfn_client.describe_execution(executionArn=execution_arn)
107+
status = response["status"]
108+
if status == "RUNNING":
109+
wait_tries -= 1
110+
sleep(5)
111+
continue
112+
else:
113+
break
114+
115+
# Without permission, it will be "FAILED"
116+
self.assertEqual(status, "SUCCEEDED")
117+
118+
@parameterized.expand(
119+
[
120+
("combination/connector_bucket_to_function_write",),
121+
]
122+
)
123+
def test_connector_by_execute_a_s3_bucket(self, template_file_path):
124+
self.skip_using_service_detector(template_file_path)
125+
bucket_name = S3_BUCKET_PREFIX + "connector" + generate_suffix()
126+
self.create_and_verify_stack(
127+
template_file_path, [{"ParameterKey": "BucketName", "ParameterValue": bucket_name}]
128+
)
129+
130+
lambda_function_name = self.get_physical_id_by_logical_id("TriggerFunction")
131+
lambda_client = self.client_provider.lambda_client
132+
s3_client = self.client_provider.s3_client
133+
134+
request_params = {
135+
"FunctionName": lambda_function_name,
136+
"InvocationType": "RequestResponse",
137+
"Payload": "{}",
138+
}
139+
response = lambda_client.invoke(**request_params)
140+
self.assertEqual(response.get("StatusCode"), 200)
141+
self.assertEqual(response.get("FunctionError"), None)
142+
143+
# Some tests will create items in S3 Bucket, which result in stack DELETE_FAILED state
144+
# manually empty the bucket to allow stacks to be deleted successfully.
145+
bucket_name = self.get_physical_id_by_type("AWS::S3::Bucket")
146+
clean_bucket(bucket_name, s3_client)

integration/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def _get_all_buckets():
3535
return s3.buckets.all()
3636

3737

38-
def _clean_bucket(s3_bucket_name, s3_client):
38+
def clean_bucket(s3_bucket_name, s3_client):
3939
"""
4040
Empties and deletes the bucket used for the tests
4141
"""
@@ -60,7 +60,7 @@ def clean_all_integ_buckets():
6060
s3_client = ClientProvider().s3_client
6161
for bucket in buckets:
6262
if bucket.name.startswith(S3_BUCKET_PREFIX):
63-
_clean_bucket(bucket.name, s3_client)
63+
clean_bucket(bucket.name, s3_client)
6464

6565

6666
@pytest.fixture()

integration/helpers/base_test.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import logging
23
import os
34
import requests
@@ -12,7 +13,12 @@
1213
from integration.helpers.deployer.utils.retry import retry_with_exponential_backoff_and_jitter
1314
from integration.helpers.exception import StatusCodeError
1415
from integration.helpers.request_utils import RequestUtils
15-
from integration.helpers.resource import generate_suffix, create_bucket, verify_stack_resources
16+
from integration.helpers.resource import (
17+
current_region_does_not_support,
18+
detect_services,
19+
generate_suffix,
20+
verify_stack_resources,
21+
)
1622
from integration.helpers.s3_uploader import S3Uploader
1723
from integration.helpers.yaml_utils import dump_yaml, load_yaml
1824
from integration.helpers.resource import read_test_config_file
@@ -151,6 +157,24 @@ def create_and_verify_stack(self, file_path, parameters=None, s3_uploader=None):
151157
self.expected_resource_path = str(Path(self.expected_dir, folder, file_name + ".json"))
152158
self.verify_stack()
153159

160+
def skip_using_service_detector(self, file_path):
161+
"""
162+
Skips the test if it cannot pass the test of
163+
current_region_does_not_support() with detected services.
164+
"""
165+
folder, file_name = file_path.split("/")
166+
167+
input_file_path = str(Path(self.template_dir, folder, file_name + ".yaml"))
168+
expected_resource_path = str(Path(self.expected_dir, folder, file_name + ".json"))
169+
170+
template_dict = yaml_parse(open(input_file_path).read())
171+
cfn_resource_types = {item["ResourceType"] for item in json.load(open(expected_resource_path))}
172+
173+
detected_services = detect_services(template_dict, cfn_resource_types)
174+
175+
if current_region_does_not_support(detected_services):
176+
self.skipTest(f"Some/All of {detected_services} are not supported in this testing region")
177+
154178
def update_stack(self, parameters=None, file_path=None):
155179
"""
156180
Updates the Cloud Formation stack

integration/helpers/resource.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
import re
33
import random
44
import string # pylint: disable=deprecated-module
5+
from typing import Any, Callable, Dict, List, Set
56

7+
from integration.config.service_names import DYNAMO_DB, HTTP_API, S3_EVENTS, SQS, STATE_MACHINE_INLINE_DEFINITION
68
from integration.helpers.yaml_utils import load_yaml
79

810
try:
@@ -183,6 +185,58 @@ def current_region_does_not_support(services):
183185
return bool(set(services).intersection(set(region_exclude_services["regions"][region])))
184186

185187

188+
def _resource_using_inline_statemachine_definition(resource: Dict[str, Any]) -> bool:
189+
resource_type = resource.get("Type")
190+
properties = resource.get("Properties", {})
191+
if resource_type == "AWS::StepFunctions::StateMachine" and properties.get("DefinitionString"):
192+
return True
193+
if resource_type == "AWS::Serverless::StateMachine" and properties.get("Definition"):
194+
return True
195+
return False
196+
197+
198+
def _resource_using_s3_events(resource: Dict[str, Any]) -> bool:
199+
resource_type = resource.get("Type")
200+
properties = resource.get("Properties", {})
201+
return resource_type == "AWS::S3::Bucket" and properties.get("NotificationConfiguration")
202+
203+
204+
SERVICE_DETECTORS: Dict[str, Callable[[Dict[str, Any], Set[str]], bool]] = {
205+
HTTP_API: lambda template_dict, cfn_resource_types: "AWS::ApiGatewayV2::Api" in cfn_resource_types,
206+
SQS: lambda template_dict, cfn_resource_types: "AWS::SQS::Queue" in cfn_resource_types,
207+
DYNAMO_DB: lambda template_dict, cfn_resource_types: "AWS::DynamoDB::Table" in cfn_resource_types,
208+
STATE_MACHINE_INLINE_DEFINITION: lambda template_dict, cfn_resource_types: any(
209+
_resource_using_inline_statemachine_definition(resource)
210+
for resource in template_dict.get("Resources", {}).values()
211+
),
212+
S3_EVENTS: lambda template_dict, cfn_resource_types: any(
213+
_resource_using_s3_events(resource) for resource in template_dict.get("Resources", {}).values()
214+
),
215+
}
216+
217+
218+
def detect_services(template_dict: Dict[str, Any], cfn_resource_types: Set[str]):
219+
"""
220+
Detect which services are used in the template.
221+
222+
TODO: Only used for connector integ testing for now.
223+
this is not cannot detect all the services. Adding more if needed.
224+
225+
Parameters
226+
----------
227+
template_dict: Dict[str, Any]
228+
the template dict
229+
cfn_resource_types : Set[str]
230+
the transformed cfn resource types to be created
231+
232+
Returns
233+
-------
234+
List[str]
235+
List of services in integration/config/service_names.py
236+
"""
237+
return [service for service, detector in SERVICE_DETECTORS.items() if detector(template_dict, cfn_resource_types)]
238+
239+
186240
def current_region_not_included(services):
187241
"""
188242
Opposite of current_region_does_not_support.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
[
2+
{
3+
"LogicalResourceId": "TriggerFunctionRole",
4+
"ResourceType": "AWS::IAM::Role"
5+
},
6+
{
7+
"LogicalResourceId": "TriggerFunction",
8+
"ResourceType": "AWS::Lambda::Function"
9+
},
10+
{
11+
"LogicalResourceId": "InvokedFunctionRole",
12+
"ResourceType": "AWS::IAM::Role"
13+
},
14+
{
15+
"LogicalResourceId": "InvokedFunction",
16+
"ResourceType": "AWS::Lambda::Function"
17+
},
18+
{
19+
"LogicalResourceId": "TriggerBucket",
20+
"ResourceType": "AWS::S3::Bucket"
21+
},
22+
{
23+
"LogicalResourceId": "VerificationQueue",
24+
"ResourceType": "AWS::SQS::Queue"
25+
},
26+
{
27+
"LogicalResourceId": "MyConnectorWriteLambdaPermission",
28+
"ResourceType": "AWS::Lambda::Permission"
29+
},
30+
{
31+
"LogicalResourceId": "ConnectorNotTestedPolicy",
32+
"ResourceType": "AWS::IAM::ManagedPolicy"
33+
}
34+
]
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
[
2+
{
3+
"LogicalResourceId": "ConnectorNotBeingTestedQueuePolicy",
4+
"ResourceType": "AWS::SQS::QueuePolicy"
5+
},
6+
{
7+
"LogicalResourceId": "CustomEventBus",
8+
"ResourceType": "AWS::Events::EventBus"
9+
},
10+
{
11+
"LogicalResourceId": "EBRole",
12+
"ResourceType": "AWS::IAM::Role"
13+
},
14+
{
15+
"LogicalResourceId": "EventRule",
16+
"ResourceType": "AWS::Events::Rule"
17+
},
18+
{
19+
"LogicalResourceId": "MyConnectorPolicy",
20+
"ResourceType": "AWS::IAM::ManagedPolicy"
21+
},
22+
{
23+
"LogicalResourceId": "TriggerFunction",
24+
"ResourceType": "AWS::Lambda::Function"
25+
},
26+
{
27+
"LogicalResourceId": "TriggerFunctionRole",
28+
"ResourceType": "AWS::IAM::Role"
29+
},
30+
{
31+
"LogicalResourceId": "VerificationEventRule",
32+
"ResourceType": "AWS::Events::Rule"
33+
},
34+
{
35+
"LogicalResourceId": "VerificationQueue",
36+
"ResourceType": "AWS::SQS::Queue"
37+
}
38+
]

0 commit comments

Comments
 (0)