1
1
import collections
2
+ import contextlib
2
3
import dataclasses
3
4
import functools
5
+ import json
4
6
import logging
7
+ import math
5
8
import os
6
9
import pathlib
7
10
import pickle
8
- import sys
9
11
import shutil
10
- import json
11
- import math
12
+ import sys
12
13
import tempfile
13
- import contextlib
14
14
from typing import Any , List
15
15
16
- import humanfriendly
17
16
import cyvcf2
17
+ import humanfriendly
18
18
import numcodecs
19
19
import numpy as np
20
20
import numpy .testing as nt
21
21
import tqdm
22
22
import zarr
23
23
24
- from . import core
25
- from . import provenance
26
- from . import vcf_utils
24
+ from . import core , provenance , vcf_utils
27
25
28
26
logger = logging .getLogger (__name__ )
29
27
@@ -301,7 +299,8 @@ def check_overlap(partitions):
301
299
302
300
def scan_vcfs (paths , show_progress , target_num_partitions , worker_processes = 1 ):
303
301
logger .info (
304
- f"Scanning { len (paths )} VCFs attempting to split into { target_num_partitions } partitions."
302
+ f"Scanning { len (paths )} VCFs attempting to split into { target_num_partitions } "
303
+ f" partitions."
305
304
)
306
305
# An easy mistake to make is to pass the same file twice. Check this early on.
307
306
for path , count in collections .Counter (paths ).items ():
@@ -850,7 +849,7 @@ def __init__(self, path):
850
849
partition .num_records for partition in self .metadata .partitions
851
850
]
852
851
# Allow us to find which partition a given record is in
853
- self .partition_record_index = np .cumsum ([0 ] + partition_num_records )
852
+ self .partition_record_index = np .cumsum ([0 , * partition_num_records ] )
854
853
for field in self .metadata .fields :
855
854
self .columns [field .full_name ] = IntermediateColumnarFormatField (self , field )
856
855
logger .info (
@@ -860,7 +859,8 @@ def __init__(self, path):
860
859
861
860
def __repr__ (self ):
862
861
return (
863
- f"IntermediateColumnarFormat(fields={ len (self )} , partitions={ self .num_partitions } , "
862
+ f"IntermediateColumnarFormat(fields={ len (self )} , "
863
+ f"partitions={ self .num_partitions } , "
864
864
f"records={ self .num_records } , path={ self .path } )"
865
865
)
866
866
@@ -956,11 +956,11 @@ def init(
956
956
# probably going to be dropped.
957
957
# https://github.com/pystatgen/vcf-zarr-spec/issues/15
958
958
# May be useful to keep lying around still though?
959
- logger .info (f "Writing VCF header" )
959
+ logger .info ("Writing VCF header" )
960
960
with open (self .path / "header.txt" , "w" ) as f :
961
961
f .write (header )
962
962
963
- logger .info (f "Writing WIP metadata" )
963
+ logger .info ("Writing WIP metadata" )
964
964
with open (self .wip_path / "metadata.json" , "w" ) as f :
965
965
json .dump (self .metadata .asdict (), f , indent = 4 )
966
966
return self .num_partitions
@@ -988,13 +988,14 @@ def load_partition_summaries(self):
988
988
not_found .append (j )
989
989
if len (not_found ) > 0 :
990
990
raise FileNotFoundError (
991
- f"Partition metadata not found for { len (not_found )} partitions: { not_found } "
991
+ f"Partition metadata not found for { len (not_found )} "
992
+ f" partitions: { not_found } "
992
993
)
993
994
return summaries
994
995
995
996
def load_metadata (self ):
996
997
if self .metadata is None :
997
- with open (self .wip_path / f "metadata.json" ) as f :
998
+ with open (self .wip_path / "metadata.json" ) as f :
998
999
self .metadata = IcfMetadata .fromdict (json .load (f ))
999
1000
1000
1001
def process_partition (self , partition_index ):
@@ -1043,12 +1044,14 @@ def process_partition(self, partition_index):
1043
1044
for field in format_fields :
1044
1045
val = variant .format (field .name )
1045
1046
tcw .append (field .full_name , val )
1046
- # Note: an issue with updating the progress per variant here like this
1047
- # is that we get a significant pause at the end of the counter while
1048
- # all the "small" fields get flushed. Possibly not much to be done about it.
1047
+ # Note: an issue with updating the progress per variant here like
1048
+ # this is that we get a significant pause at the end of the counter
1049
+ # while all the "small" fields get flushed. Possibly not much to be
1050
+ # done about it.
1049
1051
core .update_progress (1 )
1050
1052
logger .info (
1051
- f"Finished reading VCF for partition { partition_index } , flushing buffers"
1053
+ f"Finished reading VCF for partition { partition_index } , "
1054
+ f"flushing buffers"
1052
1055
)
1053
1056
1054
1057
partition_metadata = {
@@ -1130,11 +1133,11 @@ def finalise(self):
1130
1133
for summary in partition_summaries :
1131
1134
field .summary .update (summary ["field_summaries" ][field .full_name ])
1132
1135
1133
- logger .info (f "Finalising metadata" )
1136
+ logger .info ("Finalising metadata" )
1134
1137
with open (self .path / "metadata.json" , "w" ) as f :
1135
1138
json .dump (self .metadata .asdict (), f , indent = 4 )
1136
1139
1137
- logger .debug (f "Removing WIP directory" )
1140
+ logger .debug ("Removing WIP directory" )
1138
1141
shutil .rmtree (self .wip_path )
1139
1142
1140
1143
@@ -1148,7 +1151,7 @@ def explode(
1148
1151
compressor = None ,
1149
1152
):
1150
1153
writer = IntermediateColumnarFormatWriter (icf_path )
1151
- num_partitions = writer .init (
1154
+ writer .init (
1152
1155
vcfs ,
1153
1156
# Heuristic to get reasonable worker utilisation with lumpy partition sizing
1154
1157
target_num_partitions = max (1 , worker_processes * 4 ),
@@ -1381,7 +1384,7 @@ def fixed_field_spec(
1381
1384
if field .category == "FORMAT" :
1382
1385
prefix = "call_"
1383
1386
shape .append (n )
1384
- chunks .append (samples_chunk_size ),
1387
+ chunks .append (samples_chunk_size )
1385
1388
dimensions .append ("samples" )
1386
1389
# TODO make an option to add in the empty extra dimension
1387
1390
if field .summary .max_number > 1 :
@@ -1633,7 +1636,9 @@ def encode_filters_slice(self, lookup, start, stop):
1633
1636
try :
1634
1637
var_filter .buff [j , lookup [f ]] = True
1635
1638
except KeyError :
1636
- raise ValueError (f"Filter '{ f } ' was not defined in the header." )
1639
+ raise ValueError (
1640
+ f"Filter '{ f } ' was not defined " f"in the header."
1641
+ ) from None
1637
1642
var_filter .flush ()
1638
1643
logger .debug (f"Encoded FILTERS slice { start } :{ stop } " )
1639
1644
@@ -1736,7 +1741,8 @@ def encode(
1736
1741
variant_chunk_size = array .blocks [0 ].nbytes
1737
1742
encoding_memory_requirements [col .name ] = variant_chunk_size
1738
1743
logger .debug (
1739
- f"{ col .name } requires at least { display_size (variant_chunk_size )} per worker"
1744
+ f"{ col .name } requires at least { display_size (variant_chunk_size )} "
1745
+ f"per worker"
1740
1746
)
1741
1747
total_bytes += array .nbytes
1742
1748
@@ -1845,8 +1851,9 @@ def service_completed_futures():
1845
1851
or len (future_to_work ) > max_queued
1846
1852
):
1847
1853
logger .debug (
1848
- f"Wait: mem_required={ used_memory + wp .memory } max_mem={ max_memory } "
1849
- f"queued={ len (future_to_work )} max_queued={ max_queued } "
1854
+ f"Wait: mem_required={ used_memory + wp .memory } "
1855
+ f"max_mem={ max_memory } queued={ len (future_to_work )} "
1856
+ f"max_queued={ max_queued } "
1850
1857
)
1851
1858
service_completed_futures ()
1852
1859
future = pwm .submit (wp .func , wp .start , wp .stop )
@@ -1890,7 +1897,7 @@ def encode(
1890
1897
raise ValueError (
1891
1898
"Cannot specify schema along with chunk sizes"
1892
1899
) # NEEDS TEST
1893
- with open (schema_path , "r" ) as f :
1900
+ with open (schema_path ) as f :
1894
1901
schema = VcfZarrSchema .fromjson (f .read ())
1895
1902
zarr_path = pathlib .Path (zarr_path )
1896
1903
if zarr_path .exists ():
@@ -1971,7 +1978,7 @@ def assert_all_fill(zarr_val, vcf_type):
1971
1978
elif vcf_type == "Float" :
1972
1979
assert_all_fill_float (zarr_val )
1973
1980
else : # pragma: no cover
1974
- assert False
1981
+ assert False # noqa PT015
1975
1982
1976
1983
1977
1984
def assert_all_missing (zarr_val , vcf_type ):
@@ -1984,7 +1991,7 @@ def assert_all_missing(zarr_val, vcf_type):
1984
1991
elif vcf_type == "Float" :
1985
1992
assert_all_missing_float (zarr_val )
1986
1993
else : # pragma: no cover
1987
- assert False
1994
+ assert False # noqa PT015
1988
1995
1989
1996
1990
1997
def assert_info_val_missing (zarr_val , vcf_type ):
@@ -2123,7 +2130,7 @@ def validate(vcf_path, zarr_path, show_progress=False):
2123
2130
assert vid [j ] == ("." if row .ID is None else row .ID )
2124
2131
assert allele [j , 0 ] == row .REF
2125
2132
k = len (row .ALT )
2126
- nt .assert_array_equal (allele [j , 1 : k + 1 ], row .ALT ),
2133
+ nt .assert_array_equal (allele [j , 1 : k + 1 ], row .ALT )
2127
2134
assert np .all (allele [j , k + 1 :] == "" )
2128
2135
# TODO FILTERS
2129
2136
0 commit comments