Skip to content

Commit 594713a

Browse files
committed
Refactor: Remove batch.py and introduce RemoteTaskGate
This refactor removes the file and introduces a that proportionally distributes tasks between and . This allows for A/B testing and performance comparisons between the two platforms.
1 parent 1065de0 commit 594713a

File tree

8 files changed

+176
-87
lines changed

8 files changed

+176
-87
lines changed

local/bin/kind

-6.16 MB
Binary file not shown.

src/clusterfuzz/_internal/batch/service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ class GcpBatchService(RemoteTaskInterface):
335335
provides a way to check if a task is configured to run remotely.
336336
"""
337337

338-
def create_job(self, spec, input_urls: List[str]):
338+
def create_job(self, spec: BatchWorkloadSpec, input_urls: List[str]):
339339
"""Creates and starts a batch job from |spec| that executes all tasks.
340340
341341
This method creates a new GCP Batch job with a single task group. The

src/clusterfuzz/_internal/google_cloud_utils/batch.py

Lines changed: 0 additions & 31 deletions
This file was deleted.

src/clusterfuzz/_internal/k8s/service.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,19 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
"""Kubernetes batch client."""
15+
import collections
1516
from typing import List
1617
import uuid
1718

1819
from kubernetes import client as k8s_client
1920
from kubernetes import config as k8s_config
2021

22+
from clusterfuzz._internal.base import utils
23+
from clusterfuzz._internal.base.tasks import task_utils
24+
from clusterfuzz._internal.batch.service import _get_specs_from_config
25+
from clusterfuzz._internal.batch.service import MAX_CONCURRENT_VMS_PER_JOB
26+
from clusterfuzz._internal.metrics import logs
27+
from clusterfuzz._internal.remote_task import RemoteTask
2128
from clusterfuzz._internal.remote_task import RemoteTaskInterface
2229

2330

