|
2 | 2 | from pathlib import Path
|
3 | 3 | from typing import Any, Dict, List, MutableMapping, Optional, Tuple, Union
|
4 | 4 |
|
| 5 | +import dask |
5 | 6 | import dask.array as da
|
6 | 7 | import dask.dataframe as dd
|
7 | 8 | import numpy as np
|
@@ -245,64 +246,65 @@ def read_plink(
|
245 | 246 | f"{path}.{ext}" for ext in ["bed", "bim", "fam"]
|
246 | 247 | ]
|
247 | 248 |
|
248 |
| - # Load axis data first to determine dimension sizes |
249 |
| - df_fam = read_fam(fam_path, sep=fam_sep) # type: ignore[arg-type] |
250 |
| - df_bim = read_bim(bim_path, sep=bim_sep) # type: ignore[arg-type] |
251 |
| - |
252 |
| - if persist: |
253 |
| - df_fam = df_fam.persist() |
254 |
| - df_bim = df_bim.persist() |
255 |
| - |
256 |
| - arr_fam = dataframe_to_dict(df_fam, dtype=FAM_ARRAY_DTYPE) |
257 |
| - arr_bim = dataframe_to_dict(df_bim, dtype=BIM_ARRAY_DTYPE) |
258 |
| - |
259 |
| - # Load genotyping data |
260 |
| - call_genotype = da.from_array( |
261 |
| - # Make sure to use asarray=False in order for masked arrays to propagate |
262 |
| - BedReader(bed_path, (len(df_bim), len(df_fam)), count_A1=count_a1), # type: ignore[arg-type] |
263 |
| - chunks=chunks, |
264 |
| - # Lock must be true with multiprocessing dask scheduler |
265 |
| - # to not get bed-reader errors (it works w/ threading backend though) |
266 |
| - lock=lock, |
267 |
| - asarray=False, |
268 |
| - name=f"bed_reader:read_plink:{bed_path}", |
269 |
| - ) |
| 249 | + with dask.config.set({"dataframe.convert-string": False}): |
| 250 | + # Load axis data first to determine dimension sizes |
| 251 | + df_fam = read_fam(fam_path, sep=fam_sep) # type: ignore[arg-type] |
| 252 | + df_bim = read_bim(bim_path, sep=bim_sep) # type: ignore[arg-type] |
| 253 | + |
| 254 | + if persist: |
| 255 | + df_fam = df_fam.persist() |
| 256 | + df_bim = df_bim.persist() |
| 257 | + |
| 258 | + arr_fam = dataframe_to_dict(df_fam, dtype=FAM_ARRAY_DTYPE) |
| 259 | + arr_bim = dataframe_to_dict(df_bim, dtype=BIM_ARRAY_DTYPE) |
| 260 | + |
| 261 | + # Load genotyping data |
| 262 | + call_genotype = da.from_array( |
| 263 | + # Make sure to use asarray=False in order for masked arrays to propagate |
| 264 | + BedReader(bed_path, (len(df_bim), len(df_fam)), count_A1=count_a1), # type: ignore[arg-type] |
| 265 | + chunks=chunks, |
| 266 | + # Lock must be true with multiprocessing dask scheduler |
| 267 | + # to not get bed-reader errors (it works w/ threading backend though) |
| 268 | + lock=lock, |
| 269 | + asarray=False, |
| 270 | + name=f"bed_reader:read_plink:{bed_path}", |
| 271 | + ) |
270 | 272 |
|
271 |
| - # If contigs are already integers, use them as-is |
272 |
| - if bim_int_contig: |
273 |
| - variant_contig = arr_bim["contig"].astype("int16") |
274 |
| - variant_contig_names = da.unique(variant_contig).astype(str) |
275 |
| - variant_contig_names = list(variant_contig_names.compute()) |
276 |
| - # Otherwise create index for contig names based |
277 |
| - # on order of appearance in underlying .bim file |
278 |
| - else: |
279 |
| - variant_contig, variant_contig_names = encode_array(arr_bim["contig"].compute()) # type: ignore |
280 |
| - variant_contig = variant_contig.astype("int16") |
281 |
| - variant_contig_names = list(variant_contig_names) |
282 |
| - |
283 |
| - variant_position = arr_bim["pos"] |
284 |
| - a1: ArrayLike = arr_bim["a1"].astype("str") |
285 |
| - a2: ArrayLike = arr_bim["a2"].astype("str") |
286 |
| - |
287 |
| - # Note: column_stack not implemented in Dask, must use [v|h]stack |
288 |
| - variant_allele = da.hstack((a1[:, np.newaxis], a2[:, np.newaxis])) |
289 |
| - variant_allele = variant_allele.astype("S") |
290 |
| - variant_id = arr_bim["variant_id"] |
291 |
| - |
292 |
| - sample_id = arr_fam["member_id"] |
293 |
| - |
294 |
| - ds = create_genotype_call_dataset( |
295 |
| - variant_contig_names=variant_contig_names, |
296 |
| - variant_contig=variant_contig, |
297 |
| - variant_position=variant_position, |
298 |
| - variant_allele=variant_allele, |
299 |
| - sample_id=sample_id, |
300 |
| - call_genotype=call_genotype, |
301 |
| - variant_id=variant_id, |
302 |
| - ) |
| 273 | + # If contigs are already integers, use them as-is |
| 274 | + if bim_int_contig: |
| 275 | + variant_contig = arr_bim["contig"].astype("int16") |
| 276 | + variant_contig_names = da.unique(variant_contig).astype(str) |
| 277 | + variant_contig_names = list(variant_contig_names.compute()) |
| 278 | + # Otherwise create index for contig names based |
| 279 | + # on order of appearance in underlying .bim file |
| 280 | + else: |
| 281 | + variant_contig, variant_contig_names = encode_array(arr_bim["contig"].compute()) # type: ignore |
| 282 | + variant_contig = variant_contig.astype("int16") |
| 283 | + variant_contig_names = list(variant_contig_names) |
| 284 | + |
| 285 | + variant_position = arr_bim["pos"] |
| 286 | + a1: ArrayLike = arr_bim["a1"].astype("str") |
| 287 | + a2: ArrayLike = arr_bim["a2"].astype("str") |
| 288 | + |
| 289 | + # Note: column_stack not implemented in Dask, must use [v|h]stack |
| 290 | + variant_allele = da.hstack((a1[:, np.newaxis], a2[:, np.newaxis])) |
| 291 | + variant_allele = variant_allele.astype("S") |
| 292 | + variant_id = arr_bim["variant_id"] |
| 293 | + |
| 294 | + sample_id = arr_fam["member_id"] |
| 295 | + |
| 296 | + ds = create_genotype_call_dataset( |
| 297 | + variant_contig_names=variant_contig_names, |
| 298 | + variant_contig=variant_contig, |
| 299 | + variant_position=variant_position, |
| 300 | + variant_allele=variant_allele, |
| 301 | + sample_id=sample_id, |
| 302 | + call_genotype=call_genotype, |
| 303 | + variant_id=variant_id, |
| 304 | + ) |
303 | 305 |
|
304 |
| - # Assign PLINK-specific pedigree fields |
305 |
| - return ds.assign({f"sample_{f}": (DIM_SAMPLE, arr_fam[f]) for f in arr_fam}) |
| 306 | + # Assign PLINK-specific pedigree fields |
| 307 | + return ds.assign({f"sample_{f}": (DIM_SAMPLE, arr_fam[f]) for f in arr_fam}) |
306 | 308 |
|
307 | 309 |
|
308 | 310 | def plink_to_zarr(
|
|
0 commit comments