Skip to content

Commit 45879a5

Browse files
authored
Merge pull request #482 from aanil/server_status
Replace the CouchDB package with IBM cloudant
2 parents 1b16f67 + bfee8cc commit 45879a5

File tree

13 files changed

+351
-187
lines changed

13 files changed

+351
-187
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ As of now, we use:
4040
- [mypy](https://mypy.readthedocs.io/en/stable/) for static type checking and to prevent contradictory type annotation.
4141
- Run with `mypy **/*.py`
4242
- [pipreqs](https://github.com/bndr/pipreqs) to check that the requirement files are up-to-date with the code.
43-
4443
- This is run with a custom Bash script in GitHub Actions which will only compare the list of package names.
4544

4645
```

VERSIONLOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# TACA Version Log
22

3+
## 20250717.1
4+
5+
Replace the CouchDB package with IBM cloudant
6+
37
## 20250603.1
48

59
Misc. improvements to ONT processing.

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
CouchDB
21
PyYAML
32
click
43
flowcell_parser @ git+https://github.com/SciLifeLab/flowcell_parser
4+
ibmcloudant>=0.9.1
55
pandas
66
python_crontab
77
python_dateutil

taca/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
"""Main TACA module"""
22

3-
__version__ = "1.5.14"
3+
__version__ = "1.6.0"

taca/analysis/analysis.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,13 @@ def upload_to_statusdb(run_dir, software):
105105

106106

107107
def _upload_to_statusdb(run):
108-
"""Triggers the upload to statusdb using the dependency flowcell_parser.
108+
"""Triggers the upload to statusdb.
109109
110110
:param Run run: the object run
111111
"""
112112
couch_conf = CONFIG["statusdb"]
113-
couch_connection = statusdb.StatusdbSession(couch_conf).connection
114-
db = couch_connection[couch_conf["xten_db"]]
113+
couch_connection = statusdb.StatusdbSession(couch_conf)
114+
dbname = couch_conf["xten_db"]
115115
parser = run.runParserObj
116116
# Check if I have NoIndex lanes
117117
for element in parser.obj["samplesheet_csv"]:
@@ -163,13 +163,20 @@ def _upload_to_statusdb(run):
163163
else:
164164
run_date = run_vals[0]
165165
run_fc = f"{run_date}_{run_vals[-1]}"
166-
db_rows = db.view("names/name", reduce=False, include_docs=True)[run_fc].rows
166+
db_rows = couch_connection.connection.post_view(
167+
db=dbname,
168+
ddoc="names",
169+
view="name",
170+
key=run_fc,
171+
reduce=False,
172+
include_docs=True,
173+
).get_result()["rows"]
167174
if db_rows:
168-
doc = db_rows[0].doc
175+
doc = db_rows[0]["doc"]
169176
if doc.get("pdc_archived") and not parser.obj.get("pdc_archived"):
170177
parser.obj["pdc_archived"] = doc.get("pdc_archived")
171178

172-
statusdb.update_doc(db, parser.obj, over_write_db_entry=True)
179+
couch_connection.update_doc(dbname, parser.obj, over_write_db_entry=True)
173180

174181

175182
def transfer_run(run_dir, software):

taca/backup/backup.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,7 @@ def _log_pdc_statusdb(self, run):
281281
element_db_connection = statusdb.ElementRunsConnection(
282282
self.couch_info, dbname="element_runs"
283283
)
284-
run_doc_id = element_db_connection.get_db_entry(run).value
285-
run_doc = element_db_connection.db[run_doc_id]
284+
run_doc = element_db_connection.get_db_entry(run)["doc"]
286285
run_doc["pdc_archived"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
287286
run_doc["run_status"] = "archived"
288287
element_db_connection.upload_to_statusdb(run_doc)
@@ -298,15 +297,19 @@ def _log_pdc_statusdb(self, run):
298297
else:
299298
run_date = run_vals[0]
300299
run_fc = f"{run_date}_{run_vals[-1]}"
301-
couch_connection = statusdb.StatusdbSession(self.couch_info).connection
302-
db = couch_connection[self.couch_info["db"]]
303-
fc_names = {e.key: e.id for e in db.view("names/name", reduce=False)}
304-
d_id = fc_names[run_fc]
305-
doc = db.get(d_id)
300+
couch_connection = statusdb.StatusdbSession(self.couch_info)
301+
doc = couch_connection.connection.post_view(
302+
db=self.couch_info["db"],
303+
ddoc="names",
304+
view="name",
305+
key=run_fc,
306+
reduce=False,
307+
include_docs=True,
308+
).get_result()["rows"][0]["doc"]
306309
doc["pdc_archived"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
307-
db.save(doc)
310+
couch_connection.save_db_doc(doc=doc, db=self.couch_info["db"])
308311
logger.info(
309-
f'Logged "pdc_archived" timestamp for fc {run} in statusdb doc "{d_id}"'
312+
f'Logged "pdc_archived" timestamp for fc {run} in statusdb doc "{doc["_id"]}"'
310313
)
311314
except:
312315
logger.warning(

taca/delivery/delivery_classes.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,16 +169,20 @@ def __init__(
169169

170170
def get_order_details(self):
171171
"""Fetch order details from order portal"""
172-
projects_db = self.status_db_connection.connection["projects"]
173-
view = projects_db.view("order_portal/ProjectID_to_PortalID")
174-
rows = view[self.project_id].rows
172+
view = self.status_db_connection.connection.post_view(
173+
db="projects",
174+
ddoc="order_portal",
175+
view="ProjectID_to_PortalID",
176+
key=self.project_id,
177+
).get_result()
178+
rows = view["rows"]
175179
if len(rows) < 1:
176180
raise AssertionError(f"Project {self.project_id} not found in StatusDB")
177181
if len(rows) > 1:
178182
raise AssertionError(
179183
f"Project {self.project_id} has more than one entry in StatusDB orderportal_db"
180184
)
181-
portal_id = rows[0].value
185+
portal_id = rows[0]["value"]
182186
# Get project info from order portal API
183187
get_project_url = "{}/v1/order/{}".format(
184188
self.orderportal.get("orderportal_api_url"), portal_id

taca/server_status/cronjobs.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,32 +50,37 @@ def update_cronjob_db():
5050
"Connecting to database: {}".format(CONFIG.get("statusdb", {}).get("url"))
5151
)
5252
try:
53-
couch_connection = statusdb.StatusdbSession(statusdb_conf).connection
53+
couch_connection = statusdb.StatusdbSession(statusdb_conf)
5454
except Exception as e:
5555
logging.error(e.message)
5656
else:
5757
# update document
58-
crontab_db = couch_connection["cronjobs"]
59-
view = crontab_db.view("server/alias")
58+
view = couch_connection.connection.post_view(
59+
db="cronjobs",
60+
ddoc="server",
61+
view="alias",
62+
key=server,
63+
include_docs=True,
64+
).get_result()
6065
# to be safe
6166
doc = {}
6267
# create doc if not exist
63-
if not view[server].rows:
68+
if not view["rows"]:
6469
logging.info("Creating a document")
6570
doc = {
6671
"users": {user: cronjobs for user, cronjobs in result.items()},
6772
"Last updated": str(timestamp),
6873
"server": server,
6974
}
7075
# else: get existing doc
71-
for row in view[server]:
76+
else:
7277
logging.info("Updating the document")
73-
doc = crontab_db.get(row.value)
78+
doc = view["rows"][0]["doc"]
7479
doc["users"].update(result)
7580
doc["Last updated"] = str(timestamp)
7681
if doc:
7782
try:
78-
crontab_db.save(doc)
83+
couch_connection.save_db_doc(doc=doc, db="cronjobs")
7984
except Exception as e:
8085
logging.error(e.message)
8186
else:

taca/server_status/server_status.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,11 @@ def update_status_db(data, server_type=None):
9898
logging.error('"statusdb" must be present in the config file!')
9999
raise RuntimeError('"statusdb" must be present in the config file!')
100100
try:
101-
couch_connection = statusdb.StatusdbSession(db_config).connection
101+
couch_connection = statusdb.StatusdbSession(db_config)
102102
except Exception as e:
103103
logging.error(e.message)
104104
raise
105105

106-
db = couch_connection["server_status"]
107-
logging.info("Connection established")
108106
for key in data.keys(): # data is dict of dicts
109107
server = data[key] # data[key] is dictionary (the command output)
110108
server["name"] = key # key is nas url
@@ -113,7 +111,8 @@ def update_status_db(data, server_type=None):
113111
server["server_type"] = server_type or "unknown"
114112

115113
try:
116-
db.save(server)
114+
logging.info(f"Updating server status for {key}")
115+
couch_connection.save_db_doc(doc=server, db="server_status")
117116
except Exception as e:
118117
logging.error(e.message)
119118
raise

taca/utils/bioinfo_tab.py

Lines changed: 64 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from collections import OrderedDict, defaultdict
77

88
from flowcell_parser.classes import RunParametersParser, SampleSheetParser
9+
from ibmcloudant.cloudant_v1 import BulkDocs
910

1011
from taca.element.Aviti_Runs import Aviti_Run
1112
from taca.nanopore.ONT_run_classes import ONT_RUN_PATTERN, ONT_run
@@ -84,8 +85,6 @@ def update_statusdb(run_dir, inst_brand):
8485
statusdb_conf = CONFIG.get("statusdb")
8586
couch_connection = statusdb.StatusdbSession(statusdb_conf).connection
8687
valueskey = datetime.datetime.now().isoformat()
87-
db = couch_connection["bioinfo_analysis"]
88-
view = db.view("latest_data/sample_id")
8988

9089
if inst_brand == "illumina":
9190
# Fetch individual fields
@@ -128,19 +127,19 @@ def update_statusdb(run_dir, inst_brand):
128127
# If entry exists, append to existing
129128
# Special if case to handle lanes written as int, can be safely removed when old lanes
130129
# is no longer stored as int
131-
try:
132-
if (
133-
len(view[[project, run_id, int(lane), sample]].rows)
134-
>= 1
135-
):
136-
lane = int(lane)
137-
except ValueError:
138-
pass
139-
if len(view[[project, run_id, lane, sample]].rows) >= 1:
140-
remote_id = view[[project, run_id, lane, sample]].rows[0].id
141-
lane = str(lane)
142-
remote_doc = db[remote_id]["values"]
143-
remote_status = db[remote_id]["status"]
130+
num_rows = couch_connection.post_view(
131+
db="bioinfo_analysis",
132+
ddoc="latest_data",
133+
view="sample_id",
134+
key=[project, run_id, lane, sample],
135+
).get_result()["rows"]
136+
if len(num_rows) >= 1:
137+
remote_id = num_rows[0]["id"]
138+
remote_doc = couch_connection.get_document(
139+
db="bioinfo_analysis", doc_id=remote_id
140+
).get_result()
141+
remote_doc_values = remote_doc["values"]
142+
remote_status = remote_doc["status"]
144143
# Only updates the listed statuses
145144
if (
146145
remote_status
@@ -154,7 +153,7 @@ def update_statusdb(run_dir, inst_brand):
154153
and sample_status != remote_status
155154
):
156155
# Appends old entry to new. Essentially merges the two
157-
for k, v in remote_doc.items():
156+
for k, v in remote_doc_values.items():
158157
obj["values"][k] = v
159158
logger.info(
160159
f"Updating {run_id} {project} {flowcell} {lane} {sample} as {sample_status}"
@@ -168,16 +167,22 @@ def update_statusdb(run_dir, inst_brand):
168167
)
169168
)
170169
# Update record cluster
171-
obj["_rev"] = db[remote_id].rev
170+
obj["_rev"] = remote_doc["_rev"]
172171
obj["_id"] = remote_id
173-
db.save(obj)
172+
couch_connection.put_document(
173+
db="bioinfo_analysis",
174+
doc_id=obj["_id"],
175+
document=obj,
176+
)
174177
# Creates new entry
175178
else:
176179
logger.info(
177180
f"Creating {run_id} {project} {flowcell} {lane} {sample} as {sample_status}"
178181
)
179182
# Creates record
180-
db.save(obj)
183+
couch_connection.post_document(
184+
db="bioinfo_analysis", document=obj
185+
)
181186
# Sets FC error flag
182187
if project_info[flowcell].value is not None:
183188
if (
@@ -258,15 +263,18 @@ def get_ss_projects_ont(ont_run, couch_connection):
258263
"""Fetches project, FC, lane & sample (sample-run) status for a given folder for ONT runs"""
259264
proj_tree = Tree()
260265
flowcell_id = ont_run.run_name
261-
flowcell_info = (
262-
couch_connection["nanopore_runs"].view("info/lims")[flowcell_id].rows[0]
263-
)
266+
flowcell_info = couch_connection.post_view(
267+
db="nanopore_runs",
268+
ddoc="info",
269+
view="lims",
270+
key=flowcell_id,
271+
).get_result()["rows"][0]
264272
if (
265-
flowcell_info.value
266-
and flowcell_info.value.get("loading", [])
267-
and "sample_data" in flowcell_info.value["loading"][-1]
273+
flowcell_info["value"]
274+
and flowcell_info["value"].get("loading", [])
275+
and "sample_data" in flowcell_info["value"]["loading"][-1]
268276
):
269-
samples = flowcell_info.value["loading"][-1]["sample_data"]
277+
samples = flowcell_info["value"]["loading"][-1]["sample_data"]
270278
for sample_dict in samples:
271279
sample_id = sample_dict["sample_name"]
272280
project = sample_id.split("_")[0]
@@ -488,39 +496,45 @@ def fail_run(runid, project):
488496
)
489497
logger.error(e)
490498
raise e
491-
bioinfo_db = status_db["bioinfo_analysis"]
492-
if project is not None:
493-
view = bioinfo_db.view("full_doc/pj_run_to_doc")
494-
rows = view[[project, runid]].rows
495-
logger.info(
496-
f"Updating status of {len(rows)} objects with flowcell_id: {runid} and project_id {project}"
497-
)
499+
500+
if project:
501+
view = "pj_run_to_doc"
502+
key = [project, runid]
498503
else:
499-
view = bioinfo_db.view("full_doc/run_id_to_doc")
500-
rows = view[[runid]].rows
501-
logger.info(f"Updating status of {len(rows)} objects with flowcell_id: {runid}")
504+
view = "run_id_to_doc"
505+
key = [runid]
506+
507+
rows = status_db.post_view(
508+
db="bioinfo_analysis",
509+
ddoc="full_doc",
510+
view=view,
511+
key=key,
512+
).get_result()["rows"]
513+
logger.info(
514+
f"Found {len(rows)} objects with flowcell_id: {runid}"
515+
+ (f" and project_id {project}" if project else "")
516+
)
502517

503518
new_timestamp = datetime.datetime.now().isoformat()
504519
updated = 0
520+
to_save = []
505521
for row in rows:
506-
if row.value["status"] != "Failed":
507-
row.value["values"][new_timestamp] = {
522+
if row["value"]["status"] != "Failed":
523+
row["value"]["values"][new_timestamp] = {
508524
"sample_status": "Failed",
509525
"user": "taca",
510526
}
511-
row.value["status"] = "Failed"
512-
try:
513-
bioinfo_db.save(row.value)
527+
row["value"]["status"] = "Failed"
528+
529+
to_save.append(row["value"])
530+
save_result = status_db.post_bulk_docs(
531+
db="bioinfo_analysis", bulk_docs=BulkDocs(docs=to_save, new_edits=True)
532+
).get_result()
533+
for res in save_result:
534+
if "ok" in res:
514535
updated += 1
515-
except Exception as e:
536+
else:
516537
logger.error(
517-
"Cannot update object project-sample-run-lane: {}-{}-{}-{}".format(
518-
row.value.get("project_id"),
519-
row.value.get("sample"),
520-
row.value.get("run_id"),
521-
row.value.get("lane"),
522-
)
538+
f"Failed to save object {res.get('id')}: {res.get('error')} {res.get('reason')}"
523539
)
524-
logger.error(e)
525-
raise e
526540
logger.info(f"Successfully updated {updated} objects")

0 commit comments

Comments
 (0)