Skip to content

Commit bf317ff

Browse files
Change default worker processes to 0
Closes #404
1 parent 3165f58 commit bf317ff

File tree

7 files changed

+55
-23
lines changed

7 files changed

+55
-23
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
- Make format-specific dependencies optional (#385)
66

7+
- Change default number of worker processes to zero (#404) to simplify
8+
debugging
9+
710
Breaking changes
811

912
- Remove explicit sample, contig and filter lists from the schema.

bio2zarr/cli.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import numcodecs
99
import tabulate
1010

11-
from . import plink, provenance, vcf_utils
11+
from . import core, plink, provenance, vcf_utils
1212
from . import tskit as tskit_mod
1313
from . import vcf as vcf_mod
1414

@@ -89,7 +89,12 @@ def list_commands(self, ctx):
8989
version = click.version_option(version=f"{provenance.__version__}")
9090

9191
worker_processes = click.option(
92-
"-p", "--worker-processes", type=int, default=1, help="Number of worker processes"
92+
"-p",
93+
"--worker-processes",
94+
type=int,
95+
default=core.DEFAULT_WORKER_PROCESSES,
96+
help="Number of worker processes",
97+
show_default=True,
9398
)
9499

95100
column_chunk_size = click.option(

bio2zarr/core.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,20 @@ def du(path):
130130
return total
131131

132132

133+
# We set the default number of worker processes to 0 because it avoids
134+
# complexity in the call chain and makes things easier to debug by
135+
# default. However, it does use the SynchronousExecutor here, which
136+
# is technically not recommended by the Python docs.
137+
DEFAULT_WORKER_PROCESSES = 0
138+
139+
133140
class SynchronousExecutor(cf.Executor):
134-
# Arguably we should use workers=0 as the default and use this
141+
# Since https://github.com/sgkit-dev/bio2zarr/issues/404 we
142+
# set worker_processses=0 as the default and use this
135143
# executor implementation. However, the docs are fairly explicit
136144
# about saying we shouldn't instantiate Future objects directly,
137-
# so it's best to keep this as a semi-secret debugging interface
138-
# for now.
145+
# so we may need to revisit this is obscure problems start to
146+
# arise.
139147
def submit(self, fn, /, *args, **kwargs):
140148
future = cf.Future()
141149
future.set_result(fn(*args, **kwargs))

bio2zarr/plink.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ def convert(
291291
*,
292292
variants_chunk_size=None,
293293
samples_chunk_size=None,
294-
worker_processes=1,
294+
worker_processes=core.DEFAULT_WORKER_PROCESSES,
295295
show_progress=False,
296296
):
297297
plink_format = PlinkFormat(prefix)

bio2zarr/tskit.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def __init__(
2020
):
2121
import tskit
2222

23-
self._path = None # Not sure what we're using this for?
23+
self._path = None
2424
# Future versions here will need to deal with the complexities of
2525
# having lists of tree sequences for multiple chromosomes.
2626
if isinstance(ts, tskit.TreeSequence):
@@ -256,7 +256,7 @@ def convert(
256256
isolated_as_missing=False,
257257
variants_chunk_size=None,
258258
samples_chunk_size=None,
259-
worker_processes=1,
259+
worker_processes=core.DEFAULT_WORKER_PROCESSES,
260260
show_progress=False,
261261
):
262262
"""
@@ -265,6 +265,15 @@ def convert(
265265
266266
.. todo:: Document parameters
267267
"""
268+
# FIXME there's some tricky details here in how we're handling
269+
# parallelism that we'll need to tackle properly, and maybe
270+
# review the current structures a bit. Basically, it looks like
271+
# we're pickling/unpickling the format object when we have
272+
# multiple workers, and this results in several copies of the
273+
# tree sequence object being pass around. This is fine most
274+
# of the time, but results in lots of memory being used when
275+
# we're dealing with really massive files.
276+
# See https://github.com/sgkit-dev/bio2zarr/issues/403
268277
tskit_format = TskitFormat(
269278
ts_or_path,
270279
model_mapping=model_mapping,

bio2zarr/vcf.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,12 @@ def scan_vcf(path, target_num_partitions):
285285
return metadata, vcf.raw_header
286286

287287

288-
def scan_vcfs(paths, show_progress, target_num_partitions, worker_processes=1):
288+
def scan_vcfs(
289+
paths,
290+
show_progress,
291+
target_num_partitions,
292+
worker_processes=core.DEFAULT_WORKER_PROCESSES,
293+
):
289294
logger.info(
290295
f"Scanning {len(paths)} VCFs attempting to split into {target_num_partitions}"
291296
f" partitions."
@@ -1298,7 +1303,7 @@ def init(
12981303
vcfs,
12991304
*,
13001305
column_chunk_size=16,
1301-
worker_processes=1,
1306+
worker_processes=core.DEFAULT_WORKER_PROCESSES,
13021307
target_num_partitions=None,
13031308
show_progress=False,
13041309
compressor=None,
@@ -1450,7 +1455,9 @@ def process_partition(self, partition_index):
14501455
f"{num_records} records last_pos={last_position}"
14511456
)
14521457

1453-
def explode(self, *, worker_processes=1, show_progress=False):
1458+
def explode(
1459+
self, *, worker_processes=core.DEFAULT_WORKER_PROCESSES, show_progress=False
1460+
):
14541461
self.load_metadata()
14551462
num_records = self.metadata.num_records
14561463
if np.isinf(num_records):
@@ -1518,7 +1525,7 @@ def explode(
15181525
vcfs,
15191526
*,
15201527
column_chunk_size=16,
1521-
worker_processes=1,
1528+
worker_processes=core.DEFAULT_WORKER_PROCESSES,
15221529
show_progress=False,
15231530
compressor=None,
15241531
):
@@ -1543,7 +1550,7 @@ def explode_init(
15431550
*,
15441551
column_chunk_size=16,
15451552
target_num_partitions=1,
1546-
worker_processes=1,
1553+
worker_processes=core.DEFAULT_WORKER_PROCESSES,
15471554
show_progress=False,
15481555
compressor=None,
15491556
):
@@ -1605,7 +1612,7 @@ def convert(
16051612
*,
16061613
variants_chunk_size=None,
16071614
samples_chunk_size=None,
1608-
worker_processes=1,
1615+
worker_processes=core.DEFAULT_WORKER_PROCESSES,
16091616
local_alleles=None,
16101617
show_progress=False,
16111618
icf_path=None,
@@ -1649,7 +1656,7 @@ def encode(
16491656
dimension_separator=None,
16501657
max_memory=None,
16511658
local_alleles=None,
1652-
worker_processes=1,
1659+
worker_processes=core.DEFAULT_WORKER_PROCESSES,
16531660
show_progress=False,
16541661
):
16551662
# Rough heuristic to split work up enough to keep utilisation high
@@ -1687,7 +1694,7 @@ def encode_init(
16871694
max_variant_chunks=None,
16881695
dimension_separator=None,
16891696
max_memory=None,
1690-
worker_processes=1,
1697+
worker_processes=core.DEFAULT_WORKER_PROCESSES,
16911698
show_progress=False,
16921699
):
16931700
icf_store = IntermediateColumnarFormat(icf_path)

tests/test_cli.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,19 @@
77
import pytest
88

99
from bio2zarr import __main__ as main
10-
from bio2zarr import cli, provenance
10+
from bio2zarr import cli, core, provenance
1111

1212
DEFAULT_EXPLODE_ARGS = dict(
1313
column_chunk_size=64,
1414
compressor=None,
15-
worker_processes=1,
15+
worker_processes=core.DEFAULT_WORKER_PROCESSES,
1616
show_progress=True,
1717
)
1818

1919
DEFAULT_DEXPLODE_PARTITION_ARGS = dict()
2020

2121
DEFAULT_DEXPLODE_INIT_ARGS = dict(
22-
worker_processes=1,
22+
worker_processes=core.DEFAULT_WORKER_PROCESSES,
2323
column_chunk_size=64,
2424
compressor=None,
2525
show_progress=True,
@@ -30,7 +30,7 @@
3030
variants_chunk_size=None,
3131
samples_chunk_size=None,
3232
max_variant_chunks=None,
33-
worker_processes=1,
33+
worker_processes=core.DEFAULT_WORKER_PROCESSES,
3434
max_memory=None,
3535
show_progress=True,
3636
)
@@ -57,7 +57,7 @@
5757
variants_chunk_size=None,
5858
samples_chunk_size=None,
5959
show_progress=True,
60-
worker_processes=1,
60+
worker_processes=core.DEFAULT_WORKER_PROCESSES,
6161
local_alleles=False,
6262
)
6363

@@ -67,14 +67,14 @@
6767
variants_chunk_size=None,
6868
samples_chunk_size=None,
6969
show_progress=True,
70-
worker_processes=1,
70+
worker_processes=core.DEFAULT_WORKER_PROCESSES,
7171
)
7272

7373
DEFAULT_PLINK_CONVERT_ARGS = dict(
7474
variants_chunk_size=None,
7575
samples_chunk_size=None,
7676
show_progress=True,
77-
worker_processes=1,
77+
worker_processes=core.DEFAULT_WORKER_PROCESSES,
7878
)
7979

8080

0 commit comments

Comments
 (0)