Skip to content

Commit 0df9d9c

Browse files
committed
feat: add async pymongo support with new doc feeder and client methods
1 parent 0cf3a49 commit 0df9d9c

File tree

10 files changed

+639
-45
lines changed

10 files changed

+639
-45
lines changed

biothings/hub/databuild/backend.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def validate_sources(self, sources=None):
122122

123123
def get_src_master_docs(self):
124124
if self.src_masterdocs is None:
125-
self.src_masterdocs = dict([(src["_id"], src) for src in list(self.master.find())])
125+
self.src_masterdocs = {src["_id"]: src for src in mongo.doc_feeder(self.master)}
126126
return self.src_masterdocs
127127

128128
def get_src_metadata(self):
@@ -145,10 +145,10 @@ def get_src_metadata(self):
145145
srcs.append(doc["_id"])
146146
srcs = list(set(srcs))
147147
else:
148-
srcs = [d["_id"] for d in self.dump.find()]
148+
srcs = [d["_id"] for d in mongo.doc_feeder(self.dump, fields={"_id": 1})]
149149
# we need to return main_source named, but if accessed, it's been through sub-source names
150150
# query is different in that case
151-
for src in self.dump.find({"_id": {"$in": srcs}}):
151+
for src in mongo.doc_feeder(self.dump, query={"_id": {"$in": srcs}}):
152152
# now merge other extra information from src_master (src_meta key). src_master _id
153153
# are sub-source names, not main source so we have to deal with src_dump as well
154154
# in order to resolve/map main/sub source name

biothings/hub/databuild/builder.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,15 @@
2020

2121
from biothings import config as btconfig
2222
from biothings.hub import BUILDER_CATEGORY, UPLOADER_CATEGORY
23+
from biothings.hub.databuild.backend import (
24+
LinkTargetDocMongoBackend,
25+
SourceDocMongoBackend,
26+
TargetDocMongoBackend,
27+
create_backend,
28+
)
29+
from biothings.hub.databuild.buildconfig import AutoBuildConfig
30+
from biothings.hub.databuild.mapper import TransparentMapper
31+
from biothings.hub.dataload.uploader import ResourceNotReady
2332
from biothings.hub.manager import BaseManager
2433
from biothings.utils import mongo
2534
from biothings.utils.backend import DocMongoBackend
@@ -36,24 +45,14 @@
3645
get_source_fullname,
3746
get_src_build,
3847
get_src_build_config,
48+
get_src_db,
3949
get_src_dump,
4050
get_src_master,
41-
get_src_db,
4251
)
4352
from biothings.utils.loggers import get_logger
4453
from biothings.utils.manager import JobManager
4554
from biothings.utils.mongo import doc_feeder, id_feeder
4655

47-
from biothings.hub.databuild.backend import (
48-
LinkTargetDocMongoBackend,
49-
SourceDocMongoBackend,
50-
TargetDocMongoBackend,
51-
create_backend,
52-
)
53-
from biothings.hub.databuild.buildconfig import AutoBuildConfig
54-
from biothings.hub.databuild.mapper import TransparentMapper
55-
from biothings.hub.dataload.uploader import ResourceNotReady
56-
5756
logging = btconfig.logger
5857

5958

@@ -1141,7 +1140,7 @@ def __init__(
11411140

11421141
def clean_stale_status(self):
11431142
src_build = get_src_build()
1144-
for build in src_build.find():
1143+
for build in doc_feeder(src_build):
11451144
dirty = False
11461145
for job in build.get("jobs", []):
11471146
if job.get("status") == "building":

biothings/hub/databuild/differ.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from biothings.utils.hub_db import get_src_build
2020
from biothings.utils.jsondiff import make as jsondiff
2121
from biothings.utils.loggers import get_logger
22-
from biothings.utils.mongo import get_previous_collection, get_target_db, id_feeder
22+
from biothings.utils.mongo import doc_feeder, get_previous_collection, get_target_db, id_feeder
2323
from biothings.utils.serializer import to_json_file
2424

2525
from .backend import create_backend, merge_src_build_metadata
@@ -1004,7 +1004,7 @@ def __init__(self, poll_schedule=None, *args, **kwargs):
10041004

10051005
def clean_stale_status(self):
10061006
src_build = get_src_build()
1007-
for build in src_build.find():
1007+
for build in doc_feeder(src_build):
10081008
dirty = False
10091009
for job in build.get("jobs", []):
10101010
if job.get("status") == "diffing":

biothings/hub/databuild/syncer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -836,7 +836,7 @@ def __init__(self, *args, **kwargs):
836836

837837
def clean_stale_status(self):
838838
src_build = get_src_build()
839-
for build in src_build.find():
839+
for build in doc_feeder(src_build):
840840
dirty = False
841841
for job in build.get("jobs", []):
842842
if job.get("status") == "syncing":

biothings/hub/default_config.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,8 @@
8383
import biothings.utils.jsondiff
8484

8585
# set_default_folder is needed for evaluating some default values below
86-
from biothings.utils.configuration import (
87-
ConfigurationDefault,
88-
ConfigurationError,
89-
ConfigurationValue,
90-
set_default_folder, # pylint: disable=unused-import # noqa
91-
)
86+
from biothings.utils.configuration import set_default_folder # pylint: disable=unused-import # noqa
87+
from biothings.utils.configuration import ConfigurationDefault, ConfigurationError, ConfigurationValue
9288

9389
# * 1. General *#
9490
# Hub name/icon url/version, for display purpose
@@ -174,6 +170,31 @@
174170
desc="Define path to folder which will contain cache files, set to None to disable",
175171
)
176172

173+
# Experimental: use async pymongo-backed feeder in `biothings.utils.mongo.doc_feeder`
174+
# Enabled path is currently opt-in and only applies to Mongo source/target/hubdb collections.
175+
USE_ASYNC_DOC_FEEDER = ConfigurationDefault(
176+
default=False,
177+
desc="Use async pymongo doc feeder (experimental, defaults to synchronous behavior)",
178+
)
179+
180+
# Size of the async producer -> sync consumer prefetch queue used by doc_feeder bridge.
181+
ASYNC_DOC_FEEDER_BUFFER_SIZE = ConfigurationDefault(
182+
default=4,
183+
desc="Prefetch queue size for async doc feeder bridge (higher may improve throughput at the cost of memory)",
184+
)
185+
186+
# Emit comparable feeder benchmark lines with counts and elapsed time.
187+
DOC_FEEDER_BENCHMARK = ConfigurationDefault(
188+
default=False,
189+
desc="Enable doc_feeder benchmark summary logs (DOC_FEEDER_BENCH)",
190+
)
191+
192+
# Optional benchmark label to group A/B runs in logs.
193+
DOC_FEEDER_BENCHMARK_TAG = ConfigurationDefault(
194+
default="",
195+
desc="Optional label attached to DOC_FEEDER_BENCH logs (for example baseline/async/module-swap)",
196+
)
197+
177198
# Path to a folder to store all 3rd party parsers, dumpers, etc...
178199
DATA_PLUGIN_FOLDER = ConfigurationDefault(
179200
default="./plugins", desc="Define path to folder which will contain all 3rd party parsers, dumpers, etc..."

0 commit comments

Comments
 (0)