11import collections
2+ import contextlib
23import dataclasses
34import functools
5+ import json
46import logging
7+ import math
58import os
69import pathlib
710import pickle
8- import sys
911import shutil
10- import json
11- import math
12+ import sys
1213import tempfile
13- import contextlib
1414from typing import Any , List
1515
16- import humanfriendly
1716import cyvcf2
17+ import humanfriendly
1818import numcodecs
1919import numpy as np
2020import numpy .testing as nt
2121import tqdm
2222import zarr
2323
24- from . import core
25- from . import provenance
26- from . import vcf_utils
24+ from . import core , provenance , vcf_utils
2725
2826logger = logging .getLogger (__name__ )
2927
@@ -301,7 +299,8 @@ def check_overlap(partitions):
301299
302300def scan_vcfs (paths , show_progress , target_num_partitions , worker_processes = 1 ):
303301 logger .info (
304- f"Scanning { len (paths )} VCFs attempting to split into { target_num_partitions } partitions."
302+ f"Scanning { len (paths )} VCFs attempting to split into { target_num_partitions } "
303+ f" partitions."
305304 )
306305 # An easy mistake to make is to pass the same file twice. Check this early on.
307306 for path , count in collections .Counter (paths ).items ():
@@ -850,7 +849,7 @@ def __init__(self, path):
850849 partition .num_records for partition in self .metadata .partitions
851850 ]
852851 # Allow us to find which partition a given record is in
853- self .partition_record_index = np .cumsum ([0 ] + partition_num_records )
852+ self .partition_record_index = np .cumsum ([0 , * partition_num_records ] )
854853 for field in self .metadata .fields :
855854 self .columns [field .full_name ] = IntermediateColumnarFormatField (self , field )
856855 logger .info (
@@ -860,7 +859,8 @@ def __init__(self, path):
860859
861860 def __repr__ (self ):
862861 return (
863- f"IntermediateColumnarFormat(fields={ len (self )} , partitions={ self .num_partitions } , "
862+ f"IntermediateColumnarFormat(fields={ len (self )} , "
863+ f"partitions={ self .num_partitions } , "
864864 f"records={ self .num_records } , path={ self .path } )"
865865 )
866866
@@ -956,11 +956,11 @@ def init(
956956 # probably going to be dropped.
957957 # https://github.com/pystatgen/vcf-zarr-spec/issues/15
958958 # May be useful to keep lying around still though?
959- logger .info (f "Writing VCF header" )
959+ logger .info ("Writing VCF header" )
960960 with open (self .path / "header.txt" , "w" ) as f :
961961 f .write (header )
962962
963- logger .info (f "Writing WIP metadata" )
963+ logger .info ("Writing WIP metadata" )
964964 with open (self .wip_path / "metadata.json" , "w" ) as f :
965965 json .dump (self .metadata .asdict (), f , indent = 4 )
966966 return self .num_partitions
@@ -988,13 +988,14 @@ def load_partition_summaries(self):
988988 not_found .append (j )
989989 if len (not_found ) > 0 :
990990 raise FileNotFoundError (
991- f"Partition metadata not found for { len (not_found )} partitions: { not_found } "
991+ f"Partition metadata not found for { len (not_found )} "
992+ f" partitions: { not_found } "
992993 )
993994 return summaries
994995
995996 def load_metadata (self ):
996997 if self .metadata is None :
997- with open (self .wip_path / f "metadata.json" ) as f :
998+ with open (self .wip_path / "metadata.json" ) as f :
998999 self .metadata = IcfMetadata .fromdict (json .load (f ))
9991000
10001001 def process_partition (self , partition_index ):
@@ -1043,12 +1044,14 @@ def process_partition(self, partition_index):
10431044 for field in format_fields :
10441045 val = variant .format (field .name )
10451046 tcw .append (field .full_name , val )
1046- # Note: an issue with updating the progress per variant here like this
1047- # is that we get a significant pause at the end of the counter while
1048- # all the "small" fields get flushed. Possibly not much to be done about it.
1047+ # Note: an issue with updating the progress per variant here like
1048+ # this is that we get a significant pause at the end of the counter
1049+ # while all the "small" fields get flushed. Possibly not much to be
1050+ # done about it.
10491051 core .update_progress (1 )
10501052 logger .info (
1051- f"Finished reading VCF for partition { partition_index } , flushing buffers"
1053+ f"Finished reading VCF for partition { partition_index } , "
1054+ f"flushing buffers"
10521055 )
10531056
10541057 partition_metadata = {
@@ -1130,11 +1133,11 @@ def finalise(self):
11301133 for summary in partition_summaries :
11311134 field .summary .update (summary ["field_summaries" ][field .full_name ])
11321135
1133- logger .info (f "Finalising metadata" )
1136+ logger .info ("Finalising metadata" )
11341137 with open (self .path / "metadata.json" , "w" ) as f :
11351138 json .dump (self .metadata .asdict (), f , indent = 4 )
11361139
1137- logger .debug (f "Removing WIP directory" )
1140+ logger .debug ("Removing WIP directory" )
11381141 shutil .rmtree (self .wip_path )
11391142
11401143
@@ -1148,7 +1151,7 @@ def explode(
11481151 compressor = None ,
11491152):
11501153 writer = IntermediateColumnarFormatWriter (icf_path )
1151- num_partitions = writer .init (
1154+ writer .init (
11521155 vcfs ,
11531156 # Heuristic to get reasonable worker utilisation with lumpy partition sizing
11541157 target_num_partitions = max (1 , worker_processes * 4 ),
@@ -1381,7 +1384,7 @@ def fixed_field_spec(
13811384 if field .category == "FORMAT" :
13821385 prefix = "call_"
13831386 shape .append (n )
1384- chunks .append (samples_chunk_size ),
1387+ chunks .append (samples_chunk_size )
13851388 dimensions .append ("samples" )
13861389 # TODO make an option to add in the empty extra dimension
13871390 if field .summary .max_number > 1 :
@@ -1633,7 +1636,9 @@ def encode_filters_slice(self, lookup, start, stop):
16331636 try :
16341637 var_filter .buff [j , lookup [f ]] = True
16351638 except KeyError :
1636- raise ValueError (f"Filter '{ f } ' was not defined in the header." )
1639+ raise ValueError (
1640+ f"Filter '{ f } ' was not defined " f"in the header."
1641+ ) from None
16371642 var_filter .flush ()
16381643 logger .debug (f"Encoded FILTERS slice { start } :{ stop } " )
16391644
@@ -1736,7 +1741,8 @@ def encode(
17361741 variant_chunk_size = array .blocks [0 ].nbytes
17371742 encoding_memory_requirements [col .name ] = variant_chunk_size
17381743 logger .debug (
1739- f"{ col .name } requires at least { display_size (variant_chunk_size )} per worker"
1744+ f"{ col .name } requires at least { display_size (variant_chunk_size )} "
1745+ f"per worker"
17401746 )
17411747 total_bytes += array .nbytes
17421748
@@ -1845,8 +1851,9 @@ def service_completed_futures():
18451851 or len (future_to_work ) > max_queued
18461852 ):
18471853 logger .debug (
1848- f"Wait: mem_required={ used_memory + wp .memory } max_mem={ max_memory } "
1849- f"queued={ len (future_to_work )} max_queued={ max_queued } "
1854+ f"Wait: mem_required={ used_memory + wp .memory } "
1855+ f"max_mem={ max_memory } queued={ len (future_to_work )} "
1856+ f"max_queued={ max_queued } "
18501857 )
18511858 service_completed_futures ()
18521859 future = pwm .submit (wp .func , wp .start , wp .stop )
@@ -1890,7 +1897,7 @@ def encode(
18901897 raise ValueError (
18911898 "Cannot specify schema along with chunk sizes"
18921899 ) # NEEDS TEST
1893- with open (schema_path , "r" ) as f :
1900+ with open (schema_path ) as f :
18941901 schema = VcfZarrSchema .fromjson (f .read ())
18951902 zarr_path = pathlib .Path (zarr_path )
18961903 if zarr_path .exists ():
@@ -1971,7 +1978,7 @@ def assert_all_fill(zarr_val, vcf_type):
19711978 elif vcf_type == "Float" :
19721979 assert_all_fill_float (zarr_val )
19731980 else : # pragma: no cover
1974- assert False
1981+ assert False # noqa PT015
19751982
19761983
19771984def assert_all_missing (zarr_val , vcf_type ):
@@ -1984,7 +1991,7 @@ def assert_all_missing(zarr_val, vcf_type):
19841991 elif vcf_type == "Float" :
19851992 assert_all_missing_float (zarr_val )
19861993 else : # pragma: no cover
1987- assert False
1994+ assert False # noqa PT015
19881995
19891996
19901997def assert_info_val_missing (zarr_val , vcf_type ):
@@ -2123,7 +2130,7 @@ def validate(vcf_path, zarr_path, show_progress=False):
21232130 assert vid [j ] == ("." if row .ID is None else row .ID )
21242131 assert allele [j , 0 ] == row .REF
21252132 k = len (row .ALT )
2126- nt .assert_array_equal (allele [j , 1 : k + 1 ], row .ALT ),
2133+ nt .assert_array_equal (allele [j , 1 : k + 1 ], row .ALT )
21272134 assert np .all (allele [j , k + 1 :] == "" )
21282135 # TODO FILTERS
21292136
0 commit comments