|
8 | 8 | import pickle
|
9 | 9 | import shutil
|
10 | 10 | import sys
|
| 11 | +import tempfile |
11 | 12 | from functools import partial
|
12 | 13 | from typing import Any
|
13 | 14 |
|
|
16 | 17 |
|
17 | 18 | from bio2zarr import schema
|
18 | 19 |
|
19 |
| -from .. import constants, core, provenance, vcf_utils |
| 20 | +from .. import constants, core, provenance, vcf_utils, writer |
20 | 21 |
|
21 | 22 | logger = logging.getLogger(__name__)
|
22 | 23 |
|
@@ -1517,3 +1518,161 @@ def explode_partition(icf_path, partition):
|
1517 | 1518 | def explode_finalise(icf_path):
|
1518 | 1519 | writer = IntermediateColumnarFormatWriter(icf_path)
|
1519 | 1520 | writer.finalise()
|
| 1521 | + |
| 1522 | + |
| 1523 | +def inspect(path): |
| 1524 | + path = pathlib.Path(path) |
| 1525 | + if not path.exists(): |
| 1526 | + raise ValueError(f"Path not found: {path}") |
| 1527 | + if (path / "metadata.json").exists(): |
| 1528 | + obj = IntermediateColumnarFormat(path) |
| 1529 | + # NOTE: this is too strict, we should support more general Zarrs, see #276 |
| 1530 | + elif (path / ".zmetadata").exists(): |
| 1531 | + obj = writer.VcfZarr(path) |
| 1532 | + else: |
| 1533 | + raise ValueError(f"{path} not in ICF or VCF Zarr format") |
| 1534 | + return obj.summary_table() |
| 1535 | + |
| 1536 | + |
| 1537 | +def mkschema( |
| 1538 | + if_path, |
| 1539 | + out, |
| 1540 | + *, |
| 1541 | + variants_chunk_size=None, |
| 1542 | + samples_chunk_size=None, |
| 1543 | + local_alleles=None, |
| 1544 | +): |
| 1545 | + store = IntermediateColumnarFormat(if_path) |
| 1546 | + spec = store.generate_schema( |
| 1547 | + variants_chunk_size=variants_chunk_size, |
| 1548 | + samples_chunk_size=samples_chunk_size, |
| 1549 | + local_alleles=local_alleles, |
| 1550 | + ) |
| 1551 | + out.write(spec.asjson()) |
| 1552 | + |
| 1553 | + |
| 1554 | +def convert( |
| 1555 | + vcfs, |
| 1556 | + out_path, |
| 1557 | + *, |
| 1558 | + variants_chunk_size=None, |
| 1559 | + samples_chunk_size=None, |
| 1560 | + worker_processes=1, |
| 1561 | + local_alleles=None, |
| 1562 | + show_progress=False, |
| 1563 | + icf_path=None, |
| 1564 | +): |
| 1565 | + if icf_path is None: |
| 1566 | + cm = temp_icf_path(prefix="vcf2zarr") |
| 1567 | + else: |
| 1568 | + cm = contextlib.nullcontext(icf_path) |
| 1569 | + |
| 1570 | + with cm as icf_path: |
| 1571 | + explode( |
| 1572 | + icf_path, |
| 1573 | + vcfs, |
| 1574 | + worker_processes=worker_processes, |
| 1575 | + show_progress=show_progress, |
| 1576 | + ) |
| 1577 | + encode( |
| 1578 | + icf_path, |
| 1579 | + out_path, |
| 1580 | + variants_chunk_size=variants_chunk_size, |
| 1581 | + samples_chunk_size=samples_chunk_size, |
| 1582 | + worker_processes=worker_processes, |
| 1583 | + show_progress=show_progress, |
| 1584 | + local_alleles=local_alleles, |
| 1585 | + ) |
| 1586 | + |
| 1587 | + |
| 1588 | +@contextlib.contextmanager |
| 1589 | +def temp_icf_path(prefix=None): |
| 1590 | + with tempfile.TemporaryDirectory(prefix=prefix) as tmp: |
| 1591 | + yield pathlib.Path(tmp) / "icf" |
| 1592 | + |
| 1593 | + |
| 1594 | +def encode( |
| 1595 | + icf_path, |
| 1596 | + zarr_path, |
| 1597 | + schema_path=None, |
| 1598 | + variants_chunk_size=None, |
| 1599 | + samples_chunk_size=None, |
| 1600 | + max_variant_chunks=None, |
| 1601 | + dimension_separator=None, |
| 1602 | + max_memory=None, |
| 1603 | + local_alleles=None, |
| 1604 | + worker_processes=1, |
| 1605 | + show_progress=False, |
| 1606 | +): |
| 1607 | + # Rough heuristic to split work up enough to keep utilisation high |
| 1608 | + target_num_partitions = max(1, worker_processes * 4) |
| 1609 | + encode_init( |
| 1610 | + icf_path, |
| 1611 | + zarr_path, |
| 1612 | + target_num_partitions, |
| 1613 | + schema_path=schema_path, |
| 1614 | + variants_chunk_size=variants_chunk_size, |
| 1615 | + samples_chunk_size=samples_chunk_size, |
| 1616 | + local_alleles=local_alleles, |
| 1617 | + max_variant_chunks=max_variant_chunks, |
| 1618 | + dimension_separator=dimension_separator, |
| 1619 | + ) |
| 1620 | + vzw = writer.VcfZarrWriter(IntermediateColumnarFormat, zarr_path) |
| 1621 | + vzw.encode_all_partitions( |
| 1622 | + worker_processes=worker_processes, |
| 1623 | + show_progress=show_progress, |
| 1624 | + max_memory=max_memory, |
| 1625 | + ) |
| 1626 | + vzw.finalise(show_progress) |
| 1627 | + vzw.create_index() |
| 1628 | + |
| 1629 | + |
| 1630 | +def encode_init( |
| 1631 | + icf_path, |
| 1632 | + zarr_path, |
| 1633 | + target_num_partitions, |
| 1634 | + *, |
| 1635 | + schema_path=None, |
| 1636 | + variants_chunk_size=None, |
| 1637 | + samples_chunk_size=None, |
| 1638 | + local_alleles=None, |
| 1639 | + max_variant_chunks=None, |
| 1640 | + dimension_separator=None, |
| 1641 | + max_memory=None, |
| 1642 | + worker_processes=1, |
| 1643 | + show_progress=False, |
| 1644 | +): |
| 1645 | + icf_store = IntermediateColumnarFormat(icf_path) |
| 1646 | + if schema_path is None: |
| 1647 | + schema_instance = icf_store.generate_schema( |
| 1648 | + variants_chunk_size=variants_chunk_size, |
| 1649 | + samples_chunk_size=samples_chunk_size, |
| 1650 | + local_alleles=local_alleles, |
| 1651 | + ) |
| 1652 | + else: |
| 1653 | + logger.info(f"Reading schema from {schema_path}") |
| 1654 | + if variants_chunk_size is not None or samples_chunk_size is not None: |
| 1655 | + raise ValueError( |
| 1656 | + "Cannot specify schema along with chunk sizes" |
| 1657 | + ) # NEEDS TEST |
| 1658 | + with open(schema_path) as f: |
| 1659 | + schema_instance = schema.VcfZarrSchema.fromjson(f.read()) |
| 1660 | + zarr_path = pathlib.Path(zarr_path) |
| 1661 | + vzw = writer.VcfZarrWriter("icf", zarr_path) |
| 1662 | + return vzw.init( |
| 1663 | + icf_store, |
| 1664 | + target_num_partitions=target_num_partitions, |
| 1665 | + schema=schema_instance, |
| 1666 | + dimension_separator=dimension_separator, |
| 1667 | + max_variant_chunks=max_variant_chunks, |
| 1668 | + ) |
| 1669 | + |
| 1670 | + |
| 1671 | +def encode_partition(zarr_path, partition): |
| 1672 | + writer_instance = writer.VcfZarrWriter(IntermediateColumnarFormat, zarr_path) |
| 1673 | + writer_instance.encode_partition(partition) |
| 1674 | + |
| 1675 | + |
| 1676 | +def encode_finalise(zarr_path, show_progress=False): |
| 1677 | + writer_instance = writer.VcfZarrWriter(IntermediateColumnarFormat, zarr_path) |
| 1678 | + writer_instance.finalise(show_progress=show_progress) |
0 commit comments