2
2
import dataclasses
3
3
import multiprocessing
4
4
import functools
5
+ import logging
6
+ import os
5
7
import threading
6
8
import pathlib
7
9
import time
13
15
import tempfile
14
16
from typing import Any
15
17
16
- import humanize
18
+ import humanfriendly
17
19
import cyvcf2
18
20
import numcodecs
19
21
import numpy as np
23
25
24
26
import bed_reader
25
27
28
+ logger = logging .getLogger (__name__ )
29
+
26
30
INT_MISSING = - 1
27
31
INT_FILL = - 2
28
32
STR_MISSING = "."
@@ -271,8 +275,10 @@ def make_field_def(name, vcf_type, vcf_number):
271
275
def scan_vcfs (paths , show_progress ):
272
276
partitions = []
273
277
vcf_metadata = None
278
+ logger .info (f"Scanning { len (paths )} VCFs" )
274
279
for path in tqdm .tqdm (paths , desc = "Scan " , disable = not show_progress ):
275
280
vcf = cyvcf2 .VCF (path )
281
+ logger .debug (f"Scanning { path } " )
276
282
277
283
filters = [
278
284
h ["ID" ]
@@ -459,8 +465,12 @@ def __repr__(self):
459
465
# TODO add class name
460
466
return repr ({"path" : str (self .path ), ** self .vcf_field .summary .asdict ()})
461
467
468
+ def chunk_path (self , partition_index , chunk_index ):
469
+ return self .path / f"p{ partition_index } " / f"c{ chunk_index } "
470
+
462
471
def write_chunk (self , partition_index , chunk_index , data ):
463
- path = self .path / f"p{ partition_index } " / f"c{ chunk_index } "
472
+ path = self .chunk_path (partition_index , chunk_index )
473
+ logger .debug (f"Start write: { path } " )
464
474
pkl = pickle .dumps (data )
465
475
# NOTE assuming that reusing the same compressor instance
466
476
# from multiple threads is OK!
@@ -472,9 +482,10 @@ def write_chunk(self, partition_index, chunk_index, data):
472
482
self .vcf_field .summary .num_chunks += 1
473
483
self .vcf_field .summary .compressed_size += len (compressed )
474
484
self .vcf_field .summary .uncompressed_size += len (pkl )
485
+ logger .debug (f"Finish write: { path } " )
475
486
476
487
def read_chunk (self , partition_index , chunk_index ):
477
- path = self .path / f"p { partition_index } " / f"c { chunk_index } "
488
+ path = self .chunk_path ( partition_index , chunk_index )
478
489
with open (path , "rb" ) as f :
479
490
pkl = self .compressor .decode (f .read ())
480
491
return pickle .loads (pkl ), len (pkl )
@@ -615,6 +626,8 @@ def append(self, val):
615
626
616
627
def flush (self ):
617
628
if len (self .buffer ) > 0 :
629
+ path = self .column .chunk_path (self .partition_index , self .chunk_index )
630
+ logger .debug (f"Schedule write: { path } " )
618
631
future = self .executor .submit (
619
632
self .column .write_chunk ,
620
633
self .partition_index ,
@@ -649,7 +662,7 @@ def display_number(x):
649
662
return ret
650
663
651
664
def display_size (n ):
652
- return humanize . naturalsize ( n , binary = True )
665
+ return humanfriendly . format_size ( n )
653
666
654
667
data = []
655
668
for name , col in self .columns .items ():
@@ -688,6 +701,10 @@ def num_partitions(self):
688
701
def num_samples (self ):
689
702
return len (self .metadata .samples )
690
703
704
+ @property
705
+ def num_columns (self ):
706
+ return len (self .columns )
707
+
691
708
def mkdirs (self ):
692
709
self .path .mkdir ()
693
710
for col in self .columns .values ():
@@ -716,6 +733,10 @@ def convert(
716
733
partition .num_records for partition in vcf_metadata .partitions
717
734
)
718
735
736
+ logger .info (
737
+ f"Exploding { pcvcf .num_columns } columns { total_variants } variants "
738
+ f"{ pcvcf .num_samples } samples"
739
+ )
719
740
global progress_counter
720
741
progress_counter = multiprocessing .Value ("Q" , 0 )
721
742
@@ -774,6 +795,7 @@ def convert_partition(
774
795
partition = vcf_metadata .partitions [partition_index ]
775
796
vcf = cyvcf2 .VCF (partition .vcf_path )
776
797
futures = set ()
798
+ logger .info (f"Start partition { partition_index } { partition .vcf_path } " )
777
799
778
800
def service_futures (max_waiting = 2 * flush_threads ):
779
801
while len (futures ) > max_waiting :
@@ -824,12 +846,7 @@ def service_futures(max_waiting=2 * flush_threads):
824
846
gt .append (variant .genotype .array ())
825
847
826
848
for name , buff in info_fields :
827
- val = None
828
- try :
829
- val = variant .INFO [name ]
830
- except KeyError :
831
- pass
832
- buff .append (val )
849
+ buff .append (variant .INFO .get (name , None ))
833
850
834
851
for name , buff in format_fields :
835
852
val = None
@@ -841,11 +858,15 @@ def service_futures(max_waiting=2 * flush_threads):
841
858
842
859
service_futures ()
843
860
861
+ # Note: an issue with updating the progress per variant here like this
862
+ # is that we get a significant pause at the end of the counter while
863
+ # all the "small" fields get flushed. Possibly not much to be done about it.
844
864
with progress_counter .get_lock ():
845
865
progress_counter .value += 1
846
866
847
867
for col in columns .values ():
848
868
col .flush ()
869
+ logger .info (f"VCF read finished; waiting on { len (futures )} chunk writes" )
849
870
service_futures (0 )
850
871
851
872
return summaries
@@ -1213,6 +1234,7 @@ def encode_alleles(self, pcvcf):
1213
1234
with progress_counter .get_lock ():
1214
1235
for col in [ref_col , alt_col ]:
1215
1236
progress_counter .value += col .vcf_field .summary .uncompressed_size
1237
+ logger .debug ("alleles done" )
1216
1238
1217
1239
def encode_samples (self , pcvcf , sample_id , chunk_width ):
1218
1240
if not np .array_equal (sample_id , pcvcf .metadata .samples ):
@@ -1225,6 +1247,7 @@ def encode_samples(self, pcvcf, sample_id, chunk_width):
1225
1247
chunks = (chunk_width ,),
1226
1248
)
1227
1249
array .attrs ["_ARRAY_DIMENSIONS" ] = ["samples" ]
1250
+ logger .debug ("Samples done" )
1228
1251
1229
1252
def encode_contig (self , pcvcf , contig_names , contig_lengths ):
1230
1253
array = self .root .array (
@@ -1258,6 +1281,7 @@ def encode_contig(self, pcvcf, contig_names, contig_lengths):
1258
1281
1259
1282
with progress_counter .get_lock ():
1260
1283
progress_counter .value += col .vcf_field .summary .uncompressed_size
1284
+ logger .debug ("Contig done" )
1261
1285
1262
1286
def encode_filters (self , pcvcf , filter_names ):
1263
1287
self .root .attrs ["filters" ] = filter_names
@@ -1285,6 +1309,7 @@ def encode_filters(self, pcvcf, filter_names):
1285
1309
1286
1310
with progress_counter .get_lock ():
1287
1311
progress_counter .value += col .vcf_field .summary .uncompressed_size
1312
+ logger .debug ("Filters done" )
1288
1313
1289
1314
def encode_id (self , pcvcf ):
1290
1315
col = pcvcf .columns ["ID" ]
@@ -1305,14 +1330,21 @@ def encode_id(self, pcvcf):
1305
1330
1306
1331
with progress_counter .get_lock ():
1307
1332
progress_counter .value += col .vcf_field .summary .uncompressed_size
1333
+ logger .debug ("ID done" )
1308
1334
1309
1335
@staticmethod
1310
1336
def convert (
1311
1337
pcvcf , path , conversion_spec , * , worker_processes = 1 , show_progress = False
1312
1338
):
1313
- store = zarr .DirectoryStore (path )
1314
- # FIXME
1315
- sgvcf = SgvcfZarr (path )
1339
+ path = pathlib .Path (path )
1340
+ # TODO: we should do this as a future to avoid blocking
1341
+ if path .exists ():
1342
+ shutil .rmtree (path )
1343
+ write_path = path .with_suffix (path .suffix + f".{ os .getpid ()} .build" )
1344
+ store = zarr .DirectoryStore (write_path )
1345
+ # FIXME, duplicating logic about the store
1346
+ logger .info (f"Create zarr at { write_path } " )
1347
+ sgvcf = SgvcfZarr (write_path )
1316
1348
sgvcf .root = zarr .group (store = store , overwrite = True )
1317
1349
for variable in conversion_spec .variables [:]:
1318
1350
sgvcf .create_array (variable )
@@ -1373,11 +1405,14 @@ def convert(
1373
1405
1374
1406
flush_futures (futures )
1375
1407
1376
- zarr .consolidate_metadata (path )
1377
1408
# FIXME can't join the bar_thread because we never get to the correct
1378
1409
# number of bytes
1379
1410
# if bar_thread is not None:
1380
1411
# bar_thread.join()
1412
+ zarr .consolidate_metadata (write_path )
1413
+ # Atomic swap, now we've completely finished.
1414
+ logger .info (f"Moving to final path { path } " )
1415
+ os .rename (write_path , path )
1381
1416
1382
1417
1383
1418
def sync_flush_array (np_buffer , zarr_array , offset ):
@@ -1388,6 +1423,7 @@ def async_flush_array(executor, np_buffer, zarr_array, offset):
1388
1423
"""
1389
1424
Flush the specified chunk aligned buffer to the specified zarr array.
1390
1425
"""
1426
+ logger .debug (f"Schedule flush { zarr_array } @ { offset } " )
1391
1427
assert zarr_array .shape [1 :] == np_buffer .shape [1 :]
1392
1428
# print("sync", zarr_array, np_buffer)
1393
1429
0 commit comments