Skip to content

Commit 824bcac

Browse files
authored
Merge pull request #557 from VariantEffect/bugfix/bencap/556/mat-views-not-refreshing
Enhance docs for mat view refresh methods and commit their transactions
2 parents c6a2014 + 3b90ab8 commit 824bcac

File tree

2 files changed

+134
-15
lines changed

2 files changed

+134
-15
lines changed

src/mavedb/db/view.py

Lines changed: 123 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66

77
import sqlalchemy as sa
88
from sqlalchemy.ext import compiler
9-
from sqlalchemy.schema import DDLElement, MetaData
109
from sqlalchemy.orm import Session
10+
from sqlalchemy.schema import DDLElement, MetaData
1111

1212
from mavedb.db.base import Base
1313

@@ -32,7 +32,53 @@ class MaterializedView(Base):
3232

3333
@classmethod
3434
def refresh(cls, connection, concurrently=True):
35-
"""Refresh this materialized view."""
35+
"""
36+
Refresh the underlying materialized view for this ORM-mapped class.
37+
38+
This class method delegates to `refresh_mat_view` to issue a database
39+
REFRESH MATERIALIZED VIEW (optionally CONCURRENTLY) statement for the
40+
materialized view backing the current model (`cls.__table__.fullname`).
41+
42+
Parameters
43+
---------
44+
connection : sqlalchemy.engine.Connection | sqlalchemy.orm.Session
45+
An active SQLAlchemy connection or session bound to the target database.
46+
concurrently : bool, default True
47+
If True, performs a concurrent refresh (REFRESH MATERIALIZED VIEW CONCURRENTLY),
48+
allowing reads during the refresh when the database backend supports it.
49+
If False, performs a blocking refresh.
50+
51+
Returns
52+
-------
53+
None
54+
55+
Raises
56+
------
57+
sqlalchemy.exc.DBAPIError
58+
If the database reports an error while refreshing the materialized view.
59+
sqlalchemy.exc.OperationalError
60+
For operational issues such as locks or insufficient privileges.
61+
ValueError
62+
If the connection provided is not a valid SQLAlchemy connection/session.
63+
64+
Notes
65+
-----
66+
- A concurrent refresh typically requires the materialized view to have a unique
67+
index matching all rows; otherwise the database may reject the operation.
68+
- This operation does not return a value; it is executed for its side effect.
69+
- Ensure the connection/session is in a clean transactional state if you rely on
70+
consistent snapshot semantics.
71+
- This function commits no changes; it is the caller's responsibility to
72+
commit the session if needed.
73+
74+
Examples
75+
--------
76+
# Refresh with concurrent mode (default)
77+
MyMaterializedView.refresh(connection)
78+
79+
# Perform a blocking refresh
80+
MyMaterializedView.refresh(connection, concurrently=False)
81+
"""
3682
refresh_mat_view(connection, cls.__table__.fullname, concurrently)
3783

3884

@@ -123,19 +169,91 @@ class MyView(Base):
123169

124170
def refresh_mat_view(session: Session, name: str, concurrently=True):
125171
"""
126-
Refreshes a single materialized view, given by `name`.
172+
Refresh a PostgreSQL materialized view within the current SQLAlchemy session.
173+
174+
This helper issues a REFRESH MATERIALIZED VIEW statement for the specified
175+
materialized view name. It first explicitly flushes the session because
176+
session.execute() bypasses SQLAlchemy's autoflush mechanism; without the flush,
177+
pending changes (e.g., newly inserted/updated rows that the view depends on)
178+
might not be reflected in the refreshed view.
179+
180+
Parameters
181+
----------
182+
session : sqlalchemy.orm.Session
183+
An active SQLAlchemy session bound to a PostgreSQL database.
184+
name : str
185+
The exact name (optionally schema-qualified) of the materialized view to refresh.
186+
concurrently : bool, default True
187+
If True, uses REFRESH MATERIALIZED VIEW CONCURRENTLY allowing reads during
188+
the refresh and requiring a unique index on the materialized view. If False,
189+
performs a blocking refresh.
190+
191+
Raises
192+
------
193+
sqlalchemy.exc.SQLAlchemyError
194+
Propagates any database errors encountered during execution (e.g.,
195+
insufficient privileges, missing view, lack of required unique index for
196+
CONCURRENTLY).
197+
198+
Notes
199+
-----
200+
- Using CONCURRENTLY requires the materialized view to have at least one
201+
unique index; otherwise PostgreSQL will raise an error.
202+
- The operation does not return a value; it is executed for its side effect.
203+
- Ensure the session is in a clean transactional state if you rely on
204+
consistent snapshot semantics.
205+
- This function commits no changes; it is the caller's responsibility to
206+
commit the session if needed.
207+
208+
Examples
209+
--------
210+
refresh_mat_view(session, "public.my_materialized_view")
211+
refresh_mat_view(session, "reports.daily_stats", concurrently=False)
127212
"""
128213
# since session.execute() bypasses autoflush, must manually flush in order
129214
# to include newly-created/modified objects in the refresh
130215
session.flush()
216+
131217
_con = "CONCURRENTLY " if concurrently else ""
132218
session.execute(sa.text("REFRESH MATERIALIZED VIEW " + _con + name))
133219

134220

