Skip to content
1 change: 1 addition & 0 deletions create_db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ psql --quiet -h "$db" -U $PGUSER -d genomic -a -f data/pr_339.sql >>setup_out.tx
psql --quiet -h "$db" -U $PGUSER -d genomic -a -f data/pr_341.sql >>setup_out.txt
psql --quiet -h "$db" -U $PGUSER -d genomic -a -f data/pr_352.sql >>setup_out.txt
psql --quiet -h "$db" -U $PGUSER -d genomic -a -f data/pr_374.sql >>setup_out.txt
psql --quiet -h "$db" -U $PGUSER -d genomic -a -f data/pr_388.sql >>setup_out.txt
echo "...done"
1 change: 1 addition & 0 deletions data/files.sql
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ CREATE TABLE variantfile (
indexed INTEGER,
chr_prefix VARCHAR,
reference_genome VARCHAR,
analysis_date DATE,
PRIMARY KEY (id)
-- FOREIGN KEY(drs_object_id) REFERENCES drs_object (id)
);
Expand Down
7 changes: 7 additions & 0 deletions data/pr_388.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- add the analysis date column if it doesn't already exist
DO
$$
BEGIN
ALTER TABLE variantfile ADD COLUMN IF NOT EXISTS analysis_date DATE;
END;
$$;
64 changes: 64 additions & 0 deletions htsget_server/analysis_date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import htsget_operations
import database
import os
import sys
import hashlib
import re
import datetime
from candigv2_logging.logging import initialize, CanDIGLogger
import requests
from authx.auth import create_service_token
import json
from server import Session
from sqlalchemy import Column, Integer, String, JSON, Boolean, MetaData, Date, ForeignKey, Table, select


logger = CanDIGLogger(__file__)

initialize()


def find_analysis_dates():
service_headers = {
"X-Service-Token": create_service_token()
}

headers_by_vf = {}
with Session() as session:
q = select(database.VariantFile.id, database.Header.text)
q = q.join(database.VariantFile, database.Header.associated_variantfiles)
q = q.where(database.Header.text.contains("%ate=%")).order_by(database.VariantFile.id)
for row in session.execute(q):
result = row._mapping
if result["id"] not in headers_by_vf:
headers_by_vf[result["id"]] = []
headers_by_vf[result["id"]].append(result["text"])

result = {
"date_added_to": [],
"errors": []
}
for vf in headers_by_vf:
analysis_date = database.get_analysis_date_from_headers(headers_by_vf[vf])
if "datetime" in str(type(analysis_date)):
analysis_date = analysis_date.strftime("%Y-%m-%d")
response = write_analysis_date(vf, service_headers, analysis_date)
if response.status_code == 200:
result["date_added_to"].append({vf: analysis_date})
else:
result["errors"].append(f"{vf}: {response.status_code} {response.text}")

return result

def write_analysis_date(drs_obj_id, headers, analysis_date):
response = requests.get(url=f"{os.getenv("DRS_URL")}/ga4gh/drs/v1/objects/{drs_obj_id}", headers=headers)
if response.status_code == 200:
obj = response.json()
obj["metadata"]["analysis_date"] = analysis_date
response = requests.post(url=f"{os.getenv("DRS_URL")}/ga4gh/drs/v1/objects", headers=headers, json=obj)
return response
return response


if __name__ == "__main__":
print(json.dumps(find_analysis_dates(), indent=2))
46 changes: 42 additions & 4 deletions htsget_server/database.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from sqlalchemy.orm import relationship, aliased
from sqlalchemy import Column, Integer, String, JSON, Boolean, MetaData, ForeignKey, Table, select
from sqlalchemy import Column, Integer, String, JSON, Boolean, MetaData, Date, ForeignKey, Table, select
import json
import os
import re
from datetime import datetime
import dateparser
import dateparser.search
from datetime import datetime, date
from random import randint
from time import sleep
from config import BUCKET_SIZE, HTSGET_URL, MAX_TRIES
Expand Down Expand Up @@ -117,6 +119,7 @@ class VariantFile(ObjectDBBase):
indexed = Column(Integer)
chr_prefix = Column(String)
reference_genome = Column(String)
analysis_date = Column(Date)

# a variantfile maps to a drs object
drs_object_id = Column(String)
Expand Down Expand Up @@ -156,7 +159,8 @@ def __repr__(self):
}
for sample in self.samples:
result['samples'].append(sample.sample_id)

if self.analysis_date is not None:
result['analysis_date'] = self.analysis_date.strftime("%Y-%m-%d")
return json.dumps(result)


