Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ jobs:
python -m venv env-tskit
source env-tskit/bin/activate
python -m pip install .
python -m bio2zarr tskit2zarr convert tests/data/ts/example.trees ts.vcz > ts.txt 2>&1 || echo $? > ts_exit.txt
python -m bio2zarr tskit2zarr convert tests/data/tskit/example.trees ts.vcz > ts.txt 2>&1 || echo $? > ts_exit.txt
test "$(cat ts_exit.txt)" = "1"
grep -q "This process requires the optional tskit module. Install it with: pip install bio2zarr\[tskit\]" ts.txt
python -m pip install '.[tskit]'
python -m bio2zarr tskit2zarr convert tests/data/ts/example.trees ts.vcz
python -m bio2zarr tskit2zarr convert tests/data/tskit/example.trees ts.vcz
deactivate

python -m venv env-vcf
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

- Make format-specific dependencies optional (#385)

- Change default number of worker processes to zero (#404) to simplify
debugging

Breaking changes

- Remove explicit sample, contig and filter lists from the schema.
Expand Down
9 changes: 7 additions & 2 deletions bio2zarr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import numcodecs
import tabulate

from . import plink, provenance, vcf_utils
from . import core, plink, provenance, vcf_utils
from . import tskit as tskit_mod
from . import vcf as vcf_mod

Expand Down Expand Up @@ -89,7 +89,12 @@ def list_commands(self, ctx):
version = click.version_option(version=f"{provenance.__version__}")

worker_processes = click.option(
"-p", "--worker-processes", type=int, default=1, help="Number of worker processes"
"-p",
"--worker-processes",
type=int,
default=core.DEFAULT_WORKER_PROCESSES,
help="Number of worker processes",
show_default=True,
)

column_chunk_size = click.option(
Expand Down
14 changes: 11 additions & 3 deletions bio2zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,20 @@ def du(path):
return total


# We set the default number of worker processes to 0 because it avoids
# complexity in the call chain and makes things easier to debug by
# default. However, it does use the SynchronousExecutor here, which
# is technically not recommended by the Python docs.
DEFAULT_WORKER_PROCESSES = 0


class SynchronousExecutor(cf.Executor):
# Arguably we should use workers=0 as the default and use this
# Since https://github.com/sgkit-dev/bio2zarr/issues/404 we
# set worker_processses=0 as the default and use this
# executor implementation. However, the docs are fairly explicit
# about saying we shouldn't instantiate Future objects directly,
# so it's best to keep this as a semi-secret debugging interface
# for now.
# so we may need to revisit this is obscure problems start to
# arise.
def submit(self, fn, /, *args, **kwargs):
future = cf.Future()
future.set_result(fn(*args, **kwargs))
Expand Down
2 changes: 1 addition & 1 deletion bio2zarr/plink.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def convert(
*,
variants_chunk_size=None,
samples_chunk_size=None,
worker_processes=1,
worker_processes=core.DEFAULT_WORKER_PROCESSES,
show_progress=False,
):
plink_format = PlinkFormat(prefix)
Expand Down
72 changes: 51 additions & 21 deletions bio2zarr/tskit.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,46 @@ class TskitFormat(vcz.Source):
@core.requires_optional_dependency("tskit", "tskit")
def __init__(
self,
ts_path,
individuals_nodes=None,
sample_ids=None,
ts,
*,
model_mapping=None,
contig_id=None,
isolated_as_missing=False,
):
import tskit

self._path = ts_path
self.ts = tskit.load(ts_path)
self._path = None
# Future versions here will need to deal with the complexities of
# having lists of tree sequences for multiple chromosomes.
if isinstance(ts, tskit.TreeSequence):
self.ts = ts
else:
# input 'ts' is a path.
self._path = ts
logger.info(f"Loading from {ts}")
self.ts = tskit.load(ts)
logger.info(
f"Input has {self.ts.num_individuals} individuals and "
f"{self.ts.num_sites} sites"
)

self.contig_id = contig_id if contig_id is not None else "1"
self.isolated_as_missing = isolated_as_missing

self.positions = self.ts.sites_position

if individuals_nodes is None:
individuals_nodes = self.ts.individuals_nodes
if model_mapping is None:
model_mapping = self.ts.map_to_vcf_model()

individuals_nodes = model_mapping.individuals_nodes
sample_ids = model_mapping.individuals_name

self._num_samples = individuals_nodes.shape[0]
logger.info(f"Converting for {self._num_samples} samples")
if self._num_samples < 1:
raise ValueError("individuals_nodes must have at least one sample")
self.max_ploidy = individuals_nodes.shape[1]
if sample_ids is None:
sample_ids = [f"tsk_{j}" for j in range(self._num_samples)]
elif len(sample_ids) != self._num_samples:
if len(sample_ids) != self._num_samples:
raise ValueError(
f"Length of sample_ids ({len(sample_ids)}) does not match "
f"number of samples ({self._num_samples})"
Expand Down Expand Up @@ -91,6 +106,7 @@ def iter_field(self, field_name, shape, start, stop):
def iter_alleles_and_genotypes(self, start, stop, shape, num_alleles):
# All genotypes in tskit are considered phased
phased = np.ones(shape[:-1], dtype=bool)
logger.debug(f"Getting genotpes start={start} stop={stop}")

for variant in self.ts.variants(
isolated_as_missing=self.isolated_as_missing,
Expand All @@ -101,14 +117,15 @@ def iter_alleles_and_genotypes(self, start, stop, shape, num_alleles):
):
gt = np.full(shape, constants.INT_FILL, dtype=np.int8)
alleles = np.full(num_alleles, constants.STR_FILL, dtype="O")
variant_length = 0
# length is the length of the REF allele unless other fields
# are included.
variant_length = len(variant.alleles[0])
for i, allele in enumerate(variant.alleles):
# None is returned by tskit in the case of a missing allele
if allele is None:
continue
assert i < num_alleles
alleles[i] = allele
variant_length = max(variant_length, len(allele))
gt[self.sample_indices, self.ploidy_indices] = variant.genotypes[
self.genotype_indices
]
Expand Down Expand Up @@ -231,30 +248,43 @@ def generate_schema(


def convert(
ts_path,
zarr_path,
ts_or_path,
vcz_path,
*,
individuals_nodes=None,
sample_ids=None,
model_mapping=None,
contig_id=None,
isolated_as_missing=False,
variants_chunk_size=None,
samples_chunk_size=None,
worker_processes=1,
worker_processes=core.DEFAULT_WORKER_PROCESSES,
show_progress=False,
):
"""
Convert a :class:`tskit.TreeSequence` (or path to a tree sequence
file) to VCF Zarr format stored at the specified path.

.. todo:: Document parameters
"""
# FIXME there's some tricky details here in how we're handling
# parallelism that we'll need to tackle properly, and maybe
# review the current structures a bit. Basically, it looks like
# we're pickling/unpickling the format object when we have
# multiple workers, and this results in several copies of the
# tree sequence object being pass around. This is fine most
# of the time, but results in lots of memory being used when
# we're dealing with really massive files.
# See https://github.com/sgkit-dev/bio2zarr/issues/403
tskit_format = TskitFormat(
ts_path,
individuals_nodes=individuals_nodes,
sample_ids=sample_ids,
ts_or_path,
model_mapping=model_mapping,
contig_id=contig_id,
isolated_as_missing=isolated_as_missing,
)
schema_instance = tskit_format.generate_schema(
variants_chunk_size=variants_chunk_size,
samples_chunk_size=samples_chunk_size,
)
zarr_path = pathlib.Path(zarr_path)
zarr_path = pathlib.Path(vcz_path)
vzw = vcz.VcfZarrWriter(TskitFormat, zarr_path)
# Rough heuristic to split work up enough to keep utilisation high
target_num_partitions = max(1, worker_processes * 4)
Expand Down
27 changes: 19 additions & 8 deletions bio2zarr/vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,12 @@ def scan_vcf(path, target_num_partitions):
return metadata, vcf.raw_header


def scan_vcfs(paths, show_progress, target_num_partitions, worker_processes=1):
def scan_vcfs(
paths,
show_progress,
target_num_partitions,
worker_processes=core.DEFAULT_WORKER_PROCESSES,
):
logger.info(
f"Scanning {len(paths)} VCFs attempting to split into {target_num_partitions}"
f" partitions."
Expand Down Expand Up @@ -1051,6 +1056,10 @@ def iter_genotypes(self, shape, start, stop):
phased = value[:, -1] if value is not None else None
sanitised_genotypes = sanitise_value_int_2d(shape, genotypes)
sanitised_phased = sanitise_value_int_1d(shape[:-1], phased)
# Force haploids to always be phased
# https://github.com/sgkit-dev/bio2zarr/issues/399
if sanitised_genotypes.shape[1] == 1:
sanitised_phased[:] = True
yield sanitised_genotypes, sanitised_phased

def iter_alleles_and_genotypes(self, start, stop, shape, num_alleles):
Expand Down Expand Up @@ -1294,7 +1303,7 @@ def init(
vcfs,
*,
column_chunk_size=16,
worker_processes=1,
worker_processes=core.DEFAULT_WORKER_PROCESSES,
target_num_partitions=None,
show_progress=False,
compressor=None,
Expand Down Expand Up @@ -1446,7 +1455,9 @@ def process_partition(self, partition_index):
f"{num_records} records last_pos={last_position}"
)

def explode(self, *, worker_processes=1, show_progress=False):
def explode(
self, *, worker_processes=core.DEFAULT_WORKER_PROCESSES, show_progress=False
):
self.load_metadata()
num_records = self.metadata.num_records
if np.isinf(num_records):
Expand Down Expand Up @@ -1514,7 +1525,7 @@ def explode(
vcfs,
*,
column_chunk_size=16,
worker_processes=1,
worker_processes=core.DEFAULT_WORKER_PROCESSES,
show_progress=False,
compressor=None,
):
Expand All @@ -1539,7 +1550,7 @@ def explode_init(
*,
column_chunk_size=16,
target_num_partitions=1,
worker_processes=1,
worker_processes=core.DEFAULT_WORKER_PROCESSES,
show_progress=False,
compressor=None,
):
Expand Down Expand Up @@ -1601,7 +1612,7 @@ def convert(
*,
variants_chunk_size=None,
samples_chunk_size=None,
worker_processes=1,
worker_processes=core.DEFAULT_WORKER_PROCESSES,
local_alleles=None,
show_progress=False,
icf_path=None,
Expand Down Expand Up @@ -1645,7 +1656,7 @@ def encode(
dimension_separator=None,
max_memory=None,
local_alleles=None,
worker_processes=1,
worker_processes=core.DEFAULT_WORKER_PROCESSES,
show_progress=False,
):
# Rough heuristic to split work up enough to keep utilisation high
Expand Down Expand Up @@ -1683,7 +1694,7 @@ def encode_init(
max_variant_chunks=None,
dimension_separator=None,
max_memory=None,
worker_processes=1,
worker_processes=core.DEFAULT_WORKER_PROCESSES,
show_progress=False,
):
icf_store = IntermediateColumnarFormat(icf_path)
Expand Down
6 changes: 5 additions & 1 deletion docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ html:
extra_footer: |
<p>
Documentation available under the terms of the
<a href="https://creativecommons.org/publicdomain/zero/1.0/">CC0 1.0</a>
<a href="https://creativecommons.org/publicdomain/zero/1.0/">CC0 1.0</a>
license.
</p>

sphinx:
extra_extensions:
- sphinx_click.ext
- sphinx.ext.todo
- sphinx.ext.autodoc
config:
html_show_copyright: false
# This is needed to make sure that text is output in single block from
Expand All @@ -40,3 +41,6 @@ sphinx:
todo_include_todos: true
myst_enable_extensions:
- colon_fence
intersphinx_mapping:
python: ["https://docs.python.org/3/", null]
tskit: ["https://tskit.dev/tskit/docs/stable", null]
4 changes: 4 additions & 0 deletions docs/_toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ chapters:
- file: plink2zarr/overview
sections:
- file: plink2zarr/cli_ref
- file: tskit2zarr/overview
sections:
- file: tskit2zarr/python_api
- file: tskit2zarr/cli_ref
- file: vcfpartition/overview
sections:
- file: vcfpartition/cli_ref
2 changes: 1 addition & 1 deletion docs/plink2zarr/cli_ref.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
.. click:: bio2zarr.cli:convert_plink
:prog: plink2zarr convert
:nested: full

```
18 changes: 18 additions & 0 deletions docs/tskit2zarr/cli_ref.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
(sec-tskit2zarr-cli-ref)=
# CLI Reference

% A note on cross references... There's some weird long-standing problem with
% cross referencing program values in Sphinx, which means that we can't use
% the built-in labels generated by sphinx-click. We can make our own explicit
% targets, but these have to have slightly weird names to avoid conflicting
% with what sphinx-click is doing. So, hence the cmd- prefix.
% Based on: https://github.com/skypilot-org/skypilot/pull/2834

```{eval-rst}

.. _cmd-tskit2zarr-convert:
.. click:: bio2zarr.cli:convert_tskit
:prog: tskit2zarr convert
:nested: full

```
10 changes: 10 additions & 0 deletions docs/tskit2zarr/overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
(sec-tskit2zarr)=
# tskit2zarr

Convert tskit data to the
[VCF Zarr specification](https://github.com/sgkit-dev/vcf-zarr-spec/)
reliably in parallel.

See {ref}`sec-tskit2zarr-cli-ref` for detailed documentation on
command line options.

Loading