Skip to content

Commit 327d390

Browse files
authored
Merge pull request #971 from hubmapconsortium/Derek-Furst/implement-jobq
Derek furst/implement jobq
2 parents 1e615f1 + 8f1dd42 commit 327d390

File tree

9 files changed

+236
-5
lines changed

9 files changed

+236
-5
lines changed

docker/docker-compose.yml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,38 @@ services:
4141
awslogs-group: ${LOG_GROUP}
4242
awslogs-stream: ${LOG_STREAM}
4343

44+
jobq-redis:
45+
image: redis:8.2.3
46+
hostname: jobq-redis
47+
container_name: jobq-redis
48+
ports:
49+
- "6379:6379"
50+
healthcheck:
51+
test: ["CMD", "redis-cli", "ping"]
52+
interval: 1m30s
53+
timeout: 10s
54+
retries: 3
55+
start_period: 40s
56+
# NOTE: The dhi image doesn't work as easily due to hardened security
57+
# image: dhi.io/redis:8.2.3
58+
# environment:
59+
# REDIS_PASSWORD: jobq-secret
60+
# command:
61+
# [
62+
# "redis-server",
63+
# "--bind", "0.0.0.0",
64+
# "--protected-mode", "yes",
65+
# "--requirepass", "jobq-secret"
66+
# ]
67+
# healthcheck:
68+
# test: ["CMD", "redis-cli", "-a", "jobq-secret", "ping"]
69+
# interval: 1m30s
70+
# timeout: 10s
71+
# retries: 3
72+
# start_period: 40s
73+
networks:
74+
- gateway_hubmap
75+
4476
networks:
4577
# This is the network created by gateway to enable communicaton between multiple docker-compose projects
4678
gateway_hubmap:

docker/search-api/start.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,9 @@
44
# 'daemon off;' is nginx configuration directive
55
nginx -g 'daemon off;' &
66

7+
# NOTE: Explicitly call the Python executable with its full path instead of just `python` or `python3.13`
8+
# This is due to the api-base-image v1.2.0 uses aliases
9+
/usr/local/bin/python3.13 /usr/src/app/src/jobq_workers.py &
10+
711
# Start uwsgi and keep it running in foreground
812
/usr/local/python3.13/bin/uwsgi --ini /usr/src/app/src/uwsgi.ini

src/hubmap_translator.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import re
99
import sys
1010
import time
11+
from redis import Redis, ConnectionError, RedisError
1112
from yaml import safe_load, YAMLError
1213
from http.client import HTTPException
1314
from enum import Enum
@@ -27,6 +28,12 @@
2728

2829
logger = logging.getLogger(__name__)
2930

