|
| 1 | +from typing import Any |
| 2 | + |
| 3 | +from sqlalchemy import cast, func |
| 4 | +from sqlalchemy.dialects.postgresql import JSONB |
| 5 | +from sqlalchemy.exc import DataError, ProgrammingError |
| 6 | +from sqlalchemy.ext.asyncio import AsyncSession |
| 7 | +from sqlalchemy.orm import Session |
| 8 | +from structlog import get_logger |
| 9 | + |
| 10 | +from app.crud import CRUDBase |
| 11 | +from app.models import CMSContent, ContentType, User |
| 12 | + |
| 13 | +logger = get_logger() |
| 14 | + |
| 15 | + |
| 16 | +class CRUDContent(CRUDBase[CMSContent, Any, Any]): |
| 17 | + def get_all_with_optional_filters_query( |
| 18 | + self, |
| 19 | + db: Session, |
| 20 | + content_type: ContentType | None = None, |
| 21 | + query_string: str | None = None, |
| 22 | + user: User | None = None, |
| 23 | + jsonpath_match: str = None, |
| 24 | + ): |
| 25 | + query = self.get_all_query(db=db) |
| 26 | + |
| 27 | + if content_type is not None: |
| 28 | + query = query.where(CMSContent.type == content_type) |
| 29 | + |
| 30 | + if user is not None: |
| 31 | + query = query.where(CMSContent.user == user) |
| 32 | + |
| 33 | + if jsonpath_match is not None: |
| 34 | + # Apply the jsonpath filter to the content field |
| 35 | + query = query.where( |
| 36 | + func.jsonb_path_match( |
| 37 | + cast(CMSContent.content, JSONB), jsonpath_match |
| 38 | + ).is_(True) |
| 39 | + ) |
| 40 | + |
| 41 | + return query |
| 42 | + |
| 43 | + async def aget_all_with_optional_filters( |
| 44 | + self, |
| 45 | + db: AsyncSession, |
| 46 | + content_type: ContentType | None = None, |
| 47 | + query_string: str | None = None, |
| 48 | + user: User | None = None, |
| 49 | + jsonpath_match: str | None = None, |
| 50 | + skip: int = 0, |
| 51 | + limit: int = 100, |
| 52 | + ): |
| 53 | + optional_filters = { |
| 54 | + "query_string": query_string, |
| 55 | + "content_type": content_type, |
| 56 | + "user": user, |
| 57 | + "jsonpath_match": jsonpath_match, |
| 58 | + } |
| 59 | + logger.debug("Querying digital content", **optional_filters) |
| 60 | + |
| 61 | + query = self.apply_pagination( |
| 62 | + self.get_all_with_optional_filters_query(db=db, **optional_filters), |
| 63 | + skip=skip, |
| 64 | + limit=limit, |
| 65 | + ) |
| 66 | + try: |
| 67 | + return (await db.scalars(query)).all() |
| 68 | + except (ProgrammingError, DataError) as e: |
| 69 | + logger.error("Error querying events", error=e, **optional_filters) |
| 70 | + raise ValueError("Problem filtering content") |
| 71 | + |
| 72 | + |
| 73 | +content = CRUDContent(CMSContent) |
0 commit comments