Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/src/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ Give me an ASGI web app!

piccolo asgi new

FastAPI, Starlette, BlackSheep, Litestar, Esmerald and Lilya are currently supported,
with more coming soon.
FastAPI, Starlette, BlackSheep, Litestar, Esmerald, Lilya, Quart, Falcon and Sanic
are currently supported, with more coming soon.

-------------------------------------------------------------------------------
----------------------------------------------------------------------------------

Videos
------
Expand Down
86 changes: 57 additions & 29 deletions piccolo/apps/asgi/commands/templates/app/_blacksheep_app.py.jinja
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import typing as t
from typing import Any

from blacksheep.exceptions import HTTPException
from blacksheep.server import Application
from blacksheep.server.bindings import FromJSON
from blacksheep.server.openapi.v3 import OpenAPIHandler
from blacksheep.server.responses import json
from openapidocs.v3 import Info
from blacksheep.server.openapi.v3 import OpenAPIHandler, Info
from piccolo.engine import engine_finder
from piccolo_admin.endpoints import create_admin
from piccolo_api.crud.serializers import create_pydantic_model
Expand Down Expand Up @@ -34,68 +33,97 @@ app.serve_files("static", root_path="/static")
app.router.add_get("/", home)


TaskModelIn: t.Any = create_pydantic_model(table=Task, model_name="TaskModelIn")
TaskModelOut: t.Any = create_pydantic_model(
table=Task, include_default_columns=True, model_name="TaskModelOut"
TaskModelIn: Any = create_pydantic_model(
table=Task,
model_name="TaskModelIn",
)
TaskModelPartial: t.Any = create_pydantic_model(
table=Task, model_name="TaskModelPartial", all_optional=True
TaskModelOut: Any = create_pydantic_model(
table=Task,
include_default_columns=True,
model_name="TaskModelOut",
)
TaskModelPartial: Any = (
create_pydantic_model(
table=Task,
model_name="TaskModelPartial",
all_optional=True,
),
)


# Check if the record is None. Use for query callback
def check_record_not_found(result: dict[str, Any]) -> dict[str, Any]:
if result is None:
raise HTTPException(status=404)
return result


@app.router.get("/tasks/")
async def tasks() -> t.List[TaskModelOut]:
return await Task.select().order_by(Task._meta.primary_key, ascending=False)
async def tasks() -> list[TaskModelOut]:
tasks = await Task.select().order_by(Task._meta.primary_key, ascending=False)
return [TaskModelOut(**task) for task in tasks]


@app.router.get("/tasks/{task_id}/")
async def single_task(task_id: int) -> TaskModelOut:
task = (
await Task.select()
.where(Task._meta.primary_key == task_id)
.first()
.callback(check_record_not_found)
)
return TaskModelOut(**task)


@app.router.post("/tasks/")
async def create_task(task_model: FromJSON[TaskModelIn]) -> TaskModelOut:
task = Task(**task_model.value.dict())
task = Task(**task_model.value.model_dump())
await task.save()
return TaskModelOut(**task.to_dict())


@app.router.put("/tasks/{task_id}/")
async def put_task(task_id: int, task_model: FromJSON[TaskModelIn]) -> TaskModelOut:
task = await Task.objects().get(Task._meta.primary_key == task_id)
if not task:
return json({}, status=404)
task = (
await Task.objects()
.get(Task._meta.primary_key == task_id)
.callback(check_record_not_found)
)

for key, value in task_model.value.dict().items():
for key, value in task_model.value.model_dump().items():
setattr(task, key, value)

await task.save()

return TaskModelOut(**task.to_dict())


@app.router.patch("/tasks/{task_id}/")
async def patch_task(
task_id: int, task_model: FromJSON[TaskModelPartial]
) -> TaskModelOut:
task = await Task.objects().get(Task._meta.primary_key == task_id)
if not task:
return json({}, status=404)
task = (
await Task.objects()
.get(Task._meta.primary_key == task_id)
.callback(check_record_not_found)
)

for key, value in task_model.value.dict().items():
for key, value in task_model.value.model_dump().items():
if value is not None:
setattr(task, key, value)

await task.save()

return TaskModelOut(**task.to_dict())


@app.router.delete("/tasks/{task_id}/")
async def delete_task(task_id: int):
task = await Task.objects().get(Task._meta.primary_key == task_id)
if not task:
return json({}, status=404)

async def delete_task(task_id: int) -> None:
task = (
await Task.objects()
.get(Task._meta.primary_key == task_id)
.callback(check_record_not_found)
)
await task.remove()

return json({})


async def open_database_connection_pool(application):
try:
Expand Down
69 changes: 48 additions & 21 deletions piccolo/apps/asgi/commands/templates/app/_esmerald_app.py.jinja
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import typing as t
from typing import Any
from pathlib import Path

from esmerald import (
APIView,
Esmerald,
Gateway,
HTTPException,
Include,
JSONResponse,
delete,
get,
post,
Expand Down Expand Up @@ -38,45 +38,72 @@ async def close_database_connection_pool():
print("Unable to connect to the database")


TaskModelIn: t.Any = create_pydantic_model(table=Task, model_name="TaskModelIn")
TaskModelOut: t.Any = create_pydantic_model(
table=Task, include_default_columns=True, model_name="TaskModelOut"
TaskModelIn: Any = create_pydantic_model(
table=Task,
model_name="TaskModelIn",
)
TaskModelOut: Any = create_pydantic_model(
table=Task,
include_default_columns=True,
model_name="TaskModelOut",
)


# Check if the record is None. Use for query callback
def check_record_not_found(result: dict[str, Any]) -> dict[str, Any]:
if result is None:
raise HTTPException(
detail="Record not found",
status_code=404,
)
return result


class TaskAPIView(APIView):
path: str = "/"
tags: str = ["Task"]
tags: list[str] = ["Task"]

@get("/")
async def tasks(self) -> t.List[TaskModelOut]:
return await Task.select().order_by(Task._meta.primary_key, ascending=False)
async def tasks(self) -> list[TaskModelOut]:
tasks = await Task.select().order_by(Task._meta.primary_key, ascending=False)
return [TaskModelOut(**task) for task in tasks]

@get("/{task_id}")
async def single_task(self, task_id: int) -> TaskModelOut:
task = (
await Task.select()
.where(Task._meta.primary_key == task_id)
.first()
.callback(check_record_not_found)
)
return TaskModelOut(**task)

@post("/")
async def create_task(self, payload: TaskModelIn) -> TaskModelOut:
task = Task(**payload.dict())
task = Task(**payload.model_dump())
await task.save()
return task.to_dict()
return TaskModelOut(**task.to_dict())

@put("/{task_id}")
async def update_task(self, payload: TaskModelIn, task_id: int) -> TaskModelOut:
task = await Task.objects().get(Task._meta.primary_key == task_id)
if not task:
return JSONResponse({}, status_code=404)

for key, value in payload.dict().items():
task = (
await Task.objects()
.get(Task._meta.primary_key == task_id)
.callback(check_record_not_found)
)
for key, value in payload.model_dump().items():
setattr(task, key, value)

await task.save()

return task.to_dict()
return TaskModelOut(**task.to_dict())

@delete("/{task_id}")
async def delete_task(self, task_id: int) -> None:
task = await Task.objects().get(Task._meta.primary_key == task_id)
if not task:
return JSONResponse({}, status_code=404)

task = (
await Task.objects()
.get(Task._meta.primary_key == task_id)
.callback(check_record_not_found)
)
await task.remove()


Expand Down
71 changes: 63 additions & 8 deletions piccolo/apps/asgi/commands/templates/app/_falcon_app.py.jinja
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import os
import typing as t
from typing import Any

import falcon.asgi
from hypercorn.middleware import DispatcherMiddleware
from piccolo.engine import engine_finder
from piccolo_admin.endpoints import create_admin
from piccolo_api.crud.endpoints import PiccoloCRUD

from home.endpoints import HomeEndpoint
from home.piccolo_app import APP_CONFIG
Expand All @@ -28,33 +27,89 @@ async def close_database_connection_pool():
print("Unable to connect to the database")


# Check if the record is None. Use for query callback
def check_record_not_found(result: dict[str, Any]) -> dict[str, Any]:
if result is None:
raise falcon.HTTPNotFound()
return result


class LifespanMiddleware:
async def process_startup(
self, scope: t.Dict[str, t.Any], event: t.Dict[str, t.Any]
self, scope: dict[str, Any], event: dict[str, Any]
) -> None:
await open_database_connection_pool()

async def process_shutdown(
self, scope: t.Dict[str, t.Any], event: t.Dict[str, t.Any]
self, scope: dict[str, Any], event: dict[str, Any]
) -> None:
await close_database_connection_pool()


app: t.Any = falcon.asgi.App(middleware=LifespanMiddleware())
class TaskCollectionResource:
async def on_get(self, req, resp):
tasks = await Task.select().order_by(Task._meta.primary_key, ascending=False)
resp.media = tasks

async def on_post(self, req, resp):
data = await req.media
task = Task(**data)
await task.save()
resp.status = falcon.HTTP_201
resp.media = task.to_dict()


class TaskItemResource:
async def on_get(self, req, resp, task_id):
task = (
await Task.select()
.where(Task._meta.primary_key == task_id)
.first()
.callback(check_record_not_found)
)
resp.status = falcon.HTTP_200
resp.media = task

async def on_put(self, req, resp, task_id):
task = (
await Task.objects()
.get(Task._meta.primary_key == task_id)
.callback(check_record_not_found)
)

data = await req.media
for key, value in data.items():
setattr(task, key, value)

await task.save()
resp.status = falcon.HTTP_200
resp.media = task.to_dict()

async def on_delete(self, req, resp, task_id):
task = (
await Task.objects()
.get(Task._meta.primary_key == task_id)
.callback(check_record_not_found)
)
resp.status = falcon.HTTP_204
await task.remove()


app: Any = falcon.asgi.App(middleware=LifespanMiddleware())
app.add_static_route("/static", directory=os.path.abspath("static"))
app.add_route("/", HomeEndpoint())
app.add_route("/tasks/", TaskCollectionResource())
app.add_route("/tasks/{task_id:int}", TaskItemResource())

PICCOLO_CRUD: t.Any = PiccoloCRUD(table=Task)

# enable the Admin and PiccoloCrud app using DispatcherMiddleware
# enable the admin application using DispatcherMiddleware
app = DispatcherMiddleware( # type: ignore
{
"/admin": create_admin(
tables=APP_CONFIG.table_classes,
# Required when running under HTTPS:
# allowed_hosts=['my_site.com']
),
"/tasks": PICCOLO_CRUD,
"": app,
}
)
Loading