Skip to content

Commit 4108718

Browse files
Copilotforsyth2
andcommitted
Restore heap-based worker allocation in multiprocess_extract
Co-authored-by: forsyth2 <30700190+forsyth2@users.noreply.github.com>
1 parent 0ebd7a5 commit 4108718

File tree

1 file changed

+18
-3
lines changed

1 file changed

+18
-3
lines changed

zstash/extract.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import argparse
44
import collections
55
import hashlib
6+
import heapq
67
import logging
78
import multiprocessing
89
import os.path
@@ -327,11 +328,25 @@ def multiprocess_extract(
327328

328329
# For worker i, workers_to_tars[i] is a set of tars
329330
# that worker i will work on.
330-
# Round-robin assignment for predictable ordering
331331
workers_to_tars: List[set] = [set() for _ in range(num_workers)]
332+
# A min heap, of (work, worker_idx) tuples, work is the size of data
333+
# that worker_idx needs to work on.
334+
# We can efficiently get the worker with the least amount of work.
335+
work_to_workers: List[Tuple[float, int]] = [(0.0, i) for i in range(num_workers)]
336+
heapq.heapify(work_to_workers)
337+
338+
# Using a greedy approach, populate workers_to_tars.
332339
tar: str
333-
for idx, tar in enumerate(sorted(tar_to_size.keys())):
334-
workers_to_tars[idx % num_workers].add(tar)
340+
for tar in tar_to_size:
341+
# The worker with the least work should get the current largest amount of work.
342+
workers_work: float
343+
worker_idx: int
344+
workers_work, worker_idx = heapq.heappop(work_to_workers)
345+
workers_to_tars[worker_idx].add(tar)
346+
# Add this worker back to the heap, with the new amount of work.
347+
worker_tuple: Tuple[float, int] = (workers_work + tar_to_size[tar], worker_idx)
348+
# FIXME: error: Cannot infer type argument 1 of "heappush"
349+
heapq.heappush(work_to_workers, worker_tuple) # type: ignore
335350

336351
workers_to_matches: List[List[FilesRow]] = [[] for _ in range(num_workers)]
337352
workers_idx: int

0 commit comments

Comments
 (0)