Skip to content

Commit 547decf

Browse files
committed
Remove source dict
1 parent 8d69bfa commit 547decf

File tree

3 files changed

+21
-156
lines changed

3 files changed

+21
-156
lines changed

bio2zarr/plink.py

Lines changed: 3 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import numpy as np
66
import zarr
77

8+
from bio2zarr import constants, schema, writer
9+
810
logger = logging.getLogger(__name__)
911

1012

@@ -21,7 +23,6 @@ def iter_alleles(self, start, stop, num_alleles):
2123
ref_field = self.bed.allele_1
2224
alt_field = self.bed.allele_2
2325

24-
# TODO - should be doing whole chunks rather than one at a time
2526
for ref, alt in zip(
2627
ref_field[start:stop],
2728
alt_field[start:stop],
@@ -49,10 +50,6 @@ def iter_genotypes(self, shape, start, stop):
4950
yield gt, phased
5051

5152

52-
# Import here to avoid circular import
53-
from bio2zarr import constants, schema, writer # noqa: E402
54-
55-
5653
def generate_schema(
5754
bed,
5855
variants_chunk_size=None,
@@ -147,7 +144,7 @@ def convert(
147144
samples_chunk_size=samples_chunk_size,
148145
)
149146
zarr_path = pathlib.Path(zarr_path)
150-
vzw = writer.VcfZarrWriter("plink", zarr_path)
147+
vzw = writer.VcfZarrWriter(PlinkFormat, zarr_path)
151148
# Rough heuristic to split work up enough to keep utilisation high
152149
target_num_partitions = max(1, worker_processes * 4)
153150
vzw.init(
@@ -168,146 +165,6 @@ def convert(
168165
# vzw.create_index()
169166

170167

171-
# def encode_genotypes_slice(bed_path, zarr_path, start, stop):
172-
# # We need to count the A2 alleles here if we want to keep the
173-
# # alleles reported as allele_1, allele_2. It's obvious here what
174-
# # the correct approach is, but it is important to note that the
175-
# # 0th allele is *not* necessarily the REF for these datasets.
176-
# bed = bed_reader.open_bed(bed_path, num_threads=1, count_A1=False)
177-
# root = zarr.open(store=zarr_path, mode="a", **ZARR_FORMAT_KWARGS)
178-
# gt = core.BufferedArray(root["call_genotype"], start)
179-
# gt_mask = core.BufferedArray(root["call_genotype_mask"], start)
180-
# gt_phased = core.BufferedArray(root["call_genotype_phased"], start)
181-
# variants_chunk_size = gt.array.chunks[0]
182-
# assert start % variants_chunk_size == 0
183-
184-
# logger.debug(f"Reading slice {start}:{stop}")
185-
# chunk_start = start
186-
# while chunk_start < stop:
187-
# chunk_stop = min(chunk_start + variants_chunk_size, stop)
188-
# logger.debug(f"Reading bed slice {chunk_start}:{chunk_stop}")
189-
# bed_chunk = bed.read(slice(chunk_start, chunk_stop), dtype=np.int8).T
190-
# logger.debug(f"Got bed slice {humanfriendly.format_size(bed_chunk.nbytes)}")
191-
# # Probably should do this without iterating over rows, but it's a bit
192-
# # simpler and lines up better with the array buffering API. The bottleneck
193-
# # is in the encoding anyway.
194-
# for values in bed_chunk:
195-
# j = gt.next_buffer_row()
196-
# g = np.zeros_like(gt.buff[j])
197-
# g[values == -127] = -1
198-
# g[values == 2] = 1
199-
# g[values == 1, 0] = 1
200-
# gt.buff[j] = g
201-
# j = gt_phased.next_buffer_row()
202-
# gt_phased.buff[j] = False
203-
# j = gt_mask.next_buffer_row()
204-
# gt_mask.buff[j] = gt.buff[j] == -1
205-
# chunk_start = chunk_stop
206-
# gt.flush()
207-
# gt_phased.flush()
208-
# gt_mask.flush()
209-
# logger.debug(f"GT slice {start}:{stop} done")
210-
211-
# root = zarr.open_group(store=zarr_path, mode="w", **ZARR_FORMAT_KWARGS)
212-
213-
# ploidy = 2
214-
# shape = [m, n]
215-
# chunks = [variants_chunk_size, samples_chunk_size]
216-
# dimensions = ["variants", "samples"]
217-
218-
# # TODO we should be reusing some logic from vcfzarr here on laying
219-
# # out the basic dataset, and using the schema generator. Currently
220-
# # we're not using the best Blosc settings for genotypes here.
221-
# default_compressor = numcodecs.Blosc(cname="zstd", clevel=7)
222-
223-
# a = root.array(
224-
# "sample_id",
225-
# data=bed.iid,
226-
# shape=bed.iid.shape,
227-
# dtype="str",
228-
# compressor=default_compressor,
229-
# chunks=(samples_chunk_size,),
230-
# )
231-
# a.attrs["_ARRAY_DIMENSIONS"] = ["samples"]
232-
# logger.debug("Encoded samples")
233-
234-
# # TODO encode these in slices - but read them in one go to avoid
235-
# # fetching repeatedly from bim file
236-
# a = root.array(
237-
# "variant_position",
238-
# data=bed.bp_position,
239-
# shape=bed.bp_position.shape,
240-
# dtype=np.int32,
241-
# compressor=default_compressor,
242-
# chunks=(variants_chunk_size,),
243-
# )
244-
# a.attrs["_ARRAY_DIMENSIONS"] = ["variants"]
245-
# logger.debug("encoded variant_position")
246-
247-
# alleles = np.stack([bed.allele_1, bed.allele_2], axis=1)
248-
# a = root.array(
249-
# "variant_allele",
250-
# data=alleles,
251-
# shape=alleles.shape,
252-
# dtype="str",
253-
# compressor=default_compressor,
254-
# chunks=(variants_chunk_size, alleles.shape[1]),
255-
# )
256-
# a.attrs["_ARRAY_DIMENSIONS"] = ["variants", "alleles"]
257-
# logger.debug("encoded variant_allele")
258-
259-
# # TODO remove this?
260-
# a = root.empty(
261-
# name="call_genotype_phased",
262-
# dtype="bool",
263-
# shape=list(shape),
264-
# chunks=list(chunks),
265-
# compressor=default_compressor,
266-
# **ZARR_FORMAT_KWARGS,
267-
# )
268-
# a.attrs["_ARRAY_DIMENSIONS"] = list(dimensions)
269-
270-
# shape += [ploidy]
271-
# dimensions += ["ploidy"]
272-
# a = root.empty(
273-
# name="call_genotype",
274-
# dtype="i1",
275-
# shape=list(shape),
276-
# chunks=list(chunks),
277-
# compressor=default_compressor,
278-
# **ZARR_FORMAT_KWARGS,
279-
# )
280-
# a.attrs["_ARRAY_DIMENSIONS"] = list(dimensions)
281-
282-
# a = root.empty(
283-
# name="call_genotype_mask",
284-
# dtype="bool",
285-
# shape=list(shape),
286-
# chunks=list(chunks),
287-
# compressor=default_compressor,
288-
# **ZARR_FORMAT_KWARGS,
289-
# )
290-
# a.attrs["_ARRAY_DIMENSIONS"] = list(dimensions)
291-
292-
# del bed
293-
294-
# num_slices = max(1, worker_processes * 4)
295-
# slices = core.chunk_aligned_slices(a, num_slices)
296-
297-
# total_chunks = sum(a.nchunks for _, a in root.arrays())
298-
299-
# progress_config = core.ProgressConfig(
300-
# total=total_chunks, title="Convert", units="chunks", show=show_progress
301-
# )
302-
# with core.ParallelWorkManager(worker_processes, progress_config) as pwm:
303-
# for start, stop in slices:
304-
# pwm.submit(encode_genotypes_slice, bed_path, zarr_path, start, stop)
305-
306-
# # TODO also add atomic swap like VCF. Should be abstracted to
307-
# # share basic code for setting up the variation dataset zarr
308-
# zarr.consolidate_metadata(zarr_path)
309-
310-
311168
# FIXME do this more efficiently - currently reading the whole thing
312169
# in for convenience, and also comparing call-by-call
313170
def validate(bed_path, zarr_path):

bio2zarr/vcf2zarr/vcz.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ def encode(
276276
max_variant_chunks=max_variant_chunks,
277277
dimension_separator=dimension_separator,
278278
)
279-
vzw = writer.VcfZarrWriter("icf", zarr_path)
279+
vzw = writer.VcfZarrWriter(icf.IntermediateColumnarFormat, zarr_path)
280280
vzw.encode_all_partitions(
281281
worker_processes=worker_processes,
282282
show_progress=show_progress,
@@ -329,12 +329,12 @@ def encode_init(
329329

330330

331331
def encode_partition(zarr_path, partition):
332-
writer_instance = writer.VcfZarrWriter("icf", zarr_path)
332+
writer_instance = writer.VcfZarrWriter(icf.IntermediateColumnarFormat, zarr_path)
333333
writer_instance.encode_partition(partition)
334334

335335

336336
def encode_finalise(zarr_path, show_progress=False):
337-
writer_instance = writer.VcfZarrWriter("icf", zarr_path)
337+
writer_instance = writer.VcfZarrWriter(icf.IntermediateColumnarFormat, zarr_path)
338338
writer_instance.finalise(show_progress=show_progress)
339339

340340

bio2zarr/writer.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,21 @@
99
import numpy as np
1010
import zarr
1111

12-
from bio2zarr import constants, core, plink, provenance, schema, zarr_utils
13-
from bio2zarr.vcf2zarr import icf
12+
from bio2zarr import constants, core, provenance, schema, zarr_utils
1413

1514
logger = logging.getLogger(__name__)
1615

17-
SOURCES = {"icf": icf.IntermediateColumnarFormat, "plink": plink.PlinkFormat}
16+
17+
def sanitise_int_array(value, ndmin, dtype):
18+
if isinstance(value, tuple):
19+
value = [
20+
constants.VCF_INT_MISSING if x is None else x for x in value
21+
] # NEEDS TEST
22+
value = np.array(value, ndmin=ndmin, copy=True)
23+
value[value == constants.VCF_INT_MISSING] = -1
24+
value[value == constants.VCF_INT_FILL] = -2
25+
# TODO watch out for clipping here!
26+
return value.astype(dtype)
1827

1928

2029
def compute_la_field(genotypes):
@@ -87,10 +96,10 @@ class LocalisableFieldDescriptor:
8796

8897
localisable_fields = [
8998
LocalisableFieldDescriptor(
90-
"call_LAD", "FORMAT/AD", icf.sanitise_int_array, compute_lad_field
99+
"call_LAD", "FORMAT/AD", sanitise_int_array, compute_lad_field
91100
),
92101
LocalisableFieldDescriptor(
93-
"call_LPL", "FORMAT/PL", icf.sanitise_int_array, compute_lpl_field
102+
"call_LPL", "FORMAT/PL", sanitise_int_array, compute_lpl_field
94103
),
95104
]
96105

@@ -344,8 +353,7 @@ def load_metadata(self):
344353
if self.metadata is None:
345354
with open(self.wip_path / "metadata.json") as f:
346355
self.metadata = VcfZarrWriterMetadata.fromdict(json.load(f))
347-
source_loader = SOURCES[self.source_type]
348-
self.source = source_loader(self.metadata.source_path)
356+
self.source = self.source_type(self.metadata.source_path)
349357

350358
def partition_path(self, partition_index):
351359
return self.partitions_path / f"p{partition_index}"

0 commit comments

Comments
 (0)