1313# limitations under the License.
1414"""Kubernetes batch client."""
1515import collections
16+ from typing import Dict
1617from typing import List
1718import uuid
1819
2122
2223from clusterfuzz ._internal .base import utils
2324from clusterfuzz ._internal .base .tasks import task_utils
24- from clusterfuzz ._internal .batch .service import _get_specs_from_config
25- from clusterfuzz ._internal .batch .service import MAX_CONCURRENT_VMS_PER_JOB
25+ from clusterfuzz ._internal .config import local_config
26+ from clusterfuzz ._internal .datastore import data_types
27+ from clusterfuzz ._internal .datastore import ndb_utils
2628from clusterfuzz ._internal .metrics import logs
2729from clusterfuzz ._internal .remote_task import RemoteTask
2830from clusterfuzz ._internal .remote_task import RemoteTaskInterface
31+ from clusterfuzz ._internal .system import environment
32+
33+ # See https://cloud.google.com/batch/quotas#job_limits
34+ MAX_CONCURRENT_VMS_PER_JOB = 1000
35+
36+ KubernetesJobConfig = collections .namedtuple ('KubernetesJobConfig' , [
37+ 'job_type' ,
38+ 'docker_image' ,
39+ 'command' ,
40+ 'disk_size_gb' ,
41+ 'service_account_email' ,
42+ 'clusterfuzz_release' ,
43+ ])
44+
45+
46+ def _get_config_names (remote_tasks : List [RemoteTask ]):
47+ """"Gets the name of the configs for each batch_task. Returns a dict
48+
49+ that is indexed by command and job_type for efficient lookup."""
50+
51+ job_names = {task .job_type for task in remote_tasks }
52+ query = data_types .Job .query (data_types .Job .name .IN (list (job_names )))
53+ jobs = ndb_utils .get_all_from_query (query )
54+ job_map = {job .name : job for job in jobs }
55+ config_map = {}
56+ for task in remote_tasks :
57+ if task .job_type not in job_map :
58+ logs .error (f"{ task .job_type } doesn't exist." )
59+ continue
60+ if task .command == 'fuzz' :
61+ suffix = '-PREEMPTIBLE-UNPRIVILEGED'
62+ else :
63+ suffix = '-NONPREEMPTIBLE-UNPRIVILEGED'
64+ job = job_map [task .job_type ]
65+ platform = job .platform if not utils .is_oss_fuzz () else 'LINUX'
66+ disk_size_gb = environment .get_value (
67+ 'DISK_SIZE_GB' , env = job .get_environment ())
68+ # Get the OS version from the job, this is the least specific version.
69+ base_os_version = job .base_os_version
70+ # If we are running in the oss-fuzz context, the project-specific config
71+ # is more specific and overrides the job-level one.
72+ if utils .is_oss_fuzz ():
73+ oss_fuzz_project = data_types .OssFuzzProject .query (
74+ data_types .OssFuzzProject .name == job .project ).get ()
75+ if oss_fuzz_project and oss_fuzz_project .base_os_version :
76+ base_os_version = oss_fuzz_project .base_os_version
77+ config_map [(task .command , task .job_type )] = (f'{ platform } { suffix } ' ,
78+ disk_size_gb , base_os_version )
79+
80+ return config_map
81+
82+
83+ def _get_k8s_job_configs (remote_tasks : List [RemoteTask ]) -> Dict :
84+ """Gets the configured specifications for a batch workload."""
85+
86+ if not remote_tasks :
87+ return {}
88+ batch_config = local_config .BatchConfig ()
89+ config_map = _get_config_names (remote_tasks )
90+ configs = {}
91+ for task in remote_tasks :
92+ if (task .command , task .job_type ) in configs :
93+ # Don't repeat work for no reason.
94+ continue
95+ config_name , disk_size_gb , base_os_version = config_map [(task .command ,
96+ task .job_type )]
97+ instance_spec = batch_config .get ('mapping' ).get (config_name )
98+ if instance_spec is None :
99+ raise ValueError (f'No mapping for { config_name } ' )
100+ # Decide which docker image to use.
101+ versioned_images_map = instance_spec .get ('versioned_docker_images' )
102+ if (base_os_version and versioned_images_map and
103+ base_os_version in versioned_images_map ):
104+ # New path: Use the versioned image if specified and available.
105+ docker_image_uri = versioned_images_map [base_os_version ]
106+ else :
107+ # Fallback/legacy path: Use the original docker_image key.
108+ docker_image_uri = instance_spec ['docker_image' ]
109+ disk_size_gb = (disk_size_gb or instance_spec ['disk_size_gb' ])
110+ clusterfuzz_release = instance_spec .get ('clusterfuzz_release' , 'prod' )
111+ config = KubernetesJobConfig (
112+ job_type = task .job_type ,
113+ docker_image = docker_image_uri ,
114+ command = task .command ,
115+ disk_size_gb = disk_size_gb ,
116+ service_account_email = instance_spec ['service_account_email' ],
117+ clusterfuzz_release = clusterfuzz_release ,
118+ )
119+ configs [(task .command , task .job_type )] = config
120+
121+ return configs
122+
123+
124+ def _create_job_body (config : KubernetesJobConfig , input_url : str ) -> dict :
125+ """Creates the body of a Kubernetes job."""
126+
127+ job_name = config .job_type + '-' + str (uuid .uuid4 ()).split ('-' , maxsplit = 1 )[0 ]
128+ # Default job spec for non-kata containers (needs to be defined).
129+ return {
130+ 'apiVersion' : 'batch/v1' ,
131+ 'kind' : 'Job' ,
132+ 'metadata' : {
133+ 'name' : job_name
134+ },
135+ 'spec' : {
136+ 'template' : {
137+ 'spec' : {
138+ 'hostNetwork' :
139+ True ,
140+ 'containers' : [{
141+ 'name' :
142+ 'clusterfuzz-worker' ,
143+ 'image' :
144+ config .docker_image ,
145+ 'imagePullPolicy' :
146+ 'IfNotPresent' ,
147+ 'env' : [
148+ {
149+ 'name' : 'HOST_UID' ,
150+ 'value' : '1337'
151+ },
152+ {
153+ 'name' : 'CLUSTERFUZZ_RELEASE' ,
154+ 'value' : config .clusterfuzz_release
155+ },
156+ {
157+ 'name' : 'UNTRUSTED_WORKER' ,
158+ 'value' : 'False'
159+ },
160+ {
161+ 'name' : 'UWORKER' ,
162+ 'value' : 'True'
163+ },
164+ {
165+ 'name' : 'USE_GCLOUD_STORAGE_RSYNC' ,
166+ 'value' : '1'
167+ },
168+ {
169+ 'name' : 'UWORKER_INPUT_DOWNLOAD_URL' ,
170+ 'value' : input_url
171+ },
172+ ],
173+ 'securityContext' : {
174+ 'privileged' : True ,
175+ 'capabilities' : {
176+ 'add' : ['ALL' ]
177+ }
178+ },
179+ 'volumeMounts' : [{
180+ 'mountPath' : '/dev/shm' ,
181+ 'name' : 'dshm'
182+ }]
183+ }],
184+ 'volumes' : [{
185+ 'name' : 'dshm' ,
186+ 'emptyDir' : {
187+ 'medium' : 'Memory' ,
188+ 'sizeLimit' : '1.9Gi'
189+ }
190+ }],
191+ 'restartPolicy' :
192+ 'Never'
193+ }
194+ },
195+ 'backoffLimit' : 0
196+ }
197+ }
29198
30199
31200class KubernetesService (RemoteTaskInterface ):
@@ -36,110 +205,73 @@ def __init__(self):
36205 self ._core_api = k8s_client .CoreV1Api ()
37206 self ._batch_api = k8s_client .BatchV1Api ()
38207
39- def _create_job_client_wrapper (self , container_image : str , job_spec : dict ,
40- input_urls : List [str ]) -> str :
41- """Creates a Kubernetes job using the internal client."""
42- job_body = job_spec
208+ def create_job (self , config : KubernetesJobConfig , input_url : str ) -> str :
209+ """Creates a Kubernetes job.
43210
44- # See https://github.com/kubernetes-client/python/blob/master/kubernetes/
45- # docs/V1Job.md
46- job_name = job_body ['metadata' ]['name' ] + '-' + str (uuid .uuid4 ()).split (
47- '-' , maxsplit = 1 )[0 ]
48- job_body ['metadata' ]['name' ] = job_name
49- container = job_body ['spec' ]['template' ]['spec' ]['containers' ][0 ]
50- if 'env' not in container :
51- container ['env' ] = []
52- container ['image' ] = container_image
53- container ['env' ].extend ([{
54- 'name' : f'UWORKER_INPUT_DOWNLOAD_URL_{ i } ' ,
55- 'value' : url
56- } for i , url in enumerate (input_urls )])
211+ Args:
57212
58- self ._batch_api .create_namespaced_job (body = job_body , namespace = 'default' )
59- return job_name
213+ config: The Kubernetes job configuration.
60214
61- def create_job ( self , remote_task : RemoteTaskInterface , input_urls : List [ str ],
62- docker_image : str ) -> str :
63- """Creates a Kubernetes job.
215+ input_url: The URL to be passed as an environment variable to the
216+
217+ job\' s container .
64218
65- Args:
66- remote_task: The remote task specification.
67- input_urls: A list of URLs to be passed as environment variables to the
68- job's container.
69- docker_image: The Docker image to use for the job.
70219 Returns:
220+
71221 The name of the created Kubernetes job.
222+
72223 """
73- # Default job spec for non-kata containers (needs to be defined).
74- job_spec = {
75- 'apiVersion' : 'batch/v1' ,
76- 'kind' : 'Job' ,
77- 'metadata' : {
78- 'name' :
79- getattr (remote_task , 'job_type' ,
80- 'clusterfuzz-job' ) # Use job_type as base name
81- },
82- 'spec' : {
83- 'template' : {
84- 'spec' : {
85- 'containers' : [{
86- 'name' : 'clusterfuzz-worker' ,
87- 'imagePullPolicy' : 'IfNotPresent' ,
88- 'command' : ['echo' , 'hello world' ] # Default command
89- }],
90- 'restartPolicy' :
91- 'Never'
92- }
93- },
94- 'backoffLimit' : 0
95- }
96- }
97- return self ._create_job_client_wrapper (docker_image , job_spec , input_urls )
224+
225+ job_body = _create_job_body (config , input_url )
226+ self ._batch_api .create_namespaced_job (body = job_body , namespace = 'default' )
227+ return job_body ['metadata' ]['name' ]
98228
99229 def create_uworker_main_batch_job (self , module : str , job_type : str ,
100230 input_download_url : str ):
101231 """Creates a single batch job for a uworker main task."""
232+
102233 command = task_utils .get_command_from_module (module )
103234 batch_tasks = [RemoteTask (command , job_type , input_download_url )]
104235 result = self .create_uworker_main_batch_jobs (batch_tasks )
236+
105237 if result is None :
106238 return result
107239 return result [0 ]
108240
109241 def create_uworker_main_batch_jobs (self , remote_tasks : List [RemoteTask ]):
110242 """Creates a batch job for a list of uworker main tasks.
111-
243+
112244 This method groups the tasks by their workload specification and creates a
113245 separate batch job for each group. This allows tasks with similar
114246 requirements to be processed together, which can improve efficiency.
115247 """
116248 job_specs = collections .defaultdict (list )
117- specs = _get_specs_from_config (remote_tasks )
249+ configs = _get_k8s_job_configs (remote_tasks )
118250 for remote_task in remote_tasks :
119251 logs .info (f'Scheduling { remote_task .command } , { remote_task .job_type } .' )
120- spec = specs [(remote_task .command , remote_task .job_type )]
121- job_specs [spec ].append (remote_task .input_download_url )
122-
252+ config = configs [(remote_task .command , remote_task .job_type )]
253+ job_specs [config ].append (remote_task .input_download_url )
123254 logs .info ('Creating batch jobs.' )
124255 jobs = []
125-
126256 logs .info ('Batching utask_mains.' )
127- for spec , input_urls in job_specs .items ():
128- for input_urls_portion in utils .batched (input_urls ,
129- MAX_CONCURRENT_VMS_PER_JOB - 1 ):
130- jobs .append (
131- self .create_job (spec , input_urls_portion , spec .docker_image ))
257+ for config , input_urls in job_specs .items ():
258+ # TODO(javanlacerda): Batch multiple tasks into a single job.
259+ for input_url in input_urls :
260+ jobs .append (self .create_job (config , input_url ))
132261
133262 return jobs
134263
135264 def create_kata_container_job (self , container_image : str ,
136265 input_urls : List [str ]) -> str :
137266 """Creates a Kubernetes job that runs in a Kata container."""
267+ job_name = 'clusterfuzz-kata-job-' + str (uuid .uuid4 ()).split (
268+ '-' , maxsplit = 1 )[0 ]
269+
138270 job_spec = {
139271 'apiVersion' : 'batch/v1' ,
140272 'kind' : 'Job' ,
141273 'metadata' : {
142- 'name' : 'clusterfuzz-kata-job'
274+ 'name' : job_name
143275 },
144276 'spec' : {
145277 'template' : {
@@ -154,8 +286,12 @@ def create_kata_container_job(self, container_image: str,
154286 'dnsPolicy' :
155287 'ClusterFirstWithHostNet' ,
156288 'containers' : [{
157- 'name' : 'clusterfuzz-worker' ,
158- 'imagePullPolicy' : 'IfNotPresent' ,
289+ 'name' :
290+ 'clusterfuzz-worker' ,
291+ 'image' :
292+ container_image ,
293+ 'imagePullPolicy' :
294+ 'IfNotPresent' ,
159295 'lifecycle' : {
160296 'postStart' : {
161297 'exec' : {
@@ -182,7 +318,11 @@ def create_kata_container_job(self, container_image: str,
182318 'cpu' : '1' ,
183319 'memory' : '3.75Gi'
184320 }
185- }
321+ },
322+ 'env' : [{
323+ 'name' : f'UWORKER_INPUT_DOWNLOAD_URL_{ i } ' ,
324+ 'value' : url
325+ } for i , url in enumerate (input_urls )]
186326 }],
187327 'restartPolicy' :
188328 'Never' ,
@@ -201,5 +341,6 @@ def create_kata_container_job(self, container_image: str,
201341 'backoffLimit' : 0
202342 }
203343 }
204- return self ._create_job_client_wrapper (container_image , job_spec ,
205- input_urls )
344+
345+ self ._batch_api .create_namespaced_job (body = job_spec , namespace = 'default' )
346+ return job_name
0 commit comments