2121from google .cloud import batch_v1 as batch
2222
2323from clusterfuzz ._internal .base import retry
24+ from clusterfuzz ._internal .base import tasks
2425from clusterfuzz ._internal .base import utils
2526from clusterfuzz ._internal .base .tasks import task_utils
2627from clusterfuzz ._internal .config import local_config
3334
3435_local = threading .local ()
3536
36- MAX_DURATION = f'{ 60 * 60 * 6 } s'
3737RETRY_COUNT = 0
3838
3939TASK_BUNCH_SIZE = 20
4646MAX_CONCURRENT_VMS_PER_JOB = 1000
4747
4848BatchWorkloadSpec = collections .namedtuple ('BatchWorkloadSpec' , [
49- 'clusterfuzz_release' , 'disk_size_gb' , 'disk_type' , 'docker_image' ,
50- 'user_data' , 'service_account_email' , 'subnetwork' , 'preemptible' ,
51- 'project' , 'gce_zone' , 'machine_type' , 'network' , 'gce_region'
49+ 'clusterfuzz_release' ,
50+ 'disk_size_gb' ,
51+ 'disk_type' ,
52+ 'docker_image' ,
53+ 'user_data' ,
54+ 'service_account_email' ,
55+ 'subnetwork' ,
56+ 'preemptible' ,
57+ 'project' ,
58+ 'gce_zone' ,
59+ 'machine_type' ,
60+ 'network' ,
61+ 'gce_region' ,
62+ 'max_run_duration' ,
5263])
5364
5465
@@ -158,7 +169,7 @@ def _get_task_spec(batch_workload_spec):
158169 task_spec = batch .TaskSpec ()
159170 task_spec .runnables = [runnable ]
160171 task_spec .max_retry_count = RETRY_COUNT
161- task_spec .max_run_duration = MAX_DURATION
172+ task_spec .max_run_duration = batch_workload_spec . max_duration
162173 return task_spec
163174
164175
@@ -219,8 +230,7 @@ def _create_job(spec, input_urls):
219230 create_request .job_id = job_name
220231 # The job's parent is the region in which the job will run
221232 project_id = spec .project
222- create_request .parent = (
223- f'projects/{ project_id } /locations/{ spec .gce_region } ' )
233+ create_request .parent = f'projects/{ project_id } /locations/{ spec .gce_region } '
224234 job_result = _send_create_job_request (create_request )
225235 logs .info (f'Created batch job id={ job_name } .' , spec = spec )
226236 return job_result
@@ -274,6 +284,11 @@ def _get_config_name(command, job_name):
274284 return config_name
275285
276286
287+ def _get_task_duration (command ):
288+ return tasks .TASK_LEASE_SECONDS_BY_COMMAND .get (command ,
289+ tasks .TASK_LEASE_SECONDS )
290+
291+
277292def _get_spec_from_config (command , job_name ):
278293 """Gets the configured specifications for a batch workload."""
279294 config_name = _get_config_name (command , job_name )
@@ -285,6 +300,7 @@ def _get_spec_from_config(command, job_name):
285300 docker_image = instance_spec ['docker_image' ]
286301 user_data = instance_spec ['user_data' ]
287302 clusterfuzz_release = instance_spec .get ('clusterfuzz_release' , 'prod' )
303+ max_run_duration = f'{ _get_task_duration (command )} s'
288304 spec = BatchWorkloadSpec (
289305 clusterfuzz_release = clusterfuzz_release ,
290306 docker_image = docker_image ,
@@ -298,5 +314,6 @@ def _get_spec_from_config(command, job_name):
298314 network = instance_spec ['network' ],
299315 subnetwork = instance_spec ['subnetwork' ],
300316 preemptible = instance_spec ['preemptible' ],
301- machine_type = instance_spec ['machine_type' ])
317+ machine_type = instance_spec ['machine_type' ],
318+ max_run_duration = max_run_duration )
302319 return spec
0 commit comments