Skip to content

Commit 52e5e05

Browse files
committed
Add async materialized view refresh jobs
Adds refresh jobs for existing materialized views and invokes them: - Refreshes the published variants materialized view whenever a new score set is published. (We cannot delete a published score set, nor edit it's variants, so publication time is the only moment this view becomes outdated). - Refreshes all materialized views at 0300. Adds tests for newly placed publication view refresh. Clarifies connection -> inspector flow during mat view refresh. Opens #405.
1 parent 44a25e5 commit 52e5e05

File tree

8 files changed

+425
-172
lines changed

8 files changed

+425
-172
lines changed

src/mavedb/db/view.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def _drop_view(element: DropView, compiler, **kw):
5050
return "DROP %s %s" % ("MATERIALIZED VIEW" if element.materialized else "VIEW", element.name)
5151

5252

53-
def view_exists(ddl: CreateView, target, connection: Session, materialized: bool, **kw):
53+
def view_exists(ddl: CreateView, target, connection: sa.Connection, materialized: bool, **kw):
5454
inspector = sa.inspect(connection)
5555
if inspector is None:
5656
return False
@@ -59,7 +59,7 @@ def view_exists(ddl: CreateView, target, connection: Session, materialized: bool
5959
return ddl.name in view_names
6060

6161

62-
def view_doesnt_exist(ddl: CreateView, target, connection: Session, materialized: bool, **kw):
62+
def view_doesnt_exist(ddl: CreateView, target, connection: sa.Connection, materialized: bool, **kw):
6363
return not view_exists(ddl, target, connection, materialized, **kw)
6464

6565

@@ -121,7 +121,7 @@ class MyView(Base):
121121
return t
122122

123123

124-
def refresh_mat_view(session, name, concurrently=True):
124+
def refresh_mat_view(session: Session, name: str, concurrently=True):
125125
"""
126126
Refreshes a single materialized view, given by `name`.
127127
"""
@@ -132,12 +132,15 @@ def refresh_mat_view(session, name, concurrently=True):
132132
session.execute(sa.text("REFRESH MATERIALIZED VIEW " + _con + name))
133133

134134

135-
# TODO: untested.
136-
def refresh_all_mat_views(session, concurrently=True):
135+
def refresh_all_mat_views(session: Session, concurrently=True):
137136
"""
138137
Refreshes all materialized views. Views are refreshed in non-deterministic order,
139138
so view definitions can't depend on each other.
140139
"""
141-
mat_views = session.inspect(session.engine).get_view_names()
142-
for v in mat_views:
143-
refresh_mat_view(session, v, concurrently)
140+
inspector = sa.inspect(session.connection())
141+
142+
if not inspector:
143+
return
144+
145+
for mv in inspector.get_materialized_view_names():
146+
refresh_mat_view(session, mv, concurrently)

src/mavedb/routers/score_sets.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,6 @@ async def upload_score_set_variant_data(
658658
except UnicodeDecodeError as e:
659659
raise HTTPException(status_code=400, detail=f"Error decoding file: {e}. Ensure the file has correct values.")
660660

661-
662661
if scores_file:
663662
# Although this is also updated within the variant creation job, update it here
664663
# as well so that we can display the proper UI components (queue invocation delay
@@ -1016,11 +1015,12 @@ async def delete_score_set(
10161015
response_model=score_set.ScoreSet,
10171016
response_model_exclude_none=True,
10181017
)
1019-
def publish_score_set(
1018+
async def publish_score_set(
10201019
*,
10211020
urn: str,
10221021
db: Session = Depends(deps.get_db),
10231022
user_data: UserData = Depends(require_current_user),
1023+
worker: ArqRedis = Depends(deps.get_worker),
10241024
) -> Any:
10251025
"""
10261026
Publish a score set.
@@ -1097,4 +1097,18 @@ def publish_score_set(
10971097
db.commit()
10981098
db.refresh(item)
10991099

1100+
# await the insertion of this job into the worker queue, not the job itself.
1101+
job = await worker.enqueue_job(
1102+
"refresh_published_variants_view",
1103+
correlation_id_for_context(),
1104+
user_data.user.id,
1105+
)
1106+
if job is not None:
1107+
save_to_logging_context({"worker_job_id": job.job_id})
1108+
logger.info(msg="Enqueud published variant materialized view refresh job.", extra=logging_context())
1109+
else:
1110+
logger.warning(
1111+
msg="Failed to enqueue published variant materialized view refresh job.", extra=logging_context()
1112+
)
1113+
11001114
return item

src/mavedb/worker/jobs.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from sqlalchemy.orm import Session
1515

1616
from mavedb.data_providers.services import vrs_mapper
17+
from mavedb.db.view import refresh_all_mat_views
1718
from mavedb.lib.exceptions import MappingEnqueueError, NonexistentMappingReferenceError, NonexistentMappingResultsError
1819
from mavedb.lib.logging.context import format_raised_exception_info_as_dict
1920
from mavedb.lib.score_sets import (
@@ -29,6 +30,7 @@
2930
from mavedb.models.enums.mapping_state import MappingState
3031
from mavedb.models.enums.processing_state import ProcessingState
3132
from mavedb.models.mapped_variant import MappedVariant
33+
from mavedb.models.published_variant import PublishedVariantsMV
3234
from mavedb.models.score_set import ScoreSet
3335
from mavedb.models.user import User
3436
from mavedb.models.variant import Variant
@@ -50,7 +52,9 @@ async def mapping_in_execution(redis: ArqRedis, job_id: str):
5052
await redis.set(MAPPING_CURRENT_ID_NAME, "")
5153

5254

53-
def setup_job_state(ctx, invoker: int, resource: Optional[str], correlation_id: str):
55+
def setup_job_state(
56+
ctx, invoker: Optional[int], resource: Optional[str], correlation_id: Optional[str]
57+
) -> dict[str, Any]:
5458
ctx["state"][ctx["job_id"]] = {
5559
"application": "mavedb-worker",
5660
"user": invoker,
@@ -653,3 +657,20 @@ async def variant_mapper_manager(ctx: dict, correlation_id: str, updater_id: int
653657
db.commit()
654658

655659
return {"success": False, "enqueued_job": new_job_id}
660+
661+
662+
# TODO#405: Refresh materialized views within an executor.
663+
async def refresh_materialized_views(ctx: dict):
664+
logging_context = setup_job_state(ctx, None, None, None)
665+
logger.debug(msg="Began refresh materialized views.", extra=logging_context)
666+
refresh_all_mat_views(ctx["db"])
667+
logger.debug(msg="Done refreshing materialized views.", extra=logging_context)
668+
return {"success": True}
669+
670+
671+
async def refresh_published_variants_view(ctx: dict, correlation_id: str):
672+
logging_context = setup_job_state(ctx, None, None, correlation_id)
673+
logger.debug(msg="Began refresh of published variants materialized view.", extra=logging_context)
674+
PublishedVariantsMV.refresh(ctx["db"])
675+
logger.debug(msg="Done refreshing of published variants materialized view.", extra=logging_context)
676+
return {"success": True}

src/mavedb/worker/settings.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,29 @@
33
from typing import Callable
44

55
from arq.connections import RedisSettings
6-
from arq.cron import CronJob
6+
from arq.cron import CronJob, cron
77

88
from mavedb.data_providers.services import cdot_rest
99
from mavedb.db.session import SessionLocal
1010
from mavedb.lib.logging.canonical import log_job
11-
from mavedb.worker.jobs import create_variants_for_score_set, map_variants_for_score_set, variant_mapper_manager
11+
from mavedb.worker.jobs import (
12+
create_variants_for_score_set,
13+
map_variants_for_score_set,
14+
variant_mapper_manager,
15+
refresh_materialized_views,
16+
refresh_published_variants_view,
17+
)
1218

1319
# ARQ requires at least one task on startup.
1420
BACKGROUND_FUNCTIONS: list[Callable] = [
1521
create_variants_for_score_set,
1622
variant_mapper_manager,
1723
map_variants_for_score_set,
24+
refresh_published_variants_view,
25+
]
26+
BACKGROUND_CRONJOBS: list[CronJob] = [
27+
cron(refresh_materialized_views, name="refresh_all_materialized_views", hour=3, minute=0)
1828
]
19-
BACKGROUND_CRONJOBS: list[CronJob] = []
2029

2130
REDIS_IP = os.getenv("REDIS_IP") or "localhost"
2231
REDIS_PORT = int(os.getenv("REDIS_PORT") or 6379)

tests/helpers/util.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,10 @@ def create_acc_score_set_with_variants(
240240

241241

242242
def publish_score_set(client, score_set_urn):
243-
response = client.post(f"/api/v1/score-sets/{score_set_urn}/publish")
244-
assert response.status_code == 200, f"Could not publish score set {score_set_urn}"
243+
with patch.object(ArqRedis, "enqueue_job", return_value=None) as worker_queue:
244+
response = client.post(f"/api/v1/score-sets/{score_set_urn}/publish")
245+
assert response.status_code == 200, f"Could not publish score set {score_set_urn}"
246+
worker_queue.assert_called_once()
245247

246248
response_data = response.json()
247249
jsonschema.validate(instance=response_data, schema=ScoreSet.schema())

0 commit comments

Comments
 (0)