Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions e2e/pages.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
https://playwright.dev/docs/pom
"""

import typing as t

from playwright.sync_api import Page

from piccolo_admin.example.app import OrderBy
Expand Down Expand Up @@ -89,7 +87,7 @@ def get_column_count(self) -> int:
"""
return self.column_selects.count()

def populate_form(self, order_by_list: t.List[OrderBy]):
def populate_form(self, order_by_list: list[OrderBy]):
"""
Make sure we have enough column select elements, and populate them.
"""
Expand Down
159 changes: 79 additions & 80 deletions piccolo_admin/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
import json
import logging
import os
import typing as t
from collections.abc import Callable, Coroutine, Sequence
from dataclasses import dataclass
from datetime import timedelta
from functools import partial
from typing import Any, Optional, TypeVar, Union

import typing_extensions
from fastapi import FastAPI, File, Form, UploadFile
Expand Down Expand Up @@ -100,15 +101,15 @@ class GroupItem(BaseModel):


class GroupedTableNamesResponseModel(BaseModel):
grouped: t.Dict[str, t.List[str]] = Field(default_factory=dict)
ungrouped: t.List[str] = Field(default_factory=list)
grouped: dict[str, list[str]] = Field(default_factory=dict)
ungrouped: list[str] = Field(default_factory=list)


