diff --git a/integration/combination/test_connectors.py b/integration/combination/test_connectors.py index c4965dd48c..79d02e0e43 100644 --- a/integration/combination/test_connectors.py +++ b/integration/combination/test_connectors.py @@ -1,19 +1,31 @@ +import logging from time import sleep from unittest import SkipTest from unittest.case import skipIf from parameterized import parameterized -from tenacity import retry, retry_if_exception, stop_after_attempt +from tenacity import ( + after_log, + retry, + retry_if_exception, + stop_after_attempt, + wait_exponential, + wait_random, +) from integration.config.service_names import SCHEDULE_EVENT from integration.conftest import clean_bucket from integration.helpers.base_test import S3_BUCKET_PREFIX, BaseTest, nonblocking from integration.helpers.resource import current_region_does_not_support, generate_suffix -retry_once = retry( - stop=stop_after_attempt(2), - # unittest raises SkipTest for skipping tests +LOG = logging.getLogger(__name__) + +retry_with_backoff = retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=4, max=16) + wait_random(0, 1), retry=retry_if_exception(lambda e: not isinstance(e, SkipTest)), + after=after_log(LOG, logging.WARNING), + reraise=True, ) @@ -31,19 +43,18 @@ class TestConnectorsWithEventBus(BaseTest): ("combination/connector_function_to_eventbus_write",), ] ) - @retry_once def test_connector_by_invoking_a_function_with_eventbus(self, template_file_path): self.create_and_verify_stack(template_file_path) lambda_function_name = self.get_physical_id_by_logical_id("TriggerFunction") - lambda_client = self.client_provider.lambda_client - - request_params = { - "FunctionName": lambda_function_name, - "InvocationType": "RequestResponse", - "Payload": "{}", - } - response = lambda_client.invoke(**request_params) + self.verify_lambda_invocation(lambda_function_name) + + @retry_with_backoff + def verify_lambda_invocation(self, lambda_function_name): + """Verify Lambda function invocation with retry logic.""" + response = self.client_provider.lambda_client.invoke( + FunctionName=lambda_function_name, InvocationType="RequestResponse", Payload="{}" + ) self.assertEqual(response.get("StatusCode"), 200) self.assertEqual(response.get("FunctionError"), None) @@ -57,6 +68,15 @@ def tearDown(self): clean_bucket(bucket_name, self.client_provider.s3_client) super().tearDown() + @retry_with_backoff + def verify_lambda_invocation(self, lambda_function_name): + """Verify Lambda function invocation with retry logic.""" + response = self.client_provider.lambda_client.invoke( + FunctionName=lambda_function_name, InvocationType="RequestResponse", Payload="{}" + ) + self.assertEqual(response.get("StatusCode"), 200) + self.assertEqual(response.get("FunctionError"), None) + @parameterized.expand( [ ("combination/connector_appsync_api_to_lambda",), @@ -88,29 +108,18 @@ def tearDown(self): ("combination/embedded_connector",), ] ) - @retry_once def test_connector_by_invoking_a_function(self, template_file_path): self.skip_using_service_detector(template_file_path) self.create_and_verify_stack(template_file_path) lambda_function_name = self.get_physical_id_by_logical_id("TriggerFunction") - lambda_client = self.client_provider.lambda_client - - request_params = { - "FunctionName": lambda_function_name, - "InvocationType": "RequestResponse", - "Payload": "{}", - } - response = lambda_client.invoke(**request_params) - self.assertEqual(response.get("StatusCode"), 200) - self.assertEqual(response.get("FunctionError"), None) + self.verify_lambda_invocation(lambda_function_name) @parameterized.expand( [ ("combination/connector_function_to_location_place_index",), ] ) - @retry_once def test_connector_by_invoking_a_function_with_parameters(self, template_file_path): parameters = [] parameters.append(self.generate_parameter("IndexName", f"PlaceIndex-{generate_suffix()}")) @@ -118,16 +127,7 @@ def test_connector_by_invoking_a_function_with_parameters(self, template_file_pa self.create_and_verify_stack(template_file_path, parameters) lambda_function_name = self.get_physical_id_by_logical_id("TriggerFunction") - lambda_client = self.client_provider.lambda_client - - request_params = { - "FunctionName": lambda_function_name, - "InvocationType": "RequestResponse", - "Payload": "{}", - } - response = lambda_client.invoke(**request_params) - self.assertEqual(response.get("StatusCode"), 200) - self.assertEqual(response.get("FunctionError"), None) + self.verify_lambda_invocation(lambda_function_name) @parameterized.expand( [ @@ -144,17 +144,17 @@ def test_connector_by_invoking_a_function_with_parameters(self, template_file_pa ("combination/connector_sfn_to_eb_custom_write",), ] ) - @retry_once def test_connector_by_sync_execute_an_state_machine(self, template_file_path): self.skip_using_service_detector(template_file_path) self.create_and_verify_stack(template_file_path) state_machine_arn = self.get_physical_id_by_logical_id("TriggerStateMachine") - sfn_client = self.client_provider.sfn_client + self.verify_sync_step_function_execution(state_machine_arn) - response = sfn_client.start_sync_execution( - stateMachineArn=state_machine_arn, - ) + @retry_with_backoff + def verify_sync_step_function_execution(self, state_machine_arn): + """Verify synchronous Step Function execution with retry logic.""" + response = self.client_provider.sfn_client.start_sync_execution(stateMachineArn=state_machine_arn) # Without permission, it will be "FAILED" self.assertEqual(response.get("status"), "SUCCEEDED") @@ -163,17 +163,18 @@ def test_connector_by_sync_execute_an_state_machine(self, template_file_path): ("combination/connector_sfn_to_sfn_sync",), ] ) - @retry_once def test_connector_by_async_execute_an_state_machine(self, template_file_path): self.skip_using_service_detector(template_file_path) self.create_and_verify_stack(template_file_path) state_machine_arn = self.get_physical_id_by_logical_id("TriggerStateMachine") - sfn_client = self.client_provider.sfn_client + self.verify_async_step_function_execution(state_machine_arn) - response = sfn_client.start_execution( - stateMachineArn=state_machine_arn, - ) + @retry_with_backoff + def verify_async_step_function_execution(self, state_machine_arn): + """Verify asynchronous Step Function execution with retry logic.""" + sfn_client = self.client_provider.sfn_client + response = sfn_client.start_execution(stateMachineArn=state_machine_arn) execution_arn = response["executionArn"] status = None @@ -196,7 +197,6 @@ def test_connector_by_async_execute_an_state_machine(self, template_file_path): ("combination/connector_bucket_to_function_write",), ] ) - @retry_once def test_connector_by_execute_a_s3_bucket(self, template_file_path): self.skip_using_service_detector(template_file_path) bucket_name = S3_BUCKET_PREFIX + "connector" + generate_suffix() @@ -205,13 +205,4 @@ def test_connector_by_execute_a_s3_bucket(self, template_file_path): ) lambda_function_name = self.get_physical_id_by_logical_id("TriggerFunction") - lambda_client = self.client_provider.lambda_client - - request_params = { - "FunctionName": lambda_function_name, - "InvocationType": "RequestResponse", - "Payload": "{}", - } - response = lambda_client.invoke(**request_params) - self.assertEqual(response.get("StatusCode"), 200) - self.assertEqual(response.get("FunctionError"), None) + self.verify_lambda_invocation(lambda_function_name) diff --git a/integration/combination/test_connectors_event_rule_eb.py b/integration/combination/test_connectors_event_rule_eb.py index 206b767e52..ce60d88bff 100644 --- a/integration/combination/test_connectors_event_rule_eb.py +++ b/integration/combination/test_connectors_event_rule_eb.py @@ -1,18 +1,30 @@ +import logging from unittest import SkipTest from unittest.case import skipIf from parameterized import parameterized -from tenacity import retry, retry_if_exception, stop_after_attempt +from tenacity import ( + after_log, + retry, + retry_if_exception, + stop_after_attempt, + wait_exponential, + wait_random, +) from integration.config.service_names import EVENT_RULE_WITH_EVENT_BUS from integration.conftest import clean_bucket from integration.helpers.base_test import BaseTest from integration.helpers.resource import current_region_does_not_support -retry_once = retry( - stop=stop_after_attempt(2), - # unittest raises SkipTest for skipping tests +LOG = logging.getLogger(__name__) + +retry_with_backoff = retry( + stop=stop_after_attempt(5), + wait=wait_exponential(multiplier=1, min=4, max=16) + wait_random(0, 1), retry=retry_if_exception(lambda e: not isinstance(e, SkipTest)), + after=after_log(LOG, logging.WARNING), + reraise=True, ) @@ -35,19 +47,18 @@ def tearDown(self): ("combination/connector_event_rule_to_eb_custom_write",), ] ) - @retry_once def test_connector_event_rule_eb_by_invoking_a_function(self, template_file_path): self.skip_using_service_detector(template_file_path) self.create_and_verify_stack(template_file_path) lambda_function_name = self.get_physical_id_by_logical_id("TriggerFunction") - lambda_client = self.client_provider.lambda_client - - request_params = { - "FunctionName": lambda_function_name, - "InvocationType": "RequestResponse", - "Payload": "{}", - } - response = lambda_client.invoke(**request_params) + self.verify_lambda_invocation(lambda_function_name) + + @retry_with_backoff + def verify_lambda_invocation(self, lambda_function_name): + """Verify Lambda function invocation with retry logic.""" + response = self.client_provider.lambda_client.invoke( + FunctionName=lambda_function_name, InvocationType="RequestResponse", Payload="{}" + ) self.assertEqual(response.get("StatusCode"), 200) self.assertEqual(response.get("FunctionError"), None)