Skip to content

Commit 922f21f

Browse files
authored
Refactor base_os_version Logic for Task Scheduling Performance (#5031)
## Motivation The previous implementation for determining the `base_os_version` for new tasks introduced a significant performance bottleneck. The `add_task` function (and its underlying `bulk_add_tasks` wrapper) queried the Datastore for the `Job` and `OssFuzzProject` entities *for each individual task* being created. In high-throughput scenarios, such as the `schedule_fuzz.py` cron job which can schedule upwards of 300,000 tasks at once, this behavior results in an equivalent number of Datastore queries. This "N+1" query problem leads to extreme slowness, high operational costs, and a significant risk of timeouts and failed task creation. An alternative approach using a single batch query with an `IN` clause was considered. However, this is also not scalable for a very large number of entities and could hit Datastore limits or result in an unacceptably slow query. This PR refactors the logic to be far more efficient and scalable. ## Solution The core idea of this change is to move the responsibility of determining the `base_os_version` to the point where the necessary information is already available, thus eliminating redundant Datastore lookups. 1. **Logic moved to `schedule_fuzz.py`:** The `schedule_fuzz.py` cron job already queries for all `Job` and `OssFuzzProject` entities to perform its scheduling calculations. We now leverage these in-memory entities to determine the correct `base_os_version` *before* the `Task` object is created. 2. **`base_os_version` Precedence:** The logic for selecting the OS version is now explicitly handled within the schedulers (`OssfuzzFuzzTaskScheduler` and `ChromeFuzzTaskScheduler`) with the following precedence: - Use `OssFuzzProject.base_os_version` if it exists. - Otherwise, use `Job.base_os_version` if it exists. - Otherwise, the value is `None`. 3. **Simplified Task Creation:** The determined `base_os_version` is passed directly into the `Task` constructor via the `extra_info` dictionary. This makes the `add_task` and `bulk_add_tasks` functions in `tasks/__init__.py` "dumb" in this regard; they no longer perform any Datastore queries for this purpose and simply publish the tasks they are given. 4. **Reverting to `add_task`:** The logic has been consolidated back into the `add_task` function, removing the `bulk_add_tasks` implementation to simplify the call chain. The `add_task` function now correctly handles the `base_os_version` logic and uses `job.is_external()` for dispatching to `external_tasks`. ## Benefits * **Drastic Performance Improvement:** Reduces the number of Datastore queries during the scheduling of fuzz tasks from potentially hundreds of thousands to zero. * **Enhanced Scalability:** The system can now schedule extremely large batches of tasks efficiently without overwhelming the Datastore or risking timeouts. * **Improved Code Cohesion:** The logic for determining task properties now resides within the scheduler, where the necessary context is already present. This makes the `add_task` function simpler and more focused on its core responsibility of enqueuing a task.
1 parent d9b2898 commit 922f21f

File tree

4 files changed

+242
-67
lines changed

4 files changed

+242
-67
lines changed

src/clusterfuzz/_internal/base/tasks/__init__.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -809,23 +809,6 @@ def bulk_add_tasks(tasks, queue=None, eta_now=False):
809809
for task in tasks:
810810
task.eta = now
811811

812-
for task in tasks:
813-
# Determine base_os_version.
814-
job = data_types.Job.query(data_types.Job.name == task.job).get()
815-
if not job:
816-
logs.warning(f"Job {task.job} not found for bulk task.", task=task)
817-
continue
818-
819-
task.extra_info = task.extra_info or {}
820-
if job.base_os_version:
821-
task.extra_info['base_os_version'] = job.base_os_version
822-
823-
if utils.is_oss_fuzz():
824-
oss_fuzz_project = data_types.OssFuzzProject.query(
825-
data_types.OssFuzzProject.name == job.project).get()
826-
if oss_fuzz_project and oss_fuzz_project.base_os_version:
827-
task.extra_info['base_os_version'] = oss_fuzz_project.base_os_version
828-
829812
pubsub_client = pubsub.PubSubClient()
830813
pubsub_messages = [task.to_pubsub_message() for task in tasks]
831814
topic_name = pubsub.topic_name(utils.get_application_id(), queue)
@@ -843,17 +826,32 @@ def add_task(command,
843826
if wait_time is None:
844827
wait_time = random.randint(1, TASK_CREATION_WAIT_INTERVAL)
845828

829+
base_os_version = None
846830
if job_type != 'none':
847831
job = data_types.Job.query(data_types.Job.name == job_type).get()
848832
if not job:
849833
raise Error(f'Job {job_type} not found.')
850834

835+
if utils.is_oss_fuzz():
836+
project = data_types.OssFuzzProject.query(
837+
data_types.OssFuzzProject.name == job.project).get()
838+
if project and project.base_os_version:
839+
base_os_version = project.base_os_version
840+
elif job.base_os_version:
841+
base_os_version = job.base_os_version
842+
else:
843+
if job.base_os_version:
844+
base_os_version = job.base_os_version
845+
851846
if job.is_external():
852847
external_tasks.add_external_task(command, argument, job)
853848
return
854849

855850
# Add the task.
856851
eta = utils.utcnow() + datetime.timedelta(seconds=wait_time)
852+
extra_info = extra_info or {}
853+
if base_os_version:
854+
extra_info['base_os_version'] = base_os_version
857855
task = Task(command, argument, job_type, eta=eta, extra_info=extra_info)
858856

859857
bulk_add_tasks([task], queue=queue)

src/clusterfuzz/_internal/cron/schedule_fuzz.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,18 +135,25 @@ class FuzzTaskCandidate:
135135
Something like this would probably not be needed if we were using SQL and
136136
could use joins."""
137137

138-
def __init__(self, job, project, fuzzer=None, weight=None):
138+
def __init__(self,
139+
job,
140+
project,
141+
fuzzer=None,
142+
weight=None,
143+
base_os_version=None):
139144
self.job = job
140145
self.project = project
141146
self.fuzzer = fuzzer
142147
self.weight = weight
148+
self.base_os_version = base_os_version
143149

144150
def copy(self):
145151
return FuzzTaskCandidate(
146152
job=self.job,
147153
project=self.project,
148154
fuzzer=self.fuzzer,
149-
weight=self.weight)
155+
weight=self.weight,
156+
base_os_version=self.base_os_version)
150157

151158

152159
class OssfuzzFuzzTaskScheduler(BaseFuzzTaskScheduler):
@@ -166,13 +173,22 @@ def get_fuzz_tasks(self) -> Dict[str, tasks.Task]:
166173
project_weight = project.cpu_weight / total_cpu_weight
167174
project_weights[project.name] = project_weight
168175

176+
projects_by_name = {project.name: project for project in projects}
177+
169178
# Then get FuzzTaskCandidate weights.
170179
logs.info('Getting jobs.')
171180
# TODO(metzman): Handle cases where jobs are fuzzed by multiple fuzzers.
172181
candidates_by_job = {}
173182
for job in ndb_utils.get_all_from_query(data_types.Job.query()):
183+
project = projects_by_name.get(job.project)
184+
base_os_version = None
185+
if project and project.base_os_version:
186+
base_os_version = project.base_os_version
187+
elif job.base_os_version:
188+
base_os_version = job.base_os_version
189+
174190
candidates_by_job[job.name] = FuzzTaskCandidate(
175-
job=job.name, project=job.project)
191+
job=job.name, project=job.project, base_os_version=base_os_version)
176192

177193
fuzzer_job_weight_by_project = collections.defaultdict(int)
178194
fuzz_task_candidates = []
@@ -213,7 +229,11 @@ def get_fuzz_tasks(self) -> Dict[str, tasks.Task]:
213229
choices = random.choices(
214230
fuzz_task_candidates, weights=weights, k=num_instances)
215231
fuzz_tasks = [
216-
tasks.Task('fuzz', fuzz_task_candidate.fuzzer, fuzz_task_candidate.job)
232+
tasks.Task(
233+
'fuzz',
234+
fuzz_task_candidate.fuzzer,
235+
fuzz_task_candidate.job,
236+
extra_info={'base_os_version': fuzz_task_candidate.base_os_version})
217237
for fuzz_task_candidate in choices
218238
]
219239
# TODO(metzman): Use number of targets even though weight
@@ -236,8 +256,12 @@ def get_fuzz_tasks(self) -> List[tasks.Task]:
236256
# Only consider linux jobs for chrome fuzzing.
237257
job_query = data_types.Job.query(data_types.Job.platform == 'LINUX')
238258
for job in ndb_utils.get_all_from_query(job_query):
259+
base_os_version = None
260+
if job.base_os_version:
261+
base_os_version = job.base_os_version
262+
239263
candidates_by_job[job.name] = FuzzTaskCandidate(
240-
job=job.name, project=job.project)
264+
job=job.name, project=job.project, base_os_version=base_os_version)
241265

242266
fuzz_task_candidates = []
243267
fuzzer_job_query = ndb_utils.get_all_from_query(
@@ -261,7 +285,11 @@ def get_fuzz_tasks(self) -> List[tasks.Task]:
261285
choices = random.choices(
262286
fuzz_task_candidates, weights=weights, k=num_instances)
263287
fuzz_tasks = [
264-
tasks.Task('fuzz', candidate.fuzzer, candidate.job)
288+
tasks.Task(
289+
'fuzz',
290+
candidate.fuzzer,
291+
candidate.job,
292+
extra_info={'base_os_version': candidate.base_os_version})
265293
for candidate in choices
266294
]
267295
return fuzz_tasks

src/clusterfuzz/_internal/tests/appengine/handlers/cron/schedule_fuzz_test.py

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,163 @@ def test_get_fuzz_tasks(self):
7878
expected_results = [('fuzz', 'libFuzzer', 'myjob')] * 5
7979
self.assertListEqual(comparable_results, expected_results)
8080

81+
def test_os_version_precedence_project_over_job(self):
82+
"""Tests that project version is prioritized over job version."""
83+
job_name = 'myjob'
84+
project_name = 'myproject'
85+
data_types.Job(
86+
name='dead_job',
87+
environment_string=f'PROJECT_NAME = {project_name}',
88+
platform='LINUX',
89+
).put()
90+
data_types.Job(
91+
name=job_name,
92+
environment_string=f'PROJECT_NAME = {project_name}',
93+
platform='LINUX',
94+
base_os_version='job-version',
95+
).put()
96+
data_types.Job(
97+
name='dead_project_job',
98+
environment_string='PROJECT_NAME = dead_project',
99+
platform='LINUX',
100+
).put()
101+
102+
data_types.FuzzerJob(
103+
job='dead_job', weight=0.0, platform='LINUX', fuzzer='libFuzzer').put()
104+
data_types.FuzzerJob(
105+
job=job_name, platform='LINUX', fuzzer='libFuzzer').put()
106+
data_types.FuzzerJob(
107+
job='dead_project_job', platform='LINUX', fuzzer='libFuzzer').put()
108+
109+
data_types.OssFuzzProject(
110+
name=project_name, base_os_version='project-version').put()
111+
data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put()
112+
113+
scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_cpus=2)
114+
tasks = scheduler.get_fuzz_tasks()
115+
self.assertEqual(len(tasks), 1)
116+
task = tasks[0]
117+
118+
self.assertEqual(task.job, job_name)
119+
self.assertEqual(task.extra_info.get('base_os_version'), 'project-version')
120+
121+
def test_os_version_fallback_to_job(self):
122+
"""Tests that job version is used as a fallback."""
123+
job_name = 'myjob'
124+
project_name = 'myproject'
125+
data_types.Job(
126+
name='dead_job',
127+
environment_string=f'PROJECT_NAME = {project_name}',
128+
platform='LINUX',
129+
).put()
130+
data_types.Job(
131+
name=job_name,
132+
environment_string=f'PROJECT_NAME = {project_name}',
133+
platform='LINUX',
134+
base_os_version='job-version',
135+
).put()
136+
data_types.Job(
137+
name='dead_project_job',
138+
environment_string='PROJECT_NAME = dead_project',
139+
platform='LINUX',
140+
).put()
141+
142+
data_types.FuzzerJob(
143+
job='dead_job', weight=0.0, platform='LINUX', fuzzer='libFuzzer').put()
144+
data_types.FuzzerJob(
145+
job=job_name, platform='LINUX', fuzzer='libFuzzer').put()
146+
data_types.FuzzerJob(
147+
job='dead_project_job', platform='LINUX', fuzzer='libFuzzer').put()
148+
149+
data_types.OssFuzzProject(name=project_name).put()
150+
data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put()
151+
152+
scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_cpus=2)
153+
tasks = scheduler.get_fuzz_tasks()
154+
self.assertEqual(len(tasks), 1)
155+
task = tasks[0]
156+
157+
self.assertEqual(task.job, job_name)
158+
self.assertEqual(task.extra_info.get('base_os_version'), 'job-version')
159+
160+
def test_os_version_no_version(self):
161+
"""Tests that no os version is set when neither project nor job has one."""
162+
job_name = 'myjob'
163+
project_name = 'myproject'
164+
data_types.Job(
165+
name='dead_job',
166+
environment_string=f'PROJECT_NAME = {project_name}',
167+
platform='LINUX',
168+
).put()
169+
data_types.Job(
170+
name=job_name,
171+
environment_string=f'PROJECT_NAME = {project_name}',
172+
platform='LINUX',
173+
base_os_version=None,
174+
).put()
175+
data_types.Job(
176+
name='dead_project_job',
177+
environment_string='PROJECT_NAME = dead_project',
178+
platform='LINUX',
179+
).put()
180+
181+
data_types.FuzzerJob(
182+
job='dead_job', weight=0.0, platform='LINUX', fuzzer='libFuzzer').put()
183+
data_types.FuzzerJob(
184+
job=job_name, platform='LINUX', fuzzer='libFuzzer').put()
185+
data_types.FuzzerJob(
186+
job='dead_project_job', platform='LINUX', fuzzer='libFuzzer').put()
187+
188+
data_types.OssFuzzProject(name=project_name).put()
189+
data_types.OssFuzzProject(name='dead_project', cpu_weight=0.0).put()
190+
191+
scheduler = schedule_fuzz.OssfuzzFuzzTaskScheduler(num_cpus=2)
192+
tasks = scheduler.get_fuzz_tasks()
193+
self.assertEqual(len(tasks), 1)
194+
task = tasks[0]
195+
196+
self.assertEqual(task.job, job_name)
197+
self.assertIsNone(task.extra_info.get('base_os_version'))
198+
199+
200+
@test_utils.with_cloud_emulators('datastore')
201+
class ChromeFuzzTaskSchedulerTest(unittest.TestCase):
202+
"""Tests for ChromeFuzzTaskScheduler."""
203+
204+
def setUp(self):
205+
self.maxDiff = None
206+
self.job_name = 'myjob'
207+
208+
def _setup_chrome_entities(self, job_os_version=None):
209+
"""Set up entities for Chrome tests."""
210+
data_types.Job(
211+
name=self.job_name,
212+
project='chrome',
213+
platform='LINUX',
214+
base_os_version=job_os_version).put()
215+
data_types.FuzzerJob(
216+
job=self.job_name, platform='LINUX', fuzzer='libFuzzer',
217+
weight=1.0).put()
218+
219+
def _run_and_get_task(self):
220+
"""Runs the scheduler and returns the single task created."""
221+
scheduler = schedule_fuzz.ChromeFuzzTaskScheduler(num_cpus=2)
222+
tasks = scheduler.get_fuzz_tasks()
223+
self.assertEqual(len(tasks), 1)
224+
return tasks[0]
225+
226+
def test_os_version_from_job(self):
227+
"""Tests that the os version is correctly read from the job."""
228+
self._setup_chrome_entities(job_os_version='job-version')
229+
task = self._run_and_get_task()
230+
self.assertEqual(task.extra_info.get('base_os_version'), 'job-version')
231+
232+
def test_os_version_job_without_version(self):
233+
"""Tests that no os version is set when the job has none."""
234+
self._setup_chrome_entities()
235+
task = self._run_and_get_task()
236+
self.assertIsNone(task.extra_info.get('base_os_version'))
237+
81238

82239
class TestGetCpuUsage(unittest.TestCase):
83240
"""Tests for get_cpu_limit_for_regions."""

0 commit comments

Comments
 (0)