class GroupedFormsResponseModel(BaseModel):
grouped: t.Dict[str, t.List[FormConfigResponseModel]] = Field(
grouped: dict[str, list[FormConfigResponseModel]] = Field(
default_factory=dict
)
ungrouped: t.List[FormConfigResponseModel] = Field(default_factory=list)
ungrouped: list[FormConfigResponseModel] = Field(default_factory=list)


@dataclass
Expand Down Expand Up @@ -210,20 +211,20 @@ async def manager_only(

"""

table_class: t.Type[Table]
visible_columns: t.Optional[t.List[Column]] = None
exclude_visible_columns: t.Optional[t.List[Column]] = None
visible_filters: t.Optional[t.List[Column]] = None
exclude_visible_filters: t.Optional[t.List[Column]] = None
rich_text_columns: t.Optional[t.List[Column]] = None
hooks: t.Optional[t.List[Hook]] = None
media_storage: t.Optional[t.Sequence[MediaStorage]] = None
validators: t.Optional[Validators] = None
menu_group: t.Optional[str] = None
link_column: t.Optional[Column] = None
order_by: t.Optional[t.List[OrderBy]] = None
time_resolution: t.Optional[
t.Dict[t.Union[Timestamp, Timestamptz, Time], t.Union[float, int]]
table_class: type[Table]
visible_columns: Optional[list[Column]] = None
exclude_visible_columns: Optional[list[Column]] = None
visible_filters: Optional[list[Column]] = None
exclude_visible_filters: Optional[list[Column]] = None
rich_text_columns: Optional[list[Column]] = None
hooks: Optional[list[Hook]] = None
media_storage: Optional[Sequence[MediaStorage]] = None
validators: Optional[Validators] = None
menu_group: Optional[str] = None
link_column: Optional[Column] = None
order_by: Optional[list[OrderBy]] = None
time_resolution: Optional[
dict[Union[Timestamp, Timestamptz, Time], Union[float, int]]
] = None

def __post_init__(self):
Expand Down Expand Up @@ -253,10 +254,10 @@ def __post_init__(self):

def _get_columns(
self,
include_columns: t.Optional[t.List[Column]],
exclude_columns: t.Optional[t.List[Column]],
all_columns: t.List[Column],
) -> t.List[Column]:
include_columns: Optional[list[Column]],
exclude_columns: Optional[list[Column]],
all_columns: list[Column],
) -> list[Column]:
if include_columns and not exclude_columns:
return include_columns

Expand All @@ -266,34 +267,34 @@ def _get_columns(

return all_columns

def get_visible_columns(self) -> t.List[Column]:
def get_visible_columns(self) -> list[Column]:
return self._get_columns(
include_columns=self.visible_columns,
exclude_columns=self.exclude_visible_columns,
all_columns=self.table_class._meta.columns,
)

def get_visible_column_names(self) -> t.Tuple[str, ...]:
def get_visible_column_names(self) -> tuple[str, ...]:
return tuple(i._meta.name for i in self.get_visible_columns())

def get_visible_filters(self) -> t.List[Column]:
def get_visible_filters(self) -> list[Column]:
return self._get_columns(
include_columns=self.visible_filters,
exclude_columns=self.exclude_visible_filters,
all_columns=self.table_class._meta.columns,
)

def get_visible_filter_names(self) -> t.Tuple[str, ...]:
def get_visible_filter_names(self) -> tuple[str, ...]:
return tuple(i._meta.name for i in self.get_visible_filters())

def get_rich_text_columns_names(self) -> t.Tuple[str, ...]:
def get_rich_text_columns_names(self) -> tuple[str, ...]:
return (
tuple(i._meta.name for i in self.rich_text_columns)
if self.rich_text_columns
else ()
)

def get_media_columns_names(self) -> t.Tuple[str, ...]:
def get_media_columns_names(self) -> tuple[str, ...]:
return (
tuple(i._meta.name for i in self.media_columns)
if self.media_columns
Expand All @@ -303,12 +304,12 @@ def get_media_columns_names(self) -> t.Tuple[str, ...]:
def get_link_column(self) -> Column:
return self.link_column or self.table_class._meta.primary_key

def get_order_by(self) -> t.List[OrderBy]:
def get_order_by(self) -> list[OrderBy]:
return self.order_by or [
OrderBy(column=self.table_class._meta.primary_key, ascending=True)
]

def get_time_resolution(self) -> t.Dict[str, t.Union[int, float]]:
def get_time_resolution(self) -> dict[str, Union[int, float]]:
return (
{
column._meta.name: resolution
Expand All @@ -319,17 +320,17 @@ def get_time_resolution(self) -> t.Dict[str, t.Union[int, float]]:
)


PydanticModel = t.TypeVar("PydanticModel", bound=BaseModel)
PydanticModel = TypeVar("PydanticModel", bound=BaseModel)


@dataclass
class FileResponse:
contents: t.Union[io.StringIO, io.BytesIO]
contents: Union[io.StringIO, io.BytesIO]
file_name: str
media_type: str


FormResponse: typing_extensions.TypeAlias = t.Union[str, FileResponse, None]
FormResponse: typing_extensions.TypeAlias = Union[str, FileResponse, None]


@dataclass
Expand Down Expand Up @@ -389,13 +390,13 @@ def my_endpoint(request: Request, data: MyModel):
def __init__(
self,
name: str,
pydantic_model: t.Type[PydanticModel],
endpoint: t.Callable[
pydantic_model: type[PydanticModel],
endpoint: Callable[
[Request, PydanticModel],
t.Union[FormResponse, t.Coroutine[None, None, FormResponse]],
Union[FormResponse, Coroutine[None, None, FormResponse]],
],
description: t.Optional[str] = None,
form_group: t.Optional[str] = None,
description: Optional[str] = None,
form_group: Optional[str] = None,
):
self.name = name
self.pydantic_model = pydantic_model
Expand All @@ -408,7 +409,7 @@ def __init__(
class FormConfigResponseModel(BaseModel):
name: str
slug: str
description: t.Optional[str] = None
description: Optional[str] = None


def handle_auth_exception(request: Request, exc: Exception):
Expand Down Expand Up @@ -441,30 +442,30 @@ class AdminRouter(FastAPI):
The root returns a single page app. The other URLs are REST endpoints.
"""

table: t.List[Table] = []
auth_table: t.Type[BaseUser] = BaseUser
table: list[Table] = []
auth_table: type[BaseUser] = BaseUser
template: str = ""

def __init__(
self,
*tables: t.Union[t.Type[Table], TableConfig],
forms: t.List[FormConfig] = [],
auth_table: t.Type[BaseUser] = BaseUser,
session_table: t.Type[SessionsBase] = SessionsBase,
*tables: Union[type[Table], TableConfig],
forms: list[FormConfig] = [],
auth_table: type[BaseUser] = BaseUser,
session_table: type[SessionsBase] = SessionsBase,
session_expiry: timedelta = timedelta(hours=1),
max_session_expiry: timedelta = timedelta(days=7),
increase_expiry: t.Optional[timedelta] = timedelta(minutes=20),
increase_expiry: Optional[timedelta] = timedelta(minutes=20),
page_size: int = 15,
read_only: bool = False,
rate_limit_provider: t.Optional[RateLimitProvider] = None,
rate_limit_provider: Optional[RateLimitProvider] = None,
production: bool = False,
site_name: str = "Piccolo Admin",
default_language_code: str = "auto",
translations: t.Optional[t.List[Translation]] = None,
allowed_hosts: t.Sequence[str] = [],
translations: Optional[list[Translation]] = None,
allowed_hosts: Sequence[str] = [],
debug: bool = False,
sidebar_links: t.Dict[str, str] = {},
mfa_providers: t.Optional[t.Sequence[MFAProvider]] = None,
sidebar_links: dict[str, str] = {},
mfa_providers: Optional[Sequence[MFAProvider]] = None,
) -> None:
super().__init__(
title=site_name,
Expand All @@ -486,7 +487,7 @@ def __init__(
# Convert any table arguments which are plain ``Table`` classes into
# ``TableConfig`` instances.

table_configs: t.List[TableConfig] = []
table_configs: list[TableConfig] = []

for table in tables:
if isinstance(table, TableConfig):
Expand Down Expand Up @@ -614,7 +615,7 @@ def __init__(
path="/tables/",
endpoint=self.get_table_list, # type: ignore
methods=["GET"],
response_model=t.List[str],
response_model=list[str],
tags=["Tables"],
)

Expand All @@ -638,7 +639,7 @@ def __init__(
endpoint=self.get_forms, # type: ignore
methods=["GET"],
tags=["Forms"],
response_model=t.List[FormConfigResponseModel],
response_model=list[FormConfigResponseModel],
)

private_app.add_api_route(
Expand Down Expand Up @@ -953,7 +954,7 @@ def get_user(self, request: Request) -> UserResponseModel:
###########################################################################
# Custom forms

def get_forms(self) -> t.List[FormConfigResponseModel]:
def get_forms(self) -> list[FormConfigResponseModel]:
"""
Returns a list of all forms registered with the admin.
"""
Expand Down Expand Up @@ -1006,17 +1007,15 @@ def get_single_form(self, form_slug: str) -> FormConfigResponseModel:
description=form.description,
)

def get_single_form_schema(self, form_slug: str) -> t.Dict[str, t.Any]:
def get_single_form_schema(self, form_slug: str) -> dict[str, Any]:
form_config = self.form_config_map.get(form_slug)

if form_config is None:
raise HTTPException(status_code=404, detail="No such form found")
else:
return form_config.pydantic_model.model_json_schema()

async def post_single_form(
self, request: Request, form_slug: str
) -> t.Any:
async def post_single_form(self, request: Request, form_slug: str) -> Any:
"""
Handles posting of custom forms.
"""
Expand Down Expand Up @@ -1077,15 +1076,15 @@ def get_meta(self) -> MetaResponseModel:

###########################################################################

def get_sidebar_links(self) -> t.Dict[str, str]:
def get_sidebar_links(self) -> dict[str, str]:
"""
Returns the custom links registered with the admin.
"""
return self.sidebar_links

###########################################################################

def get_table_list(self) -> t.List[str]:
def get_table_list(self) -> list[str]:
"""
Returns the list of table groups registered with the admin.
"""
Expand Down Expand Up @@ -1148,15 +1147,15 @@ def get_translation(self, language_code: str = "en") -> Translation:


def get_all_tables(
tables: t.Sequence[t.Type[Table]],
) -> t.Sequence[t.Type[Table]]:
tables: Sequence[type[Table]],
) -> Sequence[type[Table]]:
"""
Fetch any related tables, and include them.
"""
output: t.List[t.Type[Table]] = []
output: list[type[Table]] = []

def get_references(table: t.Type[Table]):
references: t.List[t.Union[t.Type[Table], t.Any]] = [
def get_references(table: type[Table]):
references: list[Union[type[Table], Any]] = [
i._foreign_key_meta.references
for i in table._meta.foreign_key_columns
]
Expand All @@ -1180,25 +1179,25 @@ def get_references(table: t.Type[Table]):


def create_admin(
tables: t.Sequence[t.Union[t.Type[Table], TableConfig]],
forms: t.List[FormConfig] = [],
auth_table: t.Optional[t.Type[BaseUser]] = None,
session_table: t.Optional[t.Type[SessionsBase]] = None,
tables: Sequence[Union[type[Table], TableConfig]],
forms: list[FormConfig] = [],
auth_table: Optional[type[BaseUser]] = None,
session_table: Optional[type[SessionsBase]] = None,
session_expiry: timedelta = timedelta(hours=1),
max_session_expiry: timedelta = timedelta(days=7),
increase_expiry: t.Optional[timedelta] = timedelta(minutes=20),
increase_expiry: Optional[timedelta] = timedelta(minutes=20),
page_size: int = 15,
read_only: bool = False,
rate_limit_provider: t.Optional[RateLimitProvider] = None,
rate_limit_provider: Optional[RateLimitProvider] = None,
production: bool = False,
site_name: str = "Piccolo Admin",
default_language_code: str = "auto",
translations: t.Optional[t.List[Translation]] = None,
translations: Optional[list[Translation]] = None,
auto_include_related: bool = True,
allowed_hosts: t.Sequence[str] = [],
allowed_hosts: Sequence[str] = [],
debug: bool = False,
sidebar_links: t.Dict[str, str] = {},
mfa_providers: t.Optional[t.Sequence[MFAProvider]] = None,
sidebar_links: dict[str, str] = {},
mfa_providers: Optional[Sequence[MFAProvider]] = None,
):
"""
:param tables:
Expand Down Expand Up @@ -1328,7 +1327,7 @@ def create_admin(
session_table = session_table or SessionsBase

if auto_include_related:
table_config_map: t.Dict[t.Type[Table], t.Optional[TableConfig]] = {}
table_config_map: dict[type[Table], Optional[TableConfig]] = {}

for i in tables:
if isinstance(i, TableConfig):
Expand All @@ -1338,8 +1337,8 @@ def create_admin(

all_table_classes = get_all_tables(tuple(table_config_map.keys()))

all_table_classes_with_configs: t.List[
t.Union[t.Type[Table], TableConfig]
all_table_classes_with_configs: list[
Union[type[Table], TableConfig]
] = []
for i in all_table_classes:
table_config = table_config_map.get(i)
Expand Down
Loading
Loading