@@ -1289,6 +1289,7 @@ class EncodingWork:
1289
1289
func : callable
1290
1290
start : int
1291
1291
stop : int
1292
+ columns : list [str ]
1292
1293
memory : int = 0
1293
1294
1294
1295
@@ -1525,50 +1526,65 @@ def encode(
1525
1526
f = functools .partial (self .encode_array_slice , col )
1526
1527
work .append (
1527
1528
EncodingWork (
1528
- f , start , stop , encoding_memory_requirements [col .name ]
1529
+ f ,
1530
+ start ,
1531
+ stop ,
1532
+ [col .name ],
1533
+ encoding_memory_requirements [col .name ],
1529
1534
)
1530
1535
)
1531
- work .append (EncodingWork (self .encode_alleles_slice , start , stop ))
1532
- work .append (EncodingWork (self .encode_id_slice , start , stop ))
1536
+ work .append (
1537
+ EncodingWork (self .encode_alleles_slice , start , stop , ["variant_allele" ])
1538
+ )
1539
+ work .append (EncodingWork (self .encode_id_slice , start , stop , ["variant_id" ]))
1533
1540
work .append (
1534
1541
EncodingWork (
1535
1542
functools .partial (self .encode_filters_slice , filter_id_map ),
1536
1543
start ,
1537
1544
stop ,
1545
+ ["variant_filters" ],
1538
1546
)
1539
1547
)
1540
1548
work .append (
1541
1549
EncodingWork (
1542
1550
functools .partial (self .encode_contig_slice , contig_id_map ),
1543
1551
start ,
1544
1552
stop ,
1553
+ ["variant_contig_id" ],
1545
1554
)
1546
1555
)
1547
1556
if "call_genotype" in self .schema .columns :
1557
+ variables = [
1558
+ "call_genotype" ,
1559
+ "call_genotype_phased" ,
1560
+ "call_genotype_mask" ,
1561
+ ]
1548
1562
gt_memory = sum (
1549
- encoding_memory_requirements [name ]
1550
- for name in [
1551
- "call_genotype" ,
1552
- "call_genotype_phased" ,
1553
- "call_genotype_mask" ,
1554
- ]
1563
+ encoding_memory_requirements [name ] for name in variables
1555
1564
)
1556
1565
work .append (
1557
- EncodingWork (self .encode_genotypes_slice , start , stop , gt_memory )
1566
+ EncodingWork (
1567
+ self .encode_genotypes_slice , start , stop , variables , gt_memory
1568
+ )
1558
1569
)
1559
1570
# Fail early if we can't fit a particular column into memory
1560
1571
for wp in work :
1561
1572
if wp .memory >= max_memory :
1562
- raise ValueError (f"Insufficient memory for { wp .func } : "
1563
- f"{ display_size (wp .memory )} > { display_size (max_memory )} " )
1564
-
1573
+ raise ValueError (
1574
+ f"Insufficient memory for { wp .columns } : "
1575
+ f"{ display_size (wp .memory )} > { display_size (max_memory )} "
1576
+ )
1565
1577
1566
1578
progress_config = core .ProgressConfig (
1567
1579
total = total_bytes ,
1568
1580
title = "Encode" ,
1569
1581
units = "B" ,
1570
1582
show = show_progress ,
1571
1583
)
1584
+ # TODO add a map of slices completed to column here, so that we can
1585
+ # finalise the arrays as they get completed. We'll have to service
1586
+ # the futures more, though, not just when we exceed the memory budget
1587
+
1572
1588
used_memory = 0
1573
1589
with core .ParallelWorkManager (worker_processes , progress_config ) as pwm :
1574
1590
future = pwm .submit (self .encode_samples )
0 commit comments