1414"""Cloud Batch helpers."""
1515import collections
1616import threading
17+ from typing import Dict
1718from typing import List
19+ from typing import Tuple
1820import uuid
1921
2022from google .cloud import batch_v1 as batch
2527from clusterfuzz ._internal .base .tasks import task_utils
2628from clusterfuzz ._internal .config import local_config
2729from clusterfuzz ._internal .datastore import data_types
30+ from clusterfuzz ._internal .datastore import ndb_utils
2831from clusterfuzz ._internal .metrics import logs
32+ from clusterfuzz ._internal .system import environment
2933
30- # TODO(metzman): Change to from . import credentials when we are done
31- # developing.
3234from . import credentials
3335
3436_local = threading .local ()
3537
3638DEFAULT_RETRY_COUNT = 0
3739
38- TASK_BUNCH_SIZE = 20
39-
4040# Controls how many containers (ClusterFuzz tasks) can run on a single VM.
4141# THIS SHOULD BE 1 OR THERE WILL BE SECURITY PROBLEMS.
4242TASK_COUNT_PER_NODE = 1
5454 'subnetwork' ,
5555 'preemptible' ,
5656 'project' ,
57- 'gce_zone' ,
5857 'machine_type' ,
5958 'network' ,
6059 'gce_region' ,
6665
6766def _create_batch_client_new ():
6867 """Creates a batch client."""
69- creds , project = credentials .get_default ()
70- if not project :
71- project = utils .get_application_id ()
72-
68+ creds , _ = credentials .get_default ()
7369 return batch .BatchServiceClient (credentials = creds )
7470
7571
@@ -108,9 +104,10 @@ def create_uworker_main_batch_job(module, job_type, input_download_url):
108104def create_uworker_main_batch_jobs (batch_tasks : List [BatchTask ]):
109105 """Creates batch jobs."""
110106 job_specs = collections .defaultdict (list )
107+ specs = _get_specs_from_config (batch_tasks )
111108 for batch_task in batch_tasks :
112109 logs .info (f'Scheduling { batch_task .command } , { batch_task .job_type } .' )
113- spec = _get_spec_from_config (batch_task .command , batch_task .job_type )
110+ spec = specs [ (batch_task .command , batch_task .job_type )]
114111 job_specs [spec ].append (batch_task .input_download_url )
115112
116113 logs .info ('Creating batch jobs.' )
@@ -119,7 +116,7 @@ def create_uworker_main_batch_jobs(batch_tasks: List[BatchTask]):
119116 logs .info ('Batching utask_mains.' )
120117 for spec , input_urls in job_specs .items ():
121118 for input_urls_portion in utils .batched (input_urls ,
122- MAX_CONCURRENT_VMS_PER_JOB ):
119+ MAX_CONCURRENT_VMS_PER_JOB - 1 ):
123120 jobs .append (_create_job (spec , input_urls_portion ))
124121
125122 return jobs
@@ -209,7 +206,6 @@ def _create_job(spec, input_urls):
209206 job = batch .Job ()
210207 job .task_groups = [task_group ]
211208 job .allocation_policy = _get_allocation_policy (spec )
212- job .labels = {'env' : 'testing' , 'type' : 'container' }
213209 job .logs_policy = batch .LogsPolicy ()
214210 job .logs_policy .destination = batch .LogsPolicy .Destination .CLOUD_LOGGING
215211 job .priority = spec .priority
@@ -239,86 +235,134 @@ def _get_batch_config():
239235 return local_config .BatchConfig ()
240236
241237
242- def _get_job (job_name ):
243- """Returns the Job entity named by |job_name|. This function was made to make
244- mocking easier."""
245- return data_types .Job .query (data_types .Job .name == job_name ).get ()
246-
247-
248238def is_no_privilege_workload (command , job_name ):
249239 return is_remote_task (command , job_name )
250240
251241
252242def is_remote_task (command , job_name ):
253243 try :
254- _get_spec_from_config ( command , job_name )
244+ _get_specs_from_config ([ BatchTask ( command , job_name , None )] )
255245 return True
256246 except ValueError :
257247 return False
258248
259249
260- def _get_config_name (command , job_name ):
261- """Returns the name of the config for |command| and |job_name|."""
262- job = _get_job (job_name )
263- # As of this writing, batch only supports LINUX.
264- if utils .is_oss_fuzz ():
265- # TODO(b/377885331): In OSS-Fuzz, the platform can't be used because, as of
266- # it includes the project name.
267- config_name = 'LINUX'
268- else :
269- config_name = job .platform
270- if command == 'fuzz' :
271- config_name += '-PREEMPTIBLE-UNPRIVILEGED'
272- else :
273- config_name += '-NONPREEMPTIBLE-UNPRIVILEGED'
274- return config_name
250+ def _get_config_names (
251+ batch_tasks : List [BatchTask ]) -> Dict [Tuple [str , str ], str ]:
252+ """"Gets the name of the configs for each batch_task. Returns a dict
253+ that is indexed by command and job_type for efficient lookup."""
254+ job_names = {task .job_type for task in batch_tasks }
255+ query = data_types .Job .query (data_types .Job .name .IN (list (job_names )))
256+ jobs = ndb_utils .get_all_from_query (query )
257+ job_map = {job .name : job for job in jobs }
258+ config_map = {}
259+ for task in batch_tasks :
260+ if task .job_type not in job_map :
261+ logs .error (f'{ task .job_type } doesn\' t exist.' )
262+ continue
263+ if task .command == 'fuzz' :
264+ suffix = '-PREEMPTIBLE-UNPRIVILEGED'
265+ else :
266+ suffix = '-NONPREEMPTIBLE-UNPRIVILEGED'
267+ job = job_map [task .job_type ]
268+ platform = job .platform if not utils .is_oss_fuzz () else 'LINUX'
269+ disk_size_gb = environment .get_value (
270+ 'DISK_SIZE_GB' , env = job .get_environment ())
271+ config_map [(task .command , task .job_type )] = (f'{ platform } { suffix } ' ,
272+ disk_size_gb )
273+ # TODO(metzman): Come up with a more systematic way for configs to
274+ # be overridden by jobs.
275+ return config_map
275276
276277
277278def _get_task_duration (command ):
278279 return tasks .TASK_LEASE_SECONDS_BY_COMMAND .get (command ,
279280 tasks .TASK_LEASE_SECONDS )
280281
281282
282- def _get_spec_from_config (command , job_name ):
283+ WeightedSubconfig = collections .namedtuple ('WeightedSubconfig' ,
284+ ['name' , 'weight' ])
285+
286+
287+ def _get_subconfig (batch_config , instance_spec ):
288+ # TODO(metzman): Make this pick one at random or based on conditions.
289+ all_subconfigs = batch_config .get ('subconfigs' , {})
290+ instance_subconfigs = instance_spec ['subconfigs' ]
291+ weighted_subconfigs = [
292+ WeightedSubconfig (subconfig ['name' ], subconfig ['weight' ])
293+ for subconfig in instance_subconfigs
294+ ]
295+ weighted_subconfig = utils .random_weighted_choice (weighted_subconfigs )
296+ return all_subconfigs [weighted_subconfig .name ]
297+
298+
299+ def _get_specs_from_config (batch_tasks ) -> Dict :
283300 """Gets the configured specifications for a batch workload."""
284- config_name = _get_config_name (command , job_name )
301+ if not batch_tasks :
302+ return {}
285303 batch_config = _get_batch_config ()
286- instance_spec = batch_config .get ('mapping' ).get (config_name , None )
287- if instance_spec is None :
288- raise ValueError (f'No mapping for { config_name } ' )
289- project_name = batch_config .get ('project' )
290- docker_image = instance_spec ['docker_image' ]
291- user_data = instance_spec ['user_data' ]
292- should_retry = instance_spec .get ('retry' , False )
293- clusterfuzz_release = instance_spec .get ('clusterfuzz_release' , 'prod' )
294-
295- # Lower numbers are lower priority. From:
296- # https://cloud.google.com/batch/docs/reference/rest/v1/projects.locations.jobs
297- low_priority = command == 'fuzz'
298- priority = 0 if low_priority else 1
299-
300- max_run_duration = f'{ _get_task_duration (command )} s'
301- if command == 'corpus_pruning' :
302- should_retry = False # It is naturally retried the next day.
303-
304- spec = BatchWorkloadSpec (
305- clusterfuzz_release = clusterfuzz_release ,
306- docker_image = docker_image ,
307- user_data = user_data ,
308- disk_size_gb = instance_spec ['disk_size_gb' ],
309- disk_type = instance_spec ['disk_type' ],
310- service_account_email = instance_spec ['service_account_email' ],
311- # TODO(metzman): Get rid of zone so that we can more easily run in
312- # multiple regions.
313- gce_zone = instance_spec ['gce_zone' ],
314- gce_region = instance_spec ['gce_region' ],
315- project = project_name ,
316- network = instance_spec ['network' ],
317- subnetwork = instance_spec ['subnetwork' ],
318- preemptible = instance_spec ['preemptible' ],
319- machine_type = instance_spec ['machine_type' ],
320- priority = priority ,
321- max_run_duration = max_run_duration ,
322- retry = should_retry ,
323- )
324- return spec
304+ config_map = _get_config_names (batch_tasks )
305+ specs = {}
306+ subconfig_map = {}
307+ for task in batch_tasks :
308+ if (task .command , task .job_type ) in specs :
309+ # Don't repeat work for no reason.
310+ continue
311+ config_name , disk_size_gb = config_map [(task .command , task .job_type )]
312+
313+ instance_spec = batch_config .get ('mapping' ).get (config_name )
314+ if instance_spec is None :
315+ raise ValueError (f'No mapping for { config_name } ' )
316+ project_name = batch_config .get ('project' )
317+ clusterfuzz_release = instance_spec .get ('clusterfuzz_release' , 'prod' )
318+ # Lower numbers are a lower priority, meaning less likely to run From:
319+ # https://cloud.google.com/batch/docs/reference/rest/v1/projects.locations.jobs
320+ priority = 0 if task .command == 'fuzz' else 1
321+ max_run_duration = f'{ _get_task_duration (task .command )} s'
322+ # This saves us time and reduces fragementation, e.g. every linux fuzz task
323+ # run in this call will run in the same zone.
324+ if config_name not in subconfig_map :
325+ subconfig = _get_subconfig (batch_config , instance_spec )
326+ subconfig_map [config_name ] = subconfig
327+
328+ should_retry = instance_spec .get ('retry' , False )
329+ if should_retry and task .command == 'corpus_pruning' :
330+ should_retry = False # It is naturally retried the next day.
331+
332+ disk_size_gb = (disk_size_gb or instance_spec ['disk_size_gb' ])
333+ subconfig = subconfig_map [config_name ]
334+ spec = BatchWorkloadSpec (
335+ docker_image = instance_spec ['docker_image' ],
336+ disk_size_gb = disk_size_gb ,
337+ disk_type = instance_spec ['disk_type' ],
338+ user_data = instance_spec ['user_data' ],
339+ service_account_email = instance_spec ['service_account_email' ],
340+ preemptible = instance_spec ['preemptible' ],
341+ machine_type = instance_spec ['machine_type' ],
342+ gce_region = subconfig ['region' ],
343+ network = subconfig ['network' ],
344+ subnetwork = subconfig ['subnetwork' ],
345+ project = project_name ,
346+ clusterfuzz_release = clusterfuzz_release ,
347+ priority = priority ,
348+ max_run_duration = max_run_duration ,
349+ retry = should_retry ,
350+ )
351+ specs [(task .command , task .job_type )] = spec
352+ return specs
353+
354+
355+ def count_queued_or_scheduled_tasks (project : str ,
356+ region : str ) -> Tuple [int , int ]:
357+ """Counts the number of queued and scheduled tasks."""
358+ region = f'projects/{ project } /locations/{ region } '
359+ jobs_filter = 'Status.State="SCHEDULED" OR Status.State="QUEUED"'
360+ req = batch .types .ListJobsRequest (parent = region , filter = jobs_filter )
361+ queued = 0
362+ scheduled = 0
363+ for job in _batch_client ().list_jobs (request = req ):
364+ if job .status .state == batch .JobStatus .State .SCHEDULED :
365+ scheduled += job .task_groups [0 ].task_count
366+ elif job .status .state == batch .JobStatus .State .QUEUED :
367+ queued += job .task_groups [0 ].task_count
368+ return (queued , scheduled )
0 commit comments