1
+ import logging
2
+
1
3
import numpy as np
2
4
import zarr
3
5
import bed_reader
4
6
5
7
from . import core
6
8
7
9
8
- def encode_bed_partition_genotypes (
9
- bed_path , zarr_path , start_variant , end_variant , encoder_threads = 8
10
- ):
11
- bed = bed_reader .open_bed (bed_path , num_threads = 1 )
10
+ logger = logging .getLogger (__name__ )
11
+
12
12
13
+ def encode_genotypes_slice (bed_path , zarr_path , start , stop ):
14
+ bed = bed_reader .open_bed (bed_path , num_threads = 1 )
13
15
store = zarr .DirectoryStore (zarr_path )
14
16
root = zarr .group (store = store )
15
- gt = core .BufferedArray (root ["call_genotype" ])
16
- gt_mask = core .BufferedArray (root ["call_genotype_mask" ])
17
- gt_phased = core .BufferedArray (root ["call_genotype_phased" ])
17
+ gt = core .BufferedArray (root ["call_genotype" ], start )
18
+ gt_mask = core .BufferedArray (root ["call_genotype_mask" ], start )
19
+ gt_phased = core .BufferedArray (root ["call_genotype_phased" ], start )
18
20
chunk_length = gt .array .chunks [0 ]
19
- assert start_variant % chunk_length == 0
20
-
21
- buffered_arrays = [gt , gt_phased , gt_mask ]
22
-
23
- with core .ThreadedZarrEncoder (buffered_arrays , encoder_threads ) as te :
24
- start = start_variant
25
- while start < end_variant :
26
- stop = min (start + chunk_length , end_variant )
27
- bed_chunk = bed .read (index = slice (start , stop ), dtype = "int8" ).T
28
- # Note could do this without iterating over rows, but it's a bit
29
- # simpler and the bottleneck is in the encoding step anyway. It's
30
- # also nice to have updates on the progress monitor.
31
- for values in bed_chunk :
32
- j = te .next_buffer_row ()
33
- dest = gt .buff [j ]
34
- dest [values == - 127 ] = - 1
35
- dest [values == 2 ] = 1
36
- dest [values == 1 , 0 ] = 1
37
- gt_phased .buff [j ] = False
38
- gt_mask .buff [j ] = dest == - 1
39
- core .update_progress (1 )
40
- start = stop
21
+ n = gt .array .shape [1 ]
22
+ assert start % chunk_length == 0
23
+
24
+ B = bed .read (dtype = np .int8 ).T
25
+
26
+ chunk_start = start
27
+ while chunk_start < stop :
28
+ chunk_stop = min (chunk_start + chunk_length , stop )
29
+ bed_chunk = bed .read (index = np .s_ [:, chunk_start :chunk_stop ], dtype = np .int8 ).T
30
+ # Probably should do this without iterating over rows, but it's a bit
31
+ # simpler and lines up better with the array buffering API. The bottleneck
32
+ # is in the encoding anyway.
33
+ for values in bed_chunk :
34
+ j = gt .next_buffer_row ()
35
+ g = np .zeros_like (gt .buff [j ])
36
+ g [values == - 127 ] = - 1
37
+ g [values == 2 ] = 1
38
+ g [values == 1 , 0 ] = 1
39
+ gt .buff [j ] = g
40
+ j = gt_phased .next_buffer_row ()
41
+ gt_phased .buff [j ] = False
42
+ j = gt_mask .next_buffer_row ()
43
+ gt_mask .buff [j ] = gt .buff [j ] == - 1
44
+ chunk_start = chunk_stop
45
+ gt .flush ()
46
+ gt_phased .flush ()
47
+ gt_mask .flush ()
48
+ logger .debug (f"GT slice { start } :{ stop } done" )
41
49
42
50
43
51
def convert (
@@ -81,7 +89,7 @@ def convert(
81
89
dimensions += ["ploidy" ]
82
90
a = root .empty (
83
91
"call_genotype" ,
84
- dtype = "i8 " ,
92
+ dtype = "i1 " ,
85
93
shape = list (shape ),
86
94
chunks = list (chunks ),
87
95
compressor = core .default_compressor ,
@@ -97,22 +105,52 @@ def convert(
97
105
)
98
106
a .attrs ["_ARRAY_DIMENSIONS" ] = list (dimensions )
99
107
100
- chunks_per_future = 2 # FIXME - make a parameter
101
- start = 0
102
- partitions = []
103
- while start < m :
104
- stop = min (m , start + chunk_length * chunks_per_future )
105
- partitions .append ((start , stop ))
106
- start = stop
107
- assert start == m
108
+ num_slices = max (1 , worker_processes * 4 )
109
+ slices = core .chunk_aligned_slices (a , num_slices )
110
+
111
+ total_chunks = sum (a .nchunks for a in root .values ())
108
112
109
113
progress_config = core .ProgressConfig (
110
- total = m , title = "Convert" , units = "vars " , show = show_progress
114
+ total = total_chunks , title = "Convert" , units = "chunks " , show = show_progress
111
115
)
112
116
with core .ParallelWorkManager (worker_processes , progress_config ) as pwm :
113
- for start , end in partitions :
114
- pwm .submit (encode_bed_partition_genotypes , bed_path , zarr_path , start , end )
117
+ for start , stop in slices :
118
+ pwm .submit (encode_genotypes_slice , bed_path , zarr_path , start , stop )
115
119
116
120
# TODO also add atomic swap like VCF. Should be abstracted to
117
121
# share basic code for setting up the variation dataset zarr
118
122
zarr .consolidate_metadata (zarr_path )
123
+
124
+
125
+ # FIXME do this more efficiently - currently reading the whole thing
126
+ # in for convenience, and also comparing call-by-call
127
+ def validate (bed_path , zarr_path ):
128
+ store = zarr .DirectoryStore (zarr_path )
129
+ root = zarr .group (store = store )
130
+ call_genotype = root ["call_genotype" ][:]
131
+
132
+ bed = bed_reader .open_bed (bed_path , num_threads = 1 )
133
+
134
+ assert call_genotype .shape [0 ] == bed .sid_count
135
+ assert call_genotype .shape [1 ] == bed .iid_count
136
+ bed_genotypes = bed .read (dtype = "int8" ).T
137
+ assert call_genotype .shape [0 ] == bed_genotypes .shape [0 ]
138
+ assert call_genotype .shape [1 ] == bed_genotypes .shape [1 ]
139
+ assert call_genotype .shape [2 ] == 2
140
+
141
+ row_id = 0
142
+ for bed_row , zarr_row in zip (bed_genotypes , call_genotype ):
143
+ # print("ROW", row_id)
144
+ # print(bed_row, zarr_row)
145
+ row_id += 1
146
+ for bed_call , zarr_call in zip (bed_row , zarr_row ):
147
+ if bed_call == - 127 :
148
+ assert list (zarr_call ) == [- 1 , - 1 ]
149
+ elif bed_call == 0 :
150
+ assert list (zarr_call ) == [0 , 0 ]
151
+ elif bed_call == 1 :
152
+ assert list (zarr_call ) == [1 , 0 ]
153
+ elif bed_call == 2 :
154
+ assert list (zarr_call ) == [1 , 1 ]
155
+ else : # pragma no cover
156
+ assert False
0 commit comments