Skip to content

Commit 9502f0f

Browse files
committed
Refactor RemoteTask and fix Kubernetes E2E test
- Update RemoteTask interface to include create_uworker_main_batch_jobs. - Refactor KubernetesService and GcpBatchService to match new interface. - Fix TypeError in k8s_service_e2e_test.py by adding @classmethod to tearDownClass. - Move and update tests for batch and k8s services.
1 parent 594713a commit 9502f0f

File tree

11 files changed

+562
-155
lines changed

11 files changed

+562
-155
lines changed

src/clusterfuzz/_internal/batch/service.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ def _get_subconfig(batch_config, instance_spec):
259259
return all_subconfigs[weighted_subconfig.name]
260260

261261

262-
def _get_specs_from_config(batch_tasks) -> Dict:
262+
def _get_specs_from_config(batch_tasks: List[RemoteTask]) -> Dict:
263263
"""Gets the configured specifications for a batch workload."""
264264
if not batch_tasks:
265265
return {}
@@ -383,19 +383,19 @@ def create_uworker_main_batch_job(self, module: str, job_type: str,
383383
return result
384384
return result[0]
385385

386-
def create_uworker_main_batch_jobs(self, batch_tasks: List[RemoteTask]):
386+
def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]):
387387
"""Creates a batch job for a list of uworker main tasks.
388388
389389
This method groups the tasks by their workload specification and creates a
390390
separate batch job for each group. This allows tasks with similar
391391
requirements to be processed together, which can improve efficiency.
392392
"""
393393
job_specs = collections.defaultdict(list)
394-
specs = _get_specs_from_config(batch_tasks)
395-
for batch_task in batch_tasks:
396-
logs.info(f'Scheduling {batch_task.command}, {batch_task.job_type}.')
397-
spec = specs[(batch_task.command, batch_task.job_type)]
398-
job_specs[spec].append(batch_task.input_download_url)
394+
specs = _get_specs_from_config(remote_tasks)
395+
for remote_task in remote_tasks:
396+
logs.info(f'Scheduling {remote_task.command}, {remote_task.job_type}.')
397+
spec = specs[(remote_task.command, remote_task.job_type)]
398+
job_specs[spec].append(remote_task.input_download_url)
399399

400400
logs.info('Creating batch jobs.')
401401
jobs = []

