@@ -1127,77 +1127,44 @@ def create_array(self, variable):
1127
1127
)
1128
1128
a .attrs ["_ARRAY_DIMENSIONS" ] = variable .dimensions
1129
1129
1130
- def encode_column (self , pcvcf , column ):
1130
+ def encode_column (self , pcvcf , column , encoder_threads = 4 ):
1131
1131
source_col = pcvcf .columns [column .vcf_field ]
1132
1132
array = self .root [column .name ]
1133
1133
ba = core .BufferedArray (array )
1134
1134
sanitiser = source_col .sanitiser_factory (ba .buff .shape )
1135
- chunk_length = array .chunks [0 ]
1136
1135
1137
- with cf .ThreadPoolExecutor (max_workers = 4 ) as executor :
1138
- futures = []
1139
- chunk_start = 0
1140
- j = 0
1136
+ with core .ThreadedZarrEncoder ([ba ], encoder_threads ) as te :
1141
1137
last_bytes_read = 0
1142
1138
for value , bytes_read in source_col .iter_values_bytes ():
1139
+ j = te .next_buffer_row ()
1143
1140
sanitiser (ba .buff , j , value )
1144
- j += 1
1145
- if j == chunk_length :
1146
- flush_futures (futures )
1147
- futures .extend (ba .async_flush (executor , chunk_start ))
1148
- ba .swap_buffers ()
1149
- j = 0
1150
- chunk_start += chunk_length
1151
1141
if last_bytes_read != bytes_read :
1152
1142
with progress_counter .get_lock ():
1153
1143
progress_counter .value += bytes_read - last_bytes_read
1154
1144
last_bytes_read = bytes_read
1155
1145
1156
- if j != 0 :
1157
- flush_futures (futures )
1158
- futures .extend (ba .async_flush (executor , chunk_start , j ))
1159
- flush_futures (futures )
1160
-
1161
- def encode_genotypes (self , pcvcf ):
1146
+ def encode_genotypes (self , pcvcf , encoder_threads = 4 ):
1162
1147
source_col = pcvcf .columns ["FORMAT/GT" ]
1163
1148
gt = core .BufferedArray (self .root ["call_genotype" ])
1164
1149
gt_mask = core .BufferedArray (self .root ["call_genotype_mask" ])
1165
1150
gt_phased = core .BufferedArray (self .root ["call_genotype_phased" ])
1166
- chunk_length = gt .array .chunks [0 ]
1167
-
1168
1151
buffered_arrays = [gt , gt_phased , gt_mask ]
1169
1152
1170
- with cf .ThreadPoolExecutor (max_workers = 4 ) as executor :
1171
- futures = []
1172
- chunk_start = 0
1173
- j = 0
1153
+ with core .ThreadedZarrEncoder (buffered_arrays , encoder_threads ) as te :
1174
1154
last_bytes_read = 0
1175
1155
for value , bytes_read in source_col .iter_values_bytes ():
1156
+ j = te .next_buffer_row ()
1176
1157
sanitise_value_int_2d (gt .buff , j , value [:, :- 1 ])
1177
1158
sanitise_value_int_1d (gt_phased .buff , j , value [:, - 1 ])
1178
1159
# TODO check is this the correct semantics when we are padding
1179
1160
# with mixed ploidies?
1180
1161
gt_mask .buff [j ] = gt .buff [j ] < 0
1181
1162
1182
- j += 1
1183
- if j == chunk_length :
1184
- flush_futures (futures )
1185
- for ba in buffered_arrays :
1186
- futures .extend (ba .async_flush (executor , chunk_start ))
1187
- ba .swap_buffers ()
1188
- j = 0
1189
- chunk_start += chunk_length
1190
1163
if last_bytes_read != bytes_read :
1191
1164
with progress_counter .get_lock ():
1192
1165
progress_counter .value += bytes_read - last_bytes_read
1193
1166
last_bytes_read = bytes_read
1194
1167
1195
- if j != 0 :
1196
- flush_futures (futures )
1197
- for ba in buffered_arrays :
1198
- futures .extend (ba .async_flush (executor , chunk_start , j ))
1199
- flush_futures (futures )
1200
-
1201
1168
def encode_alleles (self , pcvcf ):
1202
1169
ref_col = pcvcf .columns ["REF" ]
1203
1170
alt_col = pcvcf .columns ["ALT" ]
@@ -1451,7 +1418,7 @@ def convert_vcf(
1451
1418
)
1452
1419
1453
1420
1454
- def encode_bed_partition_genotypes (bed_path , zarr_path , start_variant , end_variant ):
1421
+ def encode_bed_partition_genotypes (bed_path , zarr_path , start_variant , end_variant , encoder_threads = 8 ):
1455
1422
bed = bed_reader .open_bed (bed_path , num_threads = 1 )
1456
1423
1457
1424
store = zarr .DirectoryStore (zarr_path )
@@ -1464,8 +1431,7 @@ def encode_bed_partition_genotypes(bed_path, zarr_path, start_variant, end_varia
1464
1431
1465
1432
buffered_arrays = [gt , gt_phased , gt_mask ]
1466
1433
1467
- with cf .ThreadPoolExecutor (max_workers = 8 ) as executor :
1468
- futures = []
1434
+ with core .ThreadedZarrEncoder (buffered_arrays , encoder_threads ) as te :
1469
1435
1470
1436
start = start_variant
1471
1437
while start < end_variant :
@@ -1474,7 +1440,8 @@ def encode_bed_partition_genotypes(bed_path, zarr_path, start_variant, end_varia
1474
1440
# Note could do this without iterating over rows, but it's a bit
1475
1441
# simpler and the bottleneck is in the encoding step anyway. It's
1476
1442
# also nice to have updates on the progress monitor.
1477
- for j , values in enumerate (bed_chunk ):
1443
+ for values in bed_chunk :
1444
+ j = te .next_buffer_row ()
1478
1445
dest = gt .buff [j ]
1479
1446
dest [values == - 127 ] = - 1
1480
1447
dest [values == 2 ] = 1
@@ -1483,14 +1450,7 @@ def encode_bed_partition_genotypes(bed_path, zarr_path, start_variant, end_varia
1483
1450
gt_mask .buff [j ] = dest == - 1
1484
1451
with progress_counter .get_lock ():
1485
1452
progress_counter .value += 1
1486
-
1487
- assert j <= chunk_length
1488
- flush_futures (futures )
1489
- for ba in buffered_arrays :
1490
- ba .async_flush (extend , start , j )
1491
- ba .swap_buffers ()
1492
1453
start = stop
1493
- flush_futures (futures )
1494
1454
1495
1455
1496
1456
def validate (vcf_path , zarr_path , show_progress = False ):
0 commit comments