Skip to content
Merged
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ tracing = "0.1"
tracing-subscriber = "0.3"
utoipa-swagger-ui = { version = "8.0", features = ["actix-web"] }
utoipa = { version = "5.2", features = ["actix_extras", "chrono", "indexmap", "preserve_order", "yaml"] }
tempfile = "3.10.1"

[dependencies.noodles]
version = "0.77.0"
Expand All @@ -78,5 +79,3 @@ tracing-test = "0.2.4"
# Compile insta with full optimization.
[profile.dev.package.insta]
opt-level = 3
[profile.dev.package.similar]
opt-level = 3
273 changes: 201 additions & 72 deletions src/clinvar_genes/cli/import.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
//! Import of minimal ClinVar data.

use std::{collections::HashSet, io::BufRead, sync::Arc};

use clap::Parser;
use prost::Message;

use crate::common;
use crate::pbs::clinvar::per_gene::{ClinvarPerGeneRecord, ExtractedVariantsPerRelease};
use crate::pbs::clinvar_data::class_by_freq::GeneCoarseClinsigFrequencyCounts;
use crate::pbs::clinvar_data::extracted_vars::ExtractedVcvRecord;
use crate::pbs::clinvar_data::gene_impact::GeneImpactCounts;
use clap::Parser;
use itertools::Itertools;
use prost::Message;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::{BufReader, Read, Write};
use std::iter::from_fn;
use std::path::{Path, PathBuf};
use std::{collections::HashSet, io::BufRead, sync::Arc};

/// Command line arguments for `tsv import` sub command.
#[derive(Parser, Debug, Clone)]
Expand Down Expand Up @@ -86,74 +90,140 @@ fn load_per_frequency_jsonl(
Ok(result)
}

type PerRelease = indexmap::IndexMap<String, Vec<ExtractedVcvRecord>>;
type PerGene = indexmap::IndexMap<String, PerRelease>;

