20
20
import tqdm
21
21
import zarr
22
22
23
- import bed_reader
24
-
25
23
from . import core
26
24
27
25
logger = logging .getLogger (__name__ )
38
36
[0x7F800001 , 0x7F800002 ], dtype = np .int32
39
37
)
40
38
41
- numcodecs .blosc .use_threads = False
42
-
43
- default_compressor = numcodecs .Blosc (
44
- cname = "zstd" , clevel = 7 , shuffle = numcodecs .Blosc .AUTOSHUFFLE
45
- )
46
-
47
39
48
40
def assert_all_missing_float (a ):
49
41
v = np .array (a , dtype = np .float32 ).view (np .int32 )
@@ -437,6 +429,8 @@ def __init__(self, vcf_field, base_path):
437
429
else :
438
430
self .path = base_path / vcf_field .category / vcf_field .name
439
431
432
+ # TODO Check if other compressors would give reasonable compression
433
+ # with significantly faster times
440
434
self .compressor = numcodecs .Blosc (cname = "zstd" , clevel = 7 )
441
435
# TODO have a clearer way of defining this state between
442
436
# read and write mode.
@@ -905,7 +899,7 @@ def generate(pcvcf, chunk_length=None, chunk_width=None):
905
899
if chunk_length is None :
906
900
chunk_length = 10_000
907
901
908
- compressor = default_compressor .get_config ()
902
+ compressor = core . default_compressor .get_config ()
909
903
910
904
def fixed_field_spec (
911
905
name , dtype , vcf_field = None , shape = (m ,), dimensions = ("variants" ,)
@@ -1136,7 +1130,7 @@ def encode_samples(self, pcvcf, sample_id, chunk_width):
1136
1130
"sample_id" ,
1137
1131
sample_id ,
1138
1132
dtype = "str" ,
1139
- compressor = default_compressor ,
1133
+ compressor = core . default_compressor ,
1140
1134
chunks = (chunk_width ,),
1141
1135
)
1142
1136
array .attrs ["_ARRAY_DIMENSIONS" ] = ["samples" ]
@@ -1147,7 +1141,7 @@ def encode_contig(self, pcvcf, contig_names, contig_lengths):
1147
1141
"contig_id" ,
1148
1142
contig_names ,
1149
1143
dtype = "str" ,
1150
- compressor = default_compressor ,
1144
+ compressor = core . default_compressor ,
1151
1145
)
1152
1146
array .attrs ["_ARRAY_DIMENSIONS" ] = ["contigs" ]
1153
1147
@@ -1181,7 +1175,7 @@ def encode_filters(self, pcvcf, filter_names):
1181
1175
"filter_id" ,
1182
1176
filter_names ,
1183
1177
dtype = "str" ,
1184
- compressor = default_compressor ,
1178
+ compressor = core . default_compressor ,
1185
1179
)
1186
1180
array .attrs ["_ARRAY_DIMENSIONS" ] = ["filters" ]
1187
1181
@@ -1339,41 +1333,6 @@ def convert_vcf(
1339
1333
)
1340
1334
1341
1335
1342
- def encode_bed_partition_genotypes (
1343
- bed_path , zarr_path , start_variant , end_variant , encoder_threads = 8
1344
- ):
1345
- bed = bed_reader .open_bed (bed_path , num_threads = 1 )
1346
-
1347
- store = zarr .DirectoryStore (zarr_path )
1348
- root = zarr .group (store = store )
1349
- gt = core .BufferedArray (root ["call_genotype" ])
1350
- gt_mask = core .BufferedArray (root ["call_genotype_mask" ])
1351
- gt_phased = core .BufferedArray (root ["call_genotype_phased" ])
1352
- chunk_length = gt .array .chunks [0 ]
1353
- assert start_variant % chunk_length == 0
1354
-
1355
- buffered_arrays = [gt , gt_phased , gt_mask ]
1356
-
1357
- with core .ThreadedZarrEncoder (buffered_arrays , encoder_threads ) as te :
1358
- start = start_variant
1359
- while start < end_variant :
1360
- stop = min (start + chunk_length , end_variant )
1361
- bed_chunk = bed .read (index = slice (start , stop ), dtype = "int8" ).T
1362
- # Note could do this without iterating over rows, but it's a bit
1363
- # simpler and the bottleneck is in the encoding step anyway. It's
1364
- # also nice to have updates on the progress monitor.
1365
- for values in bed_chunk :
1366
- j = te .next_buffer_row ()
1367
- dest = gt .buff [j ]
1368
- dest [values == - 127 ] = - 1
1369
- dest [values == 2 ] = 1
1370
- dest [values == 1 , 0 ] = 1
1371
- gt_phased .buff [j ] = False
1372
- gt_mask .buff [j ] = dest == - 1
1373
- core .update_progress (1 )
1374
- start = stop
1375
-
1376
-
1377
1336
def validate (vcf_path , zarr_path , show_progress = False ):
1378
1337
store = zarr .DirectoryStore (zarr_path )
1379
1338
@@ -1508,89 +1467,3 @@ def validate(vcf_path, zarr_path, show_progress=False):
1508
1467
print (vcf_val )
1509
1468
print (zarr_val )
1510
1469
assert False
1511
-
1512
-
1513
- def convert_plink (
1514
- bed_path ,
1515
- zarr_path ,
1516
- * ,
1517
- show_progress ,
1518
- worker_processes = 1 ,
1519
- chunk_length = None ,
1520
- chunk_width = None ,
1521
- ):
1522
- bed = bed_reader .open_bed (bed_path , num_threads = 1 )
1523
- n = bed .iid_count
1524
- m = bed .sid_count
1525
- del bed
1526
-
1527
- # FIXME
1528
- if chunk_width is None :
1529
- chunk_width = 1000
1530
- if chunk_length is None :
1531
- chunk_length = 10_000
1532
-
1533
- store = zarr .DirectoryStore (zarr_path )
1534
- root = zarr .group (store = store , overwrite = True )
1535
-
1536
- ploidy = 2
1537
- shape = [m , n ]
1538
- chunks = [chunk_length , chunk_width ]
1539
- dimensions = ["variants" , "samples" ]
1540
-
1541
- a = root .empty (
1542
- "call_genotype_phased" ,
1543
- dtype = "bool" ,
1544
- shape = list (shape ),
1545
- chunks = list (chunks ),
1546
- compressor = default_compressor ,
1547
- )
1548
- a .attrs ["_ARRAY_DIMENSIONS" ] = list (dimensions )
1549
-
1550
- shape += [ploidy ]
1551
- dimensions += ["ploidy" ]
1552
- a = root .empty (
1553
- "call_genotype" ,
1554
- dtype = "i8" ,
1555
- shape = list (shape ),
1556
- chunks = list (chunks ),
1557
- compressor = default_compressor ,
1558
- )
1559
- a .attrs ["_ARRAY_DIMENSIONS" ] = list (dimensions )
1560
-
1561
- a = root .empty (
1562
- "call_genotype_mask" ,
1563
- dtype = "bool" ,
1564
- shape = list (shape ),
1565
- chunks = list (chunks ),
1566
- compressor = default_compressor ,
1567
- )
1568
- a .attrs ["_ARRAY_DIMENSIONS" ] = list (dimensions )
1569
-
1570
- num_chunks = max (1 , m // chunk_length )
1571
- worker_processes = min (worker_processes , num_chunks )
1572
- if num_chunks == 1 or worker_processes == 1 :
1573
- partitions = [(0 , m )]
1574
- else :
1575
- # Generate num_workers partitions
1576
- # TODO finer grained might be better.
1577
- partitions = []
1578
- chunk_boundaries = [
1579
- p [0 ] for p in np .array_split (np .arange (num_chunks ), worker_processes )
1580
- ]
1581
- for j in range (len (chunk_boundaries ) - 1 ):
1582
- start = chunk_boundaries [j ] * chunk_length
1583
- end = chunk_boundaries [j + 1 ] * chunk_length
1584
- end = min (end , m )
1585
- partitions .append ((start , end ))
1586
- last_stop = partitions [- 1 ][- 1 ]
1587
- if last_stop != m :
1588
- partitions .append ((last_stop , m ))
1589
- # print(partitions)
1590
-
1591
- progress_config = core .ProgressConfig (
1592
- total = m , title = "Convert" , units = "vars" , show = show_progress
1593
- )
1594
- with core .ParallelWorkManager (worker_processes , progress_config ) as pwm :
1595
- for start , end in partitions :
1596
- pwm .submit (encode_bed_partition_genotypes , bed_path , zarr_path , start , end )
0 commit comments