@@ -88,6 +95,41 @@ def create_job(self, remote_task: RemoteTaskInterface,
8895
return self._create_job_client_wrapper(remote_task.docker_image, job_spec,
8996
input_urls)
9097

98+
def create_uworker_main_batch_job(self, module: str, job_type: str,
99+
input_download_url: str):
100+
"""Creates a single batch job for a uworker main task."""
101+
command = task_utils.get_command_from_module(module)
102+
batch_tasks = [RemoteTask(command, job_type, input_download_url)]
103+
result = self.create_uworker_main_batch_jobs(batch_tasks)
104+
if result is None:
105+
return result
106+
return result[0]
107+
108+
def create_uworker_main_batch_jobs(self, batch_tasks: List[RemoteTask]):
109+
"""Creates a batch job for a list of uworker main tasks.
110+
111+
This method groups the tasks by their workload specification and creates a
112+
separate batch job for each group. This allows tasks with similar
113+
requirements to be processed together, which can improve efficiency.
114+
"""
115+
job_specs = collections.defaultdict(list)
116+
specs = _get_specs_from_config(batch_tasks)
117+
for batch_task in batch_tasks:
118+
logs.info(f'Scheduling {batch_task.command}, {batch_task.job_type}.')
119+
spec = specs[(batch_task.command, batch_task.job_type)]
120+
job_specs[spec].append(batch_task.input_download_url)
121+
122+
logs.info('Creating batch jobs.')
123+
jobs = []
124+
125+
logs.info('Batching utask_mains.')
126+
for spec, input_urls in job_specs.items():
127+
for input_urls_portion in utils.batched(input_urls,
128+
MAX_CONCURRENT_VMS_PER_JOB - 1):
129+
jobs.append(self.create_job(spec, input_urls_portion))
130+
131+
return jobs
132+
91133
def create_kata_container_job(self, container_image: str,
92134
input_urls: List[str]) -> str:
93135
"""Creates a Kubernetes job that runs in a Kata container."""

src/clusterfuzz/_internal/remote_task/__init__.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,69 @@ def create_job(self, remote_task: RemoteTask, input_urls: List[str]):
5353
and returns a representation of the created job.
5454
"""
5555
raise NotImplementedError
56+
57+
58+
import random
59+
60+
from clusterfuzz._internal.remote_task import job_frequency
61+
62+
63+
class RemoteTaskGate:
64+
"""A gatekeeper for remote task execution.
65+
66+
This class is responsible for choosing the remote execution backend (GCP Batch
67+
or Kubernetes) for a given task, based on the configured frequencies in the
68+
`job_frequency` module.
69+
"""
70+
71+
def __init__(self):
72+
from clusterfuzz._internal.batch.service import GcpBatchService
73+
from clusterfuzz._internal.k8s.service import KubernetesService
74+
self._gcp_batch_service = GcpBatchService()
75+
self._kubernetes_service = KubernetesService()
76+
77+
def _should_use_kubernetes(self, job_type: str) -> bool:
78+
"""Determines whether to use the Kubernetes backend for a given job.
79+
80+
The decision is made based on a random roll and the configured frequency
81+
for the given job type.
82+
"""
83+
frequencies = job_frequency.get_job_frequency(job_type)
84+
return random.random() < frequencies['kubernetes']
85+
86+
def create_uworker_main_batch_job(self, module: str, job_type: str,
87+
input_download_url: str):
88+
"""Creates a batch job on either GCP Batch or Kubernetes.
89+
90+
The choice of backend is determined by the `_should_use_kubernetes` method.
91+
"""
92+
if self._should_use_kubernetes(job_type):
93+
return self._kubernetes_service.create_uworker_main_batch_job(
94+
module, job_type, input_download_url)
95+
return self._gcp_batch_service.create_uworker_main_batch_job(
96+
module, job_type, input_download_url)
97+
98+
def create_uworker_main_batch_jobs(self, batch_tasks: List[RemoteTask]):
99+
"""Creates batch jobs on either GCP Batch or Kubernetes.
100+
101+
The tasks are grouped by their target backend (GCP Batch or Kubernetes) and
102+
then created in separate batches.
103+
"""
104+
gcp_batch_tasks = []
105+
kubernetes_tasks = []
106+
for task in batch_tasks:
107+
if self._should_use_kubernetes(task.job_type):
108+
kubernetes_tasks.append(task)
109+
else:
110+
gcp_batch_tasks.append(task)
111+
112+
results = []
113+
if gcp_batch_tasks:
114+
results.extend(
115+
self._gcp_batch_service.create_uworker_main_batch_jobs(
116+
gcp_batch_tasks))
117+
if kubernetes_tasks:
118+
results.extend(
119+
self._kubernetes_service.create_uworker_main_batch_jobs(
120+
kubernetes_tasks))
121+
return results
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Configurable job frequencies for remote task execution.
15+
16+
This module provides a way to define how frequently certain jobs are executed
17+
on different remote backends, such as GCP Batch and Kubernetes. This allows for
18+
A/B testing and performance comparisons between the two platforms.
19+
"""
20+
21+
from clusterfuzz._internal.system import environment
22+
23+
# By default, all jobs are sent to the GCP Batch backend. This can be
24+
# overridden on a per-job basis by setting the `K8S_JOBS_FREQUENCY`
25+
# environment variable.
26+
DEFAULT_FREQUENCY = {'gcp_batch': 1.0, 'kubernetes': 0.0}
27+
28+
29+
def _get_job_frequencies_from_env():
30+
"""Parses the `K8S_JOBS_FREQUENCY` environment variable.
31+
32+
The variable should be a comma-separated list of key-value pairs, where the
33+
key is the job name and the value is the frequency (a float between 0 and 1).
34+
For example: `libfuzzer_asan_chrome=0.5,libfuzzer_msan_chrome=0.2`.
35+
"""
36+
job_frequencies = {}
37+
frequency_string = environment.get_value('K8S_JOBS_FREQUENCY')
38+
if not frequency_string:
39+
return {}
40+
41+
for item in frequency_string.split(','):
42+
key, value = item.split('=')
43+
job_frequencies[key] = float(value)
44+
return job_frequencies
45+
46+
47+
def get_job_frequency(job_name):
48+
"""Returns the frequency for a given job.
49+
50+
If the frequency is not explicitly defined in the `K8S_JOBS_FREQUENCY`
51+
environment variable, the default frequency is returned.
52+
"""
53+
job_frequencies = _get_job_frequencies_from_env()
54+
if job_name in job_frequencies:
55+
kubernetes_frequency = job_frequencies[job_name]
56+
return {
57+
'gcp_batch': 1.0 - kubernetes_frequency,
58+
'kubernetes': kubernetes_frequency
59+
}
60+
return DEFAULT_FREQUENCY

src/clusterfuzz/_internal/tests/core/batch/service_test.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class GcpBatchServiceTest(unittest.TestCase):
146146

147147
def setUp(self):
148148
helpers.patch(self, [
149-
'clusterfuzz._internal.batch.batch_service._batch_client',
149+
'clusterfuzz._internal.batch.service._batch_client',
150150
'clusterfuzz._internal.base.tasks.task_utils.get_command_from_module',
151151
'uuid.uuid4',
152152
])
@@ -190,9 +190,8 @@ def test_create_uworker_main_batch_jobs(self):
190190
priority=0,
191191
max_run_duration='2s',
192192
retry=True)
193-
with mock.patch(
194-
'clusterfuzz._internal.batch.batch_service._get_specs_from_config'
195-
) as mock_get_specs_from_config:
193+
with mock.patch('clusterfuzz._internal.batch.service._get_specs_from_config'
194+
) as mock_get_specs_from_config:
196195
mock_get_specs_from_config.return_value = {
197196
('command1', 'job1'): spec1,
198197
('command2', 'job2'): spec2,
@@ -235,9 +234,8 @@ def test_create_uworker_main_batch_job(self):
235234
priority=1,
236235
max_run_duration='1s',
237236
retry=False)
238-
with mock.patch(
239-
'clusterfuzz._internal.batch.batch_service._get_specs_from_config'
240-
) as mock_get_specs_from_config:
237+
with mock.patch('clusterfuzz._internal.batch.service._get_specs_from_config'
238+
) as mock_get_specs_from_config:
241239
mock_get_specs_from_config.return_value = {
242240
('fuzz', 'job1'): spec1,
243241
}
@@ -262,7 +260,7 @@ class IsRemoteTaskTest(unittest.TestCase):
262260

263261
def setUp(self):
264262
helpers.patch(self, [
265-
'clusterfuzz._internal.batch.batch_service._get_specs_from_config',
263+
'clusterfuzz._internal.batch.service._get_specs_from_config',
266264
])
267265
data_types.Job(name='job', platform='LINUX').put()
268266

@@ -408,7 +406,7 @@ def test_get_specs_from_config_with_disk_size_override(self):
408406
[batch_service.RemoteTask('fuzz', job_name, None)])
409407
self.assertEqual(spec['fuzz', job_name].disk_size_gb, overridden_size)
410408

411-
@mock.patch('clusterfuzz._internal.batch.batch_service.utils.is_oss_fuzz')
409+
@mock.patch('clusterfuzz._internal.batch.service.utils.is_oss_fuzz')
412410
@mock.patch('clusterfuzz._internal.datastore.data_types.OssFuzzProject.query')
413411
@mock.patch('clusterfuzz._internal.datastore.ndb_utils.get_all_from_query')
414412
def test_get_config_names_os_version(self, mock_get_all_from_query,

src/clusterfuzz/_internal/tests/core/google_cloud_utils/use_batch.py

Lines changed: 0 additions & 46 deletions
This file was deleted.

0 commit comments

Comments
 (0)