Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ As of now, we use:
- [mypy](https://mypy.readthedocs.io/en/stable/) for static type checking and to prevent contradictory type annotation.
- Run with `mypy **/*.py`
- [pipreqs](https://github.com/bndr/pipreqs) to check that the requirement files are up-to-date with the code.

- This is run with a custom Bash script in GitHub Actions which will only compare the list of package names.

```
Expand Down
4 changes: 4 additions & 0 deletions VERSIONLOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# TACA Version Log

## 20250717.1

Replace the CouchDB package with IBM cloudant

## 20250603.1

Misc. improvements to ONT processing.
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CouchDB
PyYAML
click
flowcell_parser @ git+https://github.com/SciLifeLab/flowcell_parser
ibmcloudant>=0.9.1
pandas
python_crontab
python_dateutil
Expand Down
2 changes: 1 addition & 1 deletion taca/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Main TACA module"""

__version__ = "1.5.14"
__version__ = "1.6.0"
19 changes: 13 additions & 6 deletions taca/analysis/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ def upload_to_statusdb(run_dir, software):


def _upload_to_statusdb(run):
"""Triggers the upload to statusdb using the dependency flowcell_parser.
"""Triggers the upload to statusdb.

:param Run run: the object run
"""
couch_conf = CONFIG["statusdb"]
couch_connection = statusdb.StatusdbSession(couch_conf).connection
db = couch_connection[couch_conf["xten_db"]]
couch_connection = statusdb.StatusdbSession(couch_conf)
dbname = couch_conf["xten_db"]
parser = run.runParserObj
# Check if I have NoIndex lanes
for element in parser.obj["samplesheet_csv"]:
Expand Down Expand Up @@ -163,13 +163,20 @@ def _upload_to_statusdb(run):
else:
run_date = run_vals[0]
run_fc = f"{run_date}_{run_vals[-1]}"
db_rows = db.view("names/name", reduce=False, include_docs=True)[run_fc].rows
db_rows = couch_connection.connection.post_view(
db=dbname,
ddoc="names",
view="name",
key=run_fc,
reduce=False,
include_docs=True,
).get_result()["rows"]
if db_rows:
doc = db_rows[0].doc
doc = db_rows[0]["doc"]
if doc.get("pdc_archived") and not parser.obj.get("pdc_archived"):
parser.obj["pdc_archived"] = doc.get("pdc_archived")

statusdb.update_doc(db, parser.obj, over_write_db_entry=True)
couch_connection.update_doc(dbname, parser.obj, over_write_db_entry=True)


def transfer_run(run_dir, software):
Expand Down
21 changes: 12 additions & 9 deletions taca/backup/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@ def _log_pdc_statusdb(self, run):
element_db_connection = statusdb.ElementRunsConnection(
self.couch_info, dbname="element_runs"
)
run_doc_id = element_db_connection.get_db_entry(run).value
run_doc = element_db_connection.db[run_doc_id]
run_doc = element_db_connection.get_db_entry(run)["doc"]
run_doc["pdc_archived"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
run_doc["run_status"] = "archived"
element_db_connection.upload_to_statusdb(run_doc)
Expand All @@ -298,15 +297,19 @@ def _log_pdc_statusdb(self, run):
else:
run_date = run_vals[0]
run_fc = f"{run_date}_{run_vals[-1]}"
couch_connection = statusdb.StatusdbSession(self.couch_info).connection
db = couch_connection[self.couch_info["db"]]
fc_names = {e.key: e.id for e in db.view("names/name", reduce=False)}
d_id = fc_names[run_fc]
doc = db.get(d_id)
couch_connection = statusdb.StatusdbSession(self.couch_info)
doc = couch_connection.connection.post_view(
db=self.couch_info["db"],
ddoc="names",
view="name",
key=run_fc,
reduce=False,
include_docs=True,
).get_result()["rows"][0]["doc"]
doc["pdc_archived"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
db.save(doc)
couch_connection.save_db_doc(doc=doc, db=self.couch_info["db"])
logger.info(
f'Logged "pdc_archived" timestamp for fc {run} in statusdb doc "{d_id}"'
f'Logged "pdc_archived" timestamp for fc {run} in statusdb doc "{doc["_id"]}"'
)
except:
logger.warning(
Expand Down
12 changes: 8 additions & 4 deletions taca/delivery/delivery_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,20 @@ def __init__(

def get_order_details(self):
"""Fetch order details from order portal"""
projects_db = self.status_db_connection.connection["projects"]
view = projects_db.view("order_portal/ProjectID_to_PortalID")
rows = view[self.project_id].rows
view = self.status_db_connection.connection.post_view(
db="projects",
ddoc="order_portal",
view="ProjectID_to_PortalID",
key=self.project_id,
).get_result()
rows = view["rows"]
if len(rows) < 1:
raise AssertionError(f"Project {self.project_id} not found in StatusDB")
if len(rows) > 1:
raise AssertionError(
f"Project {self.project_id} has more than one entry in StatusDB orderportal_db"
)
portal_id = rows[0].value
portal_id = rows[0]["value"]
# Get project info from order portal API
get_project_url = "{}/v1/order/{}".format(
self.orderportal.get("orderportal_api_url"), portal_id
Expand Down
19 changes: 12 additions & 7 deletions taca/server_status/cronjobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,32 +50,37 @@ def update_cronjob_db():
"Connecting to database: {}".format(CONFIG.get("statusdb", {}).get("url"))
)
try:
couch_connection = statusdb.StatusdbSession(statusdb_conf).connection
couch_connection = statusdb.StatusdbSession(statusdb_conf)
except Exception as e:
logging.error(e.message)
else:
# update document
crontab_db = couch_connection["cronjobs"]
view = crontab_db.view("server/alias")
view = couch_connection.connection.post_view(
db="cronjobs",
ddoc="server",
view="alias",
key=server,
include_docs=True,
).get_result()
# to be safe
doc = {}
# create doc if not exist
if not view[server].rows:
if not view["rows"]:
logging.info("Creating a document")
doc = {
"users": {user: cronjobs for user, cronjobs in result.items()},
"Last updated": str(timestamp),
"server": server,
}
# else: get existing doc
for row in view[server]:
else:
logging.info("Updating the document")
doc = crontab_db.get(row.value)
doc = view["rows"][0]["doc"]
doc["users"].update(result)
doc["Last updated"] = str(timestamp)
if doc:
try:
crontab_db.save(doc)
couch_connection.save_db_doc(doc=doc, db="cronjobs")
except Exception as e:
logging.error(e.message)
else:
Expand Down
7 changes: 3 additions & 4 deletions taca/server_status/server_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,11 @@ def update_status_db(data, server_type=None):
logging.error('"statusdb" must be present in the config file!')
raise RuntimeError('"statusdb" must be present in the config file!')
try:
couch_connection = statusdb.StatusdbSession(db_config).connection
couch_connection = statusdb.StatusdbSession(db_config)
except Exception as e:
logging.error(e.message)
raise

db = couch_connection["server_status"]
logging.info("Connection established")
for key in data.keys(): # data is dict of dicts
server = data[key] # data[key] is dictionary (the command output)
server["name"] = key # key is nas url
Expand All @@ -113,7 +111,8 @@ def update_status_db(data, server_type=None):
server["server_type"] = server_type or "unknown"

try:
db.save(server)
logging.info(f"Updating server status for {key}")
couch_connection.save_db_doc(doc=server, db="server_status")
except Exception as e:
logging.error(e.message)
raise
Expand Down
114 changes: 64 additions & 50 deletions taca/utils/bioinfo_tab.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from collections import OrderedDict, defaultdict

from flowcell_parser.classes import RunParametersParser, SampleSheetParser
from ibmcloudant.cloudant_v1 import BulkDocs

from taca.element.Aviti_Runs import Aviti_Run
from taca.nanopore.ONT_run_classes import ONT_RUN_PATTERN, ONT_run
Expand Down Expand Up @@ -84,8 +85,6 @@ def update_statusdb(run_dir, inst_brand):
statusdb_conf = CONFIG.get("statusdb")
couch_connection = statusdb.StatusdbSession(statusdb_conf).connection
valueskey = datetime.datetime.now().isoformat()
db = couch_connection["bioinfo_analysis"]
view = db.view("latest_data/sample_id")

if inst_brand == "illumina":
# Fetch individual fields
Expand Down Expand Up @@ -128,19 +127,19 @@ def update_statusdb(run_dir, inst_brand):
# If entry exists, append to existing
# Special if case to handle lanes written as int, can be safely removed when old lanes
# is no longer stored as int
try:
if (
len(view[[project, run_id, int(lane), sample]].rows)
>= 1
):
lane = int(lane)
except ValueError:
pass
if len(view[[project, run_id, lane, sample]].rows) >= 1:
remote_id = view[[project, run_id, lane, sample]].rows[0].id
lane = str(lane)
remote_doc = db[remote_id]["values"]
remote_status = db[remote_id]["status"]
num_rows = couch_connection.post_view(
db="bioinfo_analysis",
ddoc="latest_data",
view="sample_id",
key=[project, run_id, lane, sample],
).get_result()["rows"]
if len(num_rows) >= 1:
remote_id = num_rows[0]["id"]
remote_doc = couch_connection.get_document(
db="bioinfo_analysis", doc_id=remote_id
).get_result()
remote_doc_values = remote_doc["values"]
remote_status = remote_doc["status"]
# Only updates the listed statuses
if (
remote_status
Expand All @@ -154,7 +153,7 @@ def update_statusdb(run_dir, inst_brand):
and sample_status != remote_status
):
# Appends old entry to new. Essentially merges the two
for k, v in remote_doc.items():
for k, v in remote_doc_values.items():
obj["values"][k] = v
logger.info(
f"Updating {run_id} {project} {flowcell} {lane} {sample} as {sample_status}"
Expand All @@ -168,16 +167,22 @@ def update_statusdb(run_dir, inst_brand):
)
)
# Update record cluster
obj["_rev"] = db[remote_id].rev
obj["_rev"] = remote_doc["_rev"]
obj["_id"] = remote_id
db.save(obj)
couch_connection.put_document(
db="bioinfo_analysis",
doc_id=obj["_id"],
document=obj,
)
# Creates new entry
else:
logger.info(
f"Creating {run_id} {project} {flowcell} {lane} {sample} as {sample_status}"
)
# Creates record
db.save(obj)
couch_connection.post_document(
db="bioinfo_analysis", document=obj
)
# Sets FC error flag
if project_info[flowcell].value is not None:
if (
Expand Down Expand Up @@ -258,15 +263,18 @@ def get_ss_projects_ont(ont_run, couch_connection):
"""Fetches project, FC, lane & sample (sample-run) status for a given folder for ONT runs"""
proj_tree = Tree()
flowcell_id = ont_run.run_name
flowcell_info = (
couch_connection["nanopore_runs"].view("info/lims")[flowcell_id].rows[0]
)
flowcell_info = couch_connection.post_view(
db="nanopore_runs",
ddoc="info",
view="lims",
key=flowcell_id,
).get_result()["rows"][0]
if (
flowcell_info.value
and flowcell_info.value.get("loading", [])
and "sample_data" in flowcell_info.value["loading"][-1]
flowcell_info["value"]
and flowcell_info["value"].get("loading", [])
and "sample_data" in flowcell_info["value"]["loading"][-1]
):
samples = flowcell_info.value["loading"][-1]["sample_data"]
samples = flowcell_info["value"]["loading"][-1]["sample_data"]
for sample_dict in samples:
sample_id = sample_dict["sample_name"]
project = sample_id.split("_")[0]
Expand Down Expand Up @@ -488,39 +496,45 @@ def fail_run(runid, project):
)
logger.error(e)
raise e
bioinfo_db = status_db["bioinfo_analysis"]
if project is not None:
view = bioinfo_db.view("full_doc/pj_run_to_doc")
rows = view[[project, runid]].rows
logger.info(
f"Updating status of {len(rows)} objects with flowcell_id: {runid} and project_id {project}"
)

if project:
view = "pj_run_to_doc"
key = [project, runid]
else:
view = bioinfo_db.view("full_doc/run_id_to_doc")
rows = view[[runid]].rows
logger.info(f"Updating status of {len(rows)} objects with flowcell_id: {runid}")
view = "run_id_to_doc"
key = [runid]

rows = status_db.post_view(
db="bioinfo_analysis",
ddoc="full_doc",
view=view,
key=key,
).get_result()["rows"]
logger.info(
f"Found {len(rows)} objects with flowcell_id: {runid}"
+ (f" and project_id {project}" if project else "")
)

new_timestamp = datetime.datetime.now().isoformat()
updated = 0
to_save = []
for row in rows:
if row.value["status"] != "Failed":
row.value["values"][new_timestamp] = {
if row["value"]["status"] != "Failed":
row["value"]["values"][new_timestamp] = {
"sample_status": "Failed",
"user": "taca",
}
row.value["status"] = "Failed"
try:
bioinfo_db.save(row.value)
row["value"]["status"] = "Failed"

to_save.append(row["value"])
save_result = status_db.post_bulk_docs(
db="bioinfo_analysis", bulk_docs=BulkDocs(docs=to_save, new_edits=True)
).get_result()
for res in save_result:
if "ok" in res:
updated += 1
except Exception as e:
else:
logger.error(
"Cannot update object project-sample-run-lane: {}-{}-{}-{}".format(
row.value.get("project_id"),
row.value.get("sample"),
row.value.get("run_id"),
row.value.get("lane"),
)
f"Failed to save object {res.get('id')}: {res.get('error')} {res.get('reason')}"
)
logger.error(e)
raise e
logger.info(f"Successfully updated {updated} objects")
Loading