31+
config = {}
32+
app = Flask(__name__, instance_path=os.path.join(os.path.abspath(os.path.dirname(__file__)), 'instance'),
33+
instance_relative_config=True)
34+
app.config.from_pyfile('app.cfg')
35+
config['INDICES'] = safe_load((Path(__file__).absolute().parent / 'instance/search-config.yaml').read_text())
36+
3037
# This list contains fields that are added to the top-level at index runtime
3138
entity_properties_list = [
3239
'donor',
@@ -685,6 +692,147 @@ def _transform_and_write_entity_to_index_group(self, entity:dict, index_group:st
685692
f" entity['uuid']={entity['uuid']},"
686693
f" entity['entity_type']={entity['entity_type']}")
687694

695+
def enqueue_reindex(self, entity_id, reindex_queue, priority):
696+
try:
697+
logger.info(f"Start executing translate() on entity_id: {entity_id}")
698+
entity = self.call_entity_api(entity_id=entity_id, endpoint_base='documents')
699+
logger.info(f"Enqueueing reindex for {entity['entity_type']} of uuid: {entity_id}")
700+
subsequent_priority = max(priority, 2)
701+
702+
job_id = reindex_queue.enqueue(
703+
task_func=reindex_entity_queued_wrapper,
704+
entity_id=entity_id,
705+
args=[entity_id, self.token],
706+
priority=priority
707+
)
708+
collection_associations = []
709+
upload_associations = []
710+
if entity['entity_type'] in ['Collection', 'Epicollection']:
711+
collection = self.get_collection_doc(entity_id=entity_id)
712+
if 'datasets' in collection:
713+
logger.info(f"Enqueing {len(collection['datasets'])} datasets for {entity['entity_type']} {entity_id}")
714+
dataset_ids = [ds['uuid'] for ds in collection['datasets']]
715+
for dataset_id in dataset_ids:
716+
collection_associations.append(dataset_id)
717+
if 'associated_publication' in collection and collection['associated_publication']:
718+
logger.info(f"Enqueueing associated_publication for {entity['entity_type']} {entity_id}")
719+
collection_associations.append(collection['associated_publication'])
720+
721+
logger.info(f"Finished executing enqueue_reindex() for {entity['entity_type']} of uuid: {entity_id}")
722+
return job_id
723+
724+
if entity['entity_type'] == 'Upload':
725+
if 'datasets' in entity:
726+
logger.info(f"Enqueueing {len(entity['datasets'])} datasets for Upload {entity_id}")
727+
for dataset in entity['datasets']:
728+
upload_associations.append(dataset['uuid'])
729+
logger.info(f"Finished executing enqueue_reindex() for Upload of uuid: {entity_id}")
730+
return job_id
731+
732+
logger.info(f"Calculating related entities for {entity_id}")
733+
734+
neo4j_ancestor_ids = self.call_entity_api(
735+
entity_id=entity_id,
736+
endpoint_base='ancestors',
737+
endpoint_suffix=None,
738+
url_property='uuid'
739+
)
740+
741+
neo4j_descendant_ids = self.call_entity_api(
742+
entity_id=entity_id,
743+
endpoint_base='descendants',
744+
endpoint_suffix=None,
745+
url_property='uuid'
746+
)
747+
748+
previous_revision_ids = []
749+
next_revision_ids = []
750+
neo4j_collection_ids = []
751+
neo4j_upload_ids = []
752+
753+
if entity['entity_type'] in ['Dataset', 'Publication']:
754+
previous_revision_ids = self.call_entity_api(
755+
entity_id=entity_id,
756+
endpoint_base='previous_revisions',
757+
endpoint_suffix=None,
758+
url_property='uuid'
759+
)
760+
761+
next_revision_ids = self.call_entity_api(
762+
entity_id=entity_id,
763+
endpoint_base='next_revisions',
764+
endpoint_suffix=None,
765+
url_property='uuid'
766+
)
767+
768+
neo4j_collection_ids = self.call_entity_api(
769+
entity_id=entity_id,
770+
endpoint_base='entities',
771+
endpoint_suffix='collections',
772+
url_property='uuid'
773+
)
774+
775+
neo4j_upload_ids = self.call_entity_api(
776+
entity_id=entity_id,
777+
endpoint_base='entities',
778+
endpoint_suffix='uploads',
779+
url_property='uuid'
780+
)
781+
782+
target_ids = set(
783+
neo4j_ancestor_ids +
784+
neo4j_descendant_ids +
785+
previous_revision_ids +
786+
next_revision_ids +
787+
neo4j_collection_ids +
788+
neo4j_upload_ids +
789+
upload_associations +
790+
collection_associations
791+
)
792+
793+
logger.info(f"Enqueueing {len(target_ids)} related entities for {entity_id}")
794+
795+
for related_entity_id in target_ids:
796+
reindex_queue.enqueue(
797+
task_func=reindex_entity_queued_wrapper,
798+
entity_id=related_entity_id,
799+
args=[related_entity_id, self.token],
800+
priority=subsequent_priority
801+
)
802+
logger.info(f"Finished executing translate() on {entity['entity_type']} of uuid: {entity_id}")
803+
return job_id
804+
except ValueError as e:
805+
raise ValueError(e)
806+
except RedisError as e:
807+
raise RedisError(e)
808+
except Exception:
809+
msg = "Exception during executing translate()"
810+
logger.exception(msg)
811+
raise
812+
813+
def reindex_entity_queued(self, entity_id):
814+
try:
815+
logger.info(f"Start executing reindex_entity_queued() on uuid: {entity_id}")
816+
entity = self.call_entity_api(entity_id=entity_id, endpoint_base='documents')
817+
logger.info(f"Reindexing {entity['entity_type']} of uuid: {entity_id}")
818+
819+
if entity['entity_type'] in ['Collection', 'Epicollection']:
820+
self.translate_collection(entity_id, reindex=True)
821+
822+
elif entity['entity_type'] == 'Upload':
823+
self.translate_upload(entity_id, reindex=True)
824+
825+
else:
826+
self._call_indexer(entity=entity, delete_existing_doc_first=True)
827+
828+
logger.info(f"Finished executing reindex_entity_queued() on {entity['entity_type']} of uuid: {entity_id}")
829+
830+
except Exception as e:
831+
msg = f"Exception during reindex_entity_queued() for uuid: {entity_id}"
832+
logger.exception(msg)
833+
834+
raise
835+
688836
# Used by individual live reindex call
689837
def translate(self, entity_id):
690838
try:
@@ -2027,6 +2175,18 @@ def get_organ_types(self):
20272175
# Running full reindex script in command line
20282176
# This approach is different from the live /reindex-all PUT call
20292177
# It'll delete all the existing indices and recreate then then index everything
2178+
2179+
2180+
def reindex_entity_queued_wrapper(entity_id, token):
2181+
translator = Translator(
2182+
indices=config['INDICES'],
2183+
app_client_id=app.config['APP_CLIENT_ID'],
2184+
app_client_secret=app.config['APP_CLIENT_SECRET'],
2185+
token=token,
2186+
ontology_api_base_url=app.config['ONTOLOGY_API_BASE_URL']
2187+
)
2188+
translator.reindex_entity_queued(entity_id)
2189+
20302190
if __name__ == "__main__":
20312191
# Specify the absolute path of the instance folder and use the config file relative to the instance path
20322192
app = Flask(__name__, instance_path=os.path.join(os.path.abspath(os.path.dirname(__file__)), '../src/instance'),

src/instance/app.cfg.example

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
# Set to False to use INFO logging level
2+
DEBUG_MODE = True
3+
14
# Globus app client ID and secret
25
APP_CLIENT_ID = ''
36
APP_CLIENT_SECRET = ''
@@ -29,4 +32,10 @@ PARAM_SEARCH_RECOGNIZED_ENTITIES_BY_INDEX = {
2932
}
3033
}
3134

32-
DEBUG_MODE = False
35+
# Reindex job queue settings
36+
JOB_QUEUE_MODE = False
37+
QUEUE_WORKERS = 32
38+
REDIS_HOST = jobq-redis
39+
REDIS_PORT = 6379
40+
REDIS_DB = 0
41+
REDIS_PASSWORD = None

src/jobq_workers.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import os
2+
from flask import Flask
3+
from atlas_consortia_jobq import JobQueue
4+
5+
if __name__ == '__main__':
6+
script_dir = os.path.dirname(os.path.abspath(__file__))
7+
app = Flask(__name__,
8+
instance_path=os.path.join(script_dir, 'instance'),
9+
instance_relative_config=True)
10+
app.config.from_pyfile('app.cfg')
11+
queue = JobQueue(
12+
redis_host=app.config.get('REDIS_HOST', 'localhost'),
13+
redis_port=int(app.config.get('REDIS_PORT', 6379)),
14+
redis_db=int(app.config.get('REDIS_DB', 0))
15+
)
16+
queue_workers = int(app.config.get('QUEUE_WORKERS', 4))
17+
queue.start_workers(num_workers=queue_workers)

src/main.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232
config['PARAM_SEARCH_RECOGNIZED_ENTITIES_BY_INDEX'] = app.config['PARAM_SEARCH_RECOGNIZED_ENTITIES_BY_INDEX']
3333
config['ONTOLOGY_API_BASE_URL'] = app.config['ONTOLOGY_API_BASE_URL'].strip('/')
3434
config['DEBUG_MODE'] = app.config['DEBUG_MODE']
35+
config['JOB_QUEUE_MODE'] = app.config['JOB_QUEUE_MODE']
36+
config['REDIS_HOST'] = app.config['REDIS_HOST']
37+
config['REDIS_PORT'] = app.config['REDIS_PORT']
38+
config['REDIS_DB'] = app.config['REDIS_DB']
39+
config['REDIS_PASSWORD'] = app.config['REDIS_PASSWORD']
3540

3641
if not config['ONTOLOGY_API_BASE_URL']:
3742
raise Exception(f"Unable retrieve ontology information using"

src/requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ portal-visualization==0.4.22
1010
# Use the branch name of commons from github for testing new changes made in commons from different branch
1111
# Default is main branch specified in search-api's docker-compose.development.yml if not set
1212
# git+https://github.com/hubmapconsortium/commons.git@${COMMONS_BRANCH}#egg=hubmap-commons
13+
1314
hubmap-commons==2.1.22
15+
atlas-consortia-jobq @ git+https://github.com/x-atlas-consortia/jobq.git@main
1416

1517
# The use of `-r` lets us specify the transitive requirements in one place
1618
-r search-adaptor/src/requirements.txt

src/uwsgi.ini

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ log-master=true
1111

1212
# Master with 4 worker process (based on CPU number)
1313
master = true
14-
processes = 4
14+
processes = 8
1515

16-
# Enable multithreading for search-api
17-
enable-threads = true
16+
# Enable the multithreading within uWSGI
17+
# Launch the application across multiple threads inside each process
18+
enable-threads = True
19+
threads = 8
1820

1921
# Use http socket for integration with nginx running on the same machine
2022
socket = localhost:5000

0 commit comments

Comments
 (0)