Expand Down Expand Up @@ -328,6 +332,7 @@ def create_variantfile(obj, tries=1):
new_variantfile = VariantFile()
new_variantfile.indexed = 0
new_variantfile.chr_prefix = ''
new_variantfile.analysis_date = None
new_variantfile.id = obj['id']
new_variantfile.reference_genome = obj['reference_genome']
headers = {
Expand Down Expand Up @@ -455,6 +460,11 @@ def add_header_for_variantfile(obj):
headertexts = map(lambda x: x.strip(), obj['texts'])
with Session() as session:
new_variantfile = session.query(VariantFile).filter_by(id=obj['variantfile_id']).one_or_none()
analysis_date = get_analysis_date_from_headers(headertexts)
# save the analysis date
new_variantfile.analysis_date = analysis_date
session.add(new_variantfile)

for headertext in headertexts:
if headertext == '' or headertext.startswith("#CHROM"):
continue
Expand All @@ -466,8 +476,9 @@ def add_header_for_variantfile(obj):
new_header.text = headertext
new_header.associated_variantfiles.append(new_variantfile)
session.add(new_header)

session.commit()
return None
return json.loads(str(new_variantfile))


def delete_header(text):
Expand All @@ -478,6 +489,33 @@ def delete_header(text):
return json.loads(str(new_object))


def get_analysis_date_from_headers(headertexts):
possible_dates = []
for headertext in headertexts:
# look for datelike things
date_parse = re.match(r"(.+[Dd]ate)=(.+)", headertext)
if date_parse is not None:
if date_parse.group(1) == "##fileDate":
possible_dates.insert(0, date_parse.group(2))
else:
possible_dates.append(date_parse.group(2))

# process datelike things
logger.debug(possible_dates)
analysis_date = None
while len(possible_dates) > 0:
possible_date = possible_dates.pop(0)
analysis_date = dateparser.parse(possible_date, date_formats=['%Y%m%d'])
if analysis_date is None:
analysis_date = dateparser.search.search_dates(possible_date)
if analysis_date is not None:
analysis_date = analysis_date[0][1]
if analysis_date is not None:
logger.debug(analysis_date)
return analysis_date
return None


# for efficiency, positions are bucketed into 10 bp sets: pos_bucket_id == base pair position/10, rounded down
def get_bucket_for_position(pos):
return int(pos/BUCKET_SIZE) * BUCKET_SIZE
Expand Down
3 changes: 3 additions & 0 deletions htsget_server/htsget_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,9 @@ def _verify_analysis_drs_object(id_):
# the AnalysisDrsObject's listed ExperimentContentsObjects should match the samples in the VCF file.
if len(test) > 0:
raise Exception(f"AnalysisDrsObject {id_} lists experiments {test} that are not in the linked analysis file")
# variant files should have an analysis_date
if database.get_analysis_date_from_headers(str(gen_obj['file'].header).split('\n')) is None:
raise Exception(f"AnalysisDrsObject {id_} does not have any associated analysis date")
else:
# for read files, we can test whether the linked file is readable by checking for references in the header.
try:
Expand Down
27 changes: 14 additions & 13 deletions htsget_server/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@


def index_variants(drs_obj_id, program):
headers = {
service_headers = {
"X-Service-Token": create_service_token()
}

gen_obj = htsget_operations.get_pysam_obj(drs_obj_id, headers=headers)
gen_obj = htsget_operations.get_pysam_obj(drs_obj_id, headers=service_headers)
if gen_obj is None:
return {"message": f"No id {drs_obj_id} exists"}, 404
if "message" in gen_obj:
Expand All @@ -35,13 +35,20 @@ def index_variants(drs_obj_id, program):
return {"message": f"Read object {drs_obj_id} stats calculated"}, 200

logger.info(f"{drs_obj_id} starting indexing")
write_index_status(drs_obj_id, f"{datetime.datetime.today()} starting indexing")
write_index_status(drs_obj_id, service_headers, f"{datetime.datetime.today()} starting indexing")

headers = str(gen_obj['file'].header).split('\n')

database.add_header_for_variantfile({'texts': headers, 'variantfile_id': drs_obj_id})
variantfile = database.add_header_for_variantfile({'texts': headers, 'variantfile_id': drs_obj_id})
logger.info(f"{drs_obj_id} indexed {len(headers)} headers")

response = requests.get(url=f"{os.getenv("DRS_URL")}/ga4gh/drs/v1/objects/{drs_obj_id}", headers=service_headers)
if response.status_code == 200:
obj = response.json()
if "analysis_date" not in obj["metadata"] and "analysis_date" in variantfile:
obj["metadata"]["analysis_date"] = variantfile["analysis_date"]
response = requests.post(url=f"{os.getenv("DRS_URL")}/ga4gh/drs/v1/objects", headers=service_headers, json=obj)

samples = list(gen_obj['file'].header.samples)
for sample in samples:
if database.create_sample({'id': sample, 'variantfile_id': drs_obj_id}) is None:
Expand Down Expand Up @@ -74,16 +81,13 @@ def index_variants(drs_obj_id, program):

logger.info(f"{drs_obj_id} writing {len(res['bucket_counts'])} entries to db")
write_pos_bucket(res, drs_obj_id)
mark_as_indexed(drs_obj_id)
mark_as_indexed(drs_obj_id, service_headers)
logger.info(f"{drs_obj_id} indexing done")

return {"message": f"Indexing complete for variantfile {drs_obj_id}"}, 200


def mark_as_indexed(drs_obj_id):
headers = {
"X-Service-Token": create_service_token()
}
def mark_as_indexed(drs_obj_id, headers):
response = requests.get(url=f"{os.getenv("DRS_URL")}/ga4gh/drs/v1/objects/{drs_obj_id}", headers=headers)
if response.status_code == 200:
obj = response.json()
Expand Down Expand Up @@ -166,10 +170,7 @@ def index_touch_file(file_path):
logger.warning(f"indexing error! {type(e)} {str(e)}")


def write_index_status(drs_obj_id, message):
headers = {
"X-Service-Token": create_service_token()
}
def write_index_status(drs_obj_id, headers, message):
response = requests.get(url=f"{os.getenv("DRS_URL")}/ga4gh/drs/v1/objects/{drs_obj_id}", headers=headers)
if response.status_code == 200:
obj = response.json()
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ werkzeug>=3.1.0 # not directly required, pinned by Snyk to avoid a vulnerability
zipp>=3.19.1 # not directly required, pinned by Snyk to avoid a vulnerability
urllib3>=2.2.2 # not directly required, pinned by Snyk to avoid a vulnerability
requests==2.32.4
dateparser>=1.2.2