2222from clusterfuzz ._internal .base import errors
2323from clusterfuzz ._internal .base import tasks
2424from clusterfuzz ._internal .base import utils
25+ from clusterfuzz ._internal .base .tasks import task_rate_limiting
2526from clusterfuzz ._internal .bot .tasks import blame_task
2627from clusterfuzz ._internal .bot .tasks import impact_task
2728from clusterfuzz ._internal .bot .tasks import task_types
@@ -190,12 +191,8 @@ def start_web_server_if_needed():
190191 logs .error ('Failed to start web server, skipping.' )
191192
192193
193- def run_command (task_name ,
194- task_argument ,
195- job_name ,
196- uworker_env ,
197- preprocess = False ):
198- """Run the command."""
194+ def run_command (task_name , task_argument , job_name , uworker_env ):
195+ """Runs the command."""
199196 task = COMMAND_MAP .get (task_name )
200197 if not task :
201198 logs .error ("Unknown command '%s'" % task_name )
@@ -211,23 +208,32 @@ def run_command(task_name,
211208 raise AlreadyRunningError
212209
213210 result = None
211+ rate_limiter = task_rate_limiting .TaskRateLimiter (task_name , task_argument ,
212+ job_name )
213+ if rate_limiter .is_rate_limited ():
214+ logs .error (f'Rate limited task: { task_name } { task_argument } { job_name } ' )
215+ if task_name == 'fuzz' :
216+ # Wait 10 seconds. We don't want to try again immediately because if we
217+ # tried to run a fuzz task then there is no other task to run.
218+ time .sleep (environment .get_value ('FAIL_WAIT' ))
219+ return None
214220 try :
215- if not preprocess :
216- result = task .execute (task_argument , job_name , uworker_env )
217- else :
218- result = task .preprocess (task_argument , job_name , uworker_env )
221+ result = task .execute (task_argument , job_name , uworker_env )
219222 except errors .InvalidTestcaseError :
220223 # It is difficult to try to handle the case where a test case is deleted
221224 # during processing. Rather than trying to catch by checking every point
222225 # where a test case is reloaded from the datastore, just abort the task.
223226 logs .warning ('Test case %s no longer exists.' % task_argument )
227+ rate_limiter .record_task (success = False )
224228 except BaseException :
225229 # On any other exceptions, update state to reflect error and re-raise.
226230 if should_update_task_status (task_name ):
227231 data_handler .update_task_status (task_state_name ,
228232 data_types .TaskState .ERROR )
229-
233+ rate_limiter . record_task ( success = False )
230234 raise
235+ else :
236+ rate_limiter .record_task (success = True )
231237
232238 # Task completed successfully.
233239 if should_update_task_status (task_name ):
@@ -254,12 +260,8 @@ def _get_task_id(task_name, task_argument, job_name):
254260# pylint: disable=too-many-nested-blocks
255261# TODO(mbarbella): Rewrite this function to avoid nesting issues.
256262@set_task_payload
257- def process_command_impl (task_name ,
258- task_argument ,
259- job_name ,
260- high_end ,
261- is_command_override ,
262- preprocess = False ):
263+ def process_command_impl (task_name , task_argument , job_name , high_end ,
264+ is_command_override ):
263265 """Implementation of process_command."""
264266 uworker_env = None
265267 environment .set_value ('TASK_NAME' , task_name )
@@ -320,8 +322,7 @@ def process_command_impl(task_name,
320322 logs .error ('Failed to fix platform and re-add task.' )
321323
322324 # Add a wait interval to avoid overflowing task creation.
323- failure_wait_interval = environment .get_value ('FAIL_WAIT' )
324- time .sleep (failure_wait_interval )
325+ time .sleep (environment .get_value ('FAIL_WAIT' ))
325326 return None
326327
327328 if task_name != 'fuzz' :
@@ -441,8 +442,7 @@ def process_command_impl(task_name,
441442 start_web_server_if_needed ()
442443
443444 try :
444- return run_command (task_name , task_argument , job_name , uworker_env ,
445- preprocess )
445+ return run_command (task_name , task_argument , job_name , uworker_env )
446446 finally :
447447 # Final clean up.
448448 cleanup_task_state ()
0 commit comments