Skip to content

Commit 8d52cf7

Browse files
Batch progression task (#3634)
Batch progression task so that preprocess is run, then a group of preprocess tasks will be put on batch.
1 parent 3720158 commit 8d52cf7

File tree

4 files changed

+84
-41
lines changed

4 files changed

+84
-41
lines changed

src/clusterfuzz/_internal/bot/tasks/task_creation.py

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -299,27 +299,31 @@ def _preprocess(task: Task) -> None:
299299

300300
def start_utask_mains(tasks: List[Task]) -> None:
301301
"""Start utask_main of multiple tasks as batch tasks on batch."""
302-
batch_tasks = [
303-
batch.BatchTask(task.name, task.job, task.uworker_input) for task in tasks
304-
]
305-
batch.create_uworker_main_batch_jobs(batch_tasks)
302+
batch_tasks = (
303+
batch.BatchTask(task.name, task.job, task.uworker_input)
304+
for task in tasks)
305+
return batch.create_uworker_main_batch_jobs_bunched(batch_tasks)
306306

307307

308-
def schedule_tasks(tasks: List[Task]):
309-
"""Starts tasks as defined by task objects. If the tasks are not executed
310-
remotely, then they are put on the queue. If they are executed remotely, then
311-
the utask_mains are scheduled on batch, since preprocess has already been done
312-
in this module on this bot."""
313-
uworker_tasks = []
314-
tasks = [task for task in tasks if task is not None]
308+
def preprocess_utasks_and_queue_ttasks(tasks: List[Optional[Task]]):
309+
"""If a task is a utask, then it is preprocessed yielded, otherwise it is
310+
added to the queue (when it is a ttask)."""
315311
for task in tasks:
312+
if task is None:
313+
continue
316314
if not task_types.is_remote_utask(task.name, task.job):
317-
taskslib.add_task(task.name, task.argument, task.job,
318-
task.queue_for_platform)
315+
taskslib.add_task(
316+
task.name, task.argument, task.job, queue=task.queue_for_platform)
319317
logs.log(f'UTask {task.name} not remote.')
320318
continue
321319
_preprocess(task)
322-
uworker_tasks.append(task)
323-
logs.log(f'Starting utask_mains: {len(uworker_tasks)}.')
324-
if uworker_tasks:
325-
start_utask_mains(uworker_tasks)
320+
yield task
321+
322+
323+
def schedule_tasks(tasks: List[Task]):
324+
"""Starts tasks as defined by task objects. If the tasks are not executed
325+
remotely, then they are put on the queue. If they are executed remotely, then
326+
the utask_mains are scheduled on batch, since preprocess has already been done
327+
in this module on this bot."""
328+
uworker_tasks = preprocess_utasks_and_queue_ttasks(tasks)
329+
return start_utask_mains(uworker_tasks)

src/clusterfuzz/_internal/cron/helpers/tasks_scheduler.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,29 +13,30 @@
1313
# limitations under the License.
1414
"""Task scheduler used to recreate recurring tasks."""
1515

16-
from clusterfuzz._internal.base import tasks
16+
from clusterfuzz._internal.base import tasks as taskslib
17+
from clusterfuzz._internal.bot.tasks import task_creation
1718
from clusterfuzz._internal.datastore import data_types
1819
from clusterfuzz._internal.datastore import ndb_utils
19-
from clusterfuzz._internal.metrics import logs
2020

2121

2222
def schedule(task):
2323
"""Creates tasks for open reproducible testcases."""
2424

25+
testcases = []
2526
for status in ['Processed', 'Duplicate']:
26-
testcases = data_types.Testcase.query(
27-
ndb_utils.is_true(data_types.Testcase.open),
28-
ndb_utils.is_false(data_types.Testcase.one_time_crasher_flag),
29-
data_types.Testcase.status == status)
27+
testcases.extend(
28+
data_types.Testcase.query(
29+
ndb_utils.is_true(data_types.Testcase.open),
30+
ndb_utils.is_false(data_types.Testcase.one_time_crasher_flag),
31+
data_types.Testcase.status == status))
3032

31-
for testcase in testcases:
32-
try:
33-
tasks.add_task(
34-
task,
35-
testcase.key.id(),
36-
testcase.job_type,
37-
queue=tasks.queue_for_testcase(testcase))
38-
except Exception:
39-
logs.log_error('Failed to add task.')
40-
continue
41-
logs.log(f'Task {task} added successfully')
33+
tasks = [
34+
task_creation.Task(
35+
task,
36+
testcase.key.id(),
37+
testcase.job_type,
38+
queue_for_platform=taskslib.queue_for_testcase(testcase))
39+
for testcase in testcases
40+
]
41+
42+
task_creation.schedule_tasks(tasks)

src/clusterfuzz/_internal/google_cloud_utils/batch.py

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414
"""Cloud Batch helpers."""
1515
import collections
16+
import itertools
1617
import threading
1718
import uuid
1819

@@ -34,6 +35,8 @@
3435
MAX_DURATION = f'{int(60 * 60 * 2.5)}s'
3536
RETRY_COUNT = 0
3637

38+
TASK_BUNCH_SIZE = 20
39+
3740
# Controls how many containers (ClusterFuzz tasks) can run on a single VM.
3841
# THIS SHOULD BE 1 OR THERE WILL BE SECURITY PROBLEMS.
3942
TASK_COUNT_PER_NODE = 1
@@ -96,17 +99,52 @@ def create_uworker_main_batch_job(module, job_type, input_download_url):
9699
return result[0]
97100

98101

102+
def _bunched(iterator, bunch_size):
103+
"""Implementation of itertools.py's batched that was added after Python3.7."""
104+
# TODO(metzman): Replace this with itertools.batched.
105+
assert bunch_size > -1
106+
idx = 0
107+
bunch = []
108+
for item in iterator:
109+
idx += 1
110+
bunch.append(item)
111+
if idx == bunch_size:
112+
idx = 0
113+
yield bunch
114+
bunch = []
115+
116+
if bunch:
117+
yield bunch
118+
119+
99120
def create_uworker_main_batch_jobs(batch_tasks):
100-
# Define what will be done as part of the job.
121+
"""Creates batch jobs."""
101122
job_specs = collections.defaultdict(list)
102123
for batch_task in batch_tasks:
103124
spec = _get_spec_from_config(batch_task.command, batch_task.job_type)
104125
job_specs[spec].append(batch_task.input_download_url)
105126

106127
logs.log('Creating batch jobs.')
107-
return [
108-
_create_job(spec, input_urls) for spec, input_urls in job_specs.items()
128+
jobs = []
129+
130+
logs.log(f'Starting utask_mains: {job_specs}.')
131+
for spec, input_urls in job_specs.items():
132+
for input_urls_portion in _bunched(input_urls, MAX_CONCURRENT_VMS_PER_JOB):
133+
jobs.append(_create_job(spec, input_urls_portion))
134+
135+
return jobs
136+
137+
138+
def create_uworker_main_batch_jobs_bunched(batch_tasks):
139+
"""Creates batch jobs 20 tasks at a time, lazily. This is helpful to use when
140+
batch_tasks takes a very long time to create."""
141+
# Use term bunch instead of "batch" since "batch" has nothing to do with the
142+
# cloud service and is thus very confusing in this context.
143+
jobs = [
144+
create_uworker_main_batch_jobs(bunch)
145+
for bunch in _bunched(batch_tasks, TASK_BUNCH_SIZE)
109146
]
147+
return list(itertools.chain(jobs))
110148

111149

112150
def _get_task_spec(batch_workload_spec):

src/clusterfuzz/_internal/tests/appengine/handlers/cron/recurring_tasks_test.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ def test_execute(self):
9191
"""Tests scheduling of progression tasks."""
9292
schedule_progression_tasks.main()
9393
self.mock.add_task.assert_has_calls([
94-
mock.call('progression', 1, 'job', queue='jobs-linux'),
95-
mock.call('progression', 5, 'job_windows', queue='jobs-windows')
94+
mock.call('progression', '1', 'job', queue='jobs-linux'),
95+
mock.call('progression', '5', 'job_windows', queue='jobs-windows'),
9696
])
9797

9898

@@ -109,6 +109,6 @@ def test_execute(self):
109109
"""Tests scheduling of progression tasks."""
110110
schedule_impact_tasks.main()
111111
self.mock.add_task.assert_has_calls([
112-
mock.call('impact', 1, 'job', queue='jobs-linux'),
113-
mock.call('impact', 5, 'job_windows', queue='jobs-windows'),
112+
mock.call('impact', '1', 'job', queue='jobs-linux'),
113+
mock.call('impact', '5', 'job_windows', queue='jobs-windows'),
114114
])

0 commit comments

Comments
 (0)