Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions src/hubmap_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@

logger = logging.getLogger(__name__)

config = {}
app = Flask(__name__, instance_path=os.path.join(os.path.abspath(os.path.dirname(__file__)), 'instance'),
instance_relative_config=True)
app.config.from_pyfile('app.cfg')
config['INDICES'] = safe_load((Path(__file__).absolute().parent / 'instance/search-config.yaml').read_text())

# This list contains fields that are added to the top-level at index runtime
entity_properties_list = [
'donor',
Expand Down Expand Up @@ -685,6 +691,144 @@ def _transform_and_write_entity_to_index_group(self, entity:dict, index_group:st
f" entity['uuid']={entity['uuid']},"
f" entity['entity_type']={entity['entity_type']}")

def enqueue_reindex(self, entity_id, reindex_queue, priority):
try:
logger.info(f"Start executing translate() on entity_id: {entity_id}")
entity = self.call_entity_api(entity_id=entity_id, endpoint_base='documents')
logger.info(f"Enqueueing reindex for {entity['entity_type']} of uuid: {entity_id}")
subsequent_priority = max(priority, 2)

job_id = reindex_queue.enqueue(
task_func=reindex_entity_queued_wrapper,
entity_id=entity_id,
args=[entity_id, self.token],
priority=priority
)
collection_associations = []
upload_associations = []
if entity['entity_type'] in ['Collection', 'Epicollection']:
collection = self.get_collection_doc(entity_id=entity_id)
if 'datasets' in collection:
logger.info(f"Enqueing {len(collection['datasets'])} datasets for {entity['entity_type']} {entity_id}")
dataset_ids = [ds['uuid'] for ds in collection['datasets']]
for dataset_id in dataset_ids:
collection_associations.append(dataset_id)
if 'associated_publication' in collection and collection['associated_publication']:
logger.info(f"Enqueueing associated_publication for {entity['entity_type']} {entity_id}")
collection_associations.append(collection['associated_publication'])

logger.info(f"Finished executing enqueue_reindex() for {entity['entity_type']} of uuid: {entity_id}")
return job_id

if entity['entity_type'] == 'Upload':
if 'datasets' in entity:
logger.info(f"Enqueueing {len(entity['datasets'])} datasets for Upload {entity_id}")
for dataset in entity['datasets']:
upload_associations.append(dataset['uuid'])
logger.info(f"Finished executing enqueue_reindex() for Upload of uuid: {entity_id}")
return job_id

logger.info(f"Calculating related entities for {entity_id}")

neo4j_ancestor_ids = self.call_entity_api(
entity_id=entity_id,
endpoint_base='ancestors',
endpoint_suffix=None,
url_property='uuid'
)

neo4j_descendant_ids = self.call_entity_api(
entity_id=entity_id,
endpoint_base='descendants',
endpoint_suffix=None,
url_property='uuid'
)

previous_revision_ids = []
next_revision_ids = []
neo4j_collection_ids = []
neo4j_upload_ids = []

if entity['entity_type'] in ['Dataset', 'Publication']:
previous_revision_ids = self.call_entity_api(
entity_id=entity_id,
endpoint_base='previous_revisions',
endpoint_suffix=None,
url_property='uuid'
)

next_revision_ids = self.call_entity_api(
entity_id=entity_id,
endpoint_base='next_revisions',
endpoint_suffix=None,
url_property='uuid'
)

neo4j_collection_ids = self.call_entity_api(
entity_id=entity_id,
endpoint_base='entities',
endpoint_suffix='collections',
url_property='uuid'
)

neo4j_upload_ids = self.call_entity_api(
entity_id=entity_id,
endpoint_base='entities',
endpoint_suffix='uploads',
url_property='uuid'
)

target_ids = set(
neo4j_ancestor_ids +
neo4j_descendant_ids +
previous_revision_ids +
next_revision_ids +
neo4j_collection_ids +
neo4j_upload_ids +
upload_associations +
collection_associations
)

logger.info(f"Enqueueing {len(target_ids)} related entities for {entity_id}")

for related_entity_id in target_ids:
reindex_queue.enqueue(
task_func=reindex_entity_queued_wrapper,
entity_id=related_entity_id,
args=[entity_id, self.token],
priority=subsequent_priority
)
logger.info(f"Finished executing translate() on {entity['entity_type']} of uuid: {entity_id}")
return job_id

except Exception:
msg = "Exception during executing translate()"
logger.exception(msg)
raise

def reindex_entity_queued(self, entity_id):
try:
logger.info(f"Start executing reindex_entity_queued() on uuid: {entity_id}")
entity = self.call_entity_api(entity_id=entity_id, endpoint_base='documents')
logger.info(f"Reindexing {entity['entity_type']} of uuid: {entity_id}")

if entity['entity_type'] in ['Collection', 'Epicollection']:
self.translate_collection(entity_id, reindex=True)

elif entity['entity_type'] == 'Upload':
self.translate_upload(entity_id, reindex=True)

else:
self._call_indexer(entity=entity, delete_existing_doc_first=True)

logger.info(f"Finished executing reindex_entity_queued() on {entity['entity_type']} of uuid: {entity_id}")

except Exception as e:
msg = f"Exception during reindex_entity_queued() for uuid: {entity_id}"
logger.exception(msg)

raise

# Used by individual live reindex call
def translate(self, entity_id):
try:
Expand Down Expand Up @@ -2027,6 +2171,18 @@ def get_organ_types(self):
# Running full reindex script in command line
# This approach is different from the live /reindex-all PUT call
# It'll delete all the existing indices and recreate then then index everything


def reindex_entity_queued_wrapper(entity_id, token):
translator = Translator(
indices=config['INDICES'],
app_client_id=app.config['APP_CLIENT_ID'],
app_client_secret=app.config['APP_CLIENT_SECRET'],
token=token,
ontology_api_base_url=app.config['ONTOLOGY_API_BASE_URL']
)
translator.reindex_entity_queued(entity_id)

if __name__ == "__main__":
# Specify the absolute path of the instance folder and use the config file relative to the instance path
app = Flask(__name__, instance_path=os.path.join(os.path.abspath(os.path.dirname(__file__)), '../src/instance'),
Expand Down
9 changes: 8 additions & 1 deletion src/instance/app.cfg.example
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ PARAM_SEARCH_RECOGNIZED_ENTITIES_BY_INDEX = {
}
}

DEBUG_MODE = False
DEBUG_MODE = False
JOB_QUEUE_MODE = False
QUEUE_WORKERS = 4

REDIS_HOST = jobq-redis
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = None
5 changes: 5 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
config['PARAM_SEARCH_RECOGNIZED_ENTITIES_BY_INDEX'] = app.config['PARAM_SEARCH_RECOGNIZED_ENTITIES_BY_INDEX']
config['ONTOLOGY_API_BASE_URL'] = app.config['ONTOLOGY_API_BASE_URL'].strip('/')
config['DEBUG_MODE'] = app.config['DEBUG_MODE']
config['JOB_QUEUE_MODE'] = app.config['JOB_QUEUE_MODE']
config['REDIS_HOST'] = app.config['REDIS_HOST']
config['REDIS_PORT'] = app.config['REDIS_PORT']
config['REDIS_DB'] = app.config['REDIS_DB']
config['REDIS_PASSWORD'] = app.config['REDIS_PASSWORD']

if not config['ONTOLOGY_API_BASE_URL']:
raise Exception(f"Unable retrieve ontology information using"
Expand Down
2 changes: 1 addition & 1 deletion src/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ portal-visualization==0.4.20
# Default is main branch specified in search-api's docker-compose.development.yml if not set
# git+https://github.com/hubmapconsortium/commons.git@${COMMONS_BRANCH}#egg=hubmap-commons
hubmap-commons==2.1.22

atlas-consortia-jobq @ git+https://github.com/x-atlas-consortia/jobq.git@dev-integrate
# The use of `-r` lets us specify the transitive requirements in one place
-r search-adaptor/src/requirements.txt
2 changes: 1 addition & 1 deletion src/search-adaptor
17 changes: 17 additions & 0 deletions src/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
from flask import Flask
from atlas_consortia_jobq import JobQueue

if __name__ == '__main__':
script_dir = os.path.dirname(os.path.abspath(__file__))
app = Flask(__name__,
instance_path=os.path.join(script_dir, 'instance'),
instance_relative_config=True)
app.config.from_pyfile('app.cfg')
queue = JobQueue(
redis_host=app.config.get('REDIS_HOST', 'localhost'),
redis_port=int(app.config.get('REDIS_PORT', 6379)),
redis_db=int(app.config.get('REDIS_DB', 0))
)
queue_workers = int(app.config.get('QUEUE_WORKERS', 4))
queue.start_workers(num_workers=queue_workers)