diff --git a/src/mavedb/db/view.py b/src/mavedb/db/view.py index 64655686..f2ba899a 100644 --- a/src/mavedb/db/view.py +++ b/src/mavedb/db/view.py @@ -6,8 +6,8 @@ import sqlalchemy as sa from sqlalchemy.ext import compiler -from sqlalchemy.schema import DDLElement, MetaData from sqlalchemy.orm import Session +from sqlalchemy.schema import DDLElement, MetaData from mavedb.db.base import Base @@ -32,7 +32,53 @@ class MaterializedView(Base): @classmethod def refresh(cls, connection, concurrently=True): - """Refresh this materialized view.""" + """ + Refresh the underlying materialized view for this ORM-mapped class. + + This class method delegates to `refresh_mat_view` to issue a database + REFRESH MATERIALIZED VIEW (optionally CONCURRENTLY) statement for the + materialized view backing the current model (`cls.__table__.fullname`). + + Parameters + --------- + connection : sqlalchemy.engine.Connection | sqlalchemy.orm.Session + An active SQLAlchemy connection or session bound to the target database. + concurrently : bool, default True + If True, performs a concurrent refresh (REFRESH MATERIALIZED VIEW CONCURRENTLY), + allowing reads during the refresh when the database backend supports it. + If False, performs a blocking refresh. + + Returns + ------- + None + + Raises + ------ + sqlalchemy.exc.DBAPIError + If the database reports an error while refreshing the materialized view. + sqlalchemy.exc.OperationalError + For operational issues such as locks or insufficient privileges. + ValueError + If the connection provided is not a valid SQLAlchemy connection/session. + + Notes + ----- + - A concurrent refresh typically requires the materialized view to have a unique + index matching all rows; otherwise the database may reject the operation. + - This operation does not return a value; it is executed for its side effect. + - Ensure the connection/session is in a clean transactional state if you rely on + consistent snapshot semantics. + - This function commits no changes; it is the caller's responsibility to + commit the session if needed. + + Examples + -------- + # Refresh with concurrent mode (default) + MyMaterializedView.refresh(connection) + + # Perform a blocking refresh + MyMaterializedView.refresh(connection, concurrently=False) + """ refresh_mat_view(connection, cls.__table__.fullname, concurrently) @@ -123,19 +169,91 @@ class MyView(Base): def refresh_mat_view(session: Session, name: str, concurrently=True): """ - Refreshes a single materialized view, given by `name`. + Refresh a PostgreSQL materialized view within the current SQLAlchemy session. + + This helper issues a REFRESH MATERIALIZED VIEW statement for the specified + materialized view name. It first explicitly flushes the session because + session.execute() bypasses SQLAlchemy's autoflush mechanism; without the flush, + pending changes (e.g., newly inserted/updated rows that the view depends on) + might not be reflected in the refreshed view. + + Parameters + ---------- + session : sqlalchemy.orm.Session + An active SQLAlchemy session bound to a PostgreSQL database. + name : str + The exact name (optionally schema-qualified) of the materialized view to refresh. + concurrently : bool, default True + If True, uses REFRESH MATERIALIZED VIEW CONCURRENTLY allowing reads during + the refresh and requiring a unique index on the materialized view. If False, + performs a blocking refresh. + + Raises + ------ + sqlalchemy.exc.SQLAlchemyError + Propagates any database errors encountered during execution (e.g., + insufficient privileges, missing view, lack of required unique index for + CONCURRENTLY). + + Notes + ----- + - Using CONCURRENTLY requires the materialized view to have at least one + unique index; otherwise PostgreSQL will raise an error. + - The operation does not return a value; it is executed for its side effect. + - Ensure the session is in a clean transactional state if you rely on + consistent snapshot semantics. + - This function commits no changes; it is the caller's responsibility to + commit the session if needed. + + Examples + -------- + refresh_mat_view(session, "public.my_materialized_view") + refresh_mat_view(session, "reports.daily_stats", concurrently=False) """ # since session.execute() bypasses autoflush, must manually flush in order # to include newly-created/modified objects in the refresh session.flush() + _con = "CONCURRENTLY " if concurrently else "" session.execute(sa.text("REFRESH MATERIALIZED VIEW " + _con + name)) def refresh_all_mat_views(session: Session, concurrently=True): """ - Refreshes all materialized views. Views are refreshed in non-deterministic order, - so view definitions can't depend on each other. + Refreshes all PostgreSQL materialized views visible to the given SQLAlchemy session. + + The function inspects the current database connection for registered materialized + views and issues a REFRESH MATERIALIZED VIEW command for each one using the helper + function `refresh_mat_view`. After all refresh operations complete, the session + is committed to persist any transactional side effects of the refresh statements. + + Parameters + ---------- + session : sqlalchemy.orm.Session + An active SQLAlchemy session bound to a PostgreSQL connection. + concurrently : bool, default True + If True, each materialized view is refreshed using the CONCURRENTLY option + (only supported when the view has a unique index that satisfies PostgreSQL + requirements). If False, a standard blocking refresh is performed. + + Behavior + -------- + - If inspection of the connection fails or returns no inspector, the function + exits without performing any work. + - Each materialized view name returned by the inspector is passed to + `refresh_mat_view(session, name, concurrently)`. + + Notes + ----- + - Using CONCURRENTLY allows reads during refresh at the cost of requiring an + appropriate unique index and potentially being slower. + - Exceptions raised during individual refresh operations will propagate unless + `refresh_mat_view` handles them internally; in such a case the commit will + not be reached. + - Ensure the session is in a clean transactional state if you rely on + consistent snapshot semantics. + - This function commits no changes; it is the caller's responsibility to + commit the session if needed. """ inspector = sa.inspect(session.connection()) diff --git a/src/mavedb/worker/jobs.py b/src/mavedb/worker/jobs.py index 9020d947..c119a360 100644 --- a/src/mavedb/worker/jobs.py +++ b/src/mavedb/worker/jobs.py @@ -17,42 +17,41 @@ from mavedb.db.view import refresh_all_mat_views from mavedb.lib.clingen.constants import ( CAR_SUBMISSION_ENDPOINT, + CLIN_GEN_SUBMISSION_ENABLED, DEFAULT_LDH_SUBMISSION_BATCH_SIZE, LDH_SUBMISSION_ENDPOINT, LINKED_DATA_RETRY_THRESHOLD, - CLIN_GEN_SUBMISSION_ENABLED, ) from mavedb.lib.clingen.content_constructors import construct_ldh_submission from mavedb.lib.clingen.services import ( ClinGenAlleleRegistryService, ClinGenLdhService, - get_clingen_variation, clingen_allele_id_from_ldh_variation, get_allele_registry_associations, + get_clingen_variation, ) from mavedb.lib.exceptions import ( - MappingEnqueueError, - SubmissionEnqueueError, LinkingEnqueueError, + MappingEnqueueError, NonexistentMappingReferenceError, NonexistentMappingResultsError, + SubmissionEnqueueError, UniProtIDMappingEnqueueError, UniProtPollingEnqueueError, ) from mavedb.lib.gnomad import gnomad_variant_data_for_caids, link_gnomad_variants_to_mapped_variants from mavedb.lib.logging.context import format_raised_exception_info_as_dict -from mavedb.lib.mapping import ANNOTATION_LAYERS +from mavedb.lib.mapping import ANNOTATION_LAYERS, extract_ids_from_post_mapped_metadata from mavedb.lib.score_sets import ( - get_hgvs_from_post_mapped, columns_for_dataset, create_variants, create_variants_data, + get_hgvs_from_post_mapped, ) -from mavedb.lib.slack import send_slack_error, send_slack_message, log_and_send_slack_message +from mavedb.lib.slack import log_and_send_slack_message, send_slack_error, send_slack_message +from mavedb.lib.uniprot.constants import UNIPROT_ID_MAPPING_ENABLED from mavedb.lib.uniprot.id_mapping import UniProtIDMappingAPI from mavedb.lib.uniprot.utils import infer_db_name_from_sequence_accession -from mavedb.lib.uniprot.constants import UNIPROT_ID_MAPPING_ENABLED -from mavedb.lib.mapping import extract_ids_from_post_mapped_metadata from mavedb.lib.validation.dataframe.dataframe import ( validate_and_standardize_dataframe_pair, ) @@ -790,6 +789,7 @@ async def refresh_materialized_views(ctx: dict): logging_context = setup_job_state(ctx, None, None, None) logger.debug(msg="Began refresh materialized views.", extra=logging_context) refresh_all_mat_views(ctx["db"]) + ctx["db"].commit() logger.debug(msg="Done refreshing materialized views.", extra=logging_context) return {"success": True} @@ -798,7 +798,8 @@ async def refresh_published_variants_view(ctx: dict, correlation_id: str): logging_context = setup_job_state(ctx, None, None, correlation_id) logger.debug(msg="Began refresh of published variants materialized view.", extra=logging_context) PublishedVariantsMV.refresh(ctx["db"]) - logger.debug(msg="Done refreshing of published variants materialized view.", extra=logging_context) + ctx["db"].commit() + logger.debug(msg="Done refreshing published variants materialized view.", extra=logging_context) return {"success": True}