Skip to content

Commit 07f787c

Browse files
authored
ref(multi-process): Double block count to saturate sub processes (#447)
* ref(multi-process): Double block count to saturate sub processes * test: Update tests that assert back pressure * ref: Introduce an option and add docs
1 parent fb0e12e commit 07f787c

File tree

1 file changed

+26
-3
lines changed

1 file changed

+26
-3
lines changed

arroyo/processing/strategies/run_task_with_multiprocessing.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ def build(self) -> MessageBatch[TBatchValue]:
196196

197197

198198
def parallel_worker_initializer(
199-
custom_initialize_func: Optional[Callable[[], None]] = None
199+
custom_initialize_func: Optional[Callable[[], None]] = None,
200200
) -> None:
201201
# Worker process should ignore ``SIGINT`` so that processing is not
202202
# interrupted by ``KeyboardInterrupt`` during graceful shutdown.
@@ -468,6 +468,20 @@ class RunTaskWithMultiprocessing(
468468
is applying backpressure. You can likely reduce ``num_processes`` and won't
469469
notice a performance regression.
470470
471+
Prefetching
472+
~~~~~~~~~~~
473+
474+
If you set ``prefetch_batches`` to `True`, Arroyo will allocate twice as
475+
many input blocks as processes, and will prefetch the next batch while the
476+
current batch is being processed. This can help saturate the process pool to
477+
increase throughput, but it also increases memory usage.
478+
479+
Use this option if your consumer is bottlenecked on the multiprocessing step
480+
but also runs time-consuming tasks in the other steps, like ``Produce`` or
481+
``Unfold``. By prefetching batches, the pool can immediately start working
482+
on the next batch while the current batch is being sent through the next
483+
steps.
484+
471485
How to tune your consumer
472486
~~~~~~~~~~~~~~~~~~~~~~~~~
473487
@@ -507,6 +521,7 @@ def __init__(
507521
output_block_size: Optional[int] = None,
508522
max_input_block_size: Optional[int] = None,
509523
max_output_block_size: Optional[int] = None,
524+
prefetch_batches: bool = False,
510525
) -> None:
511526
self.__transform_function = function
512527
self.__next_step = next_step
@@ -525,18 +540,26 @@ def __init__(
525540
self.__shared_memory_manager = SharedMemoryManager()
526541
self.__shared_memory_manager.start()
527542

543+
block_count = num_processes
544+
if prefetch_batches:
545+
# Allocate twice as many blocks as processes to ensure that every
546+
# process can immediately continue to handle another batch while the
547+
# main strategy is busy to submit the transformed messages to the
548+
# next step.
549+
block_count *= 2
550+
528551
self.__input_blocks = [
529552
self.__shared_memory_manager.SharedMemory(
530553
input_block_size or DEFAULT_INPUT_BLOCK_SIZE
531554
)
532-
for _ in range(num_processes)
555+
for _ in range(block_count)
533556
]
534557

535558
self.__output_blocks = [
536559
self.__shared_memory_manager.SharedMemory(
537560
output_block_size or DEFAULT_OUTPUT_BLOCK_SIZE
538561
)
539-
for _ in range(num_processes)
562+
for _ in range(block_count)
540563
]
541564

542565
self.__batch_builder: Optional[

0 commit comments

Comments
 (0)