1+ import logging
12from time import sleep
23from unittest import SkipTest
34from unittest .case import skipIf
45
56from parameterized import parameterized
6- from tenacity import retry , retry_if_exception , stop_after_attempt
7+ from tenacity import (
8+ after_log ,
9+ retry ,
10+ retry_if_exception ,
11+ stop_after_attempt ,
12+ wait_exponential ,
13+ wait_random ,
14+ )
715
816from integration .config .service_names import SCHEDULE_EVENT
917from integration .conftest import clean_bucket
1018from integration .helpers .base_test import S3_BUCKET_PREFIX , BaseTest , nonblocking
1119from integration .helpers .resource import current_region_does_not_support , generate_suffix
1220
13- retry_once = retry (
14- stop = stop_after_attempt (2 ),
15- # unittest raises SkipTest for skipping tests
21+ LOG = logging .getLogger (__name__ )
22+
23+ retry_with_backoff = retry (
24+ stop = stop_after_attempt (5 ),
25+ wait = wait_exponential (multiplier = 1 , min = 4 , max = 16 ) + wait_random (0 , 1 ),
1626 retry = retry_if_exception (lambda e : not isinstance (e , SkipTest )),
27+ after = after_log (LOG , logging .WARNING ),
28+ reraise = True ,
1729)
1830
1931
@@ -31,19 +43,20 @@ class TestConnectorsWithEventBus(BaseTest):
3143 ("combination/connector_function_to_eventbus_write" ,),
3244 ]
3345 )
34- @retry_once
3546 def test_connector_by_invoking_a_function_with_eventbus (self , template_file_path ):
3647 self .create_and_verify_stack (template_file_path )
3748
3849 lambda_function_name = self .get_physical_id_by_logical_id ("TriggerFunction" )
39- lambda_client = self .client_provider .lambda_client
40-
41- request_params = {
42- "FunctionName" : lambda_function_name ,
43- "InvocationType" : "RequestResponse" ,
44- "Payload" : "{}" ,
45- }
46- response = lambda_client .invoke (** request_params )
50+ self .verify_lambda_invocation (lambda_function_name )
51+
52+ @retry_with_backoff
53+ def verify_lambda_invocation (self , lambda_function_name ):
54+ """Verify Lambda function invocation with retry logic."""
55+ response = self .client_provider .lambda_client .invoke (
56+ FunctionName = lambda_function_name ,
57+ InvocationType = "RequestResponse" ,
58+ Payload = "{}"
59+ )
4760 self .assertEqual (response .get ("StatusCode" ), 200 )
4861 self .assertEqual (response .get ("FunctionError" ), None )
4962
@@ -57,6 +70,17 @@ def tearDown(self):
5770 clean_bucket (bucket_name , self .client_provider .s3_client )
5871 super ().tearDown ()
5972
73+ @retry_with_backoff
74+ def verify_lambda_invocation (self , lambda_function_name ):
75+ """Verify Lambda function invocation with retry logic."""
76+ response = self .client_provider .lambda_client .invoke (
77+ FunctionName = lambda_function_name ,
78+ InvocationType = "RequestResponse" ,
79+ Payload = "{}"
80+ )
81+ self .assertEqual (response .get ("StatusCode" ), 200 )
82+ self .assertEqual (response .get ("FunctionError" ), None )
83+
6084 @parameterized .expand (
6185 [
6286 ("combination/connector_appsync_api_to_lambda" ,),
@@ -88,46 +112,26 @@ def tearDown(self):
88112 ("combination/embedded_connector" ,),
89113 ]
90114 )
91- @retry_once
92115 def test_connector_by_invoking_a_function (self , template_file_path ):
93116 self .skip_using_service_detector (template_file_path )
94117 self .create_and_verify_stack (template_file_path )
95118
96119 lambda_function_name = self .get_physical_id_by_logical_id ("TriggerFunction" )
97- lambda_client = self .client_provider .lambda_client
98-
99- request_params = {
100- "FunctionName" : lambda_function_name ,
101- "InvocationType" : "RequestResponse" ,
102- "Payload" : "{}" ,
103- }
104- response = lambda_client .invoke (** request_params )
105- self .assertEqual (response .get ("StatusCode" ), 200 )
106- self .assertEqual (response .get ("FunctionError" ), None )
120+ self .verify_lambda_invocation (lambda_function_name )
107121
108122 @parameterized .expand (
109123 [
110124 ("combination/connector_function_to_location_place_index" ,),
111125 ]
112126 )
113- @retry_once
114127 def test_connector_by_invoking_a_function_with_parameters (self , template_file_path ):
115128 parameters = []
116129 parameters .append (self .generate_parameter ("IndexName" , f"PlaceIndex-{ generate_suffix ()} " ))
117130 self .skip_using_service_detector (template_file_path )
118131 self .create_and_verify_stack (template_file_path , parameters )
119132
120133 lambda_function_name = self .get_physical_id_by_logical_id ("TriggerFunction" )
121- lambda_client = self .client_provider .lambda_client
122-
123- request_params = {
124- "FunctionName" : lambda_function_name ,
125- "InvocationType" : "RequestResponse" ,
126- "Payload" : "{}" ,
127- }
128- response = lambda_client .invoke (** request_params )
129- self .assertEqual (response .get ("StatusCode" ), 200 )
130- self .assertEqual (response .get ("FunctionError" ), None )
134+ self .verify_lambda_invocation (lambda_function_name )
131135
132136 @parameterized .expand (
133137 [
@@ -144,17 +148,17 @@ def test_connector_by_invoking_a_function_with_parameters(self, template_file_pa
144148 ("combination/connector_sfn_to_eb_custom_write" ,),
145149 ]
146150 )
147- @retry_once
148151 def test_connector_by_sync_execute_an_state_machine (self , template_file_path ):
149152 self .skip_using_service_detector (template_file_path )
150153 self .create_and_verify_stack (template_file_path )
151154
152155 state_machine_arn = self .get_physical_id_by_logical_id ("TriggerStateMachine" )
153- sfn_client = self .client_provider . sfn_client
156+ self .verify_sync_step_function_execution ( state_machine_arn )
154157
155- response = sfn_client .start_sync_execution (
156- stateMachineArn = state_machine_arn ,
157- )
158+ @retry_with_backoff
159+ def verify_sync_step_function_execution (self , state_machine_arn ):
160+ """Verify synchronous Step Function execution with retry logic."""
161+ response = self .client_provider .sfn_client .start_sync_execution (stateMachineArn = state_machine_arn )
158162 # Without permission, it will be "FAILED"
159163 self .assertEqual (response .get ("status" ), "SUCCEEDED" )
160164
@@ -163,17 +167,18 @@ def test_connector_by_sync_execute_an_state_machine(self, template_file_path):
163167 ("combination/connector_sfn_to_sfn_sync" ,),
164168 ]
165169 )
166- @retry_once
167170 def test_connector_by_async_execute_an_state_machine (self , template_file_path ):
168171 self .skip_using_service_detector (template_file_path )
169172 self .create_and_verify_stack (template_file_path )
170173
171174 state_machine_arn = self .get_physical_id_by_logical_id ("TriggerStateMachine" )
172- sfn_client = self .client_provider . sfn_client
175+ self .verify_async_step_function_execution ( state_machine_arn )
173176
174- response = sfn_client .start_execution (
175- stateMachineArn = state_machine_arn ,
176- )
177+ @retry_with_backoff
178+ def verify_async_step_function_execution (self , state_machine_arn ):
179+ """Verify asynchronous Step Function execution with retry logic."""
180+ sfn_client = self .client_provider .sfn_client
181+ response = sfn_client .start_execution (stateMachineArn = state_machine_arn )
177182 execution_arn = response ["executionArn" ]
178183
179184 status = None
@@ -196,7 +201,6 @@ def test_connector_by_async_execute_an_state_machine(self, template_file_path):
196201 ("combination/connector_bucket_to_function_write" ,),
197202 ]
198203 )
199- @retry_once
200204 def test_connector_by_execute_a_s3_bucket (self , template_file_path ):
201205 self .skip_using_service_detector (template_file_path )
202206 bucket_name = S3_BUCKET_PREFIX + "connector" + generate_suffix ()
@@ -205,13 +209,4 @@ def test_connector_by_execute_a_s3_bucket(self, template_file_path):
205209 )
206210
207211 lambda_function_name = self .get_physical_id_by_logical_id ("TriggerFunction" )
208- lambda_client = self .client_provider .lambda_client
209-
210- request_params = {
211- "FunctionName" : lambda_function_name ,
212- "InvocationType" : "RequestResponse" ,
213- "Payload" : "{}" ,
214- }
215- response = lambda_client .invoke (** request_params )
216- self .assertEqual (response .get ("StatusCode" ), 200 )
217- self .assertEqual (response .get ("FunctionError" ), None )
212+ self .verify_lambda_invocation (lambda_function_name )
0 commit comments