135221
def refresh_all_mat_views(session: Session, concurrently=True):
136222
"""
137-
Refreshes all materialized views. Views are refreshed in non-deterministic order,
138-
so view definitions can't depend on each other.
223+
Refreshes all PostgreSQL materialized views visible to the given SQLAlchemy session.
224+
225+
The function inspects the current database connection for registered materialized
226+
views and issues a REFRESH MATERIALIZED VIEW command for each one using the helper
227+
function `refresh_mat_view`. After all refresh operations complete, the session
228+
is committed to persist any transactional side effects of the refresh statements.
229+
230+
Parameters
231+
----------
232+
session : sqlalchemy.orm.Session
233+
An active SQLAlchemy session bound to a PostgreSQL connection.
234+
concurrently : bool, default True
235+
If True, each materialized view is refreshed using the CONCURRENTLY option
236+
(only supported when the view has a unique index that satisfies PostgreSQL
237+
requirements). If False, a standard blocking refresh is performed.
238+
239+
Behavior
240+
--------
241+
- If inspection of the connection fails or returns no inspector, the function
242+
exits without performing any work.
243+
- Each materialized view name returned by the inspector is passed to
244+
`refresh_mat_view(session, name, concurrently)`.
245+
246+
Notes
247+
-----
248+
- Using CONCURRENTLY allows reads during refresh at the cost of requiring an
249+
appropriate unique index and potentially being slower.
250+
- Exceptions raised during individual refresh operations will propagate unless
251+
`refresh_mat_view` handles them internally; in such a case the commit will
252+
not be reached.
253+
- Ensure the session is in a clean transactional state if you rely on
254+
consistent snapshot semantics.
255+
- This function commits no changes; it is the caller's responsibility to
256+
commit the session if needed.
139257
"""
140258
inspector = sa.inspect(session.connection())
141259

src/mavedb/worker/jobs.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,42 +17,41 @@
1717
from mavedb.db.view import refresh_all_mat_views
1818
from mavedb.lib.clingen.constants import (
1919
CAR_SUBMISSION_ENDPOINT,
20+
CLIN_GEN_SUBMISSION_ENABLED,
2021
DEFAULT_LDH_SUBMISSION_BATCH_SIZE,
2122
LDH_SUBMISSION_ENDPOINT,
2223
LINKED_DATA_RETRY_THRESHOLD,
23-
CLIN_GEN_SUBMISSION_ENABLED,
2424
)
2525
from mavedb.lib.clingen.content_constructors import construct_ldh_submission
2626
from mavedb.lib.clingen.services import (
2727
ClinGenAlleleRegistryService,
2828
ClinGenLdhService,
29-
get_clingen_variation,
3029
clingen_allele_id_from_ldh_variation,
3130
get_allele_registry_associations,
31+
get_clingen_variation,
3232
)
3333
from mavedb.lib.exceptions import (
34-
MappingEnqueueError,
35-
SubmissionEnqueueError,
3634
LinkingEnqueueError,
35+
MappingEnqueueError,
3736
NonexistentMappingReferenceError,
3837
NonexistentMappingResultsError,
38+
SubmissionEnqueueError,
3939
UniProtIDMappingEnqueueError,
4040
UniProtPollingEnqueueError,
4141
)
4242
from mavedb.lib.gnomad import gnomad_variant_data_for_caids, link_gnomad_variants_to_mapped_variants
4343
from mavedb.lib.logging.context import format_raised_exception_info_as_dict
44-
from mavedb.lib.mapping import ANNOTATION_LAYERS
44+
from mavedb.lib.mapping import ANNOTATION_LAYERS, extract_ids_from_post_mapped_metadata
4545
from mavedb.lib.score_sets import (
46-
get_hgvs_from_post_mapped,
4746
columns_for_dataset,
4847
create_variants,
4948
create_variants_data,
49+
get_hgvs_from_post_mapped,
5050
)
51-
from mavedb.lib.slack import send_slack_error, send_slack_message, log_and_send_slack_message
51+
from mavedb.lib.slack import log_and_send_slack_message, send_slack_error, send_slack_message
52+
from mavedb.lib.uniprot.constants import UNIPROT_ID_MAPPING_ENABLED
5253
from mavedb.lib.uniprot.id_mapping import UniProtIDMappingAPI
5354
from mavedb.lib.uniprot.utils import infer_db_name_from_sequence_accession
54-
from mavedb.lib.uniprot.constants import UNIPROT_ID_MAPPING_ENABLED
55-
from mavedb.lib.mapping import extract_ids_from_post_mapped_metadata
5655
from mavedb.lib.validation.dataframe.dataframe import (
5756
validate_and_standardize_dataframe_pair,
5857
)
@@ -790,6 +789,7 @@ async def refresh_materialized_views(ctx: dict):
790789
logging_context = setup_job_state(ctx, None, None, None)
791790
logger.debug(msg="Began refresh materialized views.", extra=logging_context)
792791
refresh_all_mat_views(ctx["db"])
792+
ctx["db"].commit()
793793
logger.debug(msg="Done refreshing materialized views.", extra=logging_context)
794794
return {"success": True}
795795

@@ -798,7 +798,8 @@ async def refresh_published_variants_view(ctx: dict, correlation_id: str):
798798
logging_context = setup_job_state(ctx, None, None, correlation_id)
799799
logger.debug(msg="Began refresh of published variants materialized view.", extra=logging_context)
800800
PublishedVariantsMV.refresh(ctx["db"])
801-
logger.debug(msg="Done refreshing of published variants materialized view.", extra=logging_context)
801+
ctx["db"].commit()
802+
logger.debug(msg="Done refreshing published variants materialized view.", extra=logging_context)
802803
return {"success": True}
803804

804805

0 commit comments

Comments
 (0)