Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
SEE https://www.postgresql.org/docs/current/errcodes-appendix.html
"""

import warnings

# NOTE: psycopg2.errors are created dynamically
# pylint: disable=no-name-in-module
from psycopg2 import (
Expand All @@ -46,6 +48,19 @@

assert issubclass(UniqueViolation, IntegrityError) # nosec


warnings.warn(
(
"DEPRECATED: The aiopg DBAPI exceptions in this module are no longer used. "
"Please use exceptions from the `sqlalchemy.exc` module instead. "
"See https://docs.sqlalchemy.org/en/21/core/exceptions.html for details. "
"This change is part of the migration to SQLAlchemy async support with asyncpg. "
"See migration issue: https://github.com/ITISFoundation/osparc-simcore/issues/4529"
),
DeprecationWarning,
stacklevel=2,
)

__all__: tuple[str, ...] = (
"CheckViolation",
"DatabaseError",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@ def _do_assert_error(

for msg in list_expected_msg:
assert any(
re.search(msg, e) for e in details
msg == e or re.search(msg, e) for e in details
), f"could not find {msg=} in {details=}"
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import datetime

import simcore_postgres_database.aiopg_errors as db_errors
import sqlalchemy as sa
from arrow import utcnow
from models_library.api_schemas_payments.errors import (
Expand All @@ -16,6 +15,7 @@
InitPromptAckFlowState,
payments_methods,
)
from sqlalchemy.exc import IntegrityError

from ..models.db import PaymentsMethodsDB
from .base import BaseRepository
Expand All @@ -42,7 +42,7 @@ async def insert_init_payment_method(
)
return payment_method_id

except db_errors.UniqueViolation as err:
except IntegrityError as err:
raise PaymentMethodUniqueViolationError(
payment_method_id=payment_method_id
) from err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
from models_library.users import UserID
from models_library.wallets import WalletID
from pydantic import HttpUrl, PositiveInt, TypeAdapter
from simcore_postgres_database import aiopg_errors as pg_errors
from simcore_postgres_database.models.payments_transactions import (
PaymentTransactionState,
payments_transactions,
)
from sqlalchemy.exc import IntegrityError

from ..models.db import PaymentsTransactionsDB
from .base import BaseRepository
Expand Down Expand Up @@ -58,7 +58,7 @@ async def insert_init_payment_transaction(
)
)
return payment_id
except pg_errors.UniqueViolation as exc:
except IntegrityError as exc:
raise PaymentAlreadyExistsError(payment_id=f"{payment_id}") from exc

async def update_ack_payment_transaction(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import logging

from asyncpg.exceptions import PostgresError
from aws_library.s3 import S3AccessError, S3KeyNotFoundError
from fastapi import FastAPI, status
from servicelib.fastapi.http_error import (
make_http_error_handler_for_exception,
set_app_default_http_error_handlers,
)
from sqlalchemy.exc import DBAPIError

from ..modules.datcore_adapter.datcore_adapter_exceptions import (
DatcoreAdapterFileNotFoundError,
Expand Down Expand Up @@ -70,7 +70,7 @@ def set_exception_handlers(app: FastAPI) -> None:
),
)
for exc_3rd_party in (
PostgresError,
DBAPIError,
S3AccessError,
):
app.add_exception_handler(
Expand Down
9 changes: 7 additions & 2 deletions services/storage/tests/unit/test_utils_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import httpx
import pytest
from asyncpg import PostgresError
from aws_library.s3._errors import S3AccessError, S3KeyNotFoundError
from fastapi import FastAPI, HTTPException, status
from fastapi.exceptions import RequestValidationError
Expand All @@ -28,6 +27,7 @@
DatcoreAdapterTimeoutError,
)
from simcore_service_storage.modules.db.access_layer import InvalidFileIdentifierError
from sqlalchemy.exc import DBAPIError


@pytest.fixture
Expand Down Expand Up @@ -85,7 +85,12 @@ async def client(initialized_app: FastAPI) -> AsyncIterator[AsyncClient]:
status.HTTP_422_UNPROCESSABLE_ENTITY,
),
(
PostgresError("pytest postgres error"),
DBAPIError.instance(
statement="pytest statement",
params={},
orig=Exception("pytest original"),
dbapi_base_err=Exception,
),
status.HTTP_503_SERVICE_UNAVAILABLE,
),
(
Expand Down
11 changes: 11 additions & 0 deletions services/web/server/src/simcore_service_webserver/db/_aiopg.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import logging
import warnings
from collections.abc import AsyncIterator
from typing import Any, cast

Expand All @@ -29,6 +30,16 @@

_logger = logging.getLogger(__name__)

warnings.warn(
(
"simcore_service_webserver.db._aiopg is deprecated and will be removed in a future release. "
"Please use simcore_service_webserver.db._asyncpg instead. "
"See migration details: https://github.com/ITISFoundation/osparc-simcore/issues/4529"
),
DeprecationWarning,
stacklevel=2,
)


@retry(**PostgresRetryPolicyUponInitialization(_logger).kwargs)
async def _ensure_pg_ready(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
)
def setup_db(app: web.Application):

# ensures keys exist
# ensures keys exist DEPRECATED
app[APP_AIOPG_ENGINE_KEY] = None
assert get_database_engine_legacy(app) is None # nosec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from models_library.groups import Group, GroupID, GroupType
from models_library.projects import ProjectID
from models_library.users import UserID
from simcore_postgres_database.aiopg_errors import DatabaseError
from simcore_postgres_database.aiopg_errors import DatabaseError as AiopgDatabaseError
from sqlalchemy.exc import DatabaseError

from ..groups.api import get_group_by_gid
from ..projects._projects_repository_legacy import (
Expand Down Expand Up @@ -187,8 +188,8 @@ async def replace_current_owner(
)

except (
AiopgDatabaseError,
DatabaseError,
asyncpg.exceptions.PostgresError,
ProjectNotFoundError,
UserNotFoundError,
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
from models_library.products import ProductName
from models_library.users import UserID
from servicelib.aiohttp.db_asyncpg_engine import get_async_engine
from simcore_postgres_database.aiopg_errors import DatabaseError as AiopgDatabaseError
from sqlalchemy.exc import DatabaseError as SQLAlchemyDatabaseError
from sqlalchemy.exc import DatabaseError

from . import _authz_repository
from ._authz_access_model import (
Expand Down Expand Up @@ -48,11 +47,12 @@
def _handle_exceptions_as_503():
try:
yield
except (AiopgDatabaseError, SQLAlchemyDatabaseError) as err:
except DatabaseError as err:
_logger.exception(
**create_troubleshooting_log_kwargs(
"Auth unavailable due to database error",
f"{MSG_AUTH_NOT_AVAILABLE}: Auth unavailable due to database error.",
error=err,
error_context={"origin": str(err.orig) if err.orig else None},
tip="Check database connection",
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
from dataclasses import dataclass

import sqlalchemy as sa
from aiohttp import web
from aiopg.sa.connection import SAConnection
from aiopg.sa.engine import Engine
from models_library.groups import EVERYONE_GROUP_ID
from models_library.services import ServiceKey, ServiceVersion
from models_library.services_constants import (
Expand All @@ -22,11 +19,12 @@
from simcore_postgres_database.models.services_consume_filetypes import (
services_consume_filetypes,
)
from simcore_postgres_database.utils_repos import pass_or_acquire_connection
from simcore_postgres_database.utils_services import create_select_latest_services_query
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine

from ..db.plugin import get_database_engine_legacy
from ._errors import ServiceNotFoundError
from .settings import StudiesDispatcherSettings, get_plugin_settings
from .settings import StudiesDispatcherSettings

LARGEST_PAGE_SIZE = 1000

Expand All @@ -44,22 +42,28 @@ class ServiceMetaData:
file_extensions: list[str]


async def _get_service_filetypes(conn: SAConnection) -> dict[ServiceKey, list[str]]:
async def _get_service_filetypes(
engine: AsyncEngine,
connection: AsyncConnection | None = None,
) -> dict[ServiceKey, list[str]]:
query = sa.select(
services_consume_filetypes.c.service_key,
sa.func.array_agg(
sa.func.distinct(services_consume_filetypes.c.filetype)
).label("list_of_file_types"),
).group_by(services_consume_filetypes.c.service_key)

result = await conn.execute(query)
rows = await result.fetchall()
async with pass_or_acquire_connection(engine, connection) as conn:
result = await conn.execute(query)
rows = result.fetchall()

return {row.service_key: row.list_of_file_types for row in rows}
return {row.service_key: row.list_of_file_types for row in rows}


async def iter_latest_product_services(
app: web.Application,
settings: StudiesDispatcherSettings,
engine: AsyncEngine,
connection: AsyncConnection | None = None,
*,
product_name: str,
page_number: PositiveInt = 1, # 1-based
Expand All @@ -68,9 +72,6 @@ async def iter_latest_product_services(
assert page_number >= 1 # nosec
assert ((page_number - 1) * page_size) >= 0 # nosec

engine: Engine = get_database_engine_legacy(app)
settings: StudiesDispatcherSettings = get_plugin_settings(app)

# Select query for latest version of the service
latest_services = create_select_latest_services_query().alias("latest_services")

Expand Down Expand Up @@ -109,10 +110,10 @@ async def iter_latest_product_services(
# pagination
query = query.limit(page_size).offset((page_number - 1) * page_size)

async with engine.acquire() as conn:
service_filetypes = await _get_service_filetypes(conn)
async with pass_or_acquire_connection(engine, connection) as conn:
service_filetypes = await _get_service_filetypes(engine, conn)

async for row in await conn.execute(query):
async for row in await conn.stream(query):
yield ServiceMetaData(
key=row.key,
version=row.version,
Expand All @@ -135,14 +136,13 @@ class ValidService:

@log_decorator(_logger, level=logging.DEBUG)
async def validate_requested_service(
app: web.Application,
engine: AsyncEngine,
connection: AsyncConnection | None = None,
*,
service_key: ServiceKey,
service_version: ServiceVersion,
) -> ValidService:
engine: Engine = get_database_engine_legacy(app)

async with engine.acquire() as conn:
async with pass_or_acquire_connection(engine, connection) as conn:
query = sa.select(
services_meta_data.c.name,
services_meta_data.c.key,
Expand All @@ -153,7 +153,7 @@ async def validate_requested_service(
)

result = await conn.execute(query)
row = await result.fetchone()
row = result.one_or_none()

if row is None:
raise ServiceNotFoundError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
)

from ...._meta import API_VTAG
from ....db.plugin import get_asyncpg_engine
from ....products import products_web
from ....utils_aiohttp import envelope_json_response
from ... import _service
from ..._catalog import iter_latest_product_services
from ...settings import get_plugin_settings
from .nih_schemas import ServiceGet, Viewer

_logger = logging.getLogger(__name__)
Expand All @@ -26,9 +28,12 @@ async def list_latest_services(request: Request):
"""Returns a list latest version of services"""
product_name = products_web.get_product_name(request)

plugin_settings = get_plugin_settings(request.app)
engine = get_asyncpg_engine(request.app)

services = []
async for service_data in iter_latest_product_services(
request.app, product_name=product_name
plugin_settings, engine, product_name=product_name
):
try:
service = ServiceGet.create(service_data, request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from models_library.projects_nodes_io import NodeID
from servicelib.aiohttp.requests_validation import parse_request_query_parameters_as

from ....db.plugin import get_asyncpg_engine
from ....dynamic_scheduler import api as dynamic_scheduler_service
from ....products import products_web
from ....utils_aiohttp import create_redirect_to_page_response, get_api_base_url
Expand Down Expand Up @@ -133,7 +134,7 @@ async def get_redirection_to_viewer(request: web.Request):
service_params_ = query_params

valid_service: ValidService = await validate_requested_service(
app=request.app,
get_asyncpg_engine(request.app),
service_key=service_params_.viewer_key,
service_version=service_params_.viewer_version,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import pytest
from aiocache.base import BaseCache
from aiohttp import web
from psycopg2 import DatabaseError
from pytest_mock import MockerFixture
from simcore_service_webserver.projects.models import ProjectDict
from simcore_service_webserver.security._authz_access_model import (
Expand All @@ -30,6 +29,7 @@
)
from simcore_service_webserver.security._authz_policy import AuthorizationPolicy
from simcore_service_webserver.security._authz_repository import ActiveUserIdAndRole
from sqlalchemy.exc import DatabaseError


@pytest.fixture
Expand Down Expand Up @@ -277,7 +277,9 @@ async def _fake_db(engine, email):
assert engine == "FAKE-ENGINE"

if "db-failure" in email:
raise DatabaseError
raise DatabaseError(
statement="SELECT 1", params=None, orig=Exception("fake db error")
)

# inactive user or not found
return copy.deepcopy(users_db.get(email))
Expand Down
Loading
Loading