Skip to content

Commit adc50ff

Browse files
Dont exceed the number of pubsub messages per pub request. (#4427)
1 parent 9dfcae6 commit adc50ff

File tree

4 files changed

+28
-22
lines changed

4 files changed

+28
-22
lines changed

src/clusterfuzz/_internal/base/tasks/__init__.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
MAX_LEASED_TASKS_LIMIT = 1000
4949
MAX_TASKS_LIMIT = 100000
5050

51+
MAX_PUBSUB_MESSAGES_PER_REQ = 1000
52+
5153
# Various variables for task leasing and completion times (in seconds).
5254
TASK_COMPLETION_BUFFER = 90 * 60
5355
TASK_CREATION_WAIT_INTERVAL = 2 * 60
@@ -653,8 +655,9 @@ def bulk_add_tasks(tasks, queue=None, eta_now=False):
653655

654656
pubsub_client = pubsub.PubSubClient()
655657
pubsub_messages = [task.to_pubsub_message() for task in tasks]
656-
pubsub_client.publish(
657-
pubsub.topic_name(utils.get_application_id(), queue), pubsub_messages)
658+
topic_name = pubsub.topic_name(utils.get_application_id(), queue)
659+
for batch in utils.batched(pubsub_messages, MAX_PUBSUB_MESSAGES_PER_REQ):
660+
pubsub_client.publish(topic_name, batch)
658661

659662

660663
def add_task(command,

src/clusterfuzz/_internal/base/utils.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,3 +1020,22 @@ def cpu_count():
10201020

10211021
return environment.get_value('CPU_COUNT_OVERRIDE',
10221022
multiprocessing.cpu_count())
1023+
1024+
1025+
def batched(iterator, batch_size):
1026+
"""Implementation of itertools.py's batched that was added after
1027+
Python3.11."""
1028+
# TODO(metzman): Replace this with itertools.batched.
1029+
assert batch_size > -1
1030+
idx = 0
1031+
batch = []
1032+
for item in iterator:
1033+
idx += 1
1034+
batch.append(item)
1035+
if idx == batch_size:
1036+
idx = 0
1037+
yield batch
1038+
batch = []
1039+
1040+
if batch:
1041+
yield batch

src/clusterfuzz/_internal/fuzzing/corpus_manager.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -663,7 +663,8 @@ def get_proto_corpus(bucket_name,
663663
# again.
664664
if max_download_urls is not None:
665665
urls = itertools.islice(urls, max_download_urls)
666-
corpus_urls = dict(storage.sign_urls_for_existing_files(urls, include_delete_urls))
666+
corpus_urls = dict(
667+
storage.sign_urls_for_existing_files(urls, include_delete_urls))
667668

668669
upload_urls = storage.get_arbitrary_signed_upload_urls(
669670
gcs_url, num_uploads=max_upload_urls)

src/clusterfuzz/_internal/google_cloud_utils/batch.py

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -104,24 +104,6 @@ def create_uworker_main_batch_job(module, job_type, input_download_url):
104104
return result[0]
105105

106106

107-
def _bunched(iterator, bunch_size):
108-
"""Implementation of itertools.py's batched that was added after Python3.7."""
109-
# TODO(metzman): Replace this with itertools.batched.
110-
assert bunch_size > -1
111-
idx = 0
112-
bunch = []
113-
for item in iterator:
114-
idx += 1
115-
bunch.append(item)
116-
if idx == bunch_size:
117-
idx = 0
118-
yield bunch
119-
bunch = []
120-
121-
if bunch:
122-
yield bunch
123-
124-
125107
def create_uworker_main_batch_jobs(batch_tasks: List[BatchTask]):
126108
"""Creates batch jobs."""
127109
job_specs = collections.defaultdict(list)
@@ -135,7 +117,8 @@ def create_uworker_main_batch_jobs(batch_tasks: List[BatchTask]):
135117

136118
logs.info('Batching utask_mains.')
137119
for spec, input_urls in job_specs.items():
138-
for input_urls_portion in _bunched(input_urls, MAX_CONCURRENT_VMS_PER_JOB):
120+
for input_urls_portion in utils.batched(input_urls,
121+
MAX_CONCURRENT_VMS_PER_JOB):
139122
jobs.append(_create_job(spec, input_urls_portion))
140123

141124
return jobs

0 commit comments

Comments
 (0)