-
Notifications
You must be signed in to change notification settings - Fork 3
Server should Accept Recipe JSON #419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 23 commits
78ca86d
78b9b0a
a9b056b
f5f7a69
f87915a
1f2d2e3
bd8ec42
358158e
f0beaa1
a54ffa1
3d01db3
529e15b
63514c9
b2440cd
470e3a1
8a34898
45d438a
c8fe120
ecc645d
17ba17c
79e77e8
653285e
84d13c4
5770826
162ef12
64c60c8
c1d5718
3baff49
558b753
57468b0
4addc65
8ccacc3
497747f
a136b57
fdea7f1
d1686ee
3934ebe
5fba3b1
8f49b0e
728c19d
8f0c468
3fd95a4
2784825
e140122
cf3b9ed
071aaf9
fa148cc
ac34219
86b3104
45a10ab
801b86f
7a42705
c2259af
d3c9c33
e86069e
bcdc065
e667517
f9570ff
b2db8ec
4170c1e
a8ab9ad
fd12619
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Major changes here:
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,6 @@ | ||
| import copy | ||
| import logging | ||
| import shutil | ||
| from datetime import datetime, timezone | ||
| from enum import Enum | ||
| from pathlib import Path | ||
|
|
||
|
|
@@ -10,7 +9,6 @@ | |
|
|
||
| import hashlib | ||
| import json | ||
| import requests | ||
|
|
||
| from cellpack.autopack.utils import deep_merge | ||
|
|
||
|
|
@@ -321,36 +319,6 @@ def __init__(self, settings): | |
| self.settings = settings | ||
|
|
||
|
|
||
| class ResultDoc: | ||
| def __init__(self, db): | ||
| self.db = db | ||
|
|
||
| def handle_expired_results(self): | ||
| """ | ||
| Check if the results in the database are expired and delete them if the linked object expired. | ||
| """ | ||
| current_utc = datetime.now(timezone.utc) | ||
| results = self.db.get_all_docs("results") | ||
| if results: | ||
| for result in results: | ||
| result_data = self.db.doc_to_dict(result) | ||
| result_age = current_utc - result_data["timestamp"] | ||
| if result_age.days > 180 and not self.validate_existence( | ||
| result_data["url"] | ||
| ): | ||
| self.db.delete_doc("results", self.db.doc_id(result)) | ||
| logging.info("Results cleanup complete.") | ||
| else: | ||
| logging.info("No results found in the database.") | ||
|
|
||
| def validate_existence(self, url): | ||
| """ | ||
| Validate the existence of an S3 object by checking if the URL is accessible. | ||
| Returns True if the URL is accessible. | ||
| """ | ||
| return requests.head(url).status_code == requests.codes.ok | ||
|
|
||
|
|
||
| class DBUploader(object): | ||
| """ | ||
| Handles the uploading of data to the database. | ||
|
|
@@ -529,42 +497,34 @@ def upload_config(self, config_data, source_path): | |
| self.db.update_doc("configs", id, config_data) | ||
| return id | ||
|
|
||
| def upload_result_metadata(self, file_name, url, job_id=None): | ||
| """ | ||
| Upload the metadata of the result file to the database. | ||
| """ | ||
| if self.db: | ||
| username = self.db.get_username() | ||
| timestamp = self.db.create_timestamp() | ||
| self.db.update_or_create( | ||
| "results", | ||
| file_name, | ||
| { | ||
| "user": username, | ||
| "timestamp": timestamp, | ||
| "url": url, | ||
| "batch_job_id": job_id, | ||
| }, | ||
| ) | ||
| if job_id: | ||
| self.upload_job_status(job_id, "DONE", result_path=url) | ||
|
|
||
| def upload_job_status(self, job_id, status, result_path=None, error_message=None): | ||
| def upload_job_status( | ||
| self, | ||
| dedup_hash, | ||
| status, | ||
| result_path=None, | ||
| error_message=None, | ||
| outputs_directory=None, | ||
| ): | ||
| """ | ||
| Update status for a given job ID | ||
| Update status for a given dedup_hash | ||
| """ | ||
| if self.db: | ||
| timestamp = self.db.create_timestamp() | ||
| self.db.update_or_create( | ||
| "job_status", | ||
| job_id, | ||
| { | ||
| "timestamp": timestamp, | ||
| "status": str(status), | ||
| "result_path": result_path, | ||
| "error_message": error_message, | ||
| }, | ||
| ) | ||
| db_handler = self.db | ||
| # If db is AWSHandler, switch to firebase handler for job status updates | ||
| if hasattr(self.db, "s3_client"): | ||
|
||
| handler = DATABASE_IDS.handlers().get(DATABASE_IDS.FIREBASE) | ||
| db_handler = handler(default_db="staging") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this overwrite the |
||
| timestamp = db_handler.create_timestamp() | ||
| data = { | ||
| "timestamp": timestamp, | ||
| "status": str(status), | ||
| "error_message": error_message, | ||
| } | ||
| if result_path: | ||
| data["result_path"] = result_path | ||
| if outputs_directory: | ||
| data["outputs_directory"] = outputs_directory | ||
| db_handler.update_or_create("job_status", dedup_hash, data) | ||
|
|
||
| def save_recipe_and_config_to_output(self, output_folder, config_data, recipe_data): | ||
| output_path = Path(output_folder) | ||
|
|
@@ -583,15 +543,15 @@ def upload_packing_results_workflow( | |
| self, | ||
| source_folder, | ||
| recipe_name, | ||
| job_id, | ||
| dedup_hash, | ||
| config_data, | ||
| recipe_data, | ||
| ): | ||
| """ | ||
| Complete packing results upload workflow including folder preparation and s3 upload | ||
| """ | ||
| try: | ||
| if job_id: | ||
| if dedup_hash: | ||
|
|
||
| source_path = Path(source_folder) | ||
| if not source_path.exists(): | ||
|
|
@@ -601,7 +561,7 @@ def upload_packing_results_workflow( | |
|
|
||
| # prepare unique S3 upload folder | ||
| parent_folder = source_path.parent | ||
| unique_folder_name = f"{source_path.name}_run_{job_id}" | ||
| unique_folder_name = f"{source_path.name}_run_{dedup_hash}" | ||
| s3_upload_folder = parent_folder / unique_folder_name | ||
|
|
||
| logging.debug(f"outputs will be copied to: {s3_upload_folder}") | ||
|
|
@@ -618,7 +578,7 @@ def upload_packing_results_workflow( | |
| upload_result = self.upload_outputs_to_s3( | ||
| output_folder=s3_upload_folder, | ||
| recipe_name=recipe_name, | ||
| job_id=job_id, | ||
| dedup_hash=dedup_hash, | ||
| ) | ||
|
|
||
| # clean up temporary folder after upload | ||
|
|
@@ -628,9 +588,12 @@ def upload_packing_results_workflow( | |
| f"Cleaned up temporary upload folder: {s3_upload_folder}" | ||
| ) | ||
|
|
||
| # update outputs directory in firebase | ||
| self.update_outputs_directory( | ||
| job_id, upload_result.get("outputs_directory") | ||
| # update outputs directory in job status | ||
| self.upload_job_status( | ||
| dedup_hash, | ||
| "DONE", | ||
| result_path=upload_result.get("simularium_url"), | ||
| outputs_directory=upload_result.get("outputs_directory"), | ||
| ) | ||
|
|
||
| return upload_result | ||
|
|
@@ -639,15 +602,15 @@ def upload_packing_results_workflow( | |
| logging.error(e) | ||
| return {"success": False, "error": e} | ||
|
|
||
| def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): | ||
| def upload_outputs_to_s3(self, output_folder, recipe_name, dedup_hash): | ||
| """ | ||
| Upload packing outputs to S3 bucket | ||
| """ | ||
|
|
||
| bucket_name = self.db.bucket_name | ||
| region_name = self.db.region_name | ||
| sub_folder_name = self.db.sub_folder_name | ||
| s3_prefix = f"{sub_folder_name}/{recipe_name}/{job_id}" | ||
| s3_prefix = f"{sub_folder_name}/{recipe_name}/{dedup_hash}" | ||
|
|
||
| try: | ||
| upload_result = self.db.upload_directory( | ||
|
|
@@ -661,8 +624,11 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): | |
| f"{base_url}/{file_info['s3_key']}" | ||
| for file_info in upload_result["uploaded_files"] | ||
| ] | ||
| simularium_url = None | ||
| for url in public_urls: | ||
| if url.endswith(".simularium"): | ||
| simularium_url = url | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We had previously uploaded the result .simularium file twice for server run packings! To avoid that, now we're finding it in the uploaded outputs directory and keeping track of it's path to specifically reference in the
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! |
||
| outputs_directory = f"https://us-west-2.console.aws.amazon.com/s3/buckets/{bucket_name}/{s3_prefix}/" | ||
|
|
||
| logging.info( | ||
| f"Successfully uploaded {upload_result['total_files']} files to {outputs_directory}" | ||
| ) | ||
|
|
@@ -671,7 +637,7 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): | |
|
|
||
| return { | ||
| "success": True, | ||
| "run_id": job_id, | ||
| "dedup_hash": dedup_hash, | ||
| "s3_bucket": bucket_name, | ||
| "s3_prefix": s3_prefix, | ||
| "public_url_base": f"{base_url}/{s3_prefix}/", | ||
|
|
@@ -680,30 +646,12 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): | |
| "total_size": upload_result["total_size"], | ||
| "urls": public_urls, | ||
| "outputs_directory": outputs_directory, | ||
| "simularium_url": simularium_url, | ||
| } | ||
| except Exception as e: | ||
| logging.error(e) | ||
| return {"success": False, "error": e} | ||
|
|
||
| def update_outputs_directory(self, job_id, outputs_directory): | ||
| if not self.db or self.db.s3_client: | ||
| # switch to firebase handler to update job status | ||
| handler = DATABASE_IDS.handlers().get("firebase") | ||
| initialized_db = handler(default_db="staging") | ||
| if job_id: | ||
| timestamp = initialized_db.create_timestamp() | ||
| initialized_db.update_or_create( | ||
| "job_status", | ||
| job_id, | ||
| { | ||
| "timestamp": timestamp, | ||
| "outputs_directory": outputs_directory, | ||
| }, | ||
| ) | ||
| logging.debug( | ||
| f"Updated outputs s3 location {outputs_directory} for job ID: {job_id}" | ||
| ) | ||
|
|
||
|
|
||
| class DBRecipeLoader(object): | ||
| """ | ||
|
|
@@ -890,23 +838,4 @@ def compile_db_recipe_data(db_recipe_data, obj_dict, grad_dict, comp_dict): | |
| return recipe_data | ||
|
|
||
|
|
||
| class DBMaintenance(object): | ||
| """ | ||
| Handles the maintenance of the database. | ||
| """ | ||
|
|
||
| def __init__(self, db_handler): | ||
| self.db = db_handler | ||
| self.result_doc = ResultDoc(self.db) | ||
|
|
||
| def cleanup_results(self): | ||
| """ | ||
| Check if the results in the database are expired and delete them if the linked object expired. | ||
| """ | ||
| self.result_doc.handle_expired_results() | ||
|
|
||
| def readme_url(self): | ||
| """ | ||
| Return the URL to the README file for the database setup section. | ||
| """ | ||
| return "https://github.com/mesoscope/cellpack?tab=readme-ov-file#introduction-to-remote-databases" | ||
| DB_SETUP_README_URL = "https://github.com/mesoscope/cellpack?tab=readme-ov-file#introduction-to-remote-databases" | ||
|
||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We realized the results collection in firebase is not necessary. For cellpack studio purposes, it is redundant with job_status and we only look to job_status. For locally run packings, we can skip uploading the result path to firebase at all and just directly open the simularium file. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,7 +9,6 @@ | |
| "objects", | ||
| "gradients", | ||
| "recipes", | ||
| "results", | ||
| "configs", | ||
| "recipes_edited", | ||
| ] | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. RecipeLoader needs to now accept a dictionary representing a JSON recipe, while maintaining the previous functionality for accepting an input_file_path. Fortunately didn't take much to make this work! But a few things of note
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One suggestion to handle multiple input streams:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We realized the results collection in firebase is not necessary. For cellpack studio purposes, it is redundant with job_status and we only look to job_status. For locally run packings, we can skip uploading the result path to firebase at all and just directly open the simularium file. This allowed us to remove the cleanup code for the results collection (which was the only firebase cleanup we were doing in this repo anyways)