Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 47 additions & 56 deletions integration/combination/test_connectors.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
import logging
from time import sleep
from unittest import SkipTest
from unittest.case import skipIf

from parameterized import parameterized
from tenacity import retry, retry_if_exception, stop_after_attempt
from tenacity import (
after_log,
retry,
retry_if_exception,
stop_after_attempt,
wait_exponential,
wait_random,
)

from integration.config.service_names import SCHEDULE_EVENT
from integration.conftest import clean_bucket
from integration.helpers.base_test import S3_BUCKET_PREFIX, BaseTest, nonblocking
from integration.helpers.resource import current_region_does_not_support, generate_suffix

retry_once = retry(
stop=stop_after_attempt(2),
# unittest raises SkipTest for skipping tests
LOG = logging.getLogger(__name__)

retry_with_backoff = retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=16) + wait_random(0, 1),
retry=retry_if_exception(lambda e: not isinstance(e, SkipTest)),
after=after_log(LOG, logging.WARNING),
reraise=True,
)


Expand All @@ -31,19 +43,18 @@ class TestConnectorsWithEventBus(BaseTest):
("combination/connector_function_to_eventbus_write",),
]
)
@retry_once
def test_connector_by_invoking_a_function_with_eventbus(self, template_file_path):
self.create_and_verify_stack(template_file_path)

lambda_function_name = self.get_physical_id_by_logical_id("TriggerFunction")
lambda_client = self.client_provider.lambda_client

request_params = {
"FunctionName": lambda_function_name,
"InvocationType": "RequestResponse",
"Payload": "{}",
}
response = lambda_client.invoke(**request_params)
self.verify_lambda_invocation(lambda_function_name)

@retry_with_backoff
def verify_lambda_invocation(self, lambda_function_name):
"""Verify Lambda function invocation with retry logic."""
response = self.client_provider.lambda_client.invoke(
FunctionName=lambda_function_name, InvocationType="RequestResponse", Payload="{}"
)
self.assertEqual(response.get("StatusCode"), 200)
self.assertEqual(response.get("FunctionError"), None)

Expand All @@ -57,6 +68,15 @@ def tearDown(self):
clean_bucket(bucket_name, self.client_provider.s3_client)
super().tearDown()

@retry_with_backoff
def verify_lambda_invocation(self, lambda_function_name):
"""Verify Lambda function invocation with retry logic."""
response = self.client_provider.lambda_client.invoke(
FunctionName=lambda_function_name, InvocationType="RequestResponse", Payload="{}"
)
self.assertEqual(response.get("StatusCode"), 200)
self.assertEqual(response.get("FunctionError"), None)

@parameterized.expand(
[
("combination/connector_appsync_api_to_lambda",),
Expand Down Expand Up @@ -88,46 +108,26 @@ def tearDown(self):
("combination/embedded_connector",),
]
)
@retry_once
def test_connector_by_invoking_a_function(self, template_file_path):
self.skip_using_service_detector(template_file_path)
self.create_and_verify_stack(template_file_path)

lambda_function_name = self.get_physical_id_by_logical_id("TriggerFunction")
lambda_client = self.client_provider.lambda_client

request_params = {
"FunctionName": lambda_function_name,
"InvocationType": "RequestResponse",
"Payload": "{}",
}
response = lambda_client.invoke(**request_params)
self.assertEqual(response.get("StatusCode"), 200)
self.assertEqual(response.get("FunctionError"), None)
self.verify_lambda_invocation(lambda_function_name)

@parameterized.expand(
[
("combination/connector_function_to_location_place_index",),
]
)
@retry_once
def test_connector_by_invoking_a_function_with_parameters(self, template_file_path):
parameters = []
parameters.append(self.generate_parameter("IndexName", f"PlaceIndex-{generate_suffix()}"))
self.skip_using_service_detector(template_file_path)
self.create_and_verify_stack(template_file_path, parameters)

lambda_function_name = self.get_physical_id_by_logical_id("TriggerFunction")
lambda_client = self.client_provider.lambda_client

request_params = {
"FunctionName": lambda_function_name,
"InvocationType": "RequestResponse",
"Payload": "{}",
}
response = lambda_client.invoke(**request_params)
self.assertEqual(response.get("StatusCode"), 200)
self.assertEqual(response.get("FunctionError"), None)
self.verify_lambda_invocation(lambda_function_name)

