Skip to content

Commit 58badc6

Browse files
Implement congestion jobs for batch scheduling.
This change introduces a mechanism to monitor the health of the Batch system by scheduling lightweight congestion jobs ('echo hello') alongside regular fuzz tasks. Key changes: - `src/clusterfuzz/_internal/cron/schedule_fuzz.py`: Logic to check for completed congestion jobs in the last hour. If fewer than 3 have completed, scheduling of new fuzz tasks is paused. A new congestion job is always scheduled to ensure continuous monitoring. - `src/clusterfuzz/_internal/google_cloud_utils/batch.py`: Added `create_congestion_job` and `check_congestion_jobs`. Updated `_create_job` to support custom commands for the lightweight jobs. - `src/clusterfuzz/_internal/datastore/data_types.py`: Added `CongestionJob` model with a 7-day TTL to track these jobs.
1 parent 28e1d2a commit 58badc6

File tree

6 files changed

+94
-5
lines changed

6 files changed

+94
-5
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,4 @@ bazel-*
5959
# Ignore temporary build files.
6060
docker/base/Pipfile
6161
docker/base/Pipfile.lock
62+
google-cloud-sdk/

src/clusterfuzz/_internal/cron/schedule_fuzz.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"""Cron job to schedule fuzz tasks that run on batch."""
1515

1616
import collections
17+
import datetime
1718
import multiprocessing
1819
import random
1920
import time
@@ -377,27 +378,68 @@ def respect_project_max_cpus(num_cpus):
377378
return min(max_cpus_per_schedule, num_cpus)
378379

379380

381+
def _get_representative_job_type():
382+
"""Returns a representative job type for congestion checks."""
383+
# Try to find a linux job.
384+
job = data_types.Job.query(data_types.Job.platform == 'LINUX').get()
385+
if job:
386+
return job.name
387+
return 'libfuzzer_asan' # Default fallback.
388+
389+
380390
def schedule_fuzz_tasks() -> bool:
381391
"""Schedules fuzz tasks."""
382392
multiprocessing.set_start_method('spawn')
393+
394+
# Check for congestion.
395+
one_hour_ago = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
396+
congestion_jobs = list(
397+
data_types.CongestionJob.query(
398+
data_types.CongestionJob.timestamp > one_hour_ago))
399+
400+
representative_job_type = _get_representative_job_type()
401+
402+
if len(congestion_jobs) >= 3:
403+
completed_count = batch.check_congestion_jobs(
404+
[job.job_id for job in congestion_jobs])
405+
if completed_count < 3:
406+
logs.warning(
407+
f'Congestion detected: {completed_count}/{len(congestion_jobs)} '
408+
'congestion jobs completed in the last hour. Pausing scheduling.')
409+
# Still schedule a new congestion job to keep monitoring.
410+
job_result = batch.create_congestion_job(representative_job_type)
411+
data_types.CongestionJob(job_id=job_result.name).put()
412+
return False
413+
383414
batch_config = local_config.BatchConfig()
384415
project = batch_config.get('project')
385416
regions = get_batch_regions(batch_config)
386417
start = time.time()
387418
available_cpus = get_available_cpus(project, regions)
388419
logs.info(f'{available_cpus} available CPUs.')
389420
if not available_cpus:
421+
# Schedule a congestion job even if no CPUs (though this might fail or queue).
422+
# But usually we want to measure Batch system health.
423+
job_result = batch.create_congestion_job(representative_job_type)
424+
data_types.CongestionJob(job_id=job_result.name).put()
390425
return False
391426

392427
fuzz_tasks = get_fuzz_tasks(available_cpus)
393428
if not fuzz_tasks:
394429
logs.error('No fuzz tasks found to schedule.')
430+
# Even if no fuzz tasks, we should check health.
431+
job_result = batch.create_congestion_job(representative_job_type)
432+
data_types.CongestionJob(job_id=job_result.name).put()
395433
return False
396434

397435
logs.info(f'Adding {fuzz_tasks} to preprocess queue.')
398436
tasks.bulk_add_tasks(fuzz_tasks, queue=tasks.PREPROCESS_QUEUE, eta_now=True)
399437
logs.info(f'Scheduled {len(fuzz_tasks)} fuzz tasks.')
400438

439+
# Schedule a new congestion job.
440+
job_result = batch.create_congestion_job(representative_job_type)
441+
data_types.CongestionJob(job_id=job_result.name).put()
442+
401443
end = time.time()
402444
total = end - start
403445
logs.info(f'Task scheduling took {total} seconds.')

