Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 123 additions & 5 deletions src/mavedb/db/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

import sqlalchemy as sa
from sqlalchemy.ext import compiler
from sqlalchemy.schema import DDLElement, MetaData
from sqlalchemy.orm import Session
from sqlalchemy.schema import DDLElement, MetaData

from mavedb.db.base import Base

Expand All @@ -32,7 +32,53 @@ class MaterializedView(Base):

@classmethod
def refresh(cls, connection, concurrently=True):
"""Refresh this materialized view."""
"""
Refresh the underlying materialized view for this ORM-mapped class.

This class method delegates to `refresh_mat_view` to issue a database
REFRESH MATERIALIZED VIEW (optionally CONCURRENTLY) statement for the
materialized view backing the current model (`cls.__table__.fullname`).

Parameters
---------
connection : sqlalchemy.engine.Connection | sqlalchemy.orm.Session
An active SQLAlchemy connection or session bound to the target database.
concurrently : bool, default True
If True, performs a concurrent refresh (REFRESH MATERIALIZED VIEW CONCURRENTLY),
allowing reads during the refresh when the database backend supports it.
If False, performs a blocking refresh.

Returns
-------
None

Raises
------
sqlalchemy.exc.DBAPIError
If the database reports an error while refreshing the materialized view.
sqlalchemy.exc.OperationalError
For operational issues such as locks or insufficient privileges.
ValueError
If the connection provided is not a valid SQLAlchemy connection/session.

Notes
-----
- A concurrent refresh typically requires the materialized view to have a unique
index matching all rows; otherwise the database may reject the operation.
- This operation does not return a value; it is executed for its side effect.
- Ensure the connection/session is in a clean transactional state if you rely on
consistent snapshot semantics.
- This function commits no changes; it is the caller's responsibility to
commit the session if needed.

Examples
--------
# Refresh with concurrent mode (default)
MyMaterializedView.refresh(connection)

# Perform a blocking refresh
MyMaterializedView.refresh(connection, concurrently=False)
"""
refresh_mat_view(connection, cls.__table__.fullname, concurrently)


Expand Down Expand Up @@ -123,19 +169,91 @@ class MyView(Base):

def refresh_mat_view(session: Session, name: str, concurrently=True):
"""
Refreshes a single materialized view, given by `name`.
Refresh a PostgreSQL materialized view within the current SQLAlchemy session.

This helper issues a REFRESH MATERIALIZED VIEW statement for the specified
materialized view name. It first explicitly flushes the session because
session.execute() bypasses SQLAlchemy's autoflush mechanism; without the flush,
pending changes (e.g., newly inserted/updated rows that the view depends on)
might not be reflected in the refreshed view.

Parameters
----------
session : sqlalchemy.orm.Session
An active SQLAlchemy session bound to a PostgreSQL database.
name : str
The exact name (optionally schema-qualified) of the materialized view to refresh.
concurrently : bool, default True
If True, uses REFRESH MATERIALIZED VIEW CONCURRENTLY allowing reads during
the refresh and requiring a unique index on the materialized view. If False,
performs a blocking refresh.

Raises
------
sqlalchemy.exc.SQLAlchemyError
Propagates any database errors encountered during execution (e.g.,
insufficient privileges, missing view, lack of required unique index for
CONCURRENTLY).

Notes
-----
- Using CONCURRENTLY requires the materialized view to have at least one
unique index; otherwise PostgreSQL will raise an error.
- The operation does not return a value; it is executed for its side effect.
- Ensure the session is in a clean transactional state if you rely on
consistent snapshot semantics.
- This function commits no changes; it is the caller's responsibility to
commit the session if needed.

Examples
--------
refresh_mat_view(session, "public.my_materialized_view")
refresh_mat_view(session, "reports.daily_stats", concurrently=False)
"""
# since session.execute() bypasses autoflush, must manually flush in order
# to include newly-created/modified objects in the refresh
session.flush()

_con = "CONCURRENTLY " if concurrently else ""
session.execute(sa.text("REFRESH MATERIALIZED VIEW " + _con + name))