@parameterized.expand(
[
Expand All @@ -144,17 +144,17 @@ def test_connector_by_invoking_a_function_with_parameters(self, template_file_pa
("combination/connector_sfn_to_eb_custom_write",),
]
)
@retry_once
def test_connector_by_sync_execute_an_state_machine(self, template_file_path):
self.skip_using_service_detector(template_file_path)
self.create_and_verify_stack(template_file_path)

state_machine_arn = self.get_physical_id_by_logical_id("TriggerStateMachine")
sfn_client = self.client_provider.sfn_client
self.verify_sync_step_function_execution(state_machine_arn)

response = sfn_client.start_sync_execution(
stateMachineArn=state_machine_arn,
)
@retry_with_backoff
def verify_sync_step_function_execution(self, state_machine_arn):
"""Verify synchronous Step Function execution with retry logic."""
response = self.client_provider.sfn_client.start_sync_execution(stateMachineArn=state_machine_arn)
# Without permission, it will be "FAILED"
self.assertEqual(response.get("status"), "SUCCEEDED")

Expand All @@ -163,17 +163,18 @@ def test_connector_by_sync_execute_an_state_machine(self, template_file_path):
("combination/connector_sfn_to_sfn_sync",),
]
)
@retry_once
def test_connector_by_async_execute_an_state_machine(self, template_file_path):
self.skip_using_service_detector(template_file_path)
self.create_and_verify_stack(template_file_path)

state_machine_arn = self.get_physical_id_by_logical_id("TriggerStateMachine")
sfn_client = self.client_provider.sfn_client
self.verify_async_step_function_execution(state_machine_arn)

response = sfn_client.start_execution(
stateMachineArn=state_machine_arn,
)
@retry_with_backoff
def verify_async_step_function_execution(self, state_machine_arn):
"""Verify asynchronous Step Function execution with retry logic."""
sfn_client = self.client_provider.sfn_client
response = sfn_client.start_execution(stateMachineArn=state_machine_arn)
execution_arn = response["executionArn"]

status = None
Expand All @@ -196,7 +197,6 @@ def test_connector_by_async_execute_an_state_machine(self, template_file_path):
("combination/connector_bucket_to_function_write",),
]
)
@retry_once
def test_connector_by_execute_a_s3_bucket(self, template_file_path):
self.skip_using_service_detector(template_file_path)
bucket_name = S3_BUCKET_PREFIX + "connector" + generate_suffix()
Expand All @@ -205,13 +205,4 @@ def test_connector_by_execute_a_s3_bucket(self, template_file_path):
)

lambda_function_name = self.get_physical_id_by_logical_id("TriggerFunction")
lambda_client = self.client_provider.lambda_client

request_params = {
"FunctionName": lambda_function_name,
"InvocationType": "RequestResponse",
"Payload": "{}",
}
response = lambda_client.invoke(**request_params)
self.assertEqual(response.get("StatusCode"), 200)
self.assertEqual(response.get("FunctionError"), None)
self.verify_lambda_invocation(lambda_function_name)
37 changes: 24 additions & 13 deletions integration/combination/test_connectors_event_rule_eb.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,30 @@
import logging
from unittest import SkipTest
from unittest.case import skipIf

from parameterized import parameterized
from tenacity import retry, retry_if_exception, stop_after_attempt
from tenacity import (
after_log,
retry,
retry_if_exception,
stop_after_attempt,
wait_exponential,
wait_random,
)

from integration.config.service_names import EVENT_RULE_WITH_EVENT_BUS
from integration.conftest import clean_bucket
from integration.helpers.base_test import BaseTest
from integration.helpers.resource import current_region_does_not_support

retry_once = retry(
stop=stop_after_attempt(2),
# unittest raises SkipTest for skipping tests
LOG = logging.getLogger(__name__)

retry_with_backoff = retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=16) + wait_random(0, 1),
retry=retry_if_exception(lambda e: not isinstance(e, SkipTest)),
after=after_log(LOG, logging.WARNING),
reraise=True,
)


Expand All @@ -35,19 +47,18 @@ def tearDown(self):
("combination/connector_event_rule_to_eb_custom_write",),
]
)
@retry_once
def test_connector_event_rule_eb_by_invoking_a_function(self, template_file_path):
self.skip_using_service_detector(template_file_path)
self.create_and_verify_stack(template_file_path)

lambda_function_name = self.get_physical_id_by_logical_id("TriggerFunction")
lambda_client = self.client_provider.lambda_client

request_params = {
"FunctionName": lambda_function_name,
"InvocationType": "RequestResponse",
"Payload": "{}",
}
response = lambda_client.invoke(**request_params)
self.verify_lambda_invocation(lambda_function_name)

@retry_with_backoff
def verify_lambda_invocation(self, lambda_function_name):
"""Verify Lambda function invocation with retry logic."""
response = self.client_provider.lambda_client.invoke(
FunctionName=lambda_function_name, InvocationType="RequestResponse", Payload="{}"
)
self.assertEqual(response.get("StatusCode"), 200)
self.assertEqual(response.get("FunctionError"), None)