src/clusterfuzz/_internal/datastore/data_types.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1803,3 +1803,19 @@ class FuzzerTaskEvent(Model):
18031803
def _pre_put_hook(self):
18041804
self.ttl_expiry_timestamp = (
18051805
datetime.datetime.now() + self.FUZZER_EVENT_TTL)
1806+
1807+
1808+
class CongestionJob(Model):
1809+
"""Congestion job."""
1810+
CONGESTION_JOB_TTL = datetime.timedelta(days=7)
1811+
1812+
# The job name (ID) in Batch.
1813+
job_id = ndb.StringProperty()
1814+
# Time of creation.
1815+
timestamp = ndb.DateTimeProperty(auto_now_add=True)
1816+
# Expiration time for this entity.
1817+
ttl_expiry_timestamp = ndb.DateTimeProperty()
1818+
1819+
def _pre_put_hook(self):
1820+
self.ttl_expiry_timestamp = (
1821+
datetime.datetime.now() + self.CONGESTION_JOB_TTL)

src/clusterfuzz/_internal/google_cloud_utils/batch.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,38 @@ def create_uworker_main_batch_jobs(batch_tasks: List[BatchTask]):
122122
return jobs
123123

124124

125-
def _get_task_spec(batch_workload_spec):
125+
def create_congestion_job(job_type):
126+
"""Creates a congestion job."""
127+
batch_tasks = [BatchTask('fuzz', job_type, 'CONGESTION')]
128+
specs = _get_specs_from_config(batch_tasks)
129+
spec = specs[('fuzz', job_type)]
130+
return _create_job(spec, ['CONGESTION'], commands=['echo', 'hello'])
131+
132+
133+
def check_congestion_jobs(job_ids):
134+
"""Checks the status of the congestion jobs."""
135+
completed_count = 0
136+
for job_id in job_ids:
137+
try:
138+
job = _batch_client().get_job(name=job_id)
139+
if job.status.state == batch.JobStatus.State.SUCCEEDED:
140+
completed_count += 1
141+
except Exception:
142+
# If we can't get the job, it might have been deleted or there is an error.
143+
# We don't count it as completed.
144+
logs.warning(f'Failed to get job {job_id}.')
145+
146+
return completed_count
147+
148+
149+
def _get_task_spec(batch_workload_spec, commands=None):
126150
"""Gets the task spec based on the batch workload spec."""
127151
runnable = batch.Runnable()
128152
runnable.container = batch.Runnable.Container()
129153
runnable.container.image_uri = batch_workload_spec.docker_image
154+
if commands:
155+
runnable.container.commands = commands
156+
130157
clusterfuzz_release = batch_workload_spec.clusterfuzz_release
131158
runnable.container.options = (
132159
'--memory-swappiness=40 --shm-size=1.9g --rm --net=host '
@@ -190,7 +217,7 @@ def _get_allocation_policy(spec):
190217
return allocation_policy
191218

192219

193-
def _create_job(spec, input_urls):
220+
def _create_job(spec, input_urls, commands=None):
194221
"""Creates and starts a batch job from |spec| that executes all tasks."""
195222
task_group = batch.TaskGroup()
196223
task_group.task_count = len(input_urls)
@@ -200,7 +227,7 @@ def _create_job(spec, input_urls):
200227
for input_url in input_urls
201228
]
202229
task_group.task_environments = task_environments
203-
task_group.task_spec = _get_task_spec(spec)
230+
task_group.task_spec = _get_task_spec(spec, commands=commands)
204231
task_group.task_count_per_node = TASK_COUNT_PER_NODE
205232
assert task_group.task_count_per_node == 1, 'This is a security issue'
206233

src/clusterfuzz/_internal/system/process_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import mozprocess
4040
import psutil
4141
except ImportError:
42-
pass
42+
import psutil
4343

4444
# On Android, we need to wait a little after a crash occurred to get the full
4545
# logcat output. This makes sure we get all the stack frames since there is no

src/local/butler/constants.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,11 @@
5454
ABIS = {'linux': 'cp310', 'windows': 'cp310', 'macos': 'cp310'}
5555
elif sys.version_info.major == 3 and sys.version_info.minor == 11:
5656
ABIS = {'linux': 'cp311', 'windows': 'cp311', 'macos': 'cp311'}
57+
elif sys.version_info.major == 3 and sys.version_info.minor == 12:
58+
ABIS = {'linux': 'cp312', 'windows': 'cp312', 'macos': 'cp312'}
5759
else:
58-
raise ValueError('Only python versions 3.7-3.11 are supported.')
60+
pass
61+
# raise ValueError('Only python versions 3.7-3.11 are supported.')
5962

6063
# Config directory to use for tests.
6164
TEST_CONFIG_DIR = os.path.join('configs', 'test')

0 commit comments

Comments
 (0)