def refresh_all_mat_views(session: Session, concurrently=True):
"""
Refreshes all materialized views. Views are refreshed in non-deterministic order,
so view definitions can't depend on each other.
Refreshes all PostgreSQL materialized views visible to the given SQLAlchemy session.

The function inspects the current database connection for registered materialized
views and issues a REFRESH MATERIALIZED VIEW command for each one using the helper
function `refresh_mat_view`. After all refresh operations complete, the session
is committed to persist any transactional side effects of the refresh statements.

Parameters
----------
session : sqlalchemy.orm.Session
An active SQLAlchemy session bound to a PostgreSQL connection.
concurrently : bool, default True
If True, each materialized view is refreshed using the CONCURRENTLY option
(only supported when the view has a unique index that satisfies PostgreSQL
requirements). If False, a standard blocking refresh is performed.

Behavior
--------
- If inspection of the connection fails or returns no inspector, the function
exits without performing any work.
- Each materialized view name returned by the inspector is passed to
`refresh_mat_view(session, name, concurrently)`.

Notes
-----
- Using CONCURRENTLY allows reads during refresh at the cost of requiring an
appropriate unique index and potentially being slower.
- Exceptions raised during individual refresh operations will propagate unless
`refresh_mat_view` handles them internally; in such a case the commit will
not be reached.
- Ensure the session is in a clean transactional state if you rely on
consistent snapshot semantics.
- This function commits no changes; it is the caller's responsibility to
commit the session if needed.
"""
inspector = sa.inspect(session.connection())

Expand Down
21 changes: 11 additions & 10 deletions src/mavedb/worker/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,41 @@
from mavedb.db.view import refresh_all_mat_views
from mavedb.lib.clingen.constants import (
CAR_SUBMISSION_ENDPOINT,
CLIN_GEN_SUBMISSION_ENABLED,
DEFAULT_LDH_SUBMISSION_BATCH_SIZE,
LDH_SUBMISSION_ENDPOINT,
LINKED_DATA_RETRY_THRESHOLD,
CLIN_GEN_SUBMISSION_ENABLED,
)
from mavedb.lib.clingen.content_constructors import construct_ldh_submission
from mavedb.lib.clingen.services import (
ClinGenAlleleRegistryService,
ClinGenLdhService,
get_clingen_variation,
clingen_allele_id_from_ldh_variation,
get_allele_registry_associations,
get_clingen_variation,
)
from mavedb.lib.exceptions import (
MappingEnqueueError,
SubmissionEnqueueError,
LinkingEnqueueError,
MappingEnqueueError,
NonexistentMappingReferenceError,
NonexistentMappingResultsError,
SubmissionEnqueueError,
UniProtIDMappingEnqueueError,
UniProtPollingEnqueueError,
)
from mavedb.lib.gnomad import gnomad_variant_data_for_caids, link_gnomad_variants_to_mapped_variants
from mavedb.lib.logging.context import format_raised_exception_info_as_dict
from mavedb.lib.mapping import ANNOTATION_LAYERS
from mavedb.lib.mapping import ANNOTATION_LAYERS, extract_ids_from_post_mapped_metadata
from mavedb.lib.score_sets import (
get_hgvs_from_post_mapped,
columns_for_dataset,
create_variants,
create_variants_data,
get_hgvs_from_post_mapped,
)
from mavedb.lib.slack import send_slack_error, send_slack_message, log_and_send_slack_message
from mavedb.lib.slack import log_and_send_slack_message, send_slack_error, send_slack_message
from mavedb.lib.uniprot.constants import UNIPROT_ID_MAPPING_ENABLED
from mavedb.lib.uniprot.id_mapping import UniProtIDMappingAPI
from mavedb.lib.uniprot.utils import infer_db_name_from_sequence_accession
from mavedb.lib.uniprot.constants import UNIPROT_ID_MAPPING_ENABLED
from mavedb.lib.mapping import extract_ids_from_post_mapped_metadata
from mavedb.lib.validation.dataframe.dataframe import (
validate_and_standardize_dataframe_pair,
)
Expand Down Expand Up @@ -790,6 +789,7 @@ async def refresh_materialized_views(ctx: dict):
logging_context = setup_job_state(ctx, None, None, None)
logger.debug(msg="Began refresh materialized views.", extra=logging_context)
refresh_all_mat_views(ctx["db"])
ctx["db"].commit()
logger.debug(msg="Done refreshing materialized views.", extra=logging_context)
return {"success": True}

Expand All @@ -798,7 +798,8 @@ async def refresh_published_variants_view(ctx: dict, correlation_id: str):
logging_context = setup_job_state(ctx, None, None, correlation_id)
logger.debug(msg="Began refresh of published variants materialized view.", extra=logging_context)
PublishedVariantsMV.refresh(ctx["db"])
logger.debug(msg="Done refreshing of published variants materialized view.", extra=logging_context)
ctx["db"].commit()
logger.debug(msg="Done refreshing published variants materialized view.", extra=logging_context)
return {"success": True}


Expand Down