Skip to content

Commit c3fcce7

Browse files
Merge pull request #205 from jeromekelleher/fix-mac-progress
Fix mac progress
2 parents 65ad5cf + f793224 commit c3fcce7

File tree

3 files changed

+19
-23
lines changed

3 files changed

+19
-23
lines changed

bio2zarr/core.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ class ProgressConfig:
187187
# progressable thing happening per source process. This is
188188
# probably fine in practise, but there could be corner cases
189189
# where it's not. Something to watch out for.
190-
_progress_counter = multiprocessing.Value("Q", 0)
190+
_progress_counter = None
191191

192192

193193
def update_progress(inc):
@@ -201,23 +201,30 @@ def get_progress():
201201
return val
202202

203203

204-
def set_progress(value):
205-
with _progress_counter.get_lock():
206-
_progress_counter.value = value
204+
def setup_progress_counter(counter):
205+
global _progress_counter
206+
_progress_counter = counter
207207

208208

209209
class ParallelWorkManager(contextlib.AbstractContextManager):
210210
def __init__(self, worker_processes=1, progress_config=None):
211+
# Need to specify this explicitly to suppport Macs and
212+
# for future proofing.
213+
ctx = multiprocessing.get_context("spawn")
214+
global _progress_counter
215+
_progress_counter = ctx.Value("Q", 0)
211216
if worker_processes <= 0:
212217
# NOTE: this is only for testing, not for production use!
213218
self.executor = SynchronousExecutor()
214219
else:
215220
self.executor = cf.ProcessPoolExecutor(
216221
max_workers=worker_processes,
222+
mp_context=ctx,
223+
initializer=setup_progress_counter,
224+
initargs=(_progress_counter,),
217225
)
218226
self.futures = set()
219227

220-
set_progress(0)
221228
if progress_config is None:
222229
progress_config = ProgressConfig()
223230
self.progress_config = progress_config
@@ -258,16 +265,6 @@ def submit(self, *args, **kwargs):
258265
self.futures.add(future)
259266
return future
260267

261-
def wait_for_completed(self, timeout=None):
262-
done, not_done = cf.wait(self.futures, timeout, cf.FIRST_COMPLETED)
263-
for future in done:
264-
exception = future.exception()
265-
# TODO do the check for BrokenProcessPool here
266-
if exception is not None:
267-
raise exception
268-
self.futures = not_done
269-
return done
270-
271268
def results_as_completed(self):
272269
for future in cf.as_completed(self.futures):
273270
yield future.result()

tests/test_core.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ def test_bad_min_max(self, min_value, max_value):
5656
core.min_int_dtype(min_value, max_value)
5757

5858

59-
@pytest.mark.skipif(sys.platform == "darwin", reason="Issue #75")
6059
class TestParallelWorkManager:
6160
@pytest.mark.parametrize("total", [1, 10, 2**63])
6261
@pytest.mark.parametrize("workers", [0, 1])

validation.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import click
88

9-
from bio2zarr import icf, vcf, verification
9+
from bio2zarr import vcf2zarr
1010

1111
# TODO add support here for split vcfs. Perhaps simplest to take a
1212
# directory provided as input as indicating this, and then having
@@ -40,11 +40,11 @@ def cli(vcfs, worker_processes, force):
4040
else:
4141
files = [f]
4242
source_file = f
43-
exploded = tmp_path / (f.name + ".exploded")
43+
exploded = tmp_path / (f.name + ".icf")
4444
if force and exploded.exists():
4545
shutil.rmtree(exploded)
4646
if not exploded.exists():
47-
icf.explode(
47+
vcf2zarr.explode(
4848
exploded,
4949
files,
5050
worker_processes=worker_processes,
@@ -53,21 +53,21 @@ def cli(vcfs, worker_processes, force):
5353
spec = tmp_path / (f.name + ".schema")
5454
if force or not spec.exists():
5555
with open(spec, "w") as specfile:
56-
vcf.mkschema(exploded, specfile)
56+
vcf2zarr.mkschema(exploded, specfile)
5757

58-
zarr = tmp_path / (f.name + ".zarr")
58+
zarr = tmp_path / (f.name + ".vcz")
5959
if force and zarr.exists():
6060
shutil.rmtree(zarr)
6161
if not zarr.exists():
62-
vcf.encode(
62+
vcf2zarr.encode(
6363
exploded,
6464
zarr,
6565
spec,
6666
worker_processes=worker_processes,
6767
show_progress=True,
6868
)
6969

70-
verification.validate(source_file, zarr, show_progress=True)
70+
vcf2zarr.verify(source_file, zarr, show_progress=True)
7171

7272

7373
if __name__ == "__main__":

0 commit comments

Comments
 (0)