/// Load per-gene sequence variants.
fn load_variants_jsonl(
variant_jsonls: &[String],
) -> Result<indexmap::IndexMap<String, Vec<ExtractedVariantsPerRelease>>, anyhow::Error> {
// Build intermediate data structure using nested maps.
let mut per_gene: PerGene = Default::default();

for path_jsonl in variant_jsonls {
let reader: Box<dyn std::io::Read> = if path_jsonl.ends_with(".gz") {
Box::new(flate2::read::GzDecoder::new(std::fs::File::open(
path_jsonl,
)?))
} else {
Box::new(std::fs::File::open(path_jsonl)?)
};
#[derive(Debug, Serialize, Deserialize)]
struct SortableVcvRecord {
hgnc_id: String,
record: ExtractedVcvRecord,
}

let reader = std::io::BufReader::new(reader);
struct ClinvarVariants {
paths: Vec<String>,
tempdir: PathBuf,
hgnc_ids: Option<HashSet<String>>,
}

for line in reader.lines() {
let line = line?;
let input_record = serde_json::from_str::<ExtractedVcvRecord>(&line);
match input_record {
Err(e) => {
tracing::warn!("skipping line because of error: {}", e);
continue;
}
Ok(input_record) => {
for hgnc_id in &input_record.hgnc_ids {
let per_gene_entry = per_gene.entry(hgnc_id.clone()).or_default();
let release = input_record
.sequence_location
.as_ref()
.expect("missing sequence_location")
.assembly
.clone();
let per_release_entry = per_gene_entry.entry(release.clone()).or_default();
per_release_entry.push(input_record.clone());
}
}
}
}
impl ClinvarVariants {
fn from_paths(variant_jsonls: &[String], directory: impl AsRef<Path>) -> anyhow::Result<Self> {
Ok(Self {
paths: variant_jsonls.to_vec(),
tempdir: PathBuf::from(directory.as_ref()),
hgnc_ids: None,
})
}

// Convert into final data structure that uses lists of entry records rather than nested maps.
let mut result = indexmap::IndexMap::new();
for (hgnc_id, per_release) in per_gene {
let mut per_gene_out = Vec::new();
for (release, extracted_vars) in per_release {
let mut variants = extracted_vars;
variants.sort_by(|a, b| {
a.accession
.as_ref()
.expect("no accession")
.accession
.cmp(&b.accession.as_ref().expect("no accession").accession)
});
per_gene_out.push(ExtractedVariantsPerRelease {
release: Some(release),
variants,
fn _iter(&self) -> impl Iterator<Item = SortableVcvRecord> + use<'_> {
self.paths
.iter()
.map(|path| {
let reader: Box<dyn Read> = if path.ends_with(".gz") {
Box::new(flate2::bufread::MultiGzDecoder::new(BufReader::new(
std::fs::File::open(path).expect(&format!("failed to open file: {}", path)),
)))
} else {
Box::new(
std::fs::File::open(path).expect(&format!("failed to open file: {}", path)),
)
};
reader
})
.map(BufReader::new)
.flat_map(|reader| reader.lines())
.map(Result::unwrap)
.map(|line| serde_json::from_str::<ExtractedVcvRecord>(&line))
.map(Result::unwrap)
.flat_map(|record| {
let hgnc_ids = record.hgnc_ids.clone();
hgnc_ids.into_iter().map(move |hgnc_id| SortableVcvRecord {
hgnc_id,
record: record.clone(),
})
})
}

/// Distribute the records to temporary files.
/// Returns the set of HGNC IDs for which records were distributed.
///
/// Writes the records to temporary compressed jsonl files, one file per HGNC ID.
pub(crate) fn distribute_records(&mut self) -> anyhow::Result<&HashSet<String>> {
let mut vars_per_gene_hgnc_ids = HashSet::new();
let mut writers = HashMap::new();
for (i, record) in self._iter().enumerate() {
let hgnc_id = &record.hgnc_id;
vars_per_gene_hgnc_ids.insert(hgnc_id.clone());

// Open file in append mode and write the record, compressed.
// This will require usage of the MultiGzDecoder in subsequent steps.
let mut writer = writers.entry(hgnc_id.clone()).or_insert_with(|| {
let path = self.tempdir.join(&format!(
"{}.tmp.jsonl.gz",
hgnc_id.strip_prefix("HGNC:").unwrap_or(hgnc_id)
));

std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.map(std::io::BufWriter::new)
.map(|w| flate2::write::GzEncoder::new(w, flate2::Compression::fast()))
.expect(&format!("failed to open file: {:?}", path))
});
serde_json::to_writer(&mut writer, &record.record)?;
writeln!(writer)?;
}
for (_, mut writer) in writers.drain() {
writer.flush()?;
}
result.insert(hgnc_id, per_gene_out);
drop(writers);
self.hgnc_ids = Some(vars_per_gene_hgnc_ids);
Ok(self.hgnc_ids.as_ref().unwrap())
}

Ok(result)
/// Returns an iterator over records, sorted by HGNC ID.
///
/// This method requires that `distribute_records` has been called before.
/// Records are read from the temporary files in sorted order.
pub(crate) fn sorted_records(
&mut self,
) -> anyhow::Result<impl Iterator<Item = SortableVcvRecord> + use<'_>> {
if self.hgnc_ids.is_none() {
tracing::warn!("Records have not been distributed yet, doing so now.");
self.distribute_records()?;
}

let mut hgnc_ids = self.hgnc_ids.as_ref().unwrap().iter().collect_vec();
hgnc_ids.sort();

let records = hgnc_ids.into_iter().flat_map(|hgnc_id| {
let path = self.tempdir.join(&format!(
"{}.tmp.jsonl.gz",
hgnc_id.strip_prefix("HGNC:").unwrap_or(hgnc_id)
));

let reader = std::fs::File::open(&path)
.map(BufReader::new)
.map(flate2::bufread::MultiGzDecoder::new)
.map(BufReader::new)
.expect(&format!("failed to open file: {:?}", path));
let mut lines = reader.lines();

from_fn(move || match lines.next() {
None => None,
Some(Ok(line)) => match serde_json::from_str::<ExtractedVcvRecord>(&line) {
Ok(r) => Some(SortableVcvRecord {
hgnc_id: hgnc_id.clone(),
record: r,
}),
Err(e) => {
panic!("failed to deserialize line: {:?} ({})", line, e)
}
},
Some(Err(e)) => {
panic!("failed to read line: {}", e);
}
})
});
Ok(records)
}
}

/// Perform import of the JSONL files.
Expand All @@ -177,29 +247,85 @@ fn jsonl_import(
"... done loading impact frequency per significance gene in {:?}",
&before_per_freq.elapsed()
);

tracing::info!("Loading variants per gene ...");
let before_vars = std::time::Instant::now();
let vars_per_gene = load_variants_jsonl(&args.paths_variant_jsonl)?;
// Temporary directory for storing the distributed records.
let tempdir = tempfile::tempdir()?;
let mut vars_per_gene = ClinvarVariants::from_paths(&args.paths_variant_jsonl, &tempdir)?;

// Distribute the records to files, such that we can later read them in sorted order.
let vars_per_gene_hgnc_ids = vars_per_gene.distribute_records()?;
tracing::info!(
"... done loading variants per gene in {:?}",
"... done preparing variants per gene in {:?}",
&before_vars.elapsed()
);

tracing::info!("Writing to database ...");
let before_write_to_db = std::time::Instant::now();
tracing::info!("Merging gene lists ...");
let before_merge = std::time::Instant::now();
let mut hgnc_ids = counts_per_impact
.keys()
.cloned()
.chain(counts_per_freq.keys().cloned())
.chain(vars_per_gene.keys().cloned())
.chain(vars_per_gene_hgnc_ids.iter().cloned())
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
hgnc_ids.sort();
tracing::info!(
"... done merging gene lists in {:?}",
&before_merge.elapsed()
);

tracing::info!("Writing to database ...");
let before_write_to_db = std::time::Instant::now();

// We need to check if there are any genes in the impact and frequency data that are not in the variants data,
// such that we can skip advancing the iterator for the variants data in those cases.

let hgnc_ids_not_in_vars_per_gene: HashSet<String> = hgnc_ids
.iter()
.filter(|hgnc_id| !vars_per_gene_hgnc_ids.contains(*hgnc_id))
.cloned()
.collect();

let vars_per_gene_records = vars_per_gene.sorted_records()?;
let vars_per_gene_records_by_hgnc_id = vars_per_gene_records.chunk_by(|r| r.hgnc_id.clone());
let mut vars_per_gene_records_by_hgnc_id = vars_per_gene_records_by_hgnc_id.into_iter();

// Read through all records and insert each into the database.
for hgnc_id in hgnc_ids.iter() {
let per_release_vars = vars_per_gene.get(hgnc_id).cloned().unwrap_or_default();
for (i, hgnc_id) in hgnc_ids.iter().enumerate() {
let per_release_vars = if hgnc_ids_not_in_vars_per_gene.contains(hgnc_id) {
tracing::warn!("No variants found for gene {}", hgnc_id);
vec![]
} else {
if let Some((group_hgnc_id, records)) = vars_per_gene_records_by_hgnc_id.next() {
if *hgnc_id != group_hgnc_id {
tracing::warn!("Iterators out of sync ({} vs {})", hgnc_id, &group_hgnc_id);
vec![]
} else {
let key = |r: &SortableVcvRecord| -> String {
r.record
.sequence_location
.as_ref()
.expect("Missing sequence location")
.assembly
.clone()
};
let by_assembly = records.into_group_map_by(|a| key(a));
by_assembly
.into_iter()
.map(|(assembly, group)| ExtractedVariantsPerRelease {
release: Some(assembly),
variants: group.into_iter().map(|r| r.record).collect(),
})
.collect()
}
} else {
panic!("No more records in vars_per_gene_records_by_hgnc_id, even though there should be.");
}
};

let record = ClinvarPerGeneRecord {
per_impact_counts: Some(counts_per_impact.get(hgnc_id).cloned().unwrap_or_default()),
per_freq_counts: Some(counts_per_freq.get(hgnc_id).cloned().unwrap_or_default()),
Expand All @@ -214,6 +340,9 @@ fn jsonl_import(
&before_write_to_db.elapsed()
);

tracing::info!("Cleaning up temporary files ...");
drop(tempdir);

Ok(())
}

Expand Down