src/clusterfuzz/_internal/k8s/service.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,14 @@ def _create_job_client_wrapper(self, container_image: str, job_spec: dict,
5959
return job_name
6060

6161
def create_job(self, remote_task: RemoteTaskInterface,
62-
input_urls: List[str]) -> str:
62+
input_urls: List[str], docker_image: str) -> str:
6363
"""Creates a Kubernetes job.
6464
6565
Args:
6666
remote_task: The remote task specification.
6767
input_urls: A list of URLs to be passed as environment variables to the
6868
job's container.
69+
docker_image: The Docker image to use for the job.
6970
Returns:
7071
The name of the created Kubernetes job.
7172
"""
@@ -75,7 +76,8 @@ def create_job(self, remote_task: RemoteTaskInterface,
7576
'kind': 'Job',
7677
'metadata': {
7778
'name':
78-
remote_task.job_type # Use job_type as base name
79+
getattr(remote_task, 'job_type',
80+
'clusterfuzz-job') # Use job_type as base name
7981
},
8082
'spec': {
8183
'template': {
@@ -92,7 +94,7 @@ def create_job(self, remote_task: RemoteTaskInterface,
9294
'backoffLimit': 0
9395
}
9496
}
95-
return self._create_job_client_wrapper(remote_task.docker_image, job_spec,
97+
return self._create_job_client_wrapper(docker_image, job_spec,
9698
input_urls)
9799

98100
def create_uworker_main_batch_job(self, module: str, job_type: str,
@@ -105,19 +107,19 @@ def create_uworker_main_batch_job(self, module: str, job_type: str,
105107
return result
106108
return result[0]
107109

108-
def create_uworker_main_batch_jobs(self, batch_tasks: List[RemoteTask]):
110+
def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]):
109111
"""Creates a batch job for a list of uworker main tasks.
110112
111113
This method groups the tasks by their workload specification and creates a
112114
separate batch job for each group. This allows tasks with similar
113115
requirements to be processed together, which can improve efficiency.
114116
"""
115117
job_specs = collections.defaultdict(list)
116-
specs = _get_specs_from_config(batch_tasks)
117-
for batch_task in batch_tasks:
118-
logs.info(f'Scheduling {batch_task.command}, {batch_task.job_type}.')
119-
spec = specs[(batch_task.command, batch_task.job_type)]
120-
job_specs[spec].append(batch_task.input_download_url)
118+
specs = _get_specs_from_config(remote_tasks)
119+
for remote_task in remote_tasks:
120+
logs.info(f'Scheduling {remote_task.command}, {remote_task.job_type}.')
121+
spec = specs[(remote_task.command, remote_task.job_type)]
122+
job_specs[spec].append(remote_task.input_download_url)
121123

122124
logs.info('Creating batch jobs.')
123125
jobs = []
@@ -126,7 +128,7 @@ def create_uworker_main_batch_jobs(self, batch_tasks: List[RemoteTask]):
126128
for spec, input_urls in job_specs.items():
127129
for input_urls_portion in utils.batched(input_urls,
128130
MAX_CONCURRENT_VMS_PER_JOB - 1):
129-
jobs.append(self.create_job(spec, input_urls_portion))
131+
jobs.append(self.create_job(spec, input_urls_portion, spec.docker_image))
130132

131133
return jobs
132134

src/clusterfuzz/_internal/remote_task/__init__.py

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
the task creation logic to a specific implementation.
2020
"""
2121
import abc
22+
import random
2223
from typing import List
2324

25+
from clusterfuzz._internal.remote_task import job_frequency
26+
2427

2528
class RemoteTask:
2629
"""Represents a single ClusterFuzz task to be executed on a remote worker.
@@ -45,22 +48,18 @@ class RemoteTaskInterface(abc.ABC):
4548
"""
4649

4750
@abc.abstractmethod
48-
def create_job(self, remote_task: RemoteTask, input_urls: List[str]):
49-
"""Creates a remote job.
50-
51-
This method is responsible for creating a new job in the remote execution
52-
environment. It takes a workload specification and a list of input URLs,
53-
and returns a representation of the created job.
54-
"""
51+
def create_uworker_main_batch_job(self, module: str, job_type: str,
52+
input_download_url: str):
53+
"""Creates a single remote task for a uworker main task."""
5554
raise NotImplementedError
5655

57-
58-
import random
59-
60-
from clusterfuzz._internal.remote_task import job_frequency
56+
@abc.abstractmethod
57+
def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]):
58+
"""Creates a many remote tasks for uworker main tasks."""
59+
raise NotImplementedError
6160

6261

63-
class RemoteTaskGate:
62+
class RemoteTaskGate(RemoteTaskInterface):
6463
"""A gatekeeper for remote task execution.
6564
6665
This class is responsible for choosing the remote execution backend (GCP Batch
@@ -95,15 +94,15 @@ def create_uworker_main_batch_job(self, module: str, job_type: str,
9594
return self._gcp_batch_service.create_uworker_main_batch_job(
9695
module, job_type, input_download_url)
9796

98-
def create_uworker_main_batch_jobs(self, batch_tasks: List[RemoteTask]):
97+
def create_uworker_main_batch_jobs(self, remote_tasks: List[RemoteTask]):
9998
"""Creates batch jobs on either GCP Batch or Kubernetes.
10099
101100
The tasks are grouped by their target backend (GCP Batch or Kubernetes) and
102101
then created in separate batches.
103102
"""
104103
gcp_batch_tasks = []
105104
kubernetes_tasks = []
106-
for task in batch_tasks:
105+
for task in remote_tasks:
107106
if self._should_use_kubernetes(task.job_type):
108107
kubernetes_tasks.append(task)
109108
else:

src/clusterfuzz/_internal/tests/core/batch/service_test.py renamed to src/clusterfuzz/_internal/tests/core/batch/batch_service_test.py

File renamed without changes.

src/clusterfuzz/_internal/tests